CRM & ERP Integration: Salesforce, SAP, DATEV anbinden
CRM & ERP Integration: So verbinden Sie Ihre Business-Systeme
87% aller Unternehmen nutzen mindestens 3 verschiedene Business-Systeme. CRM hier, ERP dort, Buchhaltung woanders. Und dazwischen? Manuelle Datenübertragung, Excel-Exporte, Copy-Paste-Orgien. Das kostet Zeit, verursacht Fehler und frustriert alle Beteiligten.
Die Lösung: Automatische Integration. Aber wie verbindet man Salesforce mit SAP? Wie bekommt man Kundendaten in DATEV? Und was tun, wenn das Legacy-System nur SOAP spricht?
In diesem Artikel zeige ich Ihnen die Integration-Patterns, die in der Praxis funktionieren. Mit echtem PHP-Code, bewährten Fehlerbehandlungen und den Fallstricken, die in keiner Dokumentation stehen.
Die drei Integrations-Ansätze
Schnellnavigation:
REST API - Moderner Standard (Salesforce, HubSpot, Shopify) → Direkt zu REST
SOAP/XML - Enterprise Legacy (SAP, DATEV, Microsoft Dynamics) → Direkt zu SOAP
Batch-Import - Datei-basiert (CSV, XML, EDIFACT) → Direkt zu Batch
Wann welchen Ansatz?
| Kriterium | REST API | SOAP | Batch-Import |
|---|---|---|---|
| Echtzeit | ✅ Ja | ✅ Ja | ❌ Nein |
| Komplexität | Niedrig | Hoch | Mittel |
| Legacy-Support | Selten | ✅ Sehr gut | ✅ Immer möglich |
| Debugging | Einfach | Schwierig | Einfach |
| Große Datenmengen | ⚠️ Paginierung nötig | ⚠️ Timeout-Gefahr | ✅ Optimal |
Merksatz: REST für Echtzeit, SOAP für Enterprise-Legacy, Batch für Masse - aber in der Praxis brauchen Sie oft alle drei.
REST API Integration
Grundlegendes Pattern: API-Client-Klasse
<?php
declare(strict_types=1);
abstract class ApiClient
{
protected string $baseUrl;
protected array $defaultHeaders = [];
protected int $timeout = 30;
protected int $maxRetries = 3;
public function __construct(
protected readonly string $apiKey,
protected readonly LoggerInterface $logger
) {}
protected function request(
string $method,
string $endpoint,
array $data = [],
array $headers = []
): array {
$url = $this->baseUrl . $endpoint;
$allHeaders = array_merge($this->defaultHeaders, $headers);
$attempt = 0;
$lastException = null;
while ($attempt < $this->maxRetries) {
$attempt++;
try {
$response = $this->executeRequest($method, $url, $data, $allHeaders);
// Rate-Limit-Header auswerten
$this->handleRateLimitHeaders($response['headers']);
return $response['body'];
} catch (RateLimitException $e) {
$waitSeconds = $e->getRetryAfter() ?? (2 ** $attempt);
$this->logger->warning("Rate limit hit, waiting {$waitSeconds}s", [
'endpoint' => $endpoint,
'attempt' => $attempt
]);
sleep($waitSeconds);
$lastException = $e;
} catch (ServerException $e) {
// 5xx: Exponential Backoff + Full Jitter (verhindert Stampede)
if ($attempt < $this->maxRetries) {
$base = 2 ** $attempt;
$waitSeconds = random_int(0, $base); // Full Jitter
$this->logger->warning("Server error, retry in {$waitSeconds}s (jitter)", [
'endpoint' => $endpoint,
'status' => $e->getCode()
]);
sleep($waitSeconds);
}
$lastException = $e;
} catch (ClientException $e) {
// 4xx: Nicht wiederholen (ausser 429)
throw $e;
}
}
throw new ApiException(
"Max retries exceeded for {$endpoint}",
0,
$lastException
);
}
private function executeRequest(
string $method,
string $url,
array $data,
array $headers
): array {
$ch = curl_init();
curl_setopt_array($ch, [
CURLOPT_URL => $url,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_TIMEOUT => $this->timeout,
CURLOPT_CONNECTTIMEOUT => 10,
CURLOPT_HTTPHEADER => $this->formatHeaders($headers),
CURLOPT_HEADER => true,
// TLS-Sicherheit: Zertifikate IMMER pruefen!
CURLOPT_SSL_VERIFYPEER => true,
CURLOPT_SSL_VERIFYHOST => 2,
]);
if ($method === 'POST') {
curl_setopt($ch, CURLOPT_POST, true);
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
} elseif ($method === 'PUT' || $method === 'PATCH') {
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $method);
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode($data));
} elseif ($method === 'DELETE') {
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'DELETE');
} elseif ($method === 'GET' && !empty($data)) {
curl_setopt($ch, CURLOPT_URL, $url . '?' . http_build_query($data));
}
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
$headerSize = curl_getinfo($ch, CURLINFO_HEADER_SIZE);
if ($response === false) {
$error = curl_error($ch);
curl_close($ch);
throw new NetworkException("cURL error: {$error}");
}
curl_close($ch);
$headerStr = substr($response, 0, $headerSize);
$bodyStr = substr($response, $headerSize);
$parsedHeaders = $this->parseHeaders($headerStr);
// Headers an checkStatusCode übergeben für Retry-After
$this->checkStatusCode($httpCode, $bodyStr, $parsedHeaders);
return [
'headers' => $parsedHeaders,
'body' => json_decode($bodyStr, true) ?? []
];
}
private function checkStatusCode(int $code, string $body, array $headers = []): void
{
if ($code >= 200 && $code < 300) {
return;
}
if ($code === 429) {
// Retry-After Header auslesen (Sekunden oder HTTP-Date)
$retryAfter = $headers['retry-after'] ?? null;
if ($retryAfter !== null && is_numeric($retryAfter)) {
$retryAfter = (int)$retryAfter;
}
throw new RateLimitException("Rate limit exceeded", $code, $retryAfter);
}
if ($code >= 400 && $code < 500) {
throw new ClientException("Client error: {$body}", $code);
}
if ($code >= 500) {
throw new ServerException("Server error: {$body}", $code);
}
}
private function handleRateLimitHeaders(array $headers): void
{
$remaining = $headers['x-rate-limit-remaining'] ?? null;
if ($remaining !== null && (int)$remaining < 10) {
$this->logger->notice("Rate limit low: {$remaining} requests remaining");
}
}
private function formatHeaders(array $headers): array
{
return array_map(
fn($k, $v) => "{$k}: {$v}",
array_keys($headers),
array_values($headers)
);
}
private function parseHeaders(string $headerStr): array
{
$headers = [];
foreach (explode("\r\n", $headerStr) as $line) {
if (str_contains($line, ':')) {
[$key, $value] = explode(':', $line, 2);
$headers[strtolower(trim($key))] = trim($value);
}
}
return $headers;
}
}
Webhook-Empfang absichern
Wenn Salesforce, HubSpot oder andere Systeme Webhooks an Ihre App senden, muessen Sie diese absichern:
<?php
class WebhookVerifier
{
public function __construct(
private readonly string $secret,
private readonly int $replayWindowSeconds = 300 // ±5 Minuten
) {}
public function verify(
string $payload,
string $signature,
string $timestamp,
string $eventId
): void {
// 1. Replay-Schutz: Timestamp pruefen
$eventTime = (int)$timestamp;
$now = time();
if (abs($now - $eventTime) > $this->replayWindowSeconds) {
throw new WebhookReplayException(
"Timestamp ausserhalb des erlaubten Fensters"
);
}
// 2. HMAC-Signatur pruefen
$expectedSignature = hash_hmac('sha256', $timestamp . '.' . $payload, $this->secret);
if (!hash_equals($expectedSignature, $signature)) {
throw new WebhookSignatureException("Ungueltige Signatur");
}
// 3. Duplikat-Erkennung via Event-ID
if ($this->isDuplicate($eventId)) {
throw new WebhookDuplicateException("Event bereits verarbeitet");
}
$this->markAsProcessed($eventId, $eventTime);
}
private function isDuplicate(string $eventId): bool
{
// Redis oder DB: event_id mit TTL speichern
return (bool)$this->redis->exists("webhook:processed:{$eventId}");
}
private function markAsProcessed(string $eventId, int $timestamp): void
{
// TTL = Replay-Window + Puffer
$this->redis->setex(
"webhook:processed:{$eventId}",
$this->replayWindowSeconds + 60,
$timestamp
);
}
}
// HubSpot-Webhook validieren
$verifier = new WebhookVerifier($_ENV['HUBSPOT_WEBHOOK_SECRET']);
$verifier->verify(
payload: file_get_contents('php://input'),
signature: $_SERVER['HTTP_X_HUBSPOT_SIGNATURE_V3'] ?? '',
timestamp: $_SERVER['HTTP_X_HUBSPOT_REQUEST_TIMESTAMP'] ?? '',
eventId: $data['eventId'] ?? ''
);
Webhook-Sicherheit:
- HMAC-Signatur IMMER pruefen (nicht optional!)
- Replay-Window von ±5 Minuten verhindert alte Requests
- Event-ID deduplizieren verhindert Doppelverarbeitung
- Bei Signatur-Fehler: HTTP 401, nicht 200!
Salesforce Integration
Salesforce nutzt OAuth 2.0. Fuer Produktion: JWT Bearer Flow (Connected App + Zertifikat) statt Username-Password-Flow - weniger Secrets, kein Passwort-Rotation-Problem:
<?php
class SalesforceClient extends ApiClient
{
private string $instanceUrl;
private ?string $accessToken = null;
private ?int $tokenExpiresAt = null;
public function __construct(
private readonly string $clientId,
private readonly string $clientSecret,
private readonly string $username,
private readonly string $password,
private readonly string $securityToken,
LoggerInterface $logger,
private readonly string $loginUrl = 'https://login.salesforce.com'
) {
parent::__construct('', $logger);
}
private function authenticate(): void
{
if ($this->accessToken && $this->tokenExpiresAt > time()) {
return;
}
$ch = curl_init($this->loginUrl . '/services/oauth2/token');
curl_setopt_array($ch, [
CURLOPT_POST => true,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_POSTFIELDS => http_build_query([
'grant_type' => 'password',
'client_id' => $this->clientId,
'client_secret' => $this->clientSecret,
'username' => $this->username,
'password' => $this->password . $this->securityToken,
]),
]);
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
if ($httpCode !== 200) {
throw new AuthenticationException("Salesforce auth failed: {$response}");
}
$data = json_decode($response, true);
$this->accessToken = $data['access_token'];
$this->instanceUrl = $data['instance_url'];
$this->tokenExpiresAt = time() + 7200; // 2 Stunden
$this->baseUrl = $this->instanceUrl . '/services/data/v59.0';
$this->defaultHeaders = [
'Authorization' => 'Bearer ' . $this->accessToken,
'Content-Type' => 'application/json',
];
}
public function query(string $soql): array
{
$this->authenticate();
$results = [];
$endpoint = '/query?q=' . urlencode($soql);
do {
$response = $this->request('GET', $endpoint);
$results = array_merge($results, $response['records'] ?? []);
// Pagination: nextRecordsUrl folgen
$endpoint = $response['nextRecordsUrl'] ?? null;
if ($endpoint) {
// Relative URL aus der Response
$endpoint = str_replace($this->baseUrl, '', $endpoint);
}
} while ($endpoint !== null);
return $results;
}
public function createRecord(string $object, array $data): string
{
$this->authenticate();
$response = $this->request('POST', "/sobjects/{$object}", $data);
if (!($response['success'] ?? false)) {
throw new ApiException(
"Failed to create {$object}: " . json_encode($response['errors'] ?? [])
);
}
return $response['id'];
}
public function updateRecord(string $object, string $id, array $data): void
{
$this->authenticate();
$this->request('PATCH', "/sobjects/{$object}/{$id}", $data);
}
public function upsertByExternalId(
string $object,
string $externalIdField,
string $externalIdValue,
array $data
): array {
$this->authenticate();
$endpoint = "/sobjects/{$object}/{$externalIdField}/{$externalIdValue}";
return $this->request('PATCH', $endpoint, $data);
}
// Bulk API 2.0 für große Datenmengen
public function bulkUpsert(string $object, string $externalId, array $records): string
{
$this->authenticate();
// Job erstellen
$job = $this->request('POST', '/jobs/ingest', [
'object' => $object,
'externalIdFieldName' => $externalId,
'operation' => 'upsert',
'contentType' => 'CSV',
]);
$jobId = $job['id'];
// CSV-Daten hochladen
$csv = $this->arrayToCsv($records);
$ch = curl_init($this->instanceUrl . "/services/data/v59.0/jobs/ingest/{$jobId}/batches");
curl_setopt_array($ch, [
// CUSTOMREQUEST statt CURLOPT_PUT (das erwartet File-Stream)
CURLOPT_CUSTOMREQUEST => 'PUT',
CURLOPT_RETURNTRANSFER => true,
CURLOPT_HTTPHEADER => [
'Authorization: Bearer ' . $this->accessToken,
'Content-Type: text/csv',
],
CURLOPT_POSTFIELDS => $csv,
CURLOPT_SSL_VERIFYPEER => true,
CURLOPT_SSL_VERIFYHOST => 2,
]);
curl_exec($ch);
curl_close($ch);
// Job abschliessen
$this->request('PATCH', "/jobs/ingest/{$jobId}", [
'state' => 'UploadComplete'
]);
return $jobId;
}
private function arrayToCsv(array $records): string
{
if (empty($records)) {
return '';
}
$output = fopen('php://temp', 'r+');
fputcsv($output, array_keys($records[0]));
foreach ($records as $record) {
fputcsv($output, array_values($record));
}
rewind($output);
$csv = stream_get_contents($output);
fclose($output);
return $csv;
}
}
Salesforce Limits beachten:
- API Calls: Je nach Edition 15.000 - 5.000.000 pro 24h
- Bulk API: Max. 10.000 Records pro Batch
- Query: Max. 2.000 Records pro Response (Pagination nutzen!)
- Concurrent Requests: Max. 25 gleichzeitig
HubSpot Integration
<?php
class HubSpotClient extends ApiClient
{
protected string $baseUrl = 'https://api.hubapi.com';
public function __construct(
string $apiKey,
LoggerInterface $logger
) {
parent::__construct($apiKey, $logger);
$this->defaultHeaders = [
'Authorization' => 'Bearer ' . $apiKey,
'Content-Type' => 'application/json',
];
}
public function searchContacts(array $filters, array $properties = []): array
{
$allResults = [];
$after = null;
do {
$body = [
'filterGroups' => [['filters' => $filters]],
'properties' => $properties,
'limit' => 100,
];
if ($after) {
$body['after'] = $after;
}
$response = $this->request('POST', '/crm/v3/objects/contacts/search', $body);
$allResults = array_merge($allResults, $response['results'] ?? []);
$after = $response['paging']['next']['after'] ?? null;
} while ($after !== null);
return $allResults;
}
public function createOrUpdateContact(string $email, array $properties): array
{
// Erst pruefen ob Kontakt existiert
$existing = $this->searchContacts([
['propertyName' => 'email', 'operator' => 'EQ', 'value' => $email]
]);
if (!empty($existing)) {
$contactId = $existing[0]['id'];
return $this->request(
'PATCH',
"/crm/v3/objects/contacts/{$contactId}",
['properties' => $properties]
);
}
$properties['email'] = $email;
return $this->request('POST', '/crm/v3/objects/contacts', [
'properties' => $properties
]);
}
public function createDeal(array $properties, ?string $contactId = null): array
{
$deal = $this->request('POST', '/crm/v3/objects/deals', [
'properties' => $properties
]);
if ($contactId) {
$this->request('PUT', "/crm/v3/objects/deals/{$deal['id']}/associations/contacts/{$contactId}/deal_to_contact", []);
}
return $deal;
}
// Batch-Endpoint: Bis zu 100 Objekte in einem Call (weniger API-Calls!)
public function batchCreateContacts(array $contacts): array
{
$inputs = array_map(fn($c) => ['properties' => $c], $contacts);
return $this->request('POST', '/crm/v3/objects/contacts/batch/create', [
'inputs' => $inputs
]);
}
public function batchUpdateContacts(array $updates): array
{
// $updates = [['id' => '123', 'properties' => [...]], ...]
return $this->request('POST', '/crm/v3/objects/contacts/batch/update', [
'inputs' => $updates
]);
}
}
HubSpot Rate Limits:
- Standard: 100 Requests / 10 Sekunden (Private Apps)
- Secondary (Burst): Kurzfristige Spitzen werden zusaetzlich begrenzt
- Batch-Endpoints nutzen:
/batch/create,/batch/update- bis 100 Records pro Call, spart API-Calls!- Bei 429:
Retry-AfterHeader beachten
Merksatz: Bei REST-APIs immer Pagination implementieren - die meisten APIs liefern nur 100-1000 Records pro Request.
SOAP Integration
Das SOAP-Problem
SOAP ist komplex, XML-basiert und fehleranfällig. Aber: SAP, DATEV, viele Banken und Behörden sprechen nur SOAP. Sie kommen nicht drumherum.
<?php
class SoapClientWrapper
{
private SoapClient $client;
private LoggerInterface $logger;
public function __construct(
string $wsdlUrl,
LoggerInterface $logger,
array $options = []
) {
$this->logger = $logger;
$defaultOptions = [
'trace' => true,
'exceptions' => true,
'cache_wsdl' => WSDL_CACHE_BOTH,
'connection_timeout' => 30,
'default_socket_timeout' => 60,
'stream_context' => stream_context_create([
'ssl' => [
'verify_peer' => true,
'verify_peer_name' => true,
],
'http' => [
'timeout' => 60,
],
]),
];
try {
$this->client = new SoapClient(
$wsdlUrl,
array_merge($defaultOptions, $options)
);
} catch (SoapFault $e) {
throw new SoapConnectionException(
"WSDL konnte nicht geladen werden: {$e->getMessage()}"
);
}
}
public function call(string $method, array $params = []): mixed
{
$startTime = microtime(true);
try {
$result = $this->client->__soapCall($method, [$params]);
$this->logger->info("SOAP call successful", [
'method' => $method,
'duration_ms' => round((microtime(true) - $startTime) * 1000),
]);
return $result;
} catch (SoapFault $e) {
$this->logger->error("SOAP call failed", [
'method' => $method,
'fault_code' => $e->faultcode,
'fault_string' => $e->faultstring,
'request' => $this->client->__getLastRequest(),
'response' => $this->client->__getLastResponse(),
]);
throw $this->mapSoapException($e);
}
}
private function mapSoapException(SoapFault $e): Exception
{
// Typische SOAP-Fehler in lesbare Exceptions umwandeln
return match (true) {
str_contains($e->faultstring, 'Authentication') =>
new AuthenticationException("SOAP Auth failed: {$e->faultstring}"),
str_contains($e->faultstring, 'Timeout') =>
new TimeoutException("SOAP Timeout: {$e->faultstring}"),
str_contains($e->faultcode, 'Client') =>
new ClientException("SOAP Client Error: {$e->faultstring}"),
default =>
new SoapException("SOAP Error: {$e->faultstring}", 0, $e)
};
}
public function getLastRequest(): ?string
{
return $this->client->__getLastRequest();
}
public function getLastResponse(): ?string
{
return $this->client->__getLastResponse();
}
}
SAP Integration via RFC/BAPI
SAP bietet verschiedene Schnittstellen: RFC (Remote Function Call), BAPI (Business API), IDoc, OData. In PHP ist SOAP der pragmatischste Weg:
<?php
class SapClient
{
private SoapClientWrapper $soap;
public function __construct(
string $wsdlUrl,
string $username,
string $password,
LoggerInterface $logger
) {
$this->soap = new SoapClientWrapper($wsdlUrl, $logger, [
'login' => $username,
'password' => $password,
'authentication' => SOAP_AUTHENTICATION_BASIC,
]);
}
public function getCustomer(string $customerNumber): array
{
$result = $this->soap->call('BAPI_CUSTOMER_GETDETAIL2', [
'CUSTOMERNO' => str_pad($customerNumber, 10, '0', STR_PAD_LEFT),
]);
if (!empty($result->RETURN->MESSAGE)) {
throw new SapException("SAP Error: {$result->RETURN->MESSAGE}");
}
return $this->mapCustomerData($result);
}
public function createSalesOrder(array $orderData): string
{
$header = [
'DOC_TYPE' => $orderData['type'] ?? 'TA',
'SALES_ORG' => $orderData['sales_org'],
'DISTR_CHAN' => $orderData['distribution_channel'],
'DIVISION' => $orderData['division'],
'PURCH_NO_C' => $orderData['customer_reference'] ?? '',
];
$items = [];
foreach ($orderData['items'] as $idx => $item) {
$items[] = [
'ITM_NUMBER' => str_pad((string)(($idx + 1) * 10), 6, '0', STR_PAD_LEFT),
'MATERIAL' => $item['material_number'],
'TARGET_QTY' => $item['quantity'],
'TARGET_QU' => $item['unit'] ?? 'ST',
];
}
$partners = [
[
'PARTN_ROLE' => 'AG', // Auftraggeber
'PARTN_NUMB' => str_pad($orderData['customer_number'], 10, '0', STR_PAD_LEFT),
],
];
$result = $this->soap->call('BAPI_SALESORDER_CREATEFROMDAT2', [
'ORDER_HEADER_IN' => $header,
'ORDER_ITEMS_IN' => $items,
'ORDER_PARTNERS' => $partners,
]);
// Fehler pruefen BEVOR Commit/Rollback
$errors = $this->extractErrors($result->RETURN);
if (!empty($errors) || empty($result->SALESDOCUMENT)) {
// Bei Fehlern: ROLLBACK!
$this->soap->call('BAPI_TRANSACTION_ROLLBACK', []);
throw new SapException("Order creation failed: " . implode(', ', $errors));
}
// Nur bei Erfolg: COMMIT (WAIT = synchron)
$this->soap->call('BAPI_TRANSACTION_COMMIT', ['WAIT' => 'X']);
return $result->SALESDOCUMENT;
}
private function extractErrors($return): array
{
$errors = [];
$messages = is_array($return) ? $return : [$return];
foreach ($messages as $msg) {
if (in_array($msg->TYPE ?? '', ['E', 'A'])) {
$errors[] = $msg->MESSAGE ?? 'Unknown error';
}
}
return $errors;
}
private function mapCustomerData($result): array
{
return [
'number' => ltrim($result->CUSTOMERNO, '0'),
'name' => trim($result->CUSTOMERADDRESS->NAME ?? ''),
'street' => trim($result->CUSTOMERADDRESS->STREET ?? ''),
'city' => trim($result->CUSTOMERADDRESS->CITY ?? ''),
'postal_code' => trim($result->CUSTOMERADDRESS->POSTL_CODE ?? ''),
'country' => trim($result->CUSTOMERADDRESS->COUNTRY ?? ''),
'email' => trim($result->CUSTOMERADDRESS->E_MAIL ?? ''),
];
}
}
SAP-Fallen:
BAPI_TRANSACTION_COMMIT/ROLLBACK: Bei Erfolg COMMIT, bei Fehler ROLLBACK - sonst inkonsistente Daten!
Nummernformate: SAP erwartet fuehrende Nullen (Kundennummer 123 = 0000000123)
Zeichenkodierung: SAP nutzt SAP-interne Codepages (oft CP1252 oder ISO-8859-1) - Umlaute werden sonst vermurkst!
Zeitzone: SAP-Server-Zeit beachten - Timestamps immer in UTC senden und konvertieren!
Alternative: SAP OData (Gateway): Wenn SOAP zu komplex ist, bietet SAP Gateway moderne OData-APIs (REST-aehnlich). Fragen Sie Ihren SAP-Basis-Admin nach verfuegbaren Services unter /sap/opu/odata/.
DATEV Integration
DATEV bietet verschiedene Schnittstellen: DATEV-Connect (klassisch), DATEVconnect online (REST-API, OAuth 2.0) und die klassische ASCII-Schnittstelle:
<?php
class DatevExporter
{
private const HEADER_VERSION = '700';
private const FORMAT_VERSION = '4';
public function exportBookings(
array $bookings,
string $consultantNumber,
string $clientNumber,
int $fiscalYearStart,
\DateTimeInterface $from,
\DateTimeInterface $to
): string {
$lines = [];
// Header-Zeile
$lines[] = $this->buildHeader(
$consultantNumber,
$clientNumber,
$fiscalYearStart,
$from,
$to
);
// Spaltenüberschriften
$lines[] = $this->buildColumnHeaders();
// Buchungszeilen
foreach ($bookings as $booking) {
$lines[] = $this->buildBookingLine($booking);
}
// DATEV erwartet Windows-Zeilenumbrueche und ANSI-Kodierung
$content = implode("\r\n", $lines);
return mb_convert_encoding($content, 'Windows-1252', 'UTF-8');
}
private function buildHeader(
string $consultantNumber,
string $clientNumber,
int $fiscalYearStart,
\DateTimeInterface $from,
\DateTimeInterface $to
): string {
$fields = [
'EXTF', // Kennzeichen
self::HEADER_VERSION, // Versionsnummer
'21', // Datenkategorie (Buchungsstapel)
'Buchungsstapel', // Formatname
self::FORMAT_VERSION, // Formatversion
date('YmdHisv'), // Erzeugt am
'', // Importiert (leer)
'RE', // Herkunft
'', // Exportiert von
'', // Importiert von
$consultantNumber, // Berater
$clientNumber, // Mandant
$fiscalYearStart . '0101', // WJ-Beginn
'4', // Sachkontennummernlaenge
$from->format('Ymd'), // Datum von
$to->format('Ymd'), // Datum bis
'', // Bezeichnung
'', // Diktatkuerzel
'1', // Buchungstyp (1=Fibu)
'0', // Rechnungslegungszweck
'', // Festschreibung
'EUR', // WKZ
'', // Reserviert
'', // Derivatskennzeichen
'', // Reserviert
'', // Reserviert
'', // SKR
'', // Branchen-Loesungs-ID
'', // Reserviert
'', // Reserviert
'', // Anwendungsinfo
];
return implode(';', array_map([$this, 'escapeField'], $fields));
}
private function buildColumnHeaders(): string
{
return implode(';', [
'Umsatz (ohne Soll/Haben-Kz)',
'Soll/Haben-Kennzeichen',
'WKZ Umsatz',
'Kurs',
'Basis-Umsatz',
'WKZ Basis-Umsatz',
'Konto',
'Gegenkonto (ohne BU-Schluessel)',
'BU-Schluessel',
'Belegdatum',
'Belegfeld 1',
'Belegfeld 2',
'Skonto',
'Buchungstext',
]);
}
private function buildBookingLine(array $booking): string
{
$fields = [
$this->formatAmount($booking['amount']),
$booking['amount'] >= 0 ? 'S' : 'H',
'EUR',
'', // Kurs
'', // Basis-Umsatz
'', // WKZ Basis-Umsatz
$booking['account'], // Konto
$booking['counter_account'], // Gegenkonto
$booking['tax_key'] ?? '', // BU-Schluessel
$this->formatDate($booking['date']), // Belegdatum
$booking['document_number'] ?? '', // Belegfeld 1
$booking['reference'] ?? '', // Belegfeld 2
'', // Skonto
$this->escapeField($booking['text'] ?? ''),
];
return implode(';', $fields);
}
private function formatAmount(float $amount): string
{
// DATEV erwartet Komma als Dezimaltrenner, kein Tausendertrenner
return str_replace('.', ',', number_format(abs($amount), 2, '.', ''));
}
private function formatDate(\DateTimeInterface|string $date): string
{
if (is_string($date)) {
$date = new \DateTime($date);
}
// DATEV-Format: TTMM (ohne Jahr, da im Header)
return $date->format('dm');
}
private function escapeField(string $value): string
{
// CSV-Injection verhindern: Felder mit = + - @ prefixen
// (Excel/LibreOffice interpretiert diese sonst als Formeln!)
if (preg_match('/^[=+\-@]/', $value)) {
$value = "'" . $value; // Apostroph verhindert Formel-Interpretation
}
// Semikolon und Anfuehrungszeichen escapen
if (str_contains($value, ';') || str_contains($value, '"')) {
return '"' . str_replace('"', '""', $value) . '"';
}
return $value;
}
}
// Verwendung
$exporter = new DatevExporter();
$csv = $exporter->exportBookings(
bookings: [
[
'amount' => 119.00,
'account' => '1200',
'counter_account' => '8400',
'tax_key' => '3',
'date' => '2025-06-15',
'document_number' => 'RE-2025-0042',
'text' => 'Warenverkauf an Kunde Mueller',
],
],
consultantNumber: '12345',
clientNumber: '67890',
fiscalYearStart: 2025,
from: new DateTime('2025-06-01'),
to: new DateTime('2025-06-30')
);
file_put_contents('EXTF_Buchungsstapel.csv', $csv);
Merksatz: DATEV ist pingelig: Windows-Zeilenumbrueche (CRLF), ANSI-Kodierung (Windows-1252), Komma als Dezimaltrenner. Ein falsches Zeichen und der Import scheitert.
DATEV Best Practices:
- Immer mit DATEV-Importassistent testen bevor es in Produktion geht
- DATEVconnect online: REST-API mit OAuth 2.0 - moderne Alternative zu ASCII-Export
- Scopes begrenzen (nur das anfordern, was gebraucht wird)
- Bei Mandantenwechsel: Client-ID bleibt gleich, Token neu anfordern
Batch-Import Strategien
Wann Batch statt API?
- Grosse Datenmengen: Ab 10.000+ Records
- Keine Echtzeit nötig: Nächtliche Synchronisation reicht
- Legacy-Systeme: Nur Datei-Export möglich
- Kosten: API-Calls kosten, Dateien nicht
Robuster CSV-Importer
<?php
class BatchImporter
{
private const CHUNK_SIZE = 1000;
public function __construct(
private readonly PDO $db,
private readonly LoggerInterface $logger
) {}
public function importCsv(
string $filePath,
string $targetTable,
array $columnMapping,
callable $transformer = null
): ImportResult {
$result = new ImportResult();
$result->startedAt = new \DateTimeImmutable();
if (!file_exists($filePath)) {
throw new \InvalidArgumentException("File not found: {$filePath}");
}
$handle = fopen($filePath, 'r');
if ($handle === false) {
throw new \RuntimeException("Cannot open file: {$filePath}");
}
try {
// Header lesen
$header = fgetcsv($handle, 0, ';', '"', '\\');
if ($header === false) {
throw new \RuntimeException("Cannot read CSV header");
}
// Header normalisieren (BOM entfernen, trimmen)
$header = array_map(fn($h) => trim(preg_replace('/^\xEF\xBB\xBF/', '', $h)), $header);
$this->validateHeader($header, $columnMapping);
$batch = [];
$lineNumber = 1;
while (($row = fgetcsv($handle, 0, ';', '"', '\\')) !== false) {
$lineNumber++;
try {
// Leere Zeilen überspringen
if (count(array_filter($row)) === 0) {
continue;
}
$data = array_combine($header, $row);
if ($transformer) {
$data = $transformer($data, $lineNumber);
if ($data === null) {
$result->skipped++;
continue;
}
}
$mapped = $this->mapColumns($data, $columnMapping);
$batch[] = $mapped;
if (count($batch) >= self::CHUNK_SIZE) {
$this->insertBatch($targetTable, $batch);
$result->imported += count($batch);
$batch = [];
$this->logger->info("Imported {$result->imported} rows");
}
} catch (\Throwable $e) {
$result->errors[] = [
'line' => $lineNumber,
'error' => $e->getMessage(),
'data' => $row,
];
if (count($result->errors) > 100) {
throw new \RuntimeException("Too many errors, aborting import");
}
}
}
// Rest importieren
if (!empty($batch)) {
$this->insertBatch($targetTable, $batch);
$result->imported += count($batch);
}
} finally {
fclose($handle);
}
$result->finishedAt = new \DateTimeImmutable();
$this->logger->info("Import completed", [
'imported' => $result->imported,
'skipped' => $result->skipped,
'errors' => count($result->errors),
'duration_s' => $result->getDurationSeconds(),
]);
return $result;
}
private function validateHeader(array $header, array $columnMapping): void
{
$required = array_keys($columnMapping);
$missing = array_diff($required, $header);
if (!empty($missing)) {
throw new \RuntimeException(
"Missing columns in CSV: " . implode(', ', $missing)
);
}
}
private function mapColumns(array $data, array $columnMapping): array
{
$result = [];
foreach ($columnMapping as $sourceColumn => $config) {
$value = $data[$sourceColumn] ?? null;
if (is_string($config)) {
// Einfaches Mapping: 'csv_col' => 'db_col'
$result[$config] = $value;
} else {
// Komplexes Mapping mit Transformation
$targetColumn = $config['column'];
if (isset($config['transform'])) {
$value = $config['transform']($value);
}
if (isset($config['default']) && ($value === null || $value === '')) {
$value = $config['default'];
}
$result[$targetColumn] = $value;
}
}
return $result;
}
private function insertBatch(string $table, array $rows): void
{
if (empty($rows)) {
return;
}
$columns = array_keys($rows[0]);
$placeholders = '(' . implode(',', array_fill(0, count($columns), '?')) . ')';
$allPlaceholders = implode(',', array_fill(0, count($rows), $placeholders));
// UPSERT mit ON CONFLICT (PostgreSQL)
$sql = sprintf(
'INSERT INTO %s (%s) VALUES %s
ON CONFLICT (external_id) DO UPDATE SET %s, updated_at = NOW()',
$table,
implode(',', $columns),
$allPlaceholders,
implode(',', array_map(fn($c) => "{$c} = EXCLUDED.{$c}",
array_filter($columns, fn($c) => $c !== 'external_id')
))
);
$values = [];
foreach ($rows as $row) {
foreach ($columns as $col) {
$values[] = $row[$col];
}
}
$stmt = $this->db->prepare($sql);
$stmt->execute($values);
}
}
class ImportResult
{
public \DateTimeImmutable $startedAt;
public ?\DateTimeImmutable $finishedAt = null;
public int $imported = 0;
public int $skipped = 0;
public array $errors = [];
public function getDurationSeconds(): int
{
if (!$this->finishedAt) {
return 0;
}
return $this->finishedAt->getTimestamp() - $this->startedAt->getTimestamp();
}
public function isSuccessful(): bool
{
return empty($this->errors);
}
}
Integration-Patterns
Pattern 1: Point-to-Point
[System A] <-----> [System B]
Vorteile: Einfach, direkt, schnell implementiert Nachteile: Skaliert nicht (n Systeme = n*(n-1)/2 Verbindungen)
<?php
// Direkte Integration: Webshop -> ERP
class OrderSyncService
{
public function __construct(
private readonly WebshopApi $webshop,
private readonly SapClient $sap
) {}
public function syncNewOrders(): void
{
$orders = $this->webshop->getUnsynced();
foreach ($orders as $order) {
$sapOrderId = $this->sap->createSalesOrder([
'customer_number' => $order['customer_erp_id'],
'items' => $order['items'],
// ...
]);
$this->webshop->markAsSynced($order['id'], $sapOrderId);
}
}
}
Pattern 2: Hub-and-Spoke (Integration Hub)
[System A]
|
[System B] -- [HUB] -- [System C]
|
[System D]
Vorteile: Zentrale Kontrolle, Transformation, Monitoring Nachteile: Single Point of Failure, Komplexitaet im Hub
<?php
class IntegrationHub
{
private array $adapters = [];
private array $transformers = [];
public function registerAdapter(string $systemId, SystemAdapter $adapter): void
{
$this->adapters[$systemId] = $adapter;
}
public function registerTransformer(
string $sourceSystem,
string $targetSystem,
string $entityType,
callable $transformer
): void {
$key = "{$sourceSystem}:{$targetSystem}:{$entityType}";
$this->transformers[$key] = $transformer;
}
public function sync(
string $sourceSystem,
string $targetSystem,
string $entityType,
array $entities
): SyncResult {
$result = new SyncResult();
$transformer = $this->transformers["{$sourceSystem}:{$targetSystem}:{$entityType}"]
?? throw new \RuntimeException("No transformer registered");
$targetAdapter = $this->adapters[$targetSystem]
?? throw new \RuntimeException("Unknown target system: {$targetSystem}");
foreach ($entities as $entity) {
try {
$transformed = $transformer($entity);
$targetAdapter->upsert($entityType, $transformed);
$result->success++;
} catch (\Throwable $e) {
$result->failures[] = [
'entity' => $entity,
'error' => $e->getMessage()
];
}
}
return $result;
}
}
// Verwendung
$hub = new IntegrationHub();
$hub->registerAdapter('salesforce', new SalesforceAdapter($sfClient));
$hub->registerAdapter('sap', new SapAdapter($sapClient));
$hub->registerTransformer('salesforce', 'sap', 'customer', function(array $sfCustomer) {
return [
'name' => $sfCustomer['Name'],
'email' => $sfCustomer['Email'],
'external_id' => $sfCustomer['Id'],
// SAP-spezifische Felder
'sales_org' => '1000',
'distribution_channel' => '10',
];
});
Pattern 3: Event-Driven (Message Queue)
[System A] --> [Queue] --> [Consumer] --> [System B]
Vorteile: Entkopplung, Skalierbarkeit, Ausfallsicherheit Nachteile: Eventual Consistency, Komplexeres Debugging
<?php
// Event Producer
class OrderEventProducer
{
public function __construct(
private readonly MessageQueueInterface $queue
) {}
public function publishOrderCreated(array $order): void
{
$this->queue->publish('orders.created', [
'event_id' => Uuid::uuid4()->toString(),
'event_type' => 'order.created',
'timestamp' => (new \DateTimeImmutable())->format('c'),
'payload' => $order,
]);
}
}
// Event Consumer
class SapOrderConsumer
{
public function __construct(
private readonly SapClient $sap,
private readonly ProcessedEventStore $processedEvents
) {}
public function handle(array $message): void
{
$eventId = $message['event_id'];
// Idempotenz: Schon verarbeitet?
if ($this->processedEvents->exists($eventId)) {
return;
}
$order = $message['payload'];
$sapOrderId = $this->sap->createSalesOrder($order);
$this->processedEvents->store($eventId, [
'processed_at' => date('c'),
'sap_order_id' => $sapOrderId,
]);
}
}
Merksatz: Point-to-Point für 2-3 Systeme, Hub-and-Spoke ab 4+ Systeme, Event-Driven wenn Echtzeit und Skalierung kritisch sind.
Circuit-Breaker Pattern
Wenn eine Downstream-API wiederholt fehlschlaegt, macht es keinen Sinn, sie weiter zu bombardieren. Der Circuit-Breaker schuetzt beide Seiten:
<?php
class CircuitBreaker
{
private const STATE_CLOSED = 'closed'; // Normal
private const STATE_OPEN = 'open'; // Blockiert
private const STATE_HALF_OPEN = 'half'; // Testphase
public function __construct(
private readonly string $service,
private readonly CacheInterface $cache,
private readonly int $failureThreshold = 5,
private readonly int $recoveryTimeout = 30,
private readonly int $windowSeconds = 60
) {}
public function call(callable $operation): mixed
{
$state = $this->getState();
if ($state === self::STATE_OPEN) {
// Circuit offen: Nicht mal versuchen
throw new CircuitOpenException(
"Circuit für {$this->service} ist offen - bitte später versuchen"
);
}
try {
$result = $operation();
$this->recordSuccess();
return $result;
} catch (\Throwable $e) {
$this->recordFailure();
if ($this->shouldTrip()) {
$this->trip();
}
throw $e;
}
}
private function getState(): string
{
$state = $this->cache->get("circuit:{$this->service}:state") ?? self::STATE_CLOSED;
if ($state === self::STATE_OPEN) {
$trippedAt = $this->cache->get("circuit:{$this->service}:tripped_at");
if (time() - $trippedAt >= $this->recoveryTimeout) {
return self::STATE_HALF_OPEN; // Recovery-Versuch erlauben
}
}
return $state;
}
private function shouldTrip(): bool
{
$failures = (int)$this->cache->get("circuit:{$this->service}:failures") ?: 0;
return $failures >= $this->failureThreshold;
}
private function trip(): void
{
$this->cache->set("circuit:{$this->service}:state", self::STATE_OPEN);
$this->cache->set("circuit:{$this->service}:tripped_at", time());
}
private function recordFailure(): void
{
$key = "circuit:{$this->service}:failures";
$this->cache->increment($key);
$this->cache->expire($key, $this->windowSeconds);
}
private function recordSuccess(): void
{
// Bei Erfolg in Half-Open: Circuit schliessen
if ($this->getState() === self::STATE_HALF_OPEN) {
$this->cache->delete("circuit:{$this->service}:state");
$this->cache->delete("circuit:{$this->service}:failures");
}
}
}
// Verwendung
$circuit = new CircuitBreaker('salesforce', $redis);
try {
$result = $circuit->call(fn() => $salesforce->createRecord('Account', $data));
} catch (CircuitOpenException $e) {
// In Queue für spätere Verarbeitung packen
$queue->push('salesforce_retry', $data);
}
Dead Letter Queue (DLQ)
Poison Messages - Events, die immer wieder fehlschlagen - muessen aus der Queue raus, sonst blockieren sie alles:
<?php
class DLQHandler
{
private const MAX_RETRIES = 3;
public function __construct(
private readonly MessageQueueInterface $mainQueue,
private readonly MessageQueueInterface $dlq,
private readonly LoggerInterface $logger
) {}
public function processWithDLQ(string $queue, callable $handler): void
{
while ($message = $this->mainQueue->pop($queue)) {
$retryCount = $message['_retry_count'] ?? 0;
try {
$handler($message);
} catch (\Throwable $e) {
$retryCount++;
if ($retryCount >= self::MAX_RETRIES) {
// Ab in die DLQ - manuell später pruefen
$this->dlq->push("{$queue}_dlq", [
...$message,
'_retry_count' => $retryCount,
'_last_error' => $e->getMessage(),
'_failed_at' => date('c'),
]);
$this->logger->error("Message moved to DLQ after {$retryCount} retries", [
'queue' => $queue,
'error' => $e->getMessage(),
'message_id' => $message['event_id'] ?? 'unknown',
]);
} else {
// Zurueck in die Queue mit erhoehtem Counter
$message['_retry_count'] = $retryCount;
$this->mainQueue->push($queue, $message);
}
}
}
}
// DLQ manuell reprocessen (nach Fix)
public function reprocessDLQ(string $queue): int
{
$reprocessed = 0;
while ($message = $this->dlq->pop("{$queue}_dlq")) {
unset($message['_retry_count'], $message['_last_error'], $message['_failed_at']);
$this->mainQueue->push($queue, $message);
$reprocessed++;
}
return $reprocessed;
}
}
Circuit-Breaker + DLQ - Best Practice:
- Circuit-Breaker oeffnen bei 5 Fehlern in 60 Sekunden
- 30 Sekunden warten, dann Half-Open (ein Versuch)
- Nach 3 Retries: Message in DLQ verschieben
- DLQ-Wachstum monitoren - Alert bei > 10 Messages/Tag
Idempotenz und Fehlerbehandlung
Warum Idempotenz kritisch ist
Netzwerke sind unzuverlaessig. Requests koennen:
- Timeout ohne Antwort (aber erfolgreich ausgefuehrt)
- Vom Client wiederholt werden (Retry-Logik)
- Vom Server doppelt empfangen werden
Ohne Idempotenz: Doppelte Bestellungen, doppelte Zahlungen, Chaos.
<?php
class IdempotentSyncService
{
public function __construct(
private readonly PDO $db,
private readonly ExternalApi $api
) {}
public function syncEntity(string $entityType, string $localId, array $data): void
{
// Idempotency-Key generieren
$idempotencyKey = $this->generateIdempotencyKey($entityType, $localId, $data);
// Schon synchronisiert?
$existing = $this->findSyncRecord($idempotencyKey);
if ($existing) {
if ($existing['status'] === 'success') {
return; // Bereits erfolgreich
}
if ($existing['status'] === 'pending' &&
$existing['created_at'] > date('Y-m-d H:i:s', strtotime('-5 minutes'))) {
return; // Noch in Bearbeitung
}
}
// Sync-Record erstellen/aktualisieren
$syncId = $this->upsertSyncRecord($idempotencyKey, 'pending');
try {
$externalId = $this->api->upsert($entityType, $data);
$this->updateSyncRecord($syncId, 'success', $externalId);
} catch (\Throwable $e) {
$this->updateSyncRecord($syncId, 'failed', null, $e->getMessage());
throw $e;
}
}
private function generateIdempotencyKey(
string $entityType,
string $localId,
array $data
): string {
// Hash aus Entity-Typ, ID und Daten-Hash
$dataHash = md5(json_encode($data, JSON_THROW_ON_ERROR));
return "{$entityType}:{$localId}:{$dataHash}";
}
private function findSyncRecord(string $idempotencyKey): ?array
{
$stmt = $this->db->prepare(
'SELECT * FROM sync_records WHERE idempotency_key = ?'
);
$stmt->execute([$idempotencyKey]);
return $stmt->fetch(PDO::FETCH_ASSOC) ?: null;
}
private function upsertSyncRecord(string $idempotencyKey, string $status): int
{
$stmt = $this->db->prepare(
'INSERT INTO sync_records (idempotency_key, status, created_at)
VALUES (?, ?, NOW())
ON CONFLICT (idempotency_key) DO UPDATE SET status = ?, updated_at = NOW()
RETURNING id'
);
$stmt->execute([$idempotencyKey, $status, $status]);
return (int)$stmt->fetchColumn();
}
private function updateSyncRecord(
int $id,
string $status,
?string $externalId,
?string $error = null
): void {
$stmt = $this->db->prepare(
'UPDATE sync_records
SET status = ?, external_id = ?, error_message = ?, updated_at = NOW()
WHERE id = ?'
);
$stmt->execute([$status, $externalId, $error, $id]);
}
}
Monitoring und Alerting
Integration-Dashboard
<?php
class IntegrationMonitor
{
public function __construct(
private readonly PDO $db,
private readonly AlertService $alerts
) {}
public function getStats(string $integration, int $hours = 24): array
{
$stmt = $this->db->prepare(
"SELECT
COUNT(*) as total,
COUNT(*) FILTER (WHERE status = 'success') as success,
COUNT(*) FILTER (WHERE status = 'failed') as failed,
COUNT(*) FILTER (WHERE status = 'pending') as pending,
AVG(EXTRACT(EPOCH FROM (completed_at - started_at))) as avg_duration_s,
MAX(EXTRACT(EPOCH FROM (completed_at - started_at))) as max_duration_s
FROM integration_jobs
WHERE integration_name = ?
AND started_at > NOW() - INTERVAL '{$hours} hours'"
);
$stmt->execute([$integration]);
return $stmt->fetch(PDO::FETCH_ASSOC);
}
public function checkHealth(): array
{
$issues = [];
// Pruefen: Gibt es haengende Jobs?
$stmt = $this->db->query(
"SELECT integration_name, COUNT(*) as stuck
FROM integration_jobs
WHERE status = 'pending'
AND started_at < NOW() - INTERVAL '30 minutes'
GROUP BY integration_name"
);
foreach ($stmt->fetchAll(PDO::FETCH_ASSOC) as $row) {
$issues[] = [
'type' => 'stuck_jobs',
'integration' => $row['integration_name'],
'count' => $row['stuck'],
'severity' => 'warning',
];
}
// Pruefen: Fehlerrate zu hoch?
// WICHTIG: Minimum-Threshold (20 Jobs) verhindert False Positives bei geringem Traffic
$stmt = $this->db->query(
"SELECT
integration_name,
COUNT(*) as total,
COUNT(*) FILTER (WHERE status = 'failed') * 100.0 / COUNT(*) as error_rate
FROM integration_jobs
WHERE started_at > NOW() - INTERVAL '1 hour'
GROUP BY integration_name
HAVING COUNT(*) >= 20 -- Minimum-Threshold!
AND COUNT(*) FILTER (WHERE status = 'failed') * 100.0 / COUNT(*) > 10"
);
foreach ($stmt->fetchAll(PDO::FETCH_ASSOC) as $row) {
$issues[] = [
'type' => 'high_error_rate',
'integration' => $row['integration_name'],
'error_rate' => round($row['error_rate'], 1),
'severity' => 'critical',
];
$this->alerts->send(
"Integration {$row['integration_name']} hat {$row['error_rate']}% Fehlerrate!"
);
}
return $issues;
}
public function getRecentErrors(string $integration, int $limit = 10): array
{
$stmt = $this->db->prepare(
"SELECT id, started_at, error_message, payload
FROM integration_jobs
WHERE integration_name = ?
AND status = 'failed'
ORDER BY started_at DESC
LIMIT ?"
);
$stmt->execute([$integration, $limit]);
return $stmt->fetchAll(PDO::FETCH_ASSOC);
}
}
Logging Best Practices
<?php
class IntegrationLogger
{
public function __construct(
private readonly LoggerInterface $logger
) {}
public function logApiCall(
string $system,
string $method,
string $endpoint,
array $request,
mixed $response,
float $durationMs,
?string $error = null
): void {
$context = [
'system' => $system,
'method' => $method,
'endpoint' => $endpoint,
'duration_ms' => round($durationMs, 2),
'request_size' => strlen(json_encode($request)),
'correlation_id' => $this->getCorrelationId(),
];
// Sensitive Daten maskieren
$sanitizedRequest = $this->sanitize($request);
$context['request'] = $sanitizedRequest;
if ($error) {
$context['error'] = $error;
$this->logger->error("API call failed: {$system} {$method} {$endpoint}", $context);
} else {
$this->logger->info("API call success: {$system} {$method} {$endpoint}", $context);
}
}
private function sanitize(array $data): array
{
// Erweiterte Liste sensitiver Felder
$sensitiveKeys = [
'password', 'secret', 'token', 'api_key', 'apikey',
'authorization', 'bearer', 'session', 'session_id',
'refresh_token', 'access_token', 'set-cookie', 'cookie',
'x-api-key', 'client_secret', 'private_key'
];
$sanitized = [];
foreach ($data as $key => $value) {
$lowerKey = strtolower($key);
if (in_array($lowerKey, $sensitiveKeys)) {
$sanitized[$key] = '***REDACTED***';
} elseif (is_array($value)) {
$sanitized[$key] = $this->sanitize($value);
} else {
$sanitized[$key] = $value;
}
}
return $sanitized;
}
private function getCorrelationId(): string
{
return $_SERVER['X-Correlation-ID'] ?? uniqid('int-', true);
}
}
Merksatz: Ohne Monitoring fliegen Integrationsfehler unter dem Radar - bis der Kunde anruft. Loggen Sie jeden API-Call mit Correlation-ID.
Correlation-ID als Response-Header
Geben Sie die Correlation-ID auch im Response zurueck - dann kann der Kunde bei Problemen gezielt im Log suchen:
<?php
// In Ihrem API-Controller
$correlationId = $_SERVER['HTTP_X_CORRELATION_ID']
?? $_SERVER['HTTP_X_REQUEST_ID']
?? uniqid('req-', true);
// Correlation-ID im Response-Header zurueckgeben
header("X-Correlation-ID: {$correlationId}");
// In allen Logs verwenden
$logger->info("Processing request", ['correlation_id' => $correlationId]);
SLOs definieren
Ohne SLOs (Service Level Objectives) wissen Sie nicht, ob Ihre Integration “gut” laeuft. Definieren Sie messbare Ziele:
| Metrik | SLO | Alert bei |
|---|---|---|
| p95 Latenz | < 2 Sekunden | > 5 Sekunden |
| Fehlerrate | < 1% | > 5% |
| Stuck Jobs | 0 | > 0 für > 30 Min |
| DLQ-Wachstum | < 10/Tag | > 50/Tag |
| Circuit-Breaker Open | < 5 Min/Tag | > 30 Min/Tag |
<?php
// SLO-Pruefung in Monitoring integrieren
public function checkSLOs(): array
{
$violations = [];
$stats = $this->getStats('salesforce', hours: 1);
// p95 Latenz
if ($stats['p95_duration_s'] > 2.0) {
$violations[] = "Salesforce p95 Latenz: {$stats['p95_duration_s']}s (SLO: <2s)";
}
// Fehlerrate (nur bei genuegend Traffic)
if ($stats['total'] >= 20 && $stats['error_rate'] > 1.0) {
$violations[] = "Salesforce Fehlerrate: {$stats['error_rate']}% (SLO: <1%)";
}
return $violations;
}
Checkliste: Integration Production-Ready
Integration-Checkliste
Vor dem Go-Live:
- Retry-Logik mit Exponential Backoff + Jitter
- Idempotenz für alle schreibenden Operationen
- Rate-Limiting beachtet und implementiert
- Timeout-Werte konfiguriert (Connect + Read)
- Fehlerbehandlung für alle HTTP-Status-Codes
- Circuit-Breaker für instabile Downstream-APIs
- Dead Letter Queue für Poison Messages
- Credentials sicher gespeichert (nicht im Code!)
- Logging mit Correlation-IDs (auch im Response!)
- Monitoring mit SLOs und Minimum-Thresholds
- Testdaten von Produktionsdaten getrennt
- Rollback-Strategie definiert
Sicherheit:
- TLS/SSL für alle Verbindungen (CURLOPT_SSL_VERIFYPEER!)
- Webhooks mit HMAC-Signatur validieren
- API-Keys rotierbar
- Minimale Berechtigungen (Principle of Least Privilege)
- Eingabevalidierung vor API-Calls
- CSV-Injection verhindern (= + - @ prefixen)
- Sensitive Daten im Log maskiert (erweiterte Liste!)
System-spezifische Tipps
Salesforce
| Aspekt | Empfehlung |
|---|---|
| API-Version | Immer aktuelle Version nutzen (v59.0+) |
| Bulk API | Ab 2.000+ Records, nicht REST |
| Sandbox | Immer erst in Sandbox testen |
| Limits | API-Limit-Verbrauch monitoren |
| Composite API | Fuer mehrere zusammenhaengende Operationen |
SAP
| Aspekt | Empfehlung |
|---|---|
| BAPI_TRANSACTION_COMMIT | Nach JEDEM schreibenden BAPI |
| Nummernformate | Fuehrende Nullen beachten |
| Zeitzone | SAP-Server-Zeit beruecksichtigen |
| Fehler-Handling | RETURN-Struktur immer pruefen |
| Performance | BAPIs sind langsam, Batch wenn möglich |
DATEV
| Aspekt | Empfehlung |
|---|---|
| Encoding | Windows-1252, nicht UTF-8 |
| Zeilenumbruch | CRLF (Windows) |
| Dezimaltrennzeichen | Komma, nicht Punkt |
| Datumsformat | TTMM (ohne Jahr) |
| Validierung | Mit DATEV-Importassistent testen |
Fazit
ERP- und CRM-Integration ist kein Hexenwerk, aber auch kein Spaziergang. Die wichtigsten Erfolgsfaktoren:
- Idempotenz first: Jede Operation muss mehrfach ausfuehrbar sein
- Monitoring pflicht: Ohne Alerting fliegen Fehler unter dem Radar
- Retry mit Verstand: Exponential Backoff, nicht Endlosschleife
- Dokumentation: APIs aendern sich, Ihre Doku muss aktuell sein
Die Code-Beispiele in diesem Artikel sind Production-ready und haben sich in echten Projekten bewaehrt. Kopieren Sie sie, passen Sie sie an, und bauen Sie darauf auf.
Sie planen eine Integration? Wir haben Salesforce, SAP, DATEV und dutzende andere Systeme bereits angebunden. Sprechen Sie uns an - wir wissen, wo die Fallstricke liegen.
Über Carola Schulte
Software-Architektin mit 25+ Jahren Erfahrung. Spezialisiert auf robuste Business-Apps mit PHP/PostgreSQL, Security-by-Design und DSGVO-konforme Systeme. 1,8M+ Lines of Code in Produktion.
Projekt im Kopf?
Lassen Sie uns besprechen, wie ich Ihre Anforderungen umsetzen kann – kostenlos und unverbindlich.
Kostenloses Erstgespräch