
Retry strategies, lock renewal recovery, circuit breakers, queue metrics, and graceful shutdown — keeping 20+ BullMQ workers healthy in production.
Part 1 covered the foundation — worker boot, queue definitions, and the handler registry. Part 2 covered dispatching, parent-child flows, and scheduled jobs. This final part covers what keeps the system healthy when things go wrong.
The global default is 5 attempts with exponential backoff starting at 1 second:
const DEFAULT_JOB_OPTIONS: JobsOptions = {
attempts: 5,
backoff: { type: 'exponential', delay: 1000 },
removeOnFail: true,
removeOnComplete: true,
}
With exponential backoff, the delays are: 1s, 2s, 4s, 8s, 16s. Total time from first attempt to last retry: about 31 seconds. This works well for transient failures — Redis hiccups, temporary network issues, brief database locks.
Different job types override these defaults based on their failure characteristics:
| Job type | Attempts | Backoff delay | Rationale |
|---|---|---|---|
| Default | 5 | 1s exponential | Transient failures |
| Email delivery | 3 | 5s exponential | API rate limits, wait longer between retries |
| OAuth2 refresh | 3 | 60s exponential | Token endpoints can be slow to recover |
| Document embeddings | 3 | 2s exponential | AI API timeouts |
| Polling sync | 3 | 30-60s exponential | Email provider rate limits |
| Stale check / relaunch | 1 | none | Diagnostic jobs, no point retrying |
The key insight: shorter backoff for internal operations (database, Redis), longer backoff for external API calls. If Gmail's API is returning 429s, retrying in 1 second won't help — retrying in 60 seconds might.
Jobs can be cancelled via BullMQ's built-in AbortSignal support. The createJobHandler wires this through to the JobContext:
// apps/worker/src/workers/utils/createJobHandler.ts
export class JobCancelledError extends UnrecoverableError {
constructor(reason?: string) {
super(`Job cancelled: ${reason || 'No reason provided'}`)
this.name = 'JobCancelledError'
}
}
function createJobContext<T>(job: Job<T>, signal?: AbortSignal): JobContext<T> {
return {
job,
signal,
data: job.data,
jobId: job.id || 'unknown',
jobName: job.name,
isCancelled: () => signal?.aborted ?? false,
throwIfCancelled: () => {
if (signal?.aborted) {
throw new JobCancelledError(signal.reason)
}
},
// ...
}
}
JobCancelledError extends BullMQ's UnrecoverableError. This is important — when a job throws an UnrecoverableError, BullMQ moves it to failed immediately without retrying. A cancelled job shouldn't retry. It was intentionally stopped.
Job handlers check cancellation at natural breakpoints:
export async function processDocumentJob(ctx: JobContext<ProcessDocumentData>) {
const segments = await parseDocument(ctx.data.content)
for (const segment of segments) {
ctx.throwIfCancelled() // Check before expensive work
await generateEmbedding(segment)
await ctx.updateProgress(/* ... */)
}
}
The throwIfCancelled() call costs nothing when the job isn't cancelled. When it is, the job exits cleanly with a log message instead of processing more segments that will be thrown away.
This is the most interesting error handling pattern in the system. BullMQ uses distributed locks to ensure only one worker processes a job at a time. The worker acquires a lock, processes the job, and releases the lock when done.
If a job takes longer than the lock duration (default: 30 seconds), BullMQ automatically renews the lock. But if the renewal fails — network partition, Redis reconnection, garbage collection pause — the job becomes "stalled." Another worker can pick it up, and now you have two workers processing the same job.
For most jobs, this is fine. They're idempotent, and the stalled job will fail with a lock error when it tries to report completion. But for email sync jobs, this can leave integrations in a broken state — halfway through a sync, the integration's stage is set to "importing" but no worker is actually importing.
Our createWorker handles this:
// apps/worker/src/workers/utils/createWorker.ts
worker.on('lockRenewalFailed', async (jobId: string, error: Error) => {
logger.warn('Lock renewal failed', { jobId, queue, error: error.message })
try {
const job = await Job.fromId(worker, jobId)
if (job?.data?.integrationId) {
const { integrationId } = job.data
const { recoverProcessingBatch, getImportCacheSize } = await import(
'@auxx/lib/email/polling-import-cache'
)
const { database: db, schema } = await import('@auxx/database')
const recovered = await recoverProcessingBatch(integrationId)
const cacheSize = await getImportCacheSize(integrationId)
// Determine which stage to reset to based on cache state
const resetStage = cacheSize > 0
? 'MESSAGES_IMPORT_PENDING'
: 'MESSAGE_LIST_FETCH_PENDING'
await db
.update(schema.Integration)
.set({
syncStage: resetStage,
syncStageStartedAt: null,
updatedAt: new Date(),
})
.where(
and(
eq(schema.Integration.id, integrationId),
inArray(schema.Integration.syncStage, [
'MESSAGE_LIST_FETCH',
'MESSAGES_IMPORT',
])
)
)
logger.info('Recovered integration after lock loss', {
integrationId,
recoveredFromProcessing: recovered,
cacheSize,
resetStage,
})
}
} catch (err) {
// Best-effort — stale check will catch it in 15 minutes
logger.error('Failed to recover integration after lock loss', {
jobId,
error: (err as Error).message,
})
}
})
The recovery logic:
integrationId (only polling sync jobs do)pollingStaleCheckJob (runs every 15 minutes) will pick it up on the next scanThe try/catch around the entire recovery is intentional. This is best-effort. If recovery fails, the stale check scanner will catch the stuck integration within 15 minutes and reset it. We'd rather log the error and let the self-healing system handle it than crash the worker.
The where clause is also defensive — it only resets integrations that are actually in an active sync stage. If the integration already completed or was reset by another mechanism, the update is a no-op.
The polling sync pipeline has built-in self-healing through two scheduled jobs:
Stale check (every 15 minutes): Scans for integrations stuck in an active sync stage for longer than the stale threshold (15 minutes). These are jobs that started processing but never completed — crashed, lost their lock, or hit an unhandled error. The stale check resets them to pending.
Relaunch failed (every 30 minutes): Scans for integrations in a failed state and retries them. Some failures are transient — a temporary API outage, a rate limit that's since cleared. Instead of requiring manual intervention, the relaunch job gives them another chance.
// Scheduled in setupSchedules()
// Check for stuck jobs (default: every 15 min)
await pollingSyncQueue.upsertJobScheduler(
'pollingStaleCheckJob',
{ every: staleCheckIntervalMs },
{
data: { staleThresholdMs: 900000 }, // 15 minutes
opts: { attempts: 1, priority: 10 },
}
)
// Relaunch failed polling integrations (default: every 30 min)
await pollingSyncQueue.upsertJobScheduler(
'pollingRelaunchFailedJob',
{ every: relaunchFailedIntervalMs },
{ data: {}, opts: { attempts: 1, priority: 10 } },
)
Both use attempts: 1 — they're diagnostic/recovery jobs. If the stale check itself fails, there's no point retrying it immediately. The next scheduled run in 15 minutes will handle it.
This pattern — scheduled scanners that detect and recover from stuck states — is more reliable than trying to handle every possible failure mode in the job itself. You can't predict every way a job can fail, but you can always ask "is this integration stuck?" on a timer.
We expose queue metrics through a tRPC API that powers an internal dashboard:
// packages/lib/src/health/queue-metrics.ts
export async function getQueueMetrics(
queueName: string,
timeRange: QueueMetricsTimeRange
): Promise<QueueMetricsResponse> {
const queue = getQueue(queueName as Queues)
const pointsNeeded = POINTS_NEEDED[timeRange]
const samplingFactor = Math.ceil(pointsNeeded / TARGET_VISUALIZATION_POINTS)
const [workers, failedMetrics, completedMetrics, waiting, active, delayed, failed, completed] =
await Promise.all([
queue.getWorkers(),
queue.getMetrics('failed', 0, pointsNeeded - 1),
queue.getMetrics('completed', 0, pointsNeeded - 1),
queue.getWaitingCount(),
queue.getActiveCount(),
queue.getDelayedCount(),
queue.getFailedCount(),
queue.getCompletedCount(),
])
const totalJobs = failed + completed
const failureRate = totalJobs > 0
? Number(((failed / totalJobs) * 100).toFixed(1))
: 0
return {
queueName,
workers: workers.length,
timeRange,
failed, completed, waiting, active, delayed,
failureRate,
data: [
{ id: 'Completed', data: completedData.map((y, x) => ({ x, y })) },
{ id: 'Failed', data: failedData.map((y, x) => ({ x, y })) },
],
}
}
All eight queries run in parallel via Promise.all. The metrics data is downsampled to a maximum of 240 points for visualization — enough resolution for trends without sending thousands of data points to the frontend.
Time ranges: 1 hour, 4 hours, 12 hours, 1 day, 7 days. For a 7-day range, that's 10,080 minutes of data downsampled to 240 points — each point represents about 42 minutes.
We also expose cursor-paginated job run history:
export async function getQueueRuns(
queueName: string,
status: 'completed' | 'failed',
cursor: number,
limit: number
): Promise<QueueRunsResponse> {
const queue = getQueue(queueName as Queues)
const jobs = status === 'completed'
? await queue.getCompleted(cursor, cursor + limit - 1)
: await queue.getFailed(cursor, cursor + limit - 1)
return {
runs: jobs.map((job) => ({
id: job.id,
name: job.name,
finishedOn: job.finishedOn ? new Date(job.finishedOn).toISOString() : null,
attemptsMade: job.attemptsMade,
failedReason: job.failedReason ?? null,
returnvalue: job.returnvalue ? summarizeReturnValue(job.returnvalue) : null,
})),
nextCursor: jobs.length === limit ? cursor + limit : null,
}
}
This powers the "recent runs" table in the dashboard. You can see which jobs failed, why, and how many attempts they made. Useful for spotting patterns — if oauth2TokenRefreshJob is failing 10 times in a row for the same integration, something is wrong with that integration's credentials, not the job system.
We deliberately didn't set up a separate monitoring tool like Bull Board. The metrics API gives us what we need and lives in our existing dashboard — no extra service to deploy and secure.
When the worker process receives SIGINT or SIGTERM, it needs to shut down without losing jobs. The order matters:
// apps/worker/src/server.ts
const gracefulShutdown = async (signal: string) => {
console.log(`Received ${signal}, closing workers and server...`)
// 1. Stop email poller
if (inboundEmailPoller) {
await inboundEmailPoller.stop()
}
// 2. Close all workers (stop accepting new jobs, finish current ones)
if (workersInstance) {
await Promise.all(Object.values(workersInstance).map((w) => w.close()))
}
// 3. Flush analytics
await shutdownPostHog()
// 4. Close FlowProducer
await closeFlowProducer()
// 5. Close all queues
await closeAllQueues()
// 6. Close database pools
await closePools()
// 7. Close HTTP server
if (server) {
server.close()
}
// Force exit if graceful shutdown hangs
setTimeout(() => {
console.error('Graceful shutdown timed out, forcing exit.')
process.exit(1)
}, 10000)
}
process.on('SIGINT', () => gracefulShutdown('SIGINT'))
process.on('SIGTERM', () => gracefulShutdown('SIGTERM'))
The sequence is important:
Workers close first. worker.close() tells BullMQ to stop picking up new jobs and wait for currently processing jobs to finish. This is the most critical step — closing workers before queues ensures no job gets picked up and then dropped.
FlowProducer closes next. Any pending flow additions complete or fail.
Queues close. The closeAllQueues() function iterates the queue cache and closes each connection, then clears the cache.
Database pools close last. Workers might still be finishing database operations when the shutdown signal arrives. Closing the database before workers finish would cause errors in those final operations.
10-second timeout. If graceful shutdown hangs — a job that won't complete, a connection that won't close — we force-exit after 10 seconds. Better to lose one job than have a zombie process that never terminates.
The closeAllQueues function itself is straightforward:
// packages/lib/src/jobs/queues/index.ts
export async function closeAllQueues(): Promise<void> {
const closePromises: Promise<void>[] = []
for (const queueName of Object.keys(queueCache)) {
closePromises.push(queueCache[queueName].close())
}
await Promise.all(closePromises)
// Clear the cache after closing
for (const key of Object.keys(queueCache)) {
delete queueCache[key]
}
}
All queue connections close in parallel. No point doing these sequentially — they're independent Redis connections.
We also catch unhandled exceptions and rejections:
process.on('uncaughtException', (err) => {
console.error('UNCAUGHT EXCEPTION:', err)
})
process.on('unhandledRejection', (reason, promise) => {
console.error('UNHANDLED REJECTION:', reason, 'at promise:', promise)
})
These log the error but don't force-exit. In production, the container orchestrator handles restarts. Logging gives us visibility into what went wrong.
Here's the full concurrency map:
| Worker | Concurrency | Lock duration | Why |
|---|---|---|---|
| Events | 1 | 30s | Order-sensitive event processing |
| Event handlers | 1 | 30s | Side effects should be sequential |
| Maintenance | 1 | 30s | Low volume, doesn't need parallelism |
| Webhooks | 1 | 30s | Sequential processing |
| Email delivery | 5 | 30s | Multiple emails in parallel is fine |
| Message sync | 5 | 5 min | Long-running sync operations |
| Polling sync | 10 | 5 min | Many integrations syncing concurrently |
| Workflow delay | 10 | 30s | High throughput, short jobs |
| App trigger | 10 | 30s | Webhook dispatching |
| Polling trigger | 10 | 30s | Scheduled poll dispatching |
| AI agent | 5 | 30s | AI API calls |
| Document processing | 5 | 30s | Document parsing |
| Embedding generation | 3 | 30s | AI API rate limits |
| Data import | 2 | 30s | Heavy DB operations, limit parallelism |
| Thumbnail generation | 1 | 30s | CPU-intensive |
| OAuth2 refresh | 1 | 30s | Low volume |
| Dataset maintenance | 1 | 30s | Low volume cleanup |
| Scheduled trigger | 1 | 30s | Sequential execution |
| Message processing | 1 | 30s | Sequential scheduled sends |
The concurrency choices reflect the nature of each job:
Message sync and polling sync get custom lock durations (5 minutes) because they process large batches of emails. A sync job might need to fetch hundreds of messages from Gmail — the default 30-second lock would expire before the job finishes.
Job payload validation. Right now, job data types are enforced at compile time but not validated at runtime. If a job gets enqueued with malformed data (from a bug, a race condition, or a stale version), the handler crashes on first access. A Zod schema at the createJobHandler level would catch this early and give a clear error instead of a confusing Cannot read property of undefined.
Structured job results. Most of our handlers return void or an ad-hoc { success: boolean } object. A consistent result type would make the metrics dashboard richer — you could see not just "completed" but "completed with 42 emails synced" or "completed with 3 warnings."
Horizontal scaling story. Right now we run a single worker instance. All the concurrency settings are tuned for one process. When we scale to multiple instances, we'll need to think about queue-level rate limiting (BullMQ supports this natively) and partition strategies for jobs like polling sync that shouldn't process the same integration on two instances simultaneously.
The full system is about 1,500 lines of infrastructure code across a handful of files:
apps/worker/src/server.ts — entry point and lifecycleapps/worker/src/workers/index.ts — worker startup and schedule registrationapps/worker/src/workers/utils/createWorker.ts — worker factoryapps/worker/src/workers/utils/createJobHandler.ts — job routing and contextpackages/lib/src/jobs/queues/index.ts — queue factorypackages/lib/src/jobs/queues/flow-producer.ts — flow utilitiespackages/lib/src/jobs/types/job-context.ts — context typespackages/lib/src/health/queue-metrics.ts — monitoringThe job handlers themselves live across packages/lib — the infrastructure doesn't care what they do. A handler that sends an email and a handler that generates AI embeddings look identical from the queue's perspective: receive context, do work, return.
The patterns are straightforward. Singleton caches for queues and the FlowProducer. Plain objects for handler registries. Wrapper functions for cross-cutting concerns like cloudOnly(). Scheduled scanners for self-healing. Ordered shutdown for data safety.
None of this is novel. The value is in the consistency — every one of our 20+ workers follows the same pattern, uses the same utilities, and gets the same logging and error handling. When something goes wrong at 3 AM, you know exactly where to look.
Auxx.ai is open source. The full worker code is at apps/worker/. PRs welcome.