Async Jobs, Queue and Connector Contracts
Status: verified (P0)
Runtime: Docker Compose + PHP 8.3
This document describes the async queue baseline available in PSFS core, including queue backends, the queue job contract, CLI commands, and connector hardening.
1) What is available now
Queue transports
PSFS\base\queue\JobQueueInterfacePSFS\base\queue\SyncJobQueuePSFS\base\queue\RedisJobQueuePSFS\base\queue\FileJobQueue
Transport contract methods:
enqueue(string $queue, array $payload): booldequeue(string $queue): ?arrayisAvailable(): bool
Queue runtime components
PSFS\base\queue\QueueJobInterfacePSFS\base\queue\JobRegistryPSFS\base\queue\QueueDispatcherPSFS\base\queue\QueueWorkerPSFS\base\queue\ParallelQueueRunnerPSFS\base\queue\QueueBackendFactoryPSFS\Queue\NotificationJob
Queue job contract methods:
public static function code(): stringpublic static function fromPayload(array $payload): selfpublic function handle(): void
Queue messages are stored as envelopes, not raw business objects. The runtime envelope uses this shape:
[
'code' => 'notifications',
'queue' => 'notifications',
'payload' => ['message' => 'Deploy completed'],
'queued_at' => 1710000000,
'attempts' => 0,
]
Connector hardening
PSFS\base\types\CurlService- default connect timeout:
3s - default request timeout:
10s
- default connect timeout:
PSFS\base\types\helpers\SensitiveDataHelper- redacts sensitive payload fields before external logging or alerts
PSFS\base\types\helpers\SlackHelper- sends redacted payload and extra info
2) Configuration
Optional keys in config.json:
{
"curl.connectTimeout": 3,
"curl.timeout": 10,
"job.queue.file.path": "cache/queue",
"job.queue.redis.prefix": "psfs:queue",
"redis.host": "core-redis-1",
"redis.port": 6379,
"redis.timeout": 0.2
}
Environment overrides:
PSFS_REDIS_HOSTPSFS_REDIS_PORTPSFS_REDIS_TIMEOUT
3) Backend selection and fallback
PSFS resolves the persistent queue backend through QueueBackendFactory::createPersistent().
Selection order:
RedisJobQueuewhen Redis is reachable.FileJobQueueas the persistent fallback.
SyncJobQueue remains useful for isolated unit tests or fully in-process execution, but it is not a cross-process transport.
4) Queue job flow
The queue runtime follows this path:
- Application code or CLI dispatches a job by
code. JobRegistryresolvescode -> class.QueueDispatcherwraps the payload into an envelope and enqueues it.QueueWorkerconsumes a queue, rebuilds the job withfromPayload(), and executeshandle().ParallelQueueRunnerstarts multiple worker processes for the same queue.
5) CLI commands
All commands must run through Docker:
docker compose up -d
docker exec core-php-1 php /var/www/src/bin/psfs psfs:queue:dispatch --code=notifications --payload='{"message":"Deploy completed"}'
docker exec core-php-1 php /var/www/src/bin/psfs psfs:queue:work --queue=notifications --max-jobs=1 --stop-when-empty=1
docker exec core-php-1 php /var/www/src/bin/psfs psfs:queue:run-parallel --queue=notifications --workers=2 --max-jobs=10 --stop-when-empty=1
Command responsibilities:
psfs:queue:dispatch- validates
code - parses the JSON payload
- enqueues the message through the persistent backend
- validates
psfs:queue:work- consumes one queue
- executes
QueueJobInterface::fromPayload(...)->handle() - can stop on empty queues for batch-style execution
psfs:queue:run-parallel- spawns multiple worker processes for the same queue
- is suitable for batch or catch-up execution
6) Queue job implementation example
Minimal job contract:
use PSFS\base\queue\QueueJobInterface;
final class NotificationJob implements QueueJobInterface
{
public static function code(): string
{
return 'notifications';
}
public static function fromPayload(array $payload): self
{
return new self($payload);
}
public function handle(): void
{
// perform the work
}
}
The queue name defaults to the job code. If needed, psfs:queue:dispatch can override the queue name explicitly with --queue=....
7) Usage examples
7.1 Sync queue (test-friendly mode)
Use when you need deterministic behavior without external dependencies.
use PSFS\base\queue\SyncJobQueue;
$queue = new SyncJobQueue();
$queue->enqueue('notifications', [
'type' => 'slack.error',
'message' => 'Job failed',
]);
$job = $queue->dequeue('notifications');
if (null !== $job) {
// process job synchronously
}
7.2 Persistent queue backend (recommended runtime path)
Use the backend factory when you want Redis first and a file-based fallback automatically.
use PSFS\base\queue\JobRegistry;
use PSFS\base\queue\QueueBackendFactory;
use PSFS\base\queue\QueueDispatcher;
$dispatcher = new QueueDispatcher(
QueueBackendFactory::createPersistent(),
new JobRegistry()
);
$dispatcher->dispatch('notifications', [
'message' => 'Deploy completed',
]);
7.3 Redis queue (optional direct usage)
Use when Redis is available and you want decoupled request/processing flow.
use PSFS\base\queue\FileJobQueue;
use PSFS\base\queue\RedisJobQueue;
$redisQueue = new RedisJobQueue();
$queue = $redisQueue->isAvailable() ? $redisQueue : new FileJobQueue();
$ok = $queue->enqueue('notifications', [
'type' => 'slack.error',
'uid' => 'abc-123',
]);
if (!$ok) {
// fallback or retry strategy
}
7.4 Recommended fallback strategy
use PSFS\base\queue\FileJobQueue;
use PSFS\base\queue\RedisJobQueue;
$redisQueue = new RedisJobQueue();
$fileQueue = new FileJobQueue();
$payload = ['type' => 'audit.log', 'event' => 'user.login'];
if (!$redisQueue->enqueue('audit', $payload)) {
$fileQueue->enqueue('audit', $payload);
}
8) Sensitive data redaction
Before sending payloads to external systems, redact secrets.
use PSFS\base\types\helpers\SensitiveDataHelper;
$payload = [
'username' => 'demo',
'password' => 'super-secret',
'Authorization' => 'Bearer x.y.z',
];
$safePayload = SensitiveDataHelper::redact($payload);
// password + authorization are masked with "***"
Masked key patterns include:
password,passwd,secret,tokenauthorization,cookieapi_key,apikey,access_token,refresh_token
9) Connector timeouts
CurlService applies defaults even if not explicitly configured.
- connect timeout:
3seconds - request timeout:
10seconds
Override with config when required:
[
'curl.connectTimeout' => 2,
'curl.timeout' => 6,
]
10) Operational recommendations
- Use
QueueBackendFactory::createPersistent()unless you have a strong reason to choose the backend manually. - Start with
FileJobQueueor the backend factory for local/dev predictability. - Enable
RedisJobQueuein non-local environments with monitoring. - Keep payloads small and serializable (
arrayonly). - Use idempotent handlers to avoid duplicate side effects on retries.
- Keep
code()stable. Queue consumers depend on that identifier. - Reserve
SyncJobQueuefor tests or in-process flows. It is not a worker transport. - Always redact sensitive data before external delivery.
11) Validation commands (Docker only)
docker compose up -d
docker exec core-php-1 sh -lc 'cd /var/www && vendor/bin/phpunit tests/base/queue tests/base/helpers/SensitiveDataHelperTest.php tests/base/ServiceTest.php'