CRM & ERP Integration: Salesforce, SAP, DATEV anbinden
System-Integration

CRM & ERP Integration: Salesforce, SAP, DATEV anbinden

Carola Schulte
9. Juni 2025
28 min

CRM & ERP Integration: So verbinden Sie Ihre Business-Systeme

87% aller Unternehmen nutzen mindestens 3 verschiedene Business-Systeme. CRM hier, ERP dort, Buchhaltung woanders. Und dazwischen? Manuelle Datenübertragung, Excel-Exporte, Copy-Paste-Orgien. Das kostet Zeit, verursacht Fehler und frustriert alle Beteiligten.

Die Lösung: Automatische Integration. Aber wie verbindet man Salesforce mit SAP? Wie bekommt man Kundendaten in DATEV? Und was tun, wenn das Legacy-System nur SOAP spricht?

In diesem Artikel zeige ich Ihnen die Integration-Patterns, die in der Praxis funktionieren. Mit echtem PHP-Code, bewährten Fehlerbehandlungen und den Fallstricken, die in keiner Dokumentation stehen.


Die drei Integrations-Ansätze

Schnellnavigation:

REST API - Moderner Standard (Salesforce, HubSpot, Shopify) → Direkt zu REST

SOAP/XML - Enterprise Legacy (SAP, DATEV, Microsoft Dynamics) → Direkt zu SOAP

Batch-Import - Datei-basiert (CSV, XML, EDIFACT) → Direkt zu Batch

Wann welchen Ansatz?

KriteriumREST APISOAPBatch-Import
Echtzeit✅ Ja✅ Ja❌ Nein
KomplexitätNiedrigHochMittel
Legacy-SupportSelten✅ Sehr gut✅ Immer möglich
DebuggingEinfachSchwierigEinfach
Große Datenmengen⚠️ Paginierung nötig⚠️ Timeout-Gefahr✅ Optimal

Merksatz: REST für Echtzeit, SOAP für Enterprise-Legacy, Batch für Masse - aber in der Praxis brauchen Sie oft alle drei.


REST API Integration

Grundlegendes Pattern: API-Client-Klasse

<?php

declare(strict_types=1);

abstract class ApiClient
{
    protected string $baseUrl;
    protected array $defaultHeaders = [];
    protected int $timeout = 30;
    protected int $maxRetries = 3;

    public function __construct(
        protected readonly string $apiKey,
        protected readonly LoggerInterface $logger
    ) {}

    protected function request(
        string $method,
        string $endpoint,
        array $data = [],
        array $headers = []
    ): array {
        $url = $this->baseUrl . $endpoint;
        $allHeaders = array_merge($this->defaultHeaders, $headers);

        $attempt = 0;
        $lastException = null;

        while ($attempt < $this->maxRetries) {
            $attempt++;

            try {
                $response = $this->executeRequest($method, $url, $data, $allHeaders);

                // Rate-Limit-Header auswerten
                $this->handleRateLimitHeaders($response['headers']);

                return $response['body'];

            } catch (RateLimitException $e) {
                $waitSeconds = $e->getRetryAfter() ?? (2 ** $attempt);
                $this->logger->warning("Rate limit hit, waiting {$waitSeconds}s", [
                    'endpoint' => $endpoint,
                    'attempt' => $attempt
                ]);
                sleep($waitSeconds);
                $lastException = $e;

            } catch (ServerException $e) {
                // 5xx: Exponential Backoff + Full Jitter (verhindert Stampede)
                if ($attempt < $this->maxRetries) {
                    $base = 2 ** $attempt;
                    $waitSeconds = random_int(0, $base); // Full Jitter
                    $this->logger->warning("Server error, retry in {$waitSeconds}s (jitter)", [
                        'endpoint' => $endpoint,
                        'status' => $e->getCode()
                    ]);
                    sleep($waitSeconds);
                }
                $lastException = $e;

            } catch (ClientException $e) {
                // 4xx: Nicht wiederholen (ausser 429)
                throw $e;
            }
        }

        throw new ApiException(
            "Max retries exceeded for {$endpoint}",
            0,
            $lastException
        );
    }

    private function executeRequest(
        string $method,
        string $url,
        array $data,
        array $headers
    ): array {
        $ch = curl_init();

        curl_setopt_array($ch, [
            CURLOPT_URL => $url,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_TIMEOUT => $this->timeout,
            CURLOPT_CONNECTTIMEOUT => 10,
            CURLOPT_HTTPHEADER => $this->formatHeaders($headers),
            CURLOPT_HEADER => true,
            // TLS-Sicherheit: Zertifikate IMMER pruefen!
            CURLOPT_SSL_VERIFYPEER => true,
            CURLOPT_SSL_VERIFYHOST => 2,
        ]);

        if ($method === 'POST') {
            curl_setopt($ch, CURLOPT_POST, true);
            curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
        } elseif ($method === 'PUT' || $method === 'PATCH') {
            curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $method);
            curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
        } elseif ($method === 'DELETE') {
            curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'DELETE');
        } elseif ($method === 'GET' && !empty($data)) {
            curl_setopt($ch, CURLOPT_URL, $url . '?' . http_build_query($data));
        }

        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        $headerSize = curl_getinfo($ch, CURLINFO_HEADER_SIZE);

        if ($response === false) {
            $error = curl_error($ch);
            curl_close($ch);
            throw new NetworkException("cURL error: {$error}");
        }

        curl_close($ch);

        $headerStr = substr($response, 0, $headerSize);
        $bodyStr = substr($response, $headerSize);
        $parsedHeaders = $this->parseHeaders($headerStr);

        // Headers an checkStatusCode übergeben für Retry-After
        $this->checkStatusCode($httpCode, $bodyStr, $parsedHeaders);

        return [
            'headers' => $parsedHeaders,
            'body' => json_decode($bodyStr, true) ?? []
        ];
    }

    private function checkStatusCode(int $code, string $body, array $headers = []): void
    {
        if ($code >= 200 && $code < 300) {
            return;
        }

        if ($code === 429) {
            // Retry-After Header auslesen (Sekunden oder HTTP-Date)
            $retryAfter = $headers['retry-after'] ?? null;
            if ($retryAfter !== null && is_numeric($retryAfter)) {
                $retryAfter = (int)$retryAfter;
            }
            throw new RateLimitException("Rate limit exceeded", $code, $retryAfter);
        }

        if ($code >= 400 && $code < 500) {
            throw new ClientException("Client error: {$body}", $code);
        }

        if ($code >= 500) {
            throw new ServerException("Server error: {$body}", $code);
        }
    }

    private function handleRateLimitHeaders(array $headers): void
    {
        $remaining = $headers['x-rate-limit-remaining'] ?? null;

        if ($remaining !== null && (int)$remaining < 10) {
            $this->logger->notice("Rate limit low: {$remaining} requests remaining");
        }
    }

    private function formatHeaders(array $headers): array
    {
        return array_map(
            fn($k, $v) => "{$k}: {$v}",
            array_keys($headers),
            array_values($headers)
        );
    }

    private function parseHeaders(string $headerStr): array
    {
        $headers = [];
        foreach (explode("\r\n", $headerStr) as $line) {
            if (str_contains($line, ':')) {
                [$key, $value] = explode(':', $line, 2);
                $headers[strtolower(trim($key))] = trim($value);
            }
        }
        return $headers;
    }
}

Webhook-Empfang absichern

Wenn Salesforce, HubSpot oder andere Systeme Webhooks an Ihre App senden, muessen Sie diese absichern:

<?php

class WebhookVerifier
{
    public function __construct(
        private readonly string $secret,
        private readonly int $replayWindowSeconds = 300 // ±5 Minuten
    ) {}

    public function verify(
        string $payload,
        string $signature,
        string $timestamp,
        string $eventId
    ): void {
        // 1. Replay-Schutz: Timestamp pruefen
        $eventTime = (int)$timestamp;
        $now = time();

        if (abs($now - $eventTime) > $this->replayWindowSeconds) {
            throw new WebhookReplayException(
                "Timestamp ausserhalb des erlaubten Fensters"
            );
        }

        // 2. HMAC-Signatur pruefen
        $expectedSignature = hash_hmac('sha256', $timestamp . '.' . $payload, $this->secret);

        if (!hash_equals($expectedSignature, $signature)) {
            throw new WebhookSignatureException("Ungueltige Signatur");
        }

        // 3. Duplikat-Erkennung via Event-ID
        if ($this->isDuplicate($eventId)) {
            throw new WebhookDuplicateException("Event bereits verarbeitet");
        }

        $this->markAsProcessed($eventId, $eventTime);
    }

    private function isDuplicate(string $eventId): bool
    {
        // Redis oder DB: event_id mit TTL speichern
        return (bool)$this->redis->exists("webhook:processed:{$eventId}");
    }

    private function markAsProcessed(string $eventId, int $timestamp): void
    {
        // TTL = Replay-Window + Puffer
        $this->redis->setex(
            "webhook:processed:{$eventId}",
            $this->replayWindowSeconds + 60,
            $timestamp
        );
    }
}

// HubSpot-Webhook validieren
$verifier = new WebhookVerifier($_ENV['HUBSPOT_WEBHOOK_SECRET']);
$verifier->verify(
    payload: file_get_contents('php://input'),
    signature: $_SERVER['HTTP_X_HUBSPOT_SIGNATURE_V3'] ?? '',
    timestamp: $_SERVER['HTTP_X_HUBSPOT_REQUEST_TIMESTAMP'] ?? '',
    eventId: $data['eventId'] ?? ''
);

Webhook-Sicherheit:

  • HMAC-Signatur IMMER pruefen (nicht optional!)
  • Replay-Window von ±5 Minuten verhindert alte Requests
  • Event-ID deduplizieren verhindert Doppelverarbeitung
  • Bei Signatur-Fehler: HTTP 401, nicht 200!

Salesforce Integration

Salesforce nutzt OAuth 2.0. Fuer Produktion: JWT Bearer Flow (Connected App + Zertifikat) statt Username-Password-Flow - weniger Secrets, kein Passwort-Rotation-Problem:

<?php

class SalesforceClient extends ApiClient
{
    private string $instanceUrl;
    private ?string $accessToken = null;
    private ?int $tokenExpiresAt = null;

    public function __construct(
        private readonly string $clientId,
        private readonly string $clientSecret,
        private readonly string $username,
        private readonly string $password,
        private readonly string $securityToken,
        LoggerInterface $logger,
        private readonly string $loginUrl = 'https://login.salesforce.com'
    ) {
        parent::__construct('', $logger);
    }

    private function authenticate(): void
    {
        if ($this->accessToken && $this->tokenExpiresAt > time()) {
            return;
        }

        $ch = curl_init($this->loginUrl . '/services/oauth2/token');

        curl_setopt_array($ch, [
            CURLOPT_POST => true,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_POSTFIELDS => http_build_query([
                'grant_type' => 'password',
                'client_id' => $this->clientId,
                'client_secret' => $this->clientSecret,
                'username' => $this->username,
                'password' => $this->password . $this->securityToken,
            ]),
        ]);

        $response = curl_exec($ch);
        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
        curl_close($ch);

        if ($httpCode !== 200) {
            throw new AuthenticationException("Salesforce auth failed: {$response}");
        }

        $data = json_decode($response, true);

        $this->accessToken = $data['access_token'];
        $this->instanceUrl = $data['instance_url'];
        $this->tokenExpiresAt = time() + 7200; // 2 Stunden

        $this->baseUrl = $this->instanceUrl . '/services/data/v59.0';
        $this->defaultHeaders = [
            'Authorization' => 'Bearer ' . $this->accessToken,
            'Content-Type' => 'application/json',
        ];
    }

    public function query(string $soql): array
    {
        $this->authenticate();

        $results = [];
        $endpoint = '/query?q=' . urlencode($soql);

        do {
            $response = $this->request('GET', $endpoint);
            $results = array_merge($results, $response['records'] ?? []);

            // Pagination: nextRecordsUrl folgen
            $endpoint = $response['nextRecordsUrl'] ?? null;
            if ($endpoint) {
                // Relative URL aus der Response
                $endpoint = str_replace($this->baseUrl, '', $endpoint);
            }

        } while ($endpoint !== null);

        return $results;
    }

    public function createRecord(string $object, array $data): string
    {
        $this->authenticate();

        $response = $this->request('POST', "/sobjects/{$object}", $data);

        if (!($response['success'] ?? false)) {
            throw new ApiException(
                "Failed to create {$object}: " . json_encode($response['errors'] ?? [])
            );
        }

        return $response['id'];
    }

    public function updateRecord(string $object, string $id, array $data): void
    {
        $this->authenticate();

        $this->request('PATCH', "/sobjects/{$object}/{$id}", $data);
    }

    public function upsertByExternalId(
        string $object,
        string $externalIdField,
        string $externalIdValue,
        array $data
    ): array {
        $this->authenticate();

        $endpoint = "/sobjects/{$object}/{$externalIdField}/{$externalIdValue}";

        return $this->request('PATCH', $endpoint, $data);
    }

    // Bulk API 2.0 für große Datenmengen
    public function bulkUpsert(string $object, string $externalId, array $records): string
    {
        $this->authenticate();

        // Job erstellen
        $job = $this->request('POST', '/jobs/ingest', [
            'object' => $object,
            'externalIdFieldName' => $externalId,
            'operation' => 'upsert',
            'contentType' => 'CSV',
        ]);

        $jobId = $job['id'];

        // CSV-Daten hochladen
        $csv = $this->arrayToCsv($records);

        $ch = curl_init($this->instanceUrl . "/services/data/v59.0/jobs/ingest/{$jobId}/batches");
        curl_setopt_array($ch, [
            // CUSTOMREQUEST statt CURLOPT_PUT (das erwartet File-Stream)
            CURLOPT_CUSTOMREQUEST => 'PUT',
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_HTTPHEADER => [
                'Authorization: Bearer ' . $this->accessToken,
                'Content-Type: text/csv',
            ],
            CURLOPT_POSTFIELDS => $csv,
            CURLOPT_SSL_VERIFYPEER => true,
            CURLOPT_SSL_VERIFYHOST => 2,
        ]);

        curl_exec($ch);
        curl_close($ch);

        // Job abschliessen
        $this->request('PATCH', "/jobs/ingest/{$jobId}", [
            'state' => 'UploadComplete'
        ]);

        return $jobId;
    }

    private function arrayToCsv(array $records): string
    {
        if (empty($records)) {
            return '';
        }

        $output = fopen('php://temp', 'r+');
        fputcsv($output, array_keys($records[0]));

        foreach ($records as $record) {
            fputcsv($output, array_values($record));
        }

        rewind($output);
        $csv = stream_get_contents($output);
        fclose($output);

        return $csv;
    }
}

Salesforce Limits beachten:

  • API Calls: Je nach Edition 15.000 - 5.000.000 pro 24h
  • Bulk API: Max. 10.000 Records pro Batch
  • Query: Max. 2.000 Records pro Response (Pagination nutzen!)
  • Concurrent Requests: Max. 25 gleichzeitig

HubSpot Integration

<?php

class HubSpotClient extends ApiClient
{
    protected string $baseUrl = 'https://api.hubapi.com';

    public function __construct(
        string $apiKey,
        LoggerInterface $logger
    ) {
        parent::__construct($apiKey, $logger);

        $this->defaultHeaders = [
            'Authorization' => 'Bearer ' . $apiKey,
            'Content-Type' => 'application/json',
        ];
    }

    public function searchContacts(array $filters, array $properties = []): array
    {
        $allResults = [];
        $after = null;

        do {
            $body = [
                'filterGroups' => [['filters' => $filters]],
                'properties' => $properties,
                'limit' => 100,
            ];

            if ($after) {
                $body['after'] = $after;
            }

            $response = $this->request('POST', '/crm/v3/objects/contacts/search', $body);

            $allResults = array_merge($allResults, $response['results'] ?? []);
            $after = $response['paging']['next']['after'] ?? null;

        } while ($after !== null);

        return $allResults;
    }

    public function createOrUpdateContact(string $email, array $properties): array
    {
        // Erst pruefen ob Kontakt existiert
        $existing = $this->searchContacts([
            ['propertyName' => 'email', 'operator' => 'EQ', 'value' => $email]
        ]);

        if (!empty($existing)) {
            $contactId = $existing[0]['id'];
            return $this->request(
                'PATCH',
                "/crm/v3/objects/contacts/{$contactId}",
                ['properties' => $properties]
            );
        }

        $properties['email'] = $email;
        return $this->request('POST', '/crm/v3/objects/contacts', [
            'properties' => $properties
        ]);
    }

    public function createDeal(array $properties, ?string $contactId = null): array
    {
        $deal = $this->request('POST', '/crm/v3/objects/deals', [
            'properties' => $properties
        ]);

        if ($contactId) {
            $this->request('PUT', "/crm/v3/objects/deals/{$deal['id']}/associations/contacts/{$contactId}/deal_to_contact", []);
        }

        return $deal;
    }

    // Batch-Endpoint: Bis zu 100 Objekte in einem Call (weniger API-Calls!)
    public function batchCreateContacts(array $contacts): array
    {
        $inputs = array_map(fn($c) => ['properties' => $c], $contacts);

        return $this->request('POST', '/crm/v3/objects/contacts/batch/create', [
            'inputs' => $inputs
        ]);
    }

    public function batchUpdateContacts(array $updates): array
    {
        // $updates = [['id' => '123', 'properties' => [...]], ...]
        return $this->request('POST', '/crm/v3/objects/contacts/batch/update', [
            'inputs' => $updates
        ]);
    }
}

HubSpot Rate Limits:

  • Standard: 100 Requests / 10 Sekunden (Private Apps)
  • Secondary (Burst): Kurzfristige Spitzen werden zusaetzlich begrenzt
  • Batch-Endpoints nutzen: /batch/create, /batch/update - bis 100 Records pro Call, spart API-Calls!
  • Bei 429: Retry-After Header beachten

Merksatz: Bei REST-APIs immer Pagination implementieren - die meisten APIs liefern nur 100-1000 Records pro Request.


SOAP Integration

Das SOAP-Problem

SOAP ist komplex, XML-basiert und fehleranfällig. Aber: SAP, DATEV, viele Banken und Behörden sprechen nur SOAP. Sie kommen nicht drumherum.

<?php

class SoapClientWrapper
{
    private SoapClient $client;
    private LoggerInterface $logger;

    public function __construct(
        string $wsdlUrl,
        LoggerInterface $logger,
        array $options = []
    ) {
        $this->logger = $logger;

        $defaultOptions = [
            'trace' => true,
            'exceptions' => true,
            'cache_wsdl' => WSDL_CACHE_BOTH,
            'connection_timeout' => 30,
            'default_socket_timeout' => 60,
            'stream_context' => stream_context_create([
                'ssl' => [
                    'verify_peer' => true,
                    'verify_peer_name' => true,
                ],
                'http' => [
                    'timeout' => 60,
                ],
            ]),
        ];

        try {
            $this->client = new SoapClient(
                $wsdlUrl,
                array_merge($defaultOptions, $options)
            );
        } catch (SoapFault $e) {
            throw new SoapConnectionException(
                "WSDL konnte nicht geladen werden: {$e->getMessage()}"
            );
        }
    }

    public function call(string $method, array $params = []): mixed
    {
        $startTime = microtime(true);

        try {
            $result = $this->client->__soapCall($method, [$params]);

            $this->logger->info("SOAP call successful", [
                'method' => $method,
                'duration_ms' => round((microtime(true) - $startTime) * 1000),
            ]);

            return $result;

        } catch (SoapFault $e) {
            $this->logger->error("SOAP call failed", [
                'method' => $method,
                'fault_code' => $e->faultcode,
                'fault_string' => $e->faultstring,
                'request' => $this->client->__getLastRequest(),
                'response' => $this->client->__getLastResponse(),
            ]);

            throw $this->mapSoapException($e);
        }
    }

    private function mapSoapException(SoapFault $e): Exception
    {
        // Typische SOAP-Fehler in lesbare Exceptions umwandeln
        return match (true) {
            str_contains($e->faultstring, 'Authentication') =>
                new AuthenticationException("SOAP Auth failed: {$e->faultstring}"),
            str_contains($e->faultstring, 'Timeout') =>
                new TimeoutException("SOAP Timeout: {$e->faultstring}"),
            str_contains($e->faultcode, 'Client') =>
                new ClientException("SOAP Client Error: {$e->faultstring}"),
            default =>
                new SoapException("SOAP Error: {$e->faultstring}", 0, $e)
        };
    }

    public function getLastRequest(): ?string
    {
        return $this->client->__getLastRequest();
    }

    public function getLastResponse(): ?string
    {
        return $this->client->__getLastResponse();
    }
}

SAP Integration via RFC/BAPI

SAP bietet verschiedene Schnittstellen: RFC (Remote Function Call), BAPI (Business API), IDoc, OData. In PHP ist SOAP der pragmatischste Weg:

<?php

class SapClient
{
    private SoapClientWrapper $soap;

    public function __construct(
        string $wsdlUrl,
        string $username,
        string $password,
        LoggerInterface $logger
    ) {
        $this->soap = new SoapClientWrapper($wsdlUrl, $logger, [
            'login' => $username,
            'password' => $password,
            'authentication' => SOAP_AUTHENTICATION_BASIC,
        ]);
    }

    public function getCustomer(string $customerNumber): array
    {
        $result = $this->soap->call('BAPI_CUSTOMER_GETDETAIL2', [
            'CUSTOMERNO' => str_pad($customerNumber, 10, '0', STR_PAD_LEFT),
        ]);

        if (!empty($result->RETURN->MESSAGE)) {
            throw new SapException("SAP Error: {$result->RETURN->MESSAGE}");
        }

        return $this->mapCustomerData($result);
    }

    public function createSalesOrder(array $orderData): string
    {
        $header = [
            'DOC_TYPE' => $orderData['type'] ?? 'TA',
            'SALES_ORG' => $orderData['sales_org'],
            'DISTR_CHAN' => $orderData['distribution_channel'],
            'DIVISION' => $orderData['division'],
            'PURCH_NO_C' => $orderData['customer_reference'] ?? '',
        ];

        $items = [];
        foreach ($orderData['items'] as $idx => $item) {
            $items[] = [
                'ITM_NUMBER' => str_pad((string)(($idx + 1) * 10), 6, '0', STR_PAD_LEFT),
                'MATERIAL' => $item['material_number'],
                'TARGET_QTY' => $item['quantity'],
                'TARGET_QU' => $item['unit'] ?? 'ST',
            ];
        }

        $partners = [
            [
                'PARTN_ROLE' => 'AG', // Auftraggeber
                'PARTN_NUMB' => str_pad($orderData['customer_number'], 10, '0', STR_PAD_LEFT),
            ],
        ];

        $result = $this->soap->call('BAPI_SALESORDER_CREATEFROMDAT2', [
            'ORDER_HEADER_IN' => $header,
            'ORDER_ITEMS_IN' => $items,
            'ORDER_PARTNERS' => $partners,
        ]);

        // Fehler pruefen BEVOR Commit/Rollback
        $errors = $this->extractErrors($result->RETURN);

        if (!empty($errors) || empty($result->SALESDOCUMENT)) {
            // Bei Fehlern: ROLLBACK!
            $this->soap->call('BAPI_TRANSACTION_ROLLBACK', []);
            throw new SapException("Order creation failed: " . implode(', ', $errors));
        }

        // Nur bei Erfolg: COMMIT (WAIT = synchron)
        $this->soap->call('BAPI_TRANSACTION_COMMIT', ['WAIT' => 'X']);

        return $result->SALESDOCUMENT;
    }

    private function extractErrors($return): array
    {
        $errors = [];
        $messages = is_array($return) ? $return : [$return];

        foreach ($messages as $msg) {
            if (in_array($msg->TYPE ?? '', ['E', 'A'])) {
                $errors[] = $msg->MESSAGE ?? 'Unknown error';
            }
        }

        return $errors;
    }

    private function mapCustomerData($result): array
    {
        return [
            'number' => ltrim($result->CUSTOMERNO, '0'),
            'name' => trim($result->CUSTOMERADDRESS->NAME ?? ''),
            'street' => trim($result->CUSTOMERADDRESS->STREET ?? ''),
            'city' => trim($result->CUSTOMERADDRESS->CITY ?? ''),
            'postal_code' => trim($result->CUSTOMERADDRESS->POSTL_CODE ?? ''),
            'country' => trim($result->CUSTOMERADDRESS->COUNTRY ?? ''),
            'email' => trim($result->CUSTOMERADDRESS->E_MAIL ?? ''),
        ];
    }
}

SAP-Fallen:

BAPI_TRANSACTION_COMMIT/ROLLBACK: Bei Erfolg COMMIT, bei Fehler ROLLBACK - sonst inkonsistente Daten!

Nummernformate: SAP erwartet fuehrende Nullen (Kundennummer 123 = 0000000123)

Zeichenkodierung: SAP nutzt SAP-interne Codepages (oft CP1252 oder ISO-8859-1) - Umlaute werden sonst vermurkst!

Zeitzone: SAP-Server-Zeit beachten - Timestamps immer in UTC senden und konvertieren!

Alternative: SAP OData (Gateway): Wenn SOAP zu komplex ist, bietet SAP Gateway moderne OData-APIs (REST-aehnlich). Fragen Sie Ihren SAP-Basis-Admin nach verfuegbaren Services unter /sap/opu/odata/.


DATEV Integration

DATEV bietet verschiedene Schnittstellen: DATEV-Connect (klassisch), DATEVconnect online (REST-API, OAuth 2.0) und die klassische ASCII-Schnittstelle:

<?php

class DatevExporter
{
    private const HEADER_VERSION = '700';
    private const FORMAT_VERSION = '4';

    public function exportBookings(
        array $bookings,
        string $consultantNumber,
        string $clientNumber,
        int $fiscalYearStart,
        \DateTimeInterface $from,
        \DateTimeInterface $to
    ): string {
        $lines = [];

        // Header-Zeile
        $lines[] = $this->buildHeader(
            $consultantNumber,
            $clientNumber,
            $fiscalYearStart,
            $from,
            $to
        );

        // Spaltenüberschriften
        $lines[] = $this->buildColumnHeaders();

        // Buchungszeilen
        foreach ($bookings as $booking) {
            $lines[] = $this->buildBookingLine($booking);
        }

        // DATEV erwartet Windows-Zeilenumbrueche und ANSI-Kodierung
        $content = implode("\r\n", $lines);

        return mb_convert_encoding($content, 'Windows-1252', 'UTF-8');
    }

    private function buildHeader(
        string $consultantNumber,
        string $clientNumber,
        int $fiscalYearStart,
        \DateTimeInterface $from,
        \DateTimeInterface $to
    ): string {
        $fields = [
            'EXTF',                              // Kennzeichen
            self::HEADER_VERSION,                // Versionsnummer
            '21',                                // Datenkategorie (Buchungsstapel)
            'Buchungsstapel',                    // Formatname
            self::FORMAT_VERSION,                // Formatversion
            date('YmdHisv'),                     // Erzeugt am
            '',                                  // Importiert (leer)
            'RE',                                // Herkunft
            '',                                  // Exportiert von
            '',                                  // Importiert von
            $consultantNumber,                   // Berater
            $clientNumber,                       // Mandant
            $fiscalYearStart . '0101',           // WJ-Beginn
            '4',                                 // Sachkontennummernlaenge
            $from->format('Ymd'),                // Datum von
            $to->format('Ymd'),                  // Datum bis
            '',                                  // Bezeichnung
            '',                                  // Diktatkuerzel
            '1',                                 // Buchungstyp (1=Fibu)
            '0',                                 // Rechnungslegungszweck
            '',                                  // Festschreibung
            'EUR',                               // WKZ
            '',                                  // Reserviert
            '',                                  // Derivatskennzeichen
            '',                                  // Reserviert
            '',                                  // Reserviert
            '',                                  // SKR
            '',                                  // Branchen-Loesungs-ID
            '',                                  // Reserviert
            '',                                  // Reserviert
            '',                                  // Anwendungsinfo
        ];

        return implode(';', array_map([$this, 'escapeField'], $fields));
    }

    private function buildColumnHeaders(): string
    {
        return implode(';', [
            'Umsatz (ohne Soll/Haben-Kz)',
            'Soll/Haben-Kennzeichen',
            'WKZ Umsatz',
            'Kurs',
            'Basis-Umsatz',
            'WKZ Basis-Umsatz',
            'Konto',
            'Gegenkonto (ohne BU-Schluessel)',
            'BU-Schluessel',
            'Belegdatum',
            'Belegfeld 1',
            'Belegfeld 2',
            'Skonto',
            'Buchungstext',
        ]);
    }

    private function buildBookingLine(array $booking): string
    {
        $fields = [
            $this->formatAmount($booking['amount']),
            $booking['amount'] >= 0 ? 'S' : 'H',
            'EUR',
            '',                                  // Kurs
            '',                                  // Basis-Umsatz
            '',                                  // WKZ Basis-Umsatz
            $booking['account'],                 // Konto
            $booking['counter_account'],         // Gegenkonto
            $booking['tax_key'] ?? '',           // BU-Schluessel
            $this->formatDate($booking['date']), // Belegdatum
            $booking['document_number'] ?? '',   // Belegfeld 1
            $booking['reference'] ?? '',         // Belegfeld 2
            '',                                  // Skonto
            $this->escapeField($booking['text'] ?? ''),
        ];

        return implode(';', $fields);
    }

    private function formatAmount(float $amount): string
    {
        // DATEV erwartet Komma als Dezimaltrenner, kein Tausendertrenner
        return str_replace('.', ',', number_format(abs($amount), 2, '.', ''));
    }

    private function formatDate(\DateTimeInterface|string $date): string
    {
        if (is_string($date)) {
            $date = new \DateTime($date);
        }
        // DATEV-Format: TTMM (ohne Jahr, da im Header)
        return $date->format('dm');
    }

    private function escapeField(string $value): string
    {
        // CSV-Injection verhindern: Felder mit = + - @ prefixen
        // (Excel/LibreOffice interpretiert diese sonst als Formeln!)
        if (preg_match('/^[=+\-@]/', $value)) {
            $value = "'" . $value; // Apostroph verhindert Formel-Interpretation
        }

        // Semikolon und Anfuehrungszeichen escapen
        if (str_contains($value, ';') || str_contains($value, '"')) {
            return '"' . str_replace('"', '""', $value) . '"';
        }
        return $value;
    }
}

// Verwendung
$exporter = new DatevExporter();
$csv = $exporter->exportBookings(
    bookings: [
        [
            'amount' => 119.00,
            'account' => '1200',
            'counter_account' => '8400',
            'tax_key' => '3',
            'date' => '2025-06-15',
            'document_number' => 'RE-2025-0042',
            'text' => 'Warenverkauf an Kunde Mueller',
        ],
    ],
    consultantNumber: '12345',
    clientNumber: '67890',
    fiscalYearStart: 2025,
    from: new DateTime('2025-06-01'),
    to: new DateTime('2025-06-30')
);

file_put_contents('EXTF_Buchungsstapel.csv', $csv);

Merksatz: DATEV ist pingelig: Windows-Zeilenumbrueche (CRLF), ANSI-Kodierung (Windows-1252), Komma als Dezimaltrenner. Ein falsches Zeichen und der Import scheitert.

DATEV Best Practices:

  • Immer mit DATEV-Importassistent testen bevor es in Produktion geht
  • DATEVconnect online: REST-API mit OAuth 2.0 - moderne Alternative zu ASCII-Export
  • Scopes begrenzen (nur das anfordern, was gebraucht wird)
  • Bei Mandantenwechsel: Client-ID bleibt gleich, Token neu anfordern

Batch-Import Strategien

Wann Batch statt API?

  • Grosse Datenmengen: Ab 10.000+ Records
  • Keine Echtzeit nötig: Nächtliche Synchronisation reicht
  • Legacy-Systeme: Nur Datei-Export möglich
  • Kosten: API-Calls kosten, Dateien nicht

Robuster CSV-Importer

<?php

class BatchImporter
{
    private const CHUNK_SIZE = 1000;

    public function __construct(
        private readonly PDO $db,
        private readonly LoggerInterface $logger
    ) {}

    public function importCsv(
        string $filePath,
        string $targetTable,
        array $columnMapping,
        callable $transformer = null
    ): ImportResult {
        $result = new ImportResult();
        $result->startedAt = new \DateTimeImmutable();

        if (!file_exists($filePath)) {
            throw new \InvalidArgumentException("File not found: {$filePath}");
        }

        $handle = fopen($filePath, 'r');
        if ($handle === false) {
            throw new \RuntimeException("Cannot open file: {$filePath}");
        }

        try {
            // Header lesen
            $header = fgetcsv($handle, 0, ';', '"', '\\');
            if ($header === false) {
                throw new \RuntimeException("Cannot read CSV header");
            }

            // Header normalisieren (BOM entfernen, trimmen)
            $header = array_map(fn($h) => trim(preg_replace('/^\xEF\xBB\xBF/', '', $h)), $header);

            $this->validateHeader($header, $columnMapping);

            $batch = [];
            $lineNumber = 1;

            while (($row = fgetcsv($handle, 0, ';', '"', '\\')) !== false) {
                $lineNumber++;

                try {
                    // Leere Zeilen überspringen
                    if (count(array_filter($row)) === 0) {
                        continue;
                    }

                    $data = array_combine($header, $row);

                    if ($transformer) {
                        $data = $transformer($data, $lineNumber);
                        if ($data === null) {
                            $result->skipped++;
                            continue;
                        }
                    }

                    $mapped = $this->mapColumns($data, $columnMapping);
                    $batch[] = $mapped;

                    if (count($batch) >= self::CHUNK_SIZE) {
                        $this->insertBatch($targetTable, $batch);
                        $result->imported += count($batch);
                        $batch = [];

                        $this->logger->info("Imported {$result->imported} rows");
                    }

                } catch (\Throwable $e) {
                    $result->errors[] = [
                        'line' => $lineNumber,
                        'error' => $e->getMessage(),
                        'data' => $row,
                    ];

                    if (count($result->errors) > 100) {
                        throw new \RuntimeException("Too many errors, aborting import");
                    }
                }
            }

            // Rest importieren
            if (!empty($batch)) {
                $this->insertBatch($targetTable, $batch);
                $result->imported += count($batch);
            }

        } finally {
            fclose($handle);
        }

        $result->finishedAt = new \DateTimeImmutable();

        $this->logger->info("Import completed", [
            'imported' => $result->imported,
            'skipped' => $result->skipped,
            'errors' => count($result->errors),
            'duration_s' => $result->getDurationSeconds(),
        ]);

        return $result;
    }

    private function validateHeader(array $header, array $columnMapping): void
    {
        $required = array_keys($columnMapping);
        $missing = array_diff($required, $header);

        if (!empty($missing)) {
            throw new \RuntimeException(
                "Missing columns in CSV: " . implode(', ', $missing)
            );
        }
    }

    private function mapColumns(array $data, array $columnMapping): array
    {
        $result = [];

        foreach ($columnMapping as $sourceColumn => $config) {
            $value = $data[$sourceColumn] ?? null;

            if (is_string($config)) {
                // Einfaches Mapping: 'csv_col' => 'db_col'
                $result[$config] = $value;
            } else {
                // Komplexes Mapping mit Transformation
                $targetColumn = $config['column'];

                if (isset($config['transform'])) {
                    $value = $config['transform']($value);
                }

                if (isset($config['default']) && ($value === null || $value === '')) {
                    $value = $config['default'];
                }

                $result[$targetColumn] = $value;
            }
        }

        return $result;
    }

    private function insertBatch(string $table, array $rows): void
    {
        if (empty($rows)) {
            return;
        }

        $columns = array_keys($rows[0]);
        $placeholders = '(' . implode(',', array_fill(0, count($columns), '?')) . ')';
        $allPlaceholders = implode(',', array_fill(0, count($rows), $placeholders));

        // UPSERT mit ON CONFLICT (PostgreSQL)
        $sql = sprintf(
            'INSERT INTO %s (%s) VALUES %s
             ON CONFLICT (external_id) DO UPDATE SET %s, updated_at = NOW()',
            $table,
            implode(',', $columns),
            $allPlaceholders,
            implode(',', array_map(fn($c) => "{$c} = EXCLUDED.{$c}",
                array_filter($columns, fn($c) => $c !== 'external_id')
            ))
        );

        $values = [];
        foreach ($rows as $row) {
            foreach ($columns as $col) {
                $values[] = $row[$col];
            }
        }

        $stmt = $this->db->prepare($sql);
        $stmt->execute($values);
    }
}

class ImportResult
{
    public \DateTimeImmutable $startedAt;
    public ?\DateTimeImmutable $finishedAt = null;
    public int $imported = 0;
    public int $skipped = 0;
    public array $errors = [];

    public function getDurationSeconds(): int
    {
        if (!$this->finishedAt) {
            return 0;
        }
        return $this->finishedAt->getTimestamp() - $this->startedAt->getTimestamp();
    }

    public function isSuccessful(): bool
    {
        return empty($this->errors);
    }
}

Integration-Patterns

Pattern 1: Point-to-Point

[System A] <-----> [System B]

Vorteile: Einfach, direkt, schnell implementiert Nachteile: Skaliert nicht (n Systeme = n*(n-1)/2 Verbindungen)

<?php

// Direkte Integration: Webshop -> ERP
class OrderSyncService
{
    public function __construct(
        private readonly WebshopApi $webshop,
        private readonly SapClient $sap
    ) {}

    public function syncNewOrders(): void
    {
        $orders = $this->webshop->getUnsynced();

        foreach ($orders as $order) {
            $sapOrderId = $this->sap->createSalesOrder([
                'customer_number' => $order['customer_erp_id'],
                'items' => $order['items'],
                // ...
            ]);

            $this->webshop->markAsSynced($order['id'], $sapOrderId);
        }
    }
}

Pattern 2: Hub-and-Spoke (Integration Hub)

           [System A]
               |
[System B] -- [HUB] -- [System C]
               |
           [System D]

Vorteile: Zentrale Kontrolle, Transformation, Monitoring Nachteile: Single Point of Failure, Komplexitaet im Hub

<?php

class IntegrationHub
{
    private array $adapters = [];
    private array $transformers = [];

    public function registerAdapter(string $systemId, SystemAdapter $adapter): void
    {
        $this->adapters[$systemId] = $adapter;
    }

    public function registerTransformer(
        string $sourceSystem,
        string $targetSystem,
        string $entityType,
        callable $transformer
    ): void {
        $key = "{$sourceSystem}:{$targetSystem}:{$entityType}";
        $this->transformers[$key] = $transformer;
    }

    public function sync(
        string $sourceSystem,
        string $targetSystem,
        string $entityType,
        array $entities
    ): SyncResult {
        $result = new SyncResult();

        $transformer = $this->transformers["{$sourceSystem}:{$targetSystem}:{$entityType}"]
            ?? throw new \RuntimeException("No transformer registered");

        $targetAdapter = $this->adapters[$targetSystem]
            ?? throw new \RuntimeException("Unknown target system: {$targetSystem}");

        foreach ($entities as $entity) {
            try {
                $transformed = $transformer($entity);
                $targetAdapter->upsert($entityType, $transformed);
                $result->success++;
            } catch (\Throwable $e) {
                $result->failures[] = [
                    'entity' => $entity,
                    'error' => $e->getMessage()
                ];
            }
        }

        return $result;
    }
}

// Verwendung
$hub = new IntegrationHub();

$hub->registerAdapter('salesforce', new SalesforceAdapter($sfClient));
$hub->registerAdapter('sap', new SapAdapter($sapClient));

$hub->registerTransformer('salesforce', 'sap', 'customer', function(array $sfCustomer) {
    return [
        'name' => $sfCustomer['Name'],
        'email' => $sfCustomer['Email'],
        'external_id' => $sfCustomer['Id'],
        // SAP-spezifische Felder
        'sales_org' => '1000',
        'distribution_channel' => '10',
    ];
});

Pattern 3: Event-Driven (Message Queue)

[System A] --> [Queue] --> [Consumer] --> [System B]

Vorteile: Entkopplung, Skalierbarkeit, Ausfallsicherheit Nachteile: Eventual Consistency, Komplexeres Debugging

<?php

// Event Producer
class OrderEventProducer
{
    public function __construct(
        private readonly MessageQueueInterface $queue
    ) {}

    public function publishOrderCreated(array $order): void
    {
        $this->queue->publish('orders.created', [
            'event_id' => Uuid::uuid4()->toString(),
            'event_type' => 'order.created',
            'timestamp' => (new \DateTimeImmutable())->format('c'),
            'payload' => $order,
        ]);
    }
}

// Event Consumer
class SapOrderConsumer
{
    public function __construct(
        private readonly SapClient $sap,
        private readonly ProcessedEventStore $processedEvents
    ) {}

    public function handle(array $message): void
    {
        $eventId = $message['event_id'];

        // Idempotenz: Schon verarbeitet?
        if ($this->processedEvents->exists($eventId)) {
            return;
        }

        $order = $message['payload'];

        $sapOrderId = $this->sap->createSalesOrder($order);

        $this->processedEvents->store($eventId, [
            'processed_at' => date('c'),
            'sap_order_id' => $sapOrderId,
        ]);
    }
}

Merksatz: Point-to-Point für 2-3 Systeme, Hub-and-Spoke ab 4+ Systeme, Event-Driven wenn Echtzeit und Skalierung kritisch sind.


Circuit-Breaker Pattern

Wenn eine Downstream-API wiederholt fehlschlaegt, macht es keinen Sinn, sie weiter zu bombardieren. Der Circuit-Breaker schuetzt beide Seiten:

<?php

class CircuitBreaker
{
    private const STATE_CLOSED = 'closed';     // Normal
    private const STATE_OPEN = 'open';         // Blockiert
    private const STATE_HALF_OPEN = 'half';    // Testphase

    public function __construct(
        private readonly string $service,
        private readonly CacheInterface $cache,
        private readonly int $failureThreshold = 5,
        private readonly int $recoveryTimeout = 30,
        private readonly int $windowSeconds = 60
    ) {}

    public function call(callable $operation): mixed
    {
        $state = $this->getState();

        if ($state === self::STATE_OPEN) {
            // Circuit offen: Nicht mal versuchen
            throw new CircuitOpenException(
                "Circuit für {$this->service} ist offen - bitte später versuchen"
            );
        }

        try {
            $result = $operation();
            $this->recordSuccess();
            return $result;

        } catch (\Throwable $e) {
            $this->recordFailure();

            if ($this->shouldTrip()) {
                $this->trip();
            }

            throw $e;
        }
    }

    private function getState(): string
    {
        $state = $this->cache->get("circuit:{$this->service}:state") ?? self::STATE_CLOSED;

        if ($state === self::STATE_OPEN) {
            $trippedAt = $this->cache->get("circuit:{$this->service}:tripped_at");
            if (time() - $trippedAt >= $this->recoveryTimeout) {
                return self::STATE_HALF_OPEN; // Recovery-Versuch erlauben
            }
        }

        return $state;
    }

    private function shouldTrip(): bool
    {
        $failures = (int)$this->cache->get("circuit:{$this->service}:failures") ?: 0;
        return $failures >= $this->failureThreshold;
    }

    private function trip(): void
    {
        $this->cache->set("circuit:{$this->service}:state", self::STATE_OPEN);
        $this->cache->set("circuit:{$this->service}:tripped_at", time());
    }

    private function recordFailure(): void
    {
        $key = "circuit:{$this->service}:failures";
        $this->cache->increment($key);
        $this->cache->expire($key, $this->windowSeconds);
    }

    private function recordSuccess(): void
    {
        // Bei Erfolg in Half-Open: Circuit schliessen
        if ($this->getState() === self::STATE_HALF_OPEN) {
            $this->cache->delete("circuit:{$this->service}:state");
            $this->cache->delete("circuit:{$this->service}:failures");
        }
    }
}

// Verwendung
$circuit = new CircuitBreaker('salesforce', $redis);

try {
    $result = $circuit->call(fn() => $salesforce->createRecord('Account', $data));
} catch (CircuitOpenException $e) {
    // In Queue für spätere Verarbeitung packen
    $queue->push('salesforce_retry', $data);
}

Dead Letter Queue (DLQ)

Poison Messages - Events, die immer wieder fehlschlagen - muessen aus der Queue raus, sonst blockieren sie alles:

<?php

class DLQHandler
{
    private const MAX_RETRIES = 3;

    public function __construct(
        private readonly MessageQueueInterface $mainQueue,
        private readonly MessageQueueInterface $dlq,
        private readonly LoggerInterface $logger
    ) {}

    public function processWithDLQ(string $queue, callable $handler): void
    {
        while ($message = $this->mainQueue->pop($queue)) {
            $retryCount = $message['_retry_count'] ?? 0;

            try {
                $handler($message);

            } catch (\Throwable $e) {
                $retryCount++;

                if ($retryCount >= self::MAX_RETRIES) {
                    // Ab in die DLQ - manuell später pruefen
                    $this->dlq->push("{$queue}_dlq", [
                        ...$message,
                        '_retry_count' => $retryCount,
                        '_last_error' => $e->getMessage(),
                        '_failed_at' => date('c'),
                    ]);

                    $this->logger->error("Message moved to DLQ after {$retryCount} retries", [
                        'queue' => $queue,
                        'error' => $e->getMessage(),
                        'message_id' => $message['event_id'] ?? 'unknown',
                    ]);

                } else {
                    // Zurueck in die Queue mit erhoehtem Counter
                    $message['_retry_count'] = $retryCount;
                    $this->mainQueue->push($queue, $message);
                }
            }
        }
    }

    // DLQ manuell reprocessen (nach Fix)
    public function reprocessDLQ(string $queue): int
    {
        $reprocessed = 0;

        while ($message = $this->dlq->pop("{$queue}_dlq")) {
            unset($message['_retry_count'], $message['_last_error'], $message['_failed_at']);
            $this->mainQueue->push($queue, $message);
            $reprocessed++;
        }

        return $reprocessed;
    }
}

Circuit-Breaker + DLQ - Best Practice:

  • Circuit-Breaker oeffnen bei 5 Fehlern in 60 Sekunden
  • 30 Sekunden warten, dann Half-Open (ein Versuch)
  • Nach 3 Retries: Message in DLQ verschieben
  • DLQ-Wachstum monitoren - Alert bei > 10 Messages/Tag

Idempotenz und Fehlerbehandlung

Warum Idempotenz kritisch ist

Netzwerke sind unzuverlaessig. Requests koennen:

  • Timeout ohne Antwort (aber erfolgreich ausgefuehrt)
  • Vom Client wiederholt werden (Retry-Logik)
  • Vom Server doppelt empfangen werden

Ohne Idempotenz: Doppelte Bestellungen, doppelte Zahlungen, Chaos.

<?php

class IdempotentSyncService
{
    public function __construct(
        private readonly PDO $db,
        private readonly ExternalApi $api
    ) {}

    public function syncEntity(string $entityType, string $localId, array $data): void
    {
        // Idempotency-Key generieren
        $idempotencyKey = $this->generateIdempotencyKey($entityType, $localId, $data);

        // Schon synchronisiert?
        $existing = $this->findSyncRecord($idempotencyKey);

        if ($existing) {
            if ($existing['status'] === 'success') {
                return; // Bereits erfolgreich
            }

            if ($existing['status'] === 'pending' &&
                $existing['created_at'] > date('Y-m-d H:i:s', strtotime('-5 minutes'))) {
                return; // Noch in Bearbeitung
            }
        }

        // Sync-Record erstellen/aktualisieren
        $syncId = $this->upsertSyncRecord($idempotencyKey, 'pending');

        try {
            $externalId = $this->api->upsert($entityType, $data);

            $this->updateSyncRecord($syncId, 'success', $externalId);

        } catch (\Throwable $e) {
            $this->updateSyncRecord($syncId, 'failed', null, $e->getMessage());
            throw $e;
        }
    }

    private function generateIdempotencyKey(
        string $entityType,
        string $localId,
        array $data
    ): string {
        // Hash aus Entity-Typ, ID und Daten-Hash
        $dataHash = md5(json_encode($data, JSON_THROW_ON_ERROR));
        return "{$entityType}:{$localId}:{$dataHash}";
    }

    private function findSyncRecord(string $idempotencyKey): ?array
    {
        $stmt = $this->db->prepare(
            'SELECT * FROM sync_records WHERE idempotency_key = ?'
        );
        $stmt->execute([$idempotencyKey]);
        return $stmt->fetch(PDO::FETCH_ASSOC) ?: null;
    }

    private function upsertSyncRecord(string $idempotencyKey, string $status): int
    {
        $stmt = $this->db->prepare(
            'INSERT INTO sync_records (idempotency_key, status, created_at)
             VALUES (?, ?, NOW())
             ON CONFLICT (idempotency_key) DO UPDATE SET status = ?, updated_at = NOW()
             RETURNING id'
        );
        $stmt->execute([$idempotencyKey, $status, $status]);
        return (int)$stmt->fetchColumn();
    }

    private function updateSyncRecord(
        int $id,
        string $status,
        ?string $externalId,
        ?string $error = null
    ): void {
        $stmt = $this->db->prepare(
            'UPDATE sync_records
             SET status = ?, external_id = ?, error_message = ?, updated_at = NOW()
             WHERE id = ?'
        );
        $stmt->execute([$status, $externalId, $error, $id]);
    }
}

Monitoring und Alerting

Integration-Dashboard

<?php

class IntegrationMonitor
{
    public function __construct(
        private readonly PDO $db,
        private readonly AlertService $alerts
    ) {}

    public function getStats(string $integration, int $hours = 24): array
    {
        $stmt = $this->db->prepare(
            "SELECT
                COUNT(*) as total,
                COUNT(*) FILTER (WHERE status = 'success') as success,
                COUNT(*) FILTER (WHERE status = 'failed') as failed,
                COUNT(*) FILTER (WHERE status = 'pending') as pending,
                AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) as avg_duration_s,
                MAX(EXTRACT(EPOCH FROM (completed_at - started_at))) as max_duration_s
             FROM integration_jobs
             WHERE integration_name = ?
               AND started_at > NOW() - INTERVAL '{$hours} hours'"
        );
        $stmt->execute([$integration]);

        return $stmt->fetch(PDO::FETCH_ASSOC);
    }

    public function checkHealth(): array
    {
        $issues = [];

        // Pruefen: Gibt es haengende Jobs?
        $stmt = $this->db->query(
            "SELECT integration_name, COUNT(*) as stuck
             FROM integration_jobs
             WHERE status = 'pending'
               AND started_at < NOW() - INTERVAL '30 minutes'
             GROUP BY integration_name"
        );

        foreach ($stmt->fetchAll(PDO::FETCH_ASSOC) as $row) {
            $issues[] = [
                'type' => 'stuck_jobs',
                'integration' => $row['integration_name'],
                'count' => $row['stuck'],
                'severity' => 'warning',
            ];
        }

        // Pruefen: Fehlerrate zu hoch?
        // WICHTIG: Minimum-Threshold (20 Jobs) verhindert False Positives bei geringem Traffic
        $stmt = $this->db->query(
            "SELECT
                integration_name,
                COUNT(*) as total,
                COUNT(*) FILTER (WHERE status = 'failed') * 100.0 / COUNT(*) as error_rate
             FROM integration_jobs
             WHERE started_at > NOW() - INTERVAL '1 hour'
             GROUP BY integration_name
             HAVING COUNT(*) >= 20  -- Minimum-Threshold!
               AND COUNT(*) FILTER (WHERE status = 'failed') * 100.0 / COUNT(*) > 10"
        );

        foreach ($stmt->fetchAll(PDO::FETCH_ASSOC) as $row) {
            $issues[] = [
                'type' => 'high_error_rate',
                'integration' => $row['integration_name'],
                'error_rate' => round($row['error_rate'], 1),
                'severity' => 'critical',
            ];

            $this->alerts->send(
                "Integration {$row['integration_name']} hat {$row['error_rate']}% Fehlerrate!"
            );
        }

        return $issues;
    }

    public function getRecentErrors(string $integration, int $limit = 10): array
    {
        $stmt = $this->db->prepare(
            "SELECT id, started_at, error_message, payload
             FROM integration_jobs
             WHERE integration_name = ?
               AND status = 'failed'
             ORDER BY started_at DESC
             LIMIT ?"
        );
        $stmt->execute([$integration, $limit]);

        return $stmt->fetchAll(PDO::FETCH_ASSOC);
    }
}

Logging Best Practices

<?php

class IntegrationLogger
{
    public function __construct(
        private readonly LoggerInterface $logger
    ) {}

    public function logApiCall(
        string $system,
        string $method,
        string $endpoint,
        array $request,
        mixed $response,
        float $durationMs,
        ?string $error = null
    ): void {
        $context = [
            'system' => $system,
            'method' => $method,
            'endpoint' => $endpoint,
            'duration_ms' => round($durationMs, 2),
            'request_size' => strlen(json_encode($request)),
            'correlation_id' => $this->getCorrelationId(),
        ];

        // Sensitive Daten maskieren
        $sanitizedRequest = $this->sanitize($request);
        $context['request'] = $sanitizedRequest;

        if ($error) {
            $context['error'] = $error;
            $this->logger->error("API call failed: {$system} {$method} {$endpoint}", $context);
        } else {
            $this->logger->info("API call success: {$system} {$method} {$endpoint}", $context);
        }
    }

    private function sanitize(array $data): array
    {
        // Erweiterte Liste sensitiver Felder
        $sensitiveKeys = [
            'password', 'secret', 'token', 'api_key', 'apikey',
            'authorization', 'bearer', 'session', 'session_id',
            'refresh_token', 'access_token', 'set-cookie', 'cookie',
            'x-api-key', 'client_secret', 'private_key'
        ];

        $sanitized = [];
        foreach ($data as $key => $value) {
            $lowerKey = strtolower($key);

            if (in_array($lowerKey, $sensitiveKeys)) {
                $sanitized[$key] = '***REDACTED***';
            } elseif (is_array($value)) {
                $sanitized[$key] = $this->sanitize($value);
            } else {
                $sanitized[$key] = $value;
            }
        }

        return $sanitized;
    }

    private function getCorrelationId(): string
    {
        return $_SERVER['X-Correlation-ID'] ?? uniqid('int-', true);
    }
}

Merksatz: Ohne Monitoring fliegen Integrationsfehler unter dem Radar - bis der Kunde anruft. Loggen Sie jeden API-Call mit Correlation-ID.

Correlation-ID als Response-Header

Geben Sie die Correlation-ID auch im Response zurueck - dann kann der Kunde bei Problemen gezielt im Log suchen:

<?php

// In Ihrem API-Controller
$correlationId = $_SERVER['HTTP_X_CORRELATION_ID']
    ?? $_SERVER['HTTP_X_REQUEST_ID']
    ?? uniqid('req-', true);

// Correlation-ID im Response-Header zurueckgeben
header("X-Correlation-ID: {$correlationId}");

// In allen Logs verwenden
$logger->info("Processing request", ['correlation_id' => $correlationId]);

SLOs definieren

Ohne SLOs (Service Level Objectives) wissen Sie nicht, ob Ihre Integration “gut” laeuft. Definieren Sie messbare Ziele:

MetrikSLOAlert bei
p95 Latenz< 2 Sekunden> 5 Sekunden
Fehlerrate< 1%> 5%
Stuck Jobs0> 0 für > 30 Min
DLQ-Wachstum< 10/Tag> 50/Tag
Circuit-Breaker Open< 5 Min/Tag> 30 Min/Tag
<?php

// SLO-Pruefung in Monitoring integrieren
public function checkSLOs(): array
{
    $violations = [];
    $stats = $this->getStats('salesforce', hours: 1);

    // p95 Latenz
    if ($stats['p95_duration_s'] > 2.0) {
        $violations[] = "Salesforce p95 Latenz: {$stats['p95_duration_s']}s (SLO: <2s)";
    }

    // Fehlerrate (nur bei genuegend Traffic)
    if ($stats['total'] >= 20 && $stats['error_rate'] > 1.0) {
        $violations[] = "Salesforce Fehlerrate: {$stats['error_rate']}% (SLO: <1%)";
    }

    return $violations;
}

Checkliste: Integration Production-Ready

Integration-Checkliste

Vor dem Go-Live:

  • Retry-Logik mit Exponential Backoff + Jitter
  • Idempotenz für alle schreibenden Operationen
  • Rate-Limiting beachtet und implementiert
  • Timeout-Werte konfiguriert (Connect + Read)
  • Fehlerbehandlung für alle HTTP-Status-Codes
  • Circuit-Breaker für instabile Downstream-APIs
  • Dead Letter Queue für Poison Messages
  • Credentials sicher gespeichert (nicht im Code!)
  • Logging mit Correlation-IDs (auch im Response!)
  • Monitoring mit SLOs und Minimum-Thresholds
  • Testdaten von Produktionsdaten getrennt
  • Rollback-Strategie definiert

Sicherheit:

  • TLS/SSL für alle Verbindungen (CURLOPT_SSL_VERIFYPEER!)
  • Webhooks mit HMAC-Signatur validieren
  • API-Keys rotierbar
  • Minimale Berechtigungen (Principle of Least Privilege)
  • Eingabevalidierung vor API-Calls
  • CSV-Injection verhindern (= + - @ prefixen)
  • Sensitive Daten im Log maskiert (erweiterte Liste!)

System-spezifische Tipps

Salesforce

AspektEmpfehlung
API-VersionImmer aktuelle Version nutzen (v59.0+)
Bulk APIAb 2.000+ Records, nicht REST
SandboxImmer erst in Sandbox testen
LimitsAPI-Limit-Verbrauch monitoren
Composite APIFuer mehrere zusammenhaengende Operationen

SAP

AspektEmpfehlung
BAPI_TRANSACTION_COMMITNach JEDEM schreibenden BAPI
NummernformateFuehrende Nullen beachten
ZeitzoneSAP-Server-Zeit beruecksichtigen
Fehler-HandlingRETURN-Struktur immer pruefen
PerformanceBAPIs sind langsam, Batch wenn möglich

DATEV

AspektEmpfehlung
EncodingWindows-1252, nicht UTF-8
ZeilenumbruchCRLF (Windows)
DezimaltrennzeichenKomma, nicht Punkt
DatumsformatTTMM (ohne Jahr)
ValidierungMit DATEV-Importassistent testen

Fazit

ERP- und CRM-Integration ist kein Hexenwerk, aber auch kein Spaziergang. Die wichtigsten Erfolgsfaktoren:

  1. Idempotenz first: Jede Operation muss mehrfach ausfuehrbar sein
  2. Monitoring pflicht: Ohne Alerting fliegen Fehler unter dem Radar
  3. Retry mit Verstand: Exponential Backoff, nicht Endlosschleife
  4. Dokumentation: APIs aendern sich, Ihre Doku muss aktuell sein

Die Code-Beispiele in diesem Artikel sind Production-ready und haben sich in echten Projekten bewaehrt. Kopieren Sie sie, passen Sie sie an, und bauen Sie darauf auf.

Sie planen eine Integration? Wir haben Salesforce, SAP, DATEV und dutzende andere Systeme bereits angebunden. Sprechen Sie uns an - wir wissen, wo die Fallstricke liegen.

Carola Schulte

Über Carola Schulte

Software-Architektin mit 25+ Jahren Erfahrung. Spezialisiert auf robuste Business-Apps mit PHP/PostgreSQL, Security-by-Design und DSGVO-konforme Systeme. 1,8M+ Lines of Code in Produktion.

Projekt im Kopf?

Lassen Sie uns besprechen, wie ich Ihre Anforderungen umsetzen kann – kostenlos und unverbindlich.

Kostenloses Erstgespräch