Background-Jobs in PHP
Technologie & Architektur

Background-Jobs in PHP: Queues, Cron & lange Prozesse

Carola Schulte
10. November 2025
28 min

"Die PDF-Generierung blockiert den Request 8 Sekunden." "Der Newsletter-Versand bringt den Server in die Knie." "Warum dauert der CSV-Import so lange?" – Klassische Symptome für fehlende Background-Jobs. PHP ist für Request-Response optimiert, aber mit den richtigen Patterns läuft auch asynchrone Verarbeitung stabil.

TL;DR – Die Kurzfassung

  • Cron: Für zeitgesteuerte Tasks (Reports, Cleanup, Sync)
  • Message Queues: Für asynchrone Verarbeitung (E-Mails, PDFs, Webhooks)
  • Worker-Prozesse: Long-running PHP-Prozesse mit Supervisord
  • Goldene Regel: Jobs müssen idempotent sein (mehrfach ausführbar ohne Seiteneffekte)
  • Memory-Management: PHP-Worker regelmäßig recyclen (Memory Leaks!)

Warum Background-Jobs?

HTTP-Requests sollten schnell sein. Alles über 200ms fühlt sich langsam an. Aber manche Operationen dauern länger:

┌─────────────────────────────────────────────────────────────────┐
│              SYNCHRON vs. ASYNCHRON                             │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  SYNCHRON (schlecht für UX)                                     │
│  ┌────────┐    ┌─────────────────────────┐    ┌────────┐       │
│  │ User   │───▶│ PDF generieren (5s)     │───▶│ Response│       │
│  └────────┘    │ E-Mail senden (2s)      │    └────────┘       │
│                │ Webhook triggern (1s)   │                      │
│                └─────────────────────────┘                      │
│                        ↓ 8 Sekunden warten!                     │
│                                                                  │
│  ASYNCHRON (gute UX)                                            │
│  ┌────────┐    ┌─────────────┐    ┌────────┐                   │
│  │ User   │───▶│ Job queuen  │───▶│ Response│  ← 50ms          │
│  └────────┘    └──────┬──────┘    └────────┘                   │
│                       │                                         │
│                       ▼                                         │
│               ┌───────────────┐                                 │
│               │ Worker        │  ← Verarbeitet im Hintergrund   │
│               │ PDF, E-Mail.. │                                 │
│               └───────────────┘                                 │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Typische Use Cases

Use CaseAnsatzWarum?
E-Mail-VersandQueueSMTP kann langsam sein, Retry bei Fehler
PDF-GenerierungQueueCPU-intensiv, blockiert Worker
Bild-ResizeQueueI/O-intensiv, parallelisierbar
Webhook-CallbacksQueueExterne APIs können timeout
CSV/Excel-ImportQueue + ChunksGroße Dateien, Progress-Tracking
Nightly ReportsCronZeitgesteuert, nicht Event-getrieben
Datenbank-CleanupCronRegelmäßig, off-peak
API-Sync (extern)Cron + QueueRegelmäßig triggern, einzelne Syncs queuen

Cron-Jobs: Der Klassiker

Cron ist seit Jahrzehnten das Arbeitstier für zeitgesteuerte Tasks. Einfach, zuverlässig, überall verfügbar.

Crontab-Syntax

# ┌───────────── Minute (0-59)
# │ ┌───────────── Stunde (0-23)
# │ │ ┌───────────── Tag des Monats (1-31)
# │ │ │ ┌───────────── Monat (1-12)
# │ │ │ │ ┌───────────── Wochentag (0-7, 0 und 7 = Sonntag)
# │ │ │ │ │
# * * * * * command

# Beispiele:
0 2 * * *     /usr/bin/php /var/www/app/bin/console reports:daily
0 3 * * 0     /usr/bin/php /var/www/app/bin/console cleanup:old-sessions
*/15 * * * *  /usr/bin/php /var/www/app/bin/console sync:external-api

# ⚠️ Fallback für Mini-Setups OHNE echten Worker:
# */5 * * * * /usr/bin/php /var/www/app/bin/console queue:work --once
# Das ist "Cron-Polling" – NICHT empfohlen für Produktion!
# Besser: Long-running Worker + Supervisord (siehe unten)

Cron-Job Wrapper mit Locking

Problem: Was wenn ein Job länger läuft als das Intervall? Dann laufen mehrere Instanzen parallel.

<?php

declare(strict_types=1);

final class CronJobRunner
{
    public function __construct(
        private string $lockDir = '/var/run/cron',
    ) {
        if (!is_dir($this->lockDir)) {
            mkdir($this->lockDir, 0755, true);
        }
    }

    public function run(string $jobName, callable $job): int
    {
        $lockFile = "{$this->lockDir}/{$jobName}.lock";

        // Exklusives Lock holen (non-blocking)
        $fp = fopen($lockFile, 'c');
        if (!flock($fp, LOCK_EX | LOCK_NB)) {
            echo "[{$jobName}] Already running, skipping.\n";
            fclose($fp);
            return 0;
        }

        // PID schreiben für Debugging
        ftruncate($fp, 0);
        fwrite($fp, (string) getmypid());
        fflush($fp);

        try {
            $startTime = microtime(true);
            echo "[{$jobName}] Starting at " . date('Y-m-d H:i:s') . "\n";

            $result = $job();

            $duration = round(microtime(true) - $startTime, 2);
            echo "[{$jobName}] Completed in {$duration}s\n";

            return $result ?? 0;
        } catch (\Throwable $e) {
            echo "[{$jobName}] FAILED: {$e->getMessage()}\n";
            // Error-Logging, Alerting, etc.
            return 1;
        } finally {
            flock($fp, LOCK_UN);
            fclose($fp);
        }
    }
}

// Nutzung in bin/cron/daily-report.php
$runner = new CronJobRunner();
exit($runner->run('daily-report', function () use ($container) {
    $reportGenerator = $container->get(DailyReportGenerator::class);
    $reportGenerator->generate(new DateTimeImmutable('yesterday'));
    return 0;
}));

Cron-Monitoring

<?php

// Heartbeat an Monitoring-Service senden
final class CronMonitor
{
    public function __construct(
        private HttpClientInterface $http,
        private string $healthchecksUrl,  // z.B. healthchecks.io
    ) {}

    public function wrap(string $jobId, callable $job): mixed
    {
        // Start-Signal
        $this->ping($jobId, '/start');

        try {
            $result = $job();
            // Success-Signal
            $this->ping($jobId);
            return $result;
        } catch (\Throwable $e) {
            // Fail-Signal
            $this->ping($jobId, '/fail');
            throw $e;
        }
    }

    private function ping(string $jobId, string $suffix = ''): void
    {
        try {
            $this->http->request('GET', "{$this->healthchecksUrl}/{$jobId}{$suffix}", [
                'timeout' => 5,
            ]);
        } catch (\Throwable) {
            // Monitoring-Fehler nicht propagieren
        }
    }
}

Message Queues: Asynchrone Verarbeitung

Für Event-getriebene Jobs sind Message Queues der richtige Ansatz. Ein Producer erstellt Jobs, ein Worker verarbeitet sie.

Queue-Backend Vergleich

BackendProContraUse Case
RedisSchnell, einfach, LPUSH/BRPOPKein Ack, Persistence optionalEinfache Jobs, hoher Durchsatz
RabbitMQRobust, Routing, Dead LetterKomplexer, eigener ServiceEnterprise, komplexe Workflows
PostgreSQLSchon da, ACID, SKIP LOCKEDPolling statt Push, Last auf DBKleine Apps, bereits PostgreSQL
BeanstalkdSimpel, Delays, PrioritiesWeniger verbreitetEinfache Queue mit Features
Amazon SQSManaged, skaliert, zuverlässigAWS-Lock-in, LatenzCloud-native Apps

Database Queue mit PostgreSQL

Kein Redis? PostgreSQL mit FOR UPDATE SKIP LOCKED ist ein solides Queue-Backend.

CREATE TABLE job_queue (
    id BIGSERIAL PRIMARY KEY,
    queue VARCHAR(100) NOT NULL DEFAULT 'default',
    payload JSONB NOT NULL,

    -- Status-Tracking
    status VARCHAR(20) NOT NULL DEFAULT 'pending'
        CHECK (status IN ('pending', 'processing', 'completed', 'failed')),
    attempts INT NOT NULL DEFAULT 0,
    max_attempts INT NOT NULL DEFAULT 3,

    -- Timing
    available_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    reserved_at TIMESTAMPTZ,
    completed_at TIMESTAMPTZ,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),

    -- Error-Tracking
    last_error TEXT,

    -- Worker-Info
    worker_id VARCHAR(100)
);

CREATE INDEX idx_queue_pending ON job_queue(queue, available_at)
    WHERE status = 'pending';
CREATE INDEX idx_queue_failed ON job_queue(queue)
    WHERE status = 'failed';
<?php

declare(strict_types=1);

final class DatabaseQueue
{
    public function __construct(
        private PDO $db,
        private string $workerId,
    ) {}

    public function push(string $queue, array $payload, int $delaySeconds = 0): int
    {
        // make_interval() ist stabiler als `:delay * INTERVAL '1 second'`
        // (PDO-Driver-Kompatibilität)
        $sql = "INSERT INTO job_queue (queue, payload, available_at)
                VALUES (:queue, :payload, NOW() + make_interval(secs => :delay))
                RETURNING id";

        $stmt = $this->db->prepare($sql);
        $stmt->execute([
            'queue' => $queue,
            'payload' => json_encode($payload),
            'delay' => $delaySeconds,
        ]);

        return (int) $stmt->fetchColumn();
    }

    public function pop(string $queue): ?array
    {
        // Atomares Claim mit SKIP LOCKED (keine Blockierung anderer Worker!)
        $sql = "UPDATE job_queue SET
                    status = 'processing',
                    reserved_at = NOW(),
                    attempts = attempts + 1,
                    worker_id = :worker_id
                WHERE id = (
                    SELECT id FROM job_queue
                    WHERE queue = :queue
                      AND status = 'pending'
                      AND available_at <= NOW()
                      AND attempts < max_attempts  -- Wichtig: keine Retry-Loops!
                    ORDER BY available_at ASC
                    FOR UPDATE SKIP LOCKED
                    LIMIT 1
                )
                RETURNING *";

        $stmt = $this->db->prepare($sql);
        $stmt->execute([
            'queue' => $queue,
            'worker_id' => $this->workerId,
        ]);

        $job = $stmt->fetch(PDO::FETCH_ASSOC);

        if (!$job) {
            return null;
        }

        $job['payload'] = json_decode($job['payload'], true);
        return $job;
    }

    public function complete(int $jobId): void
    {
        $sql = "UPDATE job_queue SET
                    status = 'completed',
                    completed_at = NOW()
                WHERE id = :id";

        $this->db->prepare($sql)->execute(['id' => $jobId]);
    }

    public function fail(int $jobId, string $error): void
    {
        // Retry oder endgültig failed?
        $sql = "UPDATE job_queue SET
                    status = CASE
                        WHEN attempts < max_attempts THEN 'pending'
                        ELSE 'failed'
                    END,
                    reserved_at = NULL,
                    worker_id = NULL,
                    available_at = CASE
                        WHEN attempts < max_attempts
                        THEN NOW() + make_interval(secs => attempts * 30)
                        ELSE available_at
                    END,
                    last_error = :error
                WHERE id = :id";

        $this->db->prepare($sql)->execute([
            'id' => $jobId,
            'error' => $error,
        ]);
    }

    public function release(int $jobId, int $delaySeconds = 0): void
    {
        // Job zurück in Queue (z.B. bei graceful shutdown)
        $sql = "UPDATE job_queue SET
                    status = 'pending',
                    reserved_at = NULL,
                    worker_id = NULL,
                    available_at = NOW() + make_interval(secs => :delay)
                WHERE id = :id";

        $this->db->prepare($sql)->execute([
            'id' => $jobId,
            'delay' => $delaySeconds,
        ]);
    }

    /**
     * Stale Jobs zurücksetzen (Worker crashed/gestorben).
     * Per Cron alle 5 Minuten aufrufen!
     */
    public function requeueStaleJobs(int $staleMinutes = 15): int
    {
        $sql = "UPDATE job_queue SET
                    status = 'pending',
                    reserved_at = NULL,
                    worker_id = NULL
                WHERE status = 'processing'
                  AND reserved_at < NOW() - make_interval(mins => :stale_minutes)
                  AND attempts < max_attempts";

        $stmt = $this->db->prepare($sql);
        $stmt->execute(['stale_minutes' => $staleMinutes]);

        return $stmt->rowCount();
    }
}

// ⚠️ WICHTIG: Stale-Job-Recovery per Cron:
// */5 * * * * php bin/console queue:recover-stale

Redis Queue (schneller)

⚠️ Wichtig: LPUSH/BRPOP ist fire-and-forget: Wenn der Worker nach BRPOP crashed, ist der Job weg. Für Produktion besser:

  • BRPOPLPUSH-Pattern: Job in "processing"-Liste verschieben, nach Erfolg via LREM entfernen
  • Redis Streams (ab Redis 5.0): Consumer Groups mit Acknowledgment
  • Library nutzen: php-enqueue/redis, symfony/messenger mit Redis-Transport

Das folgende Beispiel ist nur für unkritische Jobs (Thumbnails, Logs) oder als Einstieg:

<?php

final class RedisQueue
{
    public function __construct(
        private \Redis $redis,
        private string $prefix = 'queue:',
    ) {}

    public function push(string $queue, array $payload): string
    {
        $job = [
            'id' => uniqid('job_', true),
            'payload' => $payload,
            'created_at' => time(),
            'attempts' => 0,
        ];

        $this->redis->lPush(
            $this->prefix . $queue,
            json_encode($job)
        );

        return $job['id'];
    }

    public function pushDelayed(string $queue, array $payload, int $delaySeconds): string
    {
        $job = [
            'id' => uniqid('job_', true),
            'payload' => $payload,
            'queue' => $queue,
            'created_at' => time(),
            'attempts' => 0,
        ];

        // Sorted Set mit Score = Ausführungszeit
        $this->redis->zAdd(
            $this->prefix . 'delayed',
            time() + $delaySeconds,
            json_encode($job)
        );

        return $job['id'];
    }

    public function pop(string $queue, int $timeout = 30): ?array
    {
        // BRPOP blockiert bis Job verfügbar (effizient!)
        $result = $this->redis->brPop(
            [$this->prefix . $queue],
            $timeout
        );

        if (!$result) {
            return null;
        }

        return json_decode($result[1], true);
    }

    public function migrateDelayed(): int
    {
        // Scheduled Jobs in Queue verschieben (per Cron alle 30s aufrufen)
        $now = time();
        $jobs = $this->redis->zRangeByScore(
            $this->prefix . 'delayed',
            '-inf',
            (string) $now
        );

        $count = 0;
        foreach ($jobs as $jobJson) {
            $job = json_decode($jobJson, true);

            // In Queue verschieben
            $this->redis->lPush($this->prefix . $job['queue'], $jobJson);
            $this->redis->zRem($this->prefix . 'delayed', $jobJson);
            $count++;
        }

        return $count;
    }
}

Worker-Prozesse

Worker sind long-running PHP-Prozesse, die Jobs aus der Queue abarbeiten. Das ist ungewöhnlich für PHP, aber funktioniert mit den richtigen Patterns.

Einfacher Worker

<?php

declare(strict_types=1);

final class QueueWorker
{
    private bool $shouldStop = false;
    private int $processedJobs = 0;
    private int $startTime;

    public function __construct(
        private DatabaseQueue $queue,
        private JobDispatcher $dispatcher,
        private LoggerInterface $logger,
        private int $maxJobs = 1000,        // Restart nach N Jobs
        private int $maxMemoryMb = 128,     // Restart bei Memory-Limit
        private int $maxRuntimeSeconds = 3600,  // Restart nach 1h
    ) {
        $this->startTime = time();
    }

    public function run(string $queueName): void
    {
        $this->registerSignalHandlers();

        $this->logger->info("Worker started", [
            'queue' => $queueName,
            'pid' => getmypid(),
        ]);

        while (!$this->shouldStop()) {
            $job = $this->queue->pop($queueName, timeout: 30);

            if ($job === null) {
                // Keine Jobs, kurz warten
                continue;
            }

            $this->processJob($job);
        }

        $this->logger->info("Worker stopping gracefully", [
            'processed_jobs' => $this->processedJobs,
            'reason' => $this->getStopReason(),
        ]);
    }

    private function processJob(array $job): void
    {
        $jobId = $job['id'];

        try {
            $this->logger->debug("Processing job", ['job_id' => $jobId]);

            $this->dispatcher->dispatch($job['payload']);

            $this->queue->complete($jobId);
            $this->processedJobs++;

            $this->logger->debug("Job completed", ['job_id' => $jobId]);

        } catch (\Throwable $e) {
            $this->logger->error("Job failed", [
                'job_id' => $jobId,
                'error' => $e->getMessage(),
                'attempt' => $job['attempts'],
            ]);

            $this->queue->fail($jobId, $e->getMessage());
        }
    }

    private function shouldStop(): bool
    {
        if ($this->shouldStop) {
            return true;
        }

        // Memory-Limit prüfen
        $memoryMb = memory_get_usage(true) / 1024 / 1024;
        if ($memoryMb >= $this->maxMemoryMb) {
            $this->logger->warning("Memory limit reached", ['memory_mb' => $memoryMb]);
            return true;
        }

        // Max Jobs erreicht
        if ($this->processedJobs >= $this->maxJobs) {
            return true;
        }

        // Max Runtime erreicht
        if ((time() - $this->startTime) >= $this->maxRuntimeSeconds) {
            return true;
        }

        return false;
    }

    private function registerSignalHandlers(): void
    {
        // ⚠️ Benötigt pcntl-Extension! Nicht verfügbar auf:
        // - Shared Hosting, Alpine-Images ohne pcntl, Windows
        // Fallback: Worker per Supervisord stoppen (stopwaitsecs)
        if (!extension_loaded('pcntl')) {
            return;
        }

        // Graceful Shutdown bei SIGTERM/SIGINT
        pcntl_async_signals(true);

        pcntl_signal(SIGTERM, function () {
            $this->logger->info("Received SIGTERM, finishing current job...");
            $this->shouldStop = true;
        });

        pcntl_signal(SIGINT, function () {
            $this->logger->info("Received SIGINT, finishing current job...");
            $this->shouldStop = true;
        });
    }

    private function getStopReason(): string
    {
        if ($this->shouldStop) {
            return 'signal';
        }
        if (memory_get_usage(true) / 1024 / 1024 >= $this->maxMemoryMb) {
            return 'memory';
        }
        if ($this->processedJobs >= $this->maxJobs) {
            return 'max_jobs';
        }
        return 'max_runtime';
    }
}

Job-Dispatcher mit Handlers

<?php

interface JobHandler
{
    public function handle(array $payload): void;
}

final class JobDispatcher
{
    /** @var array */
    private array $handlers = [];

    public function register(string $jobType, JobHandler $handler): void
    {
        $this->handlers[$jobType] = $handler;
    }

    public function dispatch(array $payload): void
    {
        $jobType = $payload['type'] ?? null;

        if (!$jobType || !isset($this->handlers[$jobType])) {
            throw new \RuntimeException("Unknown job type: {$jobType}");
        }

        $this->handlers[$jobType]->handle($payload);
    }
}

// Beispiel-Handler
final class SendEmailHandler implements JobHandler
{
    public function __construct(
        private MailerInterface $mailer,
    ) {}

    public function handle(array $payload): void
    {
        $email = (new Email())
            ->to($payload['to'])
            ->subject($payload['subject'])
            ->html($payload['body']);

        $this->mailer->send($email);
    }
}

final class GeneratePdfHandler implements JobHandler
{
    public function __construct(
        private PdfGenerator $pdf,
        private FileStorage $storage,
    ) {}

    public function handle(array $payload): void
    {
        $content = $this->pdf->generate($payload['template'], $payload['data']);

        $this->storage->put($payload['output_path'], $content);

        // Optional: Webhook/Notification wenn fertig
        if (isset($payload['callback_url'])) {
            // HTTP-Request an Callback...
        }
    }
}

Supervisord: Worker am Leben halten

Supervisord startet Worker-Prozesse neu, wenn sie crashen oder sich beenden. Essentiell für Produktion.

Installation

apt install supervisor
systemctl enable supervisor
systemctl start supervisor

Worker-Konfiguration

; /etc/supervisor/conf.d/app-worker.conf

[program:app-worker]
process_name=%(program_name)s_%(process_num)02d
command=/usr/bin/php /var/www/app/bin/console queue:work default
directory=/var/www/app
user=www-data
numprocs=4                          ; 4 Worker parallel
autostart=true
autorestart=true
startsecs=5
startretries=3
stopwaitsecs=30                     ; Zeit für graceful shutdown
redirect_stderr=true
stdout_logfile=/var/log/app/worker.log
stdout_logfile_maxbytes=10MB
stdout_logfile_backups=5

; Environment
environment=APP_ENV="production"

[program:app-worker-priority]
; Separate Queue für wichtige Jobs
command=/usr/bin/php /var/www/app/bin/console queue:work priority
numprocs=2
priority=10                         ; Startet vor normalen Workern
; ... Rest wie oben

Supervisord-Befehle

# Status aller Prozesse
supervisorctl status

# Worker neustarten (nach Deployment!)
supervisorctl restart app-worker:*

# Einzelnen Worker stoppen
supervisorctl stop app-worker:app-worker_00

# Konfiguration neu laden
supervisorctl reread
supervisorctl update

# Alle stoppen
supervisorctl stop all

Deployment-Integration

#!/bin/bash
# deploy.sh

set -e

echo "Deploying..."

# 1. Code aktualisieren
git pull origin main

# 2. Dependencies
composer install --no-dev --optimize-autoloader

# 3. Migrations
php bin/console migrate --force

# 4. Cache leeren
php bin/console cache:clear

# 5. Worker graceful restart (warten auf laufende Jobs)
supervisorctl restart app-worker:*

echo "Done!"

Wichtige Patterns

1. Idempotenz: Jobs mehrfach ausführbar

// ❌ SCHLECHT: Nicht idempotent
final class ChargeCustomerHandler implements JobHandler
{
    public function handle(array $payload): void
    {
        // Lädt erneut → doppelte Belastung möglich!
        $this->paymentGateway->charge(
            $payload['customer_id'],
            $payload['amount']
        );
    }
}

// ✅ GUT: Idempotent mit Idempotency-Key
final class ChargeCustomerHandler implements JobHandler
{
    public function handle(array $payload): void
    {
        $idempotencyKey = $payload['idempotency_key'];

        // Prüfen ob schon ausgeführt
        if ($this->paymentLog->exists($idempotencyKey)) {
            return; // Already processed
        }

        $this->paymentGateway->charge(
            $payload['customer_id'],
            $payload['amount'],
            $idempotencyKey
        );

        $this->paymentLog->record($idempotencyKey);
    }
}

2. Retry mit Exponential Backoff

<?php

final class RetryableJob
{
    public static function calculateDelay(int $attempt, int $baseDelay = 30): int
    {
        // Exponential: 30s, 60s, 120s, 240s, 480s...
        // Mit Jitter um Thundering Herd zu vermeiden
        $delay = $baseDelay * (2 ** ($attempt - 1));
        $jitter = random_int(0, (int) ($delay * 0.1));

        return min($delay + $jitter, 3600); // Max 1h
    }
}

// Im Queue-Backend
public function fail(int $jobId, string $error): void
{
    $job = $this->getJob($jobId);

    if ($job['attempts'] < $job['max_attempts']) {
        $delay = RetryableJob::calculateDelay($job['attempts']);
        $this->release($jobId, $delay);
    } else {
        $this->markFailed($jobId, $error);
    }
}

3. Dead Letter Queue (DLQ)

<?php

final class DeadLetterHandler
{
    public function __construct(
        private PDO $db,
        private AlertService $alerts,
    ) {}

    public function moveToDeadLetter(int $jobId, string $error): void
    {
        $this->db->beginTransaction();

        try {
            // Job in DLQ kopieren
            $this->db->exec("
                INSERT INTO dead_letter_queue
                    (original_id, queue, payload, error, attempts, failed_at)
                SELECT id, queue, payload, :error, attempts, NOW()
                FROM job_queue WHERE id = :id
            ");

            // Aus normaler Queue entfernen
            $this->db->exec("DELETE FROM job_queue WHERE id = :id");

            $this->db->commit();

            // Alert bei zu vielen Failed Jobs
            $this->checkAndAlert();

        } catch (\Throwable $e) {
            $this->db->rollBack();
            throw $e;
        }
    }

    private function checkAndAlert(): void
    {
        $count = $this->db->query("
            SELECT COUNT(*) FROM dead_letter_queue
            WHERE failed_at > NOW() - INTERVAL '1 hour'
        ")->fetchColumn();

        if ($count > 10) {
            $this->alerts->critical("High DLQ rate: {$count} failed jobs in last hour");
        }
    }

    public function retry(int $dlqId): void
    {
        // Job aus DLQ zurück in Queue
        $this->db->exec("
            INSERT INTO job_queue (queue, payload, attempts, max_attempts)
            SELECT queue, payload, 0, 3
            FROM dead_letter_queue WHERE id = :id
        ");

        $this->db->exec("DELETE FROM dead_letter_queue WHERE id = :id");
    }
}

4. Job-Batching mit Progress

<?php

// Für große Imports: Batch erstellen, Progress tracken

final class BatchImportHandler implements JobHandler
{
    public function handle(array $payload): void
    {
        $batchId = $payload['batch_id'];
        $chunk = $payload['chunk'];
        $totalChunks = $payload['total_chunks'];

        // Chunk verarbeiten
        foreach ($chunk as $row) {
            $this->processRow($row);
        }

        // Progress updaten
        $this->updateBatchProgress($batchId, $totalChunks);
    }

    private function updateBatchProgress(string $batchId, int $totalChunks): void
    {
        $sql = "UPDATE import_batches SET
                    completed_chunks = completed_chunks + 1,
                    status = CASE
                        WHEN completed_chunks + 1 >= :total THEN 'completed'
                        ELSE 'processing'
                    END,
                    completed_at = CASE
                        WHEN completed_chunks + 1 >= :total THEN NOW()
                        ELSE NULL
                    END
                WHERE id = :batch_id";

        $this->db->prepare($sql)->execute([
            'batch_id' => $batchId,
            'total' => $totalChunks,
        ]);
    }
}

// Batch erstellen
final class BatchCreator
{
    public function createImportBatch(string $filePath, int $chunkSize = 100): string
    {
        $batchId = Uuid::v4();
        $rows = $this->parseFile($filePath);
        $chunks = array_chunk($rows, $chunkSize);

        // Batch-Record
        $this->db->prepare("
            INSERT INTO import_batches (id, total_chunks, status)
            VALUES (:id, :total, 'pending')
        ")->execute([
            'id' => $batchId,
            'total' => count($chunks),
        ]);

        // Jobs für jeden Chunk
        foreach ($chunks as $index => $chunk) {
            $this->queue->push('imports', [
                'type' => 'batch_import',
                'batch_id' => $batchId,
                'chunk' => $chunk,
                'chunk_index' => $index,
                'total_chunks' => count($chunks),
            ]);
        }

        return $batchId;
    }
}

Memory-Management

PHP ist nicht für Long-Running Prozesse designed. Memory Leaks sind das Hauptproblem.

Goldene Regel: Restart nach X Jobs

// Worker-Defaults für stabile Long-Running Prozesse:
$maxJobs = 1000;           // Nach 1000 Jobs: Restart
$maxMemoryMb = 128;        // Bei 128MB: Restart
$maxRuntimeSeconds = 3600; // Nach 1h: Restart

// Nach JEDEM Job (wenn DI-Container/ORM):
$entityManager->clear();   // Doctrine IdentityMap leeren
gc_collect_cycles();       // Optional: GC forcieren

Typische Memory-Leak-Quellen

// ❌ Doctrine EntityManager cached Entities
foreach ($jobs as $job) {
    $entity = $em->find(Invoice::class, $job['invoice_id']);
    // Entity bleibt im IdentityMap → Memory wächst!
}

// ✅ EntityManager regelmäßig clearen
foreach ($jobs as $i => $job) {
    $entity = $em->find(Invoice::class, $job['invoice_id']);
    $this->process($entity);

    if ($i % 100 === 0) {
        $em->clear(); // IdentityMap leeren
        gc_collect_cycles(); // Garbage Collection forcieren
    }
}

// ❌ Monolog Handler sammeln
// (Standard-Handler cacht alle Log-Einträge im Memory)

// ✅ In Worker: Nur StreamHandler ohne Buffer
$logger = new Logger('worker');
$logger->pushHandler(new StreamHandler('php://stdout'));
// Kein BufferHandler, kein FingersCrossedHandler!

Memory-Monitoring im Worker

<?php

final class MemoryAwareWorker
{
    private int $initialMemory;

    public function __construct(
        private int $maxMemoryMb = 128,
        private int $leakThresholdMb = 10,
    ) {
        $this->initialMemory = memory_get_usage(true);
    }

    public function shouldRestart(): bool
    {
        $currentMemory = memory_get_usage(true);
        $usedMb = $currentMemory / 1024 / 1024;

        // Hard limit
        if ($usedMb >= $this->maxMemoryMb) {
            return true;
        }

        // Leak detection: zu viel Wachstum seit Start
        $growth = ($currentMemory - $this->initialMemory) / 1024 / 1024;
        if ($growth > $this->leakThresholdMb) {
            error_log("Memory leak detected: +{$growth}MB since start");
            return true;
        }

        return false;
    }

    public function logMemoryUsage(): void
    {
        $current = memory_get_usage(true) / 1024 / 1024;
        $peak = memory_get_peak_usage(true) / 1024 / 1024;

        error_log(sprintf(
            "Memory: %.1fMB (peak: %.1fMB)",
            $current,
            $peak
        ));
    }
}

Monitoring & Alerting

<?php

final class QueueMetrics
{
    public function __construct(
        private PDO $db,
        private MetricsClient $metrics,  // Prometheus, StatsD, etc.
    ) {}

    public function collect(): void
    {
        // Queue-Tiefe
        $depths = $this->db->query("
            SELECT queue, status, COUNT(*) as count
            FROM job_queue
            GROUP BY queue, status
        ")->fetchAll();

        foreach ($depths as $row) {
            $this->metrics->gauge(
                'queue_depth',
                $row['count'],
                ['queue' => $row['queue'], 'status' => $row['status']]
            );
        }

        // Ältester Job (Latency)
        $oldest = $this->db->query("
            SELECT queue,
                   EXTRACT(EPOCH FROM (NOW() - MIN(created_at))) as age_seconds
            FROM job_queue
            WHERE status = 'pending'
            GROUP BY queue
        ")->fetchAll();

        foreach ($oldest as $row) {
            $this->metrics->gauge(
                'queue_oldest_job_seconds',
                $row['age_seconds'],
                ['queue' => $row['queue']]
            );
        }

        // Failed Jobs letzte Stunde
        $failed = $this->db->query("
            SELECT COUNT(*) FROM job_queue
            WHERE status = 'failed'
              AND completed_at > NOW() - INTERVAL '1 hour'
        ")->fetchColumn();

        // Gauge für Dashboard-Anzeige
        $this->metrics->gauge('queue_failed_jobs_hourly', $failed);

        // Counter für rate()-Berechnung in Prometheus
        // (Counter wird NIE zurückgesetzt, nur inkrementiert)
        static $lastFailedCount = 0;
        if ($failed > $lastFailedCount) {
            $this->metrics->counter('queue_jobs_failed_total', $failed - $lastFailedCount);
        }
        $lastFailedCount = $failed;
    }
}

// Prometheus-Endpunkt
// GET /metrics
$metrics = new QueueMetrics($db, $prometheusClient);
$metrics->collect();
echo $prometheusClient->render();

Alerting-Regeln (Prometheus)

# prometheus/alerts.yml

groups:
  - name: queue_alerts
    rules:
      - alert: QueueBacklogHigh
        expr: queue_depth{status="pending"} > 1000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Queue backlog too high"
          description: "Queue {{ $labels.queue }} has {{ $value }} pending jobs"

      - alert: QueueJobStuck
        expr: queue_oldest_job_seconds > 3600
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Jobs stuck in queue"
          description: "Oldest job in {{ $labels.queue }} is {{ $value }}s old"

      - alert: QueueHighFailureRate
        # rate() nur mit Counter, nicht mit Gauge!
        expr: rate(queue_jobs_failed_total[5m]) * 60 > 10
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "High job failure rate (>10/min)"

Jobs testen

<?php

final class SendEmailHandlerTest extends TestCase
{
    public function testSendsEmail(): void
    {
        $mailer = $this->createMock(MailerInterface::class);
        $mailer->expects($this->once())
               ->method('send')
               ->with($this->callback(function (Email $email) {
                   return $email->getTo()[0]->getAddress() === 'user@example.com'
                       && $email->getSubject() === 'Welcome!';
               }));

        $handler = new SendEmailHandler($mailer);

        $handler->handle([
            'type' => 'send_email',
            'to' => 'user@example.com',
            'subject' => 'Welcome!',
            'body' => '<p>Hello!</p>',
        ]);
    }

    public function testRetryOnTemporaryFailure(): void
    {
        $mailer = $this->createMock(MailerInterface::class);
        $mailer->method('send')
               ->willThrowException(new TransportException('Connection timeout'));

        $handler = new SendEmailHandler($mailer);

        $this->expectException(TransportException::class);

        // Handler wirft Exception → Queue wird retry machen
        $handler->handle([
            'type' => 'send_email',
            'to' => 'user@example.com',
            'subject' => 'Test',
            'body' => 'Test',
        ]);
    }
}

// Integration-Test für Queue
final class DatabaseQueueTest extends TestCase
{
    public function testPushAndPop(): void
    {
        $queue = new DatabaseQueue($this->db, 'test-worker');

        $jobId = $queue->push('emails', ['to' => 'test@example.com']);

        $this->assertGreaterThan(0, $jobId);

        $job = $queue->pop('emails');

        $this->assertNotNull($job);
        $this->assertEquals($jobId, $job['id']);
        $this->assertEquals('test@example.com', $job['payload']['to']);
        $this->assertEquals('processing', $job['status']);
    }

    public function testConcurrentWorkersWithSkipLocked(): void
    {
        $queue = new DatabaseQueue($this->db, 'worker-1');

        // 3 Jobs erstellen
        $queue->push('test', ['n' => 1]);
        $queue->push('test', ['n' => 2]);
        $queue->push('test', ['n' => 3]);

        // Simuliere 2 Worker gleichzeitig
        $job1 = $queue->pop('test');
        $this->db->exec("SET lock_timeout = '1s'"); // Timeout für Test

        $queue2 = new DatabaseQueue($this->db, 'worker-2');
        $job2 = $queue2->pop('test'); // Sollte SKIP LOCKED nutzen

        // Beide bekommen unterschiedliche Jobs
        $this->assertNotEquals($job1['id'], $job2['id']);
    }
}

Multi-Tenant & Audit-Kontext

In Multi-Tenant-Apps oder bei Audit-Trails: Kontext muss in den Job-Payload! Sonst: Cross-Tenant-Chaos oder fehlende Audit-Logs.

// ✅ Job mit vollständigem Kontext erstellen
$queue->push('emails', [
    'type' => 'send_email',
    'to' => $user->email,
    'subject' => 'Welcome!',

    // Kontext für Worker
    'tenant_id' => $currentTenant->id,    // Multi-Tenant Isolation
    'actor_id' => $currentUser->id,       // Audit: wer hat ausgelöst?
    'request_id' => $request->getId(),    // Tracing/Correlation
]);

// ✅ Im Worker: Kontext wiederherstellen
final class TenantAwareJobHandler implements JobHandler
{
    public function handle(array $payload): void
    {
        // Tenant-Kontext setzen (für RLS, Logging, etc.)
        $this->tenantContext->set($payload['tenant_id'] ?? null);
        $this->auditContext->setActor($payload['actor_id'] ?? null);
        $this->logger->withContext(['request_id' => $payload['request_id'] ?? null]);

        try {
            $this->innerHandler->handle($payload);
        } finally {
            $this->tenantContext->clear();
        }
    }
}

Checkliste

┌─────────────────────────────────────────────────────────────────┐
│               BACKGROUND-JOBS CHECKLISTE                        │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  □ Jobs sind idempotent (Idempotency-Key bei kritischen Ops)   │
│  □ Retries mit Exponential Backoff implementiert               │
│  □ Dead Letter Queue für fehlgeschlagene Jobs                  │
│  □ Stale-Job-Recovery (gecrasht Worker) per Cron               │
│  □ Supervisord (oder systemd) für Worker-Prozesse              │
│  □ Graceful Shutdown bei SIGTERM (pcntl oder stopwaitsecs)     │
│  □ Memory-Limits im Worker (Auto-Restart bei Threshold)        │
│  □ Nach jedem Job: EntityManager->clear(), gc_collect_cycles() │
│  □ Monitoring: Queue-Tiefe, Failure-Rate, Latency              │
│  □ Alerting bei Queue-Backlog oder hoher Failure-Rate          │
│  □ Deployment: Worker nach Code-Update neustarten              │
│  □ Multi-Tenant: tenant_id, actor_id im Job-Payload            │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Fazit

Background-Jobs machen PHP-Apps skalierbar und responsive:

  • Cron: Einfach für zeitgesteuerte Tasks, mit Locking gegen Überlappung
  • Database Queue: Solide mit FOR UPDATE SKIP LOCKED, kein extra Service nötig
  • Redis Queue: Schneller, gut für hohen Durchsatz
  • Supervisord: Hält Worker am Leben, essentiell für Produktion
  • Memory-Management: Worker regelmäßig recyclen, Leaks monitoren
  • Idempotenz: Jobs müssen mehrfach ausführbar sein ohne Seiteneffekte

Mein Stack: PostgreSQL Queue für einfache Apps (schon da, ACID, zuverlässig), Redis wenn Performance kritisch wird. Supervisord immer, Memory-Limit bei 128MB, Restart nach 1000 Jobs.


Weiterführende Ressourcen

Carola Schulte

Über Carola Schulte

Software-Architektin mit 25+ Jahren Erfahrung. Spezialisiert auf robuste Business-Apps mit PHP/PostgreSQL, Security-by-Design und DSGVO-konforme Systeme. 1,8M+ Lines of Code in Produktion.

Hintergrundverarbeitung für Ihre App?

Lassen Sie uns besprechen, wie Queues und Worker Ihre Applikation schneller und skalierbarer machen – kostenlos und unverbindlich.

Kostenloses Erstgespräch