Background-Jobs in PHP: Queues, Cron & lange Prozesse
"Die PDF-Generierung blockiert den Request 8 Sekunden." "Der Newsletter-Versand bringt den Server in die Knie." "Warum dauert der CSV-Import so lange?" – Klassische Symptome für fehlende Background-Jobs. PHP ist für Request-Response optimiert, aber mit den richtigen Patterns läuft auch asynchrone Verarbeitung stabil.
TL;DR – Die Kurzfassung
- Cron: Für zeitgesteuerte Tasks (Reports, Cleanup, Sync)
- Message Queues: Für asynchrone Verarbeitung (E-Mails, PDFs, Webhooks)
- Worker-Prozesse: Long-running PHP-Prozesse mit Supervisord
- Goldene Regel: Jobs müssen idempotent sein (mehrfach ausführbar ohne Seiteneffekte)
- Memory-Management: PHP-Worker regelmäßig recyclen (Memory Leaks!)
Warum Background-Jobs?
HTTP-Requests sollten schnell sein. Alles über 200ms fühlt sich langsam an. Aber manche Operationen dauern länger:
┌─────────────────────────────────────────────────────────────────┐
│ SYNCHRON vs. ASYNCHRON │
├─────────────────────────────────────────────────────────────────┤
│ │
│ SYNCHRON (schlecht für UX) │
│ ┌────────┐ ┌─────────────────────────┐ ┌────────┐ │
│ │ User │───▶│ PDF generieren (5s) │───▶│ Response│ │
│ └────────┘ │ E-Mail senden (2s) │ └────────┘ │
│ │ Webhook triggern (1s) │ │
│ └─────────────────────────┘ │
│ ↓ 8 Sekunden warten! │
│ │
│ ASYNCHRON (gute UX) │
│ ┌────────┐ ┌─────────────┐ ┌────────┐ │
│ │ User │───▶│ Job queuen │───▶│ Response│ ← 50ms │
│ └────────┘ └──────┬──────┘ └────────┘ │
│ │ │
│ ▼ │
│ ┌───────────────┐ │
│ │ Worker │ ← Verarbeitet im Hintergrund │
│ │ PDF, E-Mail.. │ │
│ └───────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
Typische Use Cases
| Use Case | Ansatz | Warum? |
|---|---|---|
| E-Mail-Versand | Queue | SMTP kann langsam sein, Retry bei Fehler |
| PDF-Generierung | Queue | CPU-intensiv, blockiert Worker |
| Bild-Resize | Queue | I/O-intensiv, parallelisierbar |
| Webhook-Callbacks | Queue | Externe APIs können timeout |
| CSV/Excel-Import | Queue + Chunks | Große Dateien, Progress-Tracking |
| Nightly Reports | Cron | Zeitgesteuert, nicht Event-getrieben |
| Datenbank-Cleanup | Cron | Regelmäßig, off-peak |
| API-Sync (extern) | Cron + Queue | Regelmäßig triggern, einzelne Syncs queuen |
Cron-Jobs: Der Klassiker
Cron ist seit Jahrzehnten das Arbeitstier für zeitgesteuerte Tasks. Einfach, zuverlässig, überall verfügbar.
Crontab-Syntax
# ┌───────────── Minute (0-59)
# │ ┌───────────── Stunde (0-23)
# │ │ ┌───────────── Tag des Monats (1-31)
# │ │ │ ┌───────────── Monat (1-12)
# │ │ │ │ ┌───────────── Wochentag (0-7, 0 und 7 = Sonntag)
# │ │ │ │ │
# * * * * * command
# Beispiele:
0 2 * * * /usr/bin/php /var/www/app/bin/console reports:daily
0 3 * * 0 /usr/bin/php /var/www/app/bin/console cleanup:old-sessions
*/15 * * * * /usr/bin/php /var/www/app/bin/console sync:external-api
# ⚠️ Fallback für Mini-Setups OHNE echten Worker:
# */5 * * * * /usr/bin/php /var/www/app/bin/console queue:work --once
# Das ist "Cron-Polling" – NICHT empfohlen für Produktion!
# Besser: Long-running Worker + Supervisord (siehe unten)
Cron-Job Wrapper mit Locking
Problem: Was wenn ein Job länger läuft als das Intervall? Dann laufen mehrere Instanzen parallel.
<?php
declare(strict_types=1);
final class CronJobRunner
{
public function __construct(
private string $lockDir = '/var/run/cron',
) {
if (!is_dir($this->lockDir)) {
mkdir($this->lockDir, 0755, true);
}
}
public function run(string $jobName, callable $job): int
{
$lockFile = "{$this->lockDir}/{$jobName}.lock";
// Exklusives Lock holen (non-blocking)
$fp = fopen($lockFile, 'c');
if (!flock($fp, LOCK_EX | LOCK_NB)) {
echo "[{$jobName}] Already running, skipping.\n";
fclose($fp);
return 0;
}
// PID schreiben für Debugging
ftruncate($fp, 0);
fwrite($fp, (string) getmypid());
fflush($fp);
try {
$startTime = microtime(true);
echo "[{$jobName}] Starting at " . date('Y-m-d H:i:s') . "\n";
$result = $job();
$duration = round(microtime(true) - $startTime, 2);
echo "[{$jobName}] Completed in {$duration}s\n";
return $result ?? 0;
} catch (\Throwable $e) {
echo "[{$jobName}] FAILED: {$e->getMessage()}\n";
// Error-Logging, Alerting, etc.
return 1;
} finally {
flock($fp, LOCK_UN);
fclose($fp);
}
}
}
// Nutzung in bin/cron/daily-report.php
$runner = new CronJobRunner();
exit($runner->run('daily-report', function () use ($container) {
$reportGenerator = $container->get(DailyReportGenerator::class);
$reportGenerator->generate(new DateTimeImmutable('yesterday'));
return 0;
}));
Cron-Monitoring
<?php
// Heartbeat an Monitoring-Service senden
final class CronMonitor
{
public function __construct(
private HttpClientInterface $http,
private string $healthchecksUrl, // z.B. healthchecks.io
) {}
public function wrap(string $jobId, callable $job): mixed
{
// Start-Signal
$this->ping($jobId, '/start');
try {
$result = $job();
// Success-Signal
$this->ping($jobId);
return $result;
} catch (\Throwable $e) {
// Fail-Signal
$this->ping($jobId, '/fail');
throw $e;
}
}
private function ping(string $jobId, string $suffix = ''): void
{
try {
$this->http->request('GET', "{$this->healthchecksUrl}/{$jobId}{$suffix}", [
'timeout' => 5,
]);
} catch (\Throwable) {
// Monitoring-Fehler nicht propagieren
}
}
}
Message Queues: Asynchrone Verarbeitung
Für Event-getriebene Jobs sind Message Queues der richtige Ansatz. Ein Producer erstellt Jobs, ein Worker verarbeitet sie.
Queue-Backend Vergleich
| Backend | Pro | Contra | Use Case |
|---|---|---|---|
| Redis | Schnell, einfach, LPUSH/BRPOP | Kein Ack, Persistence optional | Einfache Jobs, hoher Durchsatz |
| RabbitMQ | Robust, Routing, Dead Letter | Komplexer, eigener Service | Enterprise, komplexe Workflows |
| PostgreSQL | Schon da, ACID, SKIP LOCKED | Polling statt Push, Last auf DB | Kleine Apps, bereits PostgreSQL |
| Beanstalkd | Simpel, Delays, Priorities | Weniger verbreitet | Einfache Queue mit Features |
| Amazon SQS | Managed, skaliert, zuverlässig | AWS-Lock-in, Latenz | Cloud-native Apps |
Database Queue mit PostgreSQL
Kein Redis? PostgreSQL mit FOR UPDATE SKIP LOCKED ist ein solides Queue-Backend.
CREATE TABLE job_queue (
id BIGSERIAL PRIMARY KEY,
queue VARCHAR(100) NOT NULL DEFAULT 'default',
payload JSONB NOT NULL,
-- Status-Tracking
status VARCHAR(20) NOT NULL DEFAULT 'pending'
CHECK (status IN ('pending', 'processing', 'completed', 'failed')),
attempts INT NOT NULL DEFAULT 0,
max_attempts INT NOT NULL DEFAULT 3,
-- Timing
available_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
reserved_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
-- Error-Tracking
last_error TEXT,
-- Worker-Info
worker_id VARCHAR(100)
);
CREATE INDEX idx_queue_pending ON job_queue(queue, available_at)
WHERE status = 'pending';
CREATE INDEX idx_queue_failed ON job_queue(queue)
WHERE status = 'failed';
<?php
declare(strict_types=1);
final class DatabaseQueue
{
public function __construct(
private PDO $db,
private string $workerId,
) {}
public function push(string $queue, array $payload, int $delaySeconds = 0): int
{
// make_interval() ist stabiler als `:delay * INTERVAL '1 second'`
// (PDO-Driver-Kompatibilität)
$sql = "INSERT INTO job_queue (queue, payload, available_at)
VALUES (:queue, :payload, NOW() + make_interval(secs => :delay))
RETURNING id";
$stmt = $this->db->prepare($sql);
$stmt->execute([
'queue' => $queue,
'payload' => json_encode($payload),
'delay' => $delaySeconds,
]);
return (int) $stmt->fetchColumn();
}
public function pop(string $queue): ?array
{
// Atomares Claim mit SKIP LOCKED (keine Blockierung anderer Worker!)
$sql = "UPDATE job_queue SET
status = 'processing',
reserved_at = NOW(),
attempts = attempts + 1,
worker_id = :worker_id
WHERE id = (
SELECT id FROM job_queue
WHERE queue = :queue
AND status = 'pending'
AND available_at <= NOW()
AND attempts < max_attempts -- Wichtig: keine Retry-Loops!
ORDER BY available_at ASC
FOR UPDATE SKIP LOCKED
LIMIT 1
)
RETURNING *";
$stmt = $this->db->prepare($sql);
$stmt->execute([
'queue' => $queue,
'worker_id' => $this->workerId,
]);
$job = $stmt->fetch(PDO::FETCH_ASSOC);
if (!$job) {
return null;
}
$job['payload'] = json_decode($job['payload'], true);
return $job;
}
public function complete(int $jobId): void
{
$sql = "UPDATE job_queue SET
status = 'completed',
completed_at = NOW()
WHERE id = :id";
$this->db->prepare($sql)->execute(['id' => $jobId]);
}
public function fail(int $jobId, string $error): void
{
// Retry oder endgültig failed?
$sql = "UPDATE job_queue SET
status = CASE
WHEN attempts < max_attempts THEN 'pending'
ELSE 'failed'
END,
reserved_at = NULL,
worker_id = NULL,
available_at = CASE
WHEN attempts < max_attempts
THEN NOW() + make_interval(secs => attempts * 30)
ELSE available_at
END,
last_error = :error
WHERE id = :id";
$this->db->prepare($sql)->execute([
'id' => $jobId,
'error' => $error,
]);
}
public function release(int $jobId, int $delaySeconds = 0): void
{
// Job zurück in Queue (z.B. bei graceful shutdown)
$sql = "UPDATE job_queue SET
status = 'pending',
reserved_at = NULL,
worker_id = NULL,
available_at = NOW() + make_interval(secs => :delay)
WHERE id = :id";
$this->db->prepare($sql)->execute([
'id' => $jobId,
'delay' => $delaySeconds,
]);
}
/**
* Stale Jobs zurücksetzen (Worker crashed/gestorben).
* Per Cron alle 5 Minuten aufrufen!
*/
public function requeueStaleJobs(int $staleMinutes = 15): int
{
$sql = "UPDATE job_queue SET
status = 'pending',
reserved_at = NULL,
worker_id = NULL
WHERE status = 'processing'
AND reserved_at < NOW() - make_interval(mins => :stale_minutes)
AND attempts < max_attempts";
$stmt = $this->db->prepare($sql);
$stmt->execute(['stale_minutes' => $staleMinutes]);
return $stmt->rowCount();
}
}
// ⚠️ WICHTIG: Stale-Job-Recovery per Cron:
// */5 * * * * php bin/console queue:recover-stale
Redis Queue (schneller)
⚠️ Wichtig: LPUSH/BRPOP ist fire-and-forget: Wenn der Worker nach BRPOP crashed, ist der Job weg. Für Produktion besser:
- BRPOPLPUSH-Pattern: Job in "processing"-Liste verschieben, nach Erfolg via LREM entfernen
- Redis Streams (ab Redis 5.0): Consumer Groups mit Acknowledgment
- Library nutzen: php-enqueue/redis, symfony/messenger mit Redis-Transport
Das folgende Beispiel ist nur für unkritische Jobs (Thumbnails, Logs) oder als Einstieg:
<?php
final class RedisQueue
{
public function __construct(
private \Redis $redis,
private string $prefix = 'queue:',
) {}
public function push(string $queue, array $payload): string
{
$job = [
'id' => uniqid('job_', true),
'payload' => $payload,
'created_at' => time(),
'attempts' => 0,
];
$this->redis->lPush(
$this->prefix . $queue,
json_encode($job)
);
return $job['id'];
}
public function pushDelayed(string $queue, array $payload, int $delaySeconds): string
{
$job = [
'id' => uniqid('job_', true),
'payload' => $payload,
'queue' => $queue,
'created_at' => time(),
'attempts' => 0,
];
// Sorted Set mit Score = Ausführungszeit
$this->redis->zAdd(
$this->prefix . 'delayed',
time() + $delaySeconds,
json_encode($job)
);
return $job['id'];
}
public function pop(string $queue, int $timeout = 30): ?array
{
// BRPOP blockiert bis Job verfügbar (effizient!)
$result = $this->redis->brPop(
[$this->prefix . $queue],
$timeout
);
if (!$result) {
return null;
}
return json_decode($result[1], true);
}
public function migrateDelayed(): int
{
// Scheduled Jobs in Queue verschieben (per Cron alle 30s aufrufen)
$now = time();
$jobs = $this->redis->zRangeByScore(
$this->prefix . 'delayed',
'-inf',
(string) $now
);
$count = 0;
foreach ($jobs as $jobJson) {
$job = json_decode($jobJson, true);
// In Queue verschieben
$this->redis->lPush($this->prefix . $job['queue'], $jobJson);
$this->redis->zRem($this->prefix . 'delayed', $jobJson);
$count++;
}
return $count;
}
}
Worker-Prozesse
Worker sind long-running PHP-Prozesse, die Jobs aus der Queue abarbeiten. Das ist ungewöhnlich für PHP, aber funktioniert mit den richtigen Patterns.
Einfacher Worker
<?php
declare(strict_types=1);
final class QueueWorker
{
private bool $shouldStop = false;
private int $processedJobs = 0;
private int $startTime;
public function __construct(
private DatabaseQueue $queue,
private JobDispatcher $dispatcher,
private LoggerInterface $logger,
private int $maxJobs = 1000, // Restart nach N Jobs
private int $maxMemoryMb = 128, // Restart bei Memory-Limit
private int $maxRuntimeSeconds = 3600, // Restart nach 1h
) {
$this->startTime = time();
}
public function run(string $queueName): void
{
$this->registerSignalHandlers();
$this->logger->info("Worker started", [
'queue' => $queueName,
'pid' => getmypid(),
]);
while (!$this->shouldStop()) {
$job = $this->queue->pop($queueName, timeout: 30);
if ($job === null) {
// Keine Jobs, kurz warten
continue;
}
$this->processJob($job);
}
$this->logger->info("Worker stopping gracefully", [
'processed_jobs' => $this->processedJobs,
'reason' => $this->getStopReason(),
]);
}
private function processJob(array $job): void
{
$jobId = $job['id'];
try {
$this->logger->debug("Processing job", ['job_id' => $jobId]);
$this->dispatcher->dispatch($job['payload']);
$this->queue->complete($jobId);
$this->processedJobs++;
$this->logger->debug("Job completed", ['job_id' => $jobId]);
} catch (\Throwable $e) {
$this->logger->error("Job failed", [
'job_id' => $jobId,
'error' => $e->getMessage(),
'attempt' => $job['attempts'],
]);
$this->queue->fail($jobId, $e->getMessage());
}
}
private function shouldStop(): bool
{
if ($this->shouldStop) {
return true;
}
// Memory-Limit prüfen
$memoryMb = memory_get_usage(true) / 1024 / 1024;
if ($memoryMb >= $this->maxMemoryMb) {
$this->logger->warning("Memory limit reached", ['memory_mb' => $memoryMb]);
return true;
}
// Max Jobs erreicht
if ($this->processedJobs >= $this->maxJobs) {
return true;
}
// Max Runtime erreicht
if ((time() - $this->startTime) >= $this->maxRuntimeSeconds) {
return true;
}
return false;
}
private function registerSignalHandlers(): void
{
// ⚠️ Benötigt pcntl-Extension! Nicht verfügbar auf:
// - Shared Hosting, Alpine-Images ohne pcntl, Windows
// Fallback: Worker per Supervisord stoppen (stopwaitsecs)
if (!extension_loaded('pcntl')) {
return;
}
// Graceful Shutdown bei SIGTERM/SIGINT
pcntl_async_signals(true);
pcntl_signal(SIGTERM, function () {
$this->logger->info("Received SIGTERM, finishing current job...");
$this->shouldStop = true;
});
pcntl_signal(SIGINT, function () {
$this->logger->info("Received SIGINT, finishing current job...");
$this->shouldStop = true;
});
}
private function getStopReason(): string
{
if ($this->shouldStop) {
return 'signal';
}
if (memory_get_usage(true) / 1024 / 1024 >= $this->maxMemoryMb) {
return 'memory';
}
if ($this->processedJobs >= $this->maxJobs) {
return 'max_jobs';
}
return 'max_runtime';
}
}
Job-Dispatcher mit Handlers
<?php
interface JobHandler
{
public function handle(array $payload): void;
}
final class JobDispatcher
{
/** @var array */
private array $handlers = [];
public function register(string $jobType, JobHandler $handler): void
{
$this->handlers[$jobType] = $handler;
}
public function dispatch(array $payload): void
{
$jobType = $payload['type'] ?? null;
if (!$jobType || !isset($this->handlers[$jobType])) {
throw new \RuntimeException("Unknown job type: {$jobType}");
}
$this->handlers[$jobType]->handle($payload);
}
}
// Beispiel-Handler
final class SendEmailHandler implements JobHandler
{
public function __construct(
private MailerInterface $mailer,
) {}
public function handle(array $payload): void
{
$email = (new Email())
->to($payload['to'])
->subject($payload['subject'])
->html($payload['body']);
$this->mailer->send($email);
}
}
final class GeneratePdfHandler implements JobHandler
{
public function __construct(
private PdfGenerator $pdf,
private FileStorage $storage,
) {}
public function handle(array $payload): void
{
$content = $this->pdf->generate($payload['template'], $payload['data']);
$this->storage->put($payload['output_path'], $content);
// Optional: Webhook/Notification wenn fertig
if (isset($payload['callback_url'])) {
// HTTP-Request an Callback...
}
}
}
Supervisord: Worker am Leben halten
Supervisord startet Worker-Prozesse neu, wenn sie crashen oder sich beenden. Essentiell für Produktion.
Installation
apt install supervisor
systemctl enable supervisor
systemctl start supervisor
Worker-Konfiguration
; /etc/supervisor/conf.d/app-worker.conf
[program:app-worker]
process_name=%(program_name)s_%(process_num)02d
command=/usr/bin/php /var/www/app/bin/console queue:work default
directory=/var/www/app
user=www-data
numprocs=4 ; 4 Worker parallel
autostart=true
autorestart=true
startsecs=5
startretries=3
stopwaitsecs=30 ; Zeit für graceful shutdown
redirect_stderr=true
stdout_logfile=/var/log/app/worker.log
stdout_logfile_maxbytes=10MB
stdout_logfile_backups=5
; Environment
environment=APP_ENV="production"
[program:app-worker-priority]
; Separate Queue für wichtige Jobs
command=/usr/bin/php /var/www/app/bin/console queue:work priority
numprocs=2
priority=10 ; Startet vor normalen Workern
; ... Rest wie oben
Supervisord-Befehle
# Status aller Prozesse
supervisorctl status
# Worker neustarten (nach Deployment!)
supervisorctl restart app-worker:*
# Einzelnen Worker stoppen
supervisorctl stop app-worker:app-worker_00
# Konfiguration neu laden
supervisorctl reread
supervisorctl update
# Alle stoppen
supervisorctl stop all
Deployment-Integration
#!/bin/bash
# deploy.sh
set -e
echo "Deploying..."
# 1. Code aktualisieren
git pull origin main
# 2. Dependencies
composer install --no-dev --optimize-autoloader
# 3. Migrations
php bin/console migrate --force
# 4. Cache leeren
php bin/console cache:clear
# 5. Worker graceful restart (warten auf laufende Jobs)
supervisorctl restart app-worker:*
echo "Done!"
Wichtige Patterns
1. Idempotenz: Jobs mehrfach ausführbar
// ❌ SCHLECHT: Nicht idempotent
final class ChargeCustomerHandler implements JobHandler
{
public function handle(array $payload): void
{
// Lädt erneut → doppelte Belastung möglich!
$this->paymentGateway->charge(
$payload['customer_id'],
$payload['amount']
);
}
}
// ✅ GUT: Idempotent mit Idempotency-Key
final class ChargeCustomerHandler implements JobHandler
{
public function handle(array $payload): void
{
$idempotencyKey = $payload['idempotency_key'];
// Prüfen ob schon ausgeführt
if ($this->paymentLog->exists($idempotencyKey)) {
return; // Already processed
}
$this->paymentGateway->charge(
$payload['customer_id'],
$payload['amount'],
$idempotencyKey
);
$this->paymentLog->record($idempotencyKey);
}
}
2. Retry mit Exponential Backoff
<?php
final class RetryableJob
{
public static function calculateDelay(int $attempt, int $baseDelay = 30): int
{
// Exponential: 30s, 60s, 120s, 240s, 480s...
// Mit Jitter um Thundering Herd zu vermeiden
$delay = $baseDelay * (2 ** ($attempt - 1));
$jitter = random_int(0, (int) ($delay * 0.1));
return min($delay + $jitter, 3600); // Max 1h
}
}
// Im Queue-Backend
public function fail(int $jobId, string $error): void
{
$job = $this->getJob($jobId);
if ($job['attempts'] < $job['max_attempts']) {
$delay = RetryableJob::calculateDelay($job['attempts']);
$this->release($jobId, $delay);
} else {
$this->markFailed($jobId, $error);
}
}
3. Dead Letter Queue (DLQ)
<?php
final class DeadLetterHandler
{
public function __construct(
private PDO $db,
private AlertService $alerts,
) {}
public function moveToDeadLetter(int $jobId, string $error): void
{
$this->db->beginTransaction();
try {
// Job in DLQ kopieren
$this->db->exec("
INSERT INTO dead_letter_queue
(original_id, queue, payload, error, attempts, failed_at)
SELECT id, queue, payload, :error, attempts, NOW()
FROM job_queue WHERE id = :id
");
// Aus normaler Queue entfernen
$this->db->exec("DELETE FROM job_queue WHERE id = :id");
$this->db->commit();
// Alert bei zu vielen Failed Jobs
$this->checkAndAlert();
} catch (\Throwable $e) {
$this->db->rollBack();
throw $e;
}
}
private function checkAndAlert(): void
{
$count = $this->db->query("
SELECT COUNT(*) FROM dead_letter_queue
WHERE failed_at > NOW() - INTERVAL '1 hour'
")->fetchColumn();
if ($count > 10) {
$this->alerts->critical("High DLQ rate: {$count} failed jobs in last hour");
}
}
public function retry(int $dlqId): void
{
// Job aus DLQ zurück in Queue
$this->db->exec("
INSERT INTO job_queue (queue, payload, attempts, max_attempts)
SELECT queue, payload, 0, 3
FROM dead_letter_queue WHERE id = :id
");
$this->db->exec("DELETE FROM dead_letter_queue WHERE id = :id");
}
}
4. Job-Batching mit Progress
<?php
// Für große Imports: Batch erstellen, Progress tracken
final class BatchImportHandler implements JobHandler
{
public function handle(array $payload): void
{
$batchId = $payload['batch_id'];
$chunk = $payload['chunk'];
$totalChunks = $payload['total_chunks'];
// Chunk verarbeiten
foreach ($chunk as $row) {
$this->processRow($row);
}
// Progress updaten
$this->updateBatchProgress($batchId, $totalChunks);
}
private function updateBatchProgress(string $batchId, int $totalChunks): void
{
$sql = "UPDATE import_batches SET
completed_chunks = completed_chunks + 1,
status = CASE
WHEN completed_chunks + 1 >= :total THEN 'completed'
ELSE 'processing'
END,
completed_at = CASE
WHEN completed_chunks + 1 >= :total THEN NOW()
ELSE NULL
END
WHERE id = :batch_id";
$this->db->prepare($sql)->execute([
'batch_id' => $batchId,
'total' => $totalChunks,
]);
}
}
// Batch erstellen
final class BatchCreator
{
public function createImportBatch(string $filePath, int $chunkSize = 100): string
{
$batchId = Uuid::v4();
$rows = $this->parseFile($filePath);
$chunks = array_chunk($rows, $chunkSize);
// Batch-Record
$this->db->prepare("
INSERT INTO import_batches (id, total_chunks, status)
VALUES (:id, :total, 'pending')
")->execute([
'id' => $batchId,
'total' => count($chunks),
]);
// Jobs für jeden Chunk
foreach ($chunks as $index => $chunk) {
$this->queue->push('imports', [
'type' => 'batch_import',
'batch_id' => $batchId,
'chunk' => $chunk,
'chunk_index' => $index,
'total_chunks' => count($chunks),
]);
}
return $batchId;
}
}
Memory-Management
PHP ist nicht für Long-Running Prozesse designed. Memory Leaks sind das Hauptproblem.
Goldene Regel: Restart nach X Jobs
// Worker-Defaults für stabile Long-Running Prozesse:
$maxJobs = 1000; // Nach 1000 Jobs: Restart
$maxMemoryMb = 128; // Bei 128MB: Restart
$maxRuntimeSeconds = 3600; // Nach 1h: Restart
// Nach JEDEM Job (wenn DI-Container/ORM):
$entityManager->clear(); // Doctrine IdentityMap leeren
gc_collect_cycles(); // Optional: GC forcieren
Typische Memory-Leak-Quellen
// ❌ Doctrine EntityManager cached Entities
foreach ($jobs as $job) {
$entity = $em->find(Invoice::class, $job['invoice_id']);
// Entity bleibt im IdentityMap → Memory wächst!
}
// ✅ EntityManager regelmäßig clearen
foreach ($jobs as $i => $job) {
$entity = $em->find(Invoice::class, $job['invoice_id']);
$this->process($entity);
if ($i % 100 === 0) {
$em->clear(); // IdentityMap leeren
gc_collect_cycles(); // Garbage Collection forcieren
}
}
// ❌ Monolog Handler sammeln
// (Standard-Handler cacht alle Log-Einträge im Memory)
// ✅ In Worker: Nur StreamHandler ohne Buffer
$logger = new Logger('worker');
$logger->pushHandler(new StreamHandler('php://stdout'));
// Kein BufferHandler, kein FingersCrossedHandler!
Memory-Monitoring im Worker
<?php
final class MemoryAwareWorker
{
private int $initialMemory;
public function __construct(
private int $maxMemoryMb = 128,
private int $leakThresholdMb = 10,
) {
$this->initialMemory = memory_get_usage(true);
}
public function shouldRestart(): bool
{
$currentMemory = memory_get_usage(true);
$usedMb = $currentMemory / 1024 / 1024;
// Hard limit
if ($usedMb >= $this->maxMemoryMb) {
return true;
}
// Leak detection: zu viel Wachstum seit Start
$growth = ($currentMemory - $this->initialMemory) / 1024 / 1024;
if ($growth > $this->leakThresholdMb) {
error_log("Memory leak detected: +{$growth}MB since start");
return true;
}
return false;
}
public function logMemoryUsage(): void
{
$current = memory_get_usage(true) / 1024 / 1024;
$peak = memory_get_peak_usage(true) / 1024 / 1024;
error_log(sprintf(
"Memory: %.1fMB (peak: %.1fMB)",
$current,
$peak
));
}
}
Monitoring & Alerting
<?php
final class QueueMetrics
{
public function __construct(
private PDO $db,
private MetricsClient $metrics, // Prometheus, StatsD, etc.
) {}
public function collect(): void
{
// Queue-Tiefe
$depths = $this->db->query("
SELECT queue, status, COUNT(*) as count
FROM job_queue
GROUP BY queue, status
")->fetchAll();
foreach ($depths as $row) {
$this->metrics->gauge(
'queue_depth',
$row['count'],
['queue' => $row['queue'], 'status' => $row['status']]
);
}
// Ältester Job (Latency)
$oldest = $this->db->query("
SELECT queue,
EXTRACT(EPOCH FROM (NOW() - MIN(created_at))) as age_seconds
FROM job_queue
WHERE status = 'pending'
GROUP BY queue
")->fetchAll();
foreach ($oldest as $row) {
$this->metrics->gauge(
'queue_oldest_job_seconds',
$row['age_seconds'],
['queue' => $row['queue']]
);
}
// Failed Jobs letzte Stunde
$failed = $this->db->query("
SELECT COUNT(*) FROM job_queue
WHERE status = 'failed'
AND completed_at > NOW() - INTERVAL '1 hour'
")->fetchColumn();
// Gauge für Dashboard-Anzeige
$this->metrics->gauge('queue_failed_jobs_hourly', $failed);
// Counter für rate()-Berechnung in Prometheus
// (Counter wird NIE zurückgesetzt, nur inkrementiert)
static $lastFailedCount = 0;
if ($failed > $lastFailedCount) {
$this->metrics->counter('queue_jobs_failed_total', $failed - $lastFailedCount);
}
$lastFailedCount = $failed;
}
}
// Prometheus-Endpunkt
// GET /metrics
$metrics = new QueueMetrics($db, $prometheusClient);
$metrics->collect();
echo $prometheusClient->render();
Alerting-Regeln (Prometheus)
# prometheus/alerts.yml
groups:
- name: queue_alerts
rules:
- alert: QueueBacklogHigh
expr: queue_depth{status="pending"} > 1000
for: 5m
labels:
severity: warning
annotations:
summary: "Queue backlog too high"
description: "Queue {{ $labels.queue }} has {{ $value }} pending jobs"
- alert: QueueJobStuck
expr: queue_oldest_job_seconds > 3600
for: 5m
labels:
severity: critical
annotations:
summary: "Jobs stuck in queue"
description: "Oldest job in {{ $labels.queue }} is {{ $value }}s old"
- alert: QueueHighFailureRate
# rate() nur mit Counter, nicht mit Gauge!
expr: rate(queue_jobs_failed_total[5m]) * 60 > 10
for: 5m
labels:
severity: critical
annotations:
summary: "High job failure rate (>10/min)"
Jobs testen
<?php
final class SendEmailHandlerTest extends TestCase
{
public function testSendsEmail(): void
{
$mailer = $this->createMock(MailerInterface::class);
$mailer->expects($this->once())
->method('send')
->with($this->callback(function (Email $email) {
return $email->getTo()[0]->getAddress() === 'user@example.com'
&& $email->getSubject() === 'Welcome!';
}));
$handler = new SendEmailHandler($mailer);
$handler->handle([
'type' => 'send_email',
'to' => 'user@example.com',
'subject' => 'Welcome!',
'body' => '<p>Hello!</p>',
]);
}
public function testRetryOnTemporaryFailure(): void
{
$mailer = $this->createMock(MailerInterface::class);
$mailer->method('send')
->willThrowException(new TransportException('Connection timeout'));
$handler = new SendEmailHandler($mailer);
$this->expectException(TransportException::class);
// Handler wirft Exception → Queue wird retry machen
$handler->handle([
'type' => 'send_email',
'to' => 'user@example.com',
'subject' => 'Test',
'body' => 'Test',
]);
}
}
// Integration-Test für Queue
final class DatabaseQueueTest extends TestCase
{
public function testPushAndPop(): void
{
$queue = new DatabaseQueue($this->db, 'test-worker');
$jobId = $queue->push('emails', ['to' => 'test@example.com']);
$this->assertGreaterThan(0, $jobId);
$job = $queue->pop('emails');
$this->assertNotNull($job);
$this->assertEquals($jobId, $job['id']);
$this->assertEquals('test@example.com', $job['payload']['to']);
$this->assertEquals('processing', $job['status']);
}
public function testConcurrentWorkersWithSkipLocked(): void
{
$queue = new DatabaseQueue($this->db, 'worker-1');
// 3 Jobs erstellen
$queue->push('test', ['n' => 1]);
$queue->push('test', ['n' => 2]);
$queue->push('test', ['n' => 3]);
// Simuliere 2 Worker gleichzeitig
$job1 = $queue->pop('test');
$this->db->exec("SET lock_timeout = '1s'"); // Timeout für Test
$queue2 = new DatabaseQueue($this->db, 'worker-2');
$job2 = $queue2->pop('test'); // Sollte SKIP LOCKED nutzen
// Beide bekommen unterschiedliche Jobs
$this->assertNotEquals($job1['id'], $job2['id']);
}
}
Multi-Tenant & Audit-Kontext
In Multi-Tenant-Apps oder bei Audit-Trails: Kontext muss in den Job-Payload! Sonst: Cross-Tenant-Chaos oder fehlende Audit-Logs.
// ✅ Job mit vollständigem Kontext erstellen
$queue->push('emails', [
'type' => 'send_email',
'to' => $user->email,
'subject' => 'Welcome!',
// Kontext für Worker
'tenant_id' => $currentTenant->id, // Multi-Tenant Isolation
'actor_id' => $currentUser->id, // Audit: wer hat ausgelöst?
'request_id' => $request->getId(), // Tracing/Correlation
]);
// ✅ Im Worker: Kontext wiederherstellen
final class TenantAwareJobHandler implements JobHandler
{
public function handle(array $payload): void
{
// Tenant-Kontext setzen (für RLS, Logging, etc.)
$this->tenantContext->set($payload['tenant_id'] ?? null);
$this->auditContext->setActor($payload['actor_id'] ?? null);
$this->logger->withContext(['request_id' => $payload['request_id'] ?? null]);
try {
$this->innerHandler->handle($payload);
} finally {
$this->tenantContext->clear();
}
}
}
Checkliste
┌─────────────────────────────────────────────────────────────────┐
│ BACKGROUND-JOBS CHECKLISTE │
├─────────────────────────────────────────────────────────────────┤
│ │
│ □ Jobs sind idempotent (Idempotency-Key bei kritischen Ops) │
│ □ Retries mit Exponential Backoff implementiert │
│ □ Dead Letter Queue für fehlgeschlagene Jobs │
│ □ Stale-Job-Recovery (gecrasht Worker) per Cron │
│ □ Supervisord (oder systemd) für Worker-Prozesse │
│ □ Graceful Shutdown bei SIGTERM (pcntl oder stopwaitsecs) │
│ □ Memory-Limits im Worker (Auto-Restart bei Threshold) │
│ □ Nach jedem Job: EntityManager->clear(), gc_collect_cycles() │
│ □ Monitoring: Queue-Tiefe, Failure-Rate, Latency │
│ □ Alerting bei Queue-Backlog oder hoher Failure-Rate │
│ □ Deployment: Worker nach Code-Update neustarten │
│ □ Multi-Tenant: tenant_id, actor_id im Job-Payload │
│ │
└─────────────────────────────────────────────────────────────────┘
Fazit
Background-Jobs machen PHP-Apps skalierbar und responsive:
- Cron: Einfach für zeitgesteuerte Tasks, mit Locking gegen Überlappung
- Database Queue: Solide mit
FOR UPDATE SKIP LOCKED, kein extra Service nötig - Redis Queue: Schneller, gut für hohen Durchsatz
- Supervisord: Hält Worker am Leben, essentiell für Produktion
- Memory-Management: Worker regelmäßig recyclen, Leaks monitoren
- Idempotenz: Jobs müssen mehrfach ausführbar sein ohne Seiteneffekte
Mein Stack: PostgreSQL Queue für einfache Apps (schon da, ACID, zuverlässig), Redis wenn Performance kritisch wird. Supervisord immer, Memory-Limit bei 128MB, Restart nach 1000 Jobs.
Weiterführende Ressourcen
Über Carola Schulte
Software-Architektin mit 25+ Jahren Erfahrung. Spezialisiert auf robuste Business-Apps mit PHP/PostgreSQL, Security-by-Design und DSGVO-konforme Systeme. 1,8M+ Lines of Code in Produktion.
Hintergrundverarbeitung für Ihre App?
Lassen Sie uns besprechen, wie Queues und Worker Ihre Applikation schneller und skalierbarer machen – kostenlos und unverbindlich.
Kostenloses Erstgespräch