
Queues, workers, and the handler registry pattern — how we structured our BullMQ setup to process email syncs, webhooks, AI jobs, and more.
Auxx.ai does a lot of work that doesn't belong in a request-response cycle. Syncing emails from Gmail and Outlook. Processing Shopify webhooks. Generating vector embeddings for knowledge base documents. Sending transactional emails. Refreshing OAuth2 tokens. Running AI agent sessions. Cleaning up orphaned files.
Some of these are triggered by user actions. Some run on a schedule. Some spawn child jobs that run in parallel. All of them need retries, logging, and graceful failure handling. Trying to do any of this inside a Next.js API route is a non-starter.
We picked BullMQ because it's Redis-backed, runs natively in Node.js, has first-class TypeScript support, and handles the hard parts — retries with backoff, job deduplication, repeatable schedules, parent-child flows, and graceful shutdown. It's battle-tested and the API is clean.
This is part 1 of a 3-part series. This post covers the foundation — how the worker app boots, how queues are defined, and the handler pattern that wires everything together. Part 2 covers job dispatching patterns, parent-child flows, and scheduled jobs. Part 3 covers error handling, recovery, monitoring, and graceful shutdown.
The worker is a standalone Node.js process, separate from our Next.js app. It connects to the same database and Redis instance, but runs its own lifecycle. Here's the boot sequence:
ConfigService.init() → load credentials
setupSchedules() → register cron/repeatable jobs
startWorkers() → spin up 20+ BullMQ workers
startInboundEmailPoller() → optional email polling
Hono HTTP server → health check endpoint
The actual entry point:
// apps/worker/src/server.ts
async function initializeApp() {
await configService.init()
console.log('Setting up schedules...')
await setupSchedules()
console.log('Starting workers...')
workersInstance = await startWorkers()
inboundEmailPoller = startInboundEmailPoller()
// Health check server
const app = new Hono()
app.use('*', cors())
app.get('/health', (c) => {
return c.json({
status: 'OK',
message: 'Workers are healthy',
inboundEmail: inboundEmailPoller ? 'running' : 'disabled',
})
})
server = serve({ fetch: app.fetch, port, hostname: host })
}
The health check endpoint is important for container orchestration. AWS ECS (or Railway, in our production setup) pings this to know whether the worker is alive. If the health check fails, the container gets replaced.
We have 20+ named queues, each defined in a simple enum:
// packages/lib/src/jobs/queues/types.ts
export enum Queues {
defaultQueue = 'default',
eventHandlersQueue = 'eventHandlers',
eventsQueue = 'events',
maintenanceQueue = 'maintenance',
webhooksQueue = 'webhooks',
shopifyQueue = 'shopify',
embeddingQueue = 'embedding',
messageSyncQueue = 'messageSync',
messageProcessingQueue = 'messageProcessing',
workflowDelayQueue = 'workflowDelay',
scheduledTriggerQueue = 'scheduled-trigger-queue',
documentProcessingQueue = 'document-processing-queue',
datasetMaintenanceQueue = 'dataset-maintenance-queue',
thumbnailQueue = 'thumbnail',
oauth2RefreshQueue = 'oauth2-refresh',
dataImportQueue = 'data-import',
pollingSyncQueue = 'polling-sync',
emailQueue = 'email',
appTriggerQueue = 'app-trigger',
appPollingTriggerQueue = 'app-polling-trigger-queue',
aiAgentQueue = 'ai-agent',
}
Why so many queues instead of one big one? Isolation. If the embedding queue backs up with 10,000 segments, we don't want email delivery to stall. Each queue gets its own concurrency settings, retry policies, and rate limits. A problem in one queue doesn't cascade to others.
Queues are created lazily via a singleton cache. You never construct a Queue directly — you call getQueue():
// packages/lib/src/jobs/queues/index.ts
const DEFAULT_JOB_OPTIONS: JobsOptions = {
attempts: 5,
backoff: { type: 'exponential', delay: 1000 },
removeOnFail: true,
removeOnComplete: true,
}
const queueCache: Record<string, Queue> = {}
export function getQueue(queueName: Queues): Queue {
if (queueCache[queueName]) {
return queueCache[queueName]
}
const options = getBullMQOptions()
const queue = new Queue(queueName, options)
queueCache[queueName] = queue
return queue
}
The defaults are sensible for most jobs — 5 retries with exponential backoff (1s, 2s, 4s, 8s, 16s), and completed/failed jobs get cleaned up automatically. Individual jobs can override these when they need different behavior.
The cache matters because BullMQ queues hold Redis connections. Creating a new Queue object for the same queue name would open duplicate connections. The cache ensures one connection per queue name across the entire process.
This is the core pattern. Each worker definition file is a plain object mapping job names to handler functions:
// apps/worker/src/workers/worker-definitions/events-worker.ts
import {
createEventJob,
createTimelineEvent,
handleEntityTriggers,
handleFieldTriggerJob,
publishEventJob,
publishToAnalyticsJob,
sendInvitationUserJob,
triggerResourceWorkflows,
updateWebhookLastTriggeredAt,
} from '@auxx/lib/events/handlers'
import { Queues } from '@auxx/lib/jobs/queues'
import { createWorker } from '../utils/createWorker'
const eventsJobMappings = {
publishEventJob,
createEventJob,
publishToAnalyticsJob,
}
const eventHandlersJobMappings = {
sendInvitationUserJob,
updateWebhookLastTriggeredAt,
createTimelineEvent,
triggerResourceWorkflows,
handleFieldTriggerJob,
handleEntityTriggers,
}
export function startEventsWorker() {
return createWorker(Queues.eventsQueue, eventsJobMappings)
}
export function startEventHandlersWorker() {
return createWorker(Queues.eventHandlersQueue, eventHandlersJobMappings)
}
That's it. No decorators. No reflection. No dynamic file scanning. Just an object where keys are job names and values are functions. When a job named publishEventJob arrives on the events queue, BullMQ calls our handler, the handler looks up publishEventJob in the mappings object, and calls the function.
This is intentionally boring. The job name string you pass to queue.add('publishEventJob', data) must match a key in the mappings object. If it doesn't, you get a clear error at runtime. It's easy to grep for any job name across the codebase. You can see every job a queue handles by reading one file.
Here's the email worker — even simpler:
// apps/worker/src/workers/worker-definitions/email-worker.ts
const emailJobMappings = {
sendEmailJob,
}
export function startEmailWorker() {
return createWorker(Queues.emailQueue, emailJobMappings, {
concurrency: 5,
})
}
And the polling sync worker, which needs custom lock settings for long-running email sync jobs:
// apps/worker/src/workers/worker-definitions/polling-sync-worker.ts
const pollingSyncJobMappings = {
pollingSyncScannerJob,
messageListFetchJob,
messagesImportJob,
imapImportBatchJob,
pollingStaleCheckJob,
pollingRelaunchFailedJob,
}
export function startPollingSyncWorker() {
return createWorker(Queues.pollingSyncQueue, pollingSyncJobMappings, {
lockDuration: 300000, // 5 minutes
lockRenewTime: 150000, // 2.5 minutes
concurrency: 10,
})
}
The createWorker utility wraps BullMQ's Worker with standardized logging, error handling, and lock recovery:
// apps/worker/src/workers/utils/createWorker.ts
export function createWorker<T extends Record<string, JobHandler | LegacyJobHandler>>(
queue: Queues,
jobMappings: T,
workerOptions?: EnhancedWorkerOptions
): Worker {
const { enableCancellation = true, logLevel = 'info', ...bullmqOptions } = workerOptions || {}
const options: WorkerOptions = {
connection: getConnectionOptions(),
...bullmqOptions,
}
const handler = createJobHandler(jobMappings)
const worker = new Worker(queue, handler, options)
worker.on('error', (error: Error) => {
logger.error(`Worker error on queue ${queue}:`, { error: error.message })
})
worker.on('completed', (job, result) => {
logger.info('Job completed', {
jobId: job.id,
jobName: job.name,
duration: job.finishedOn ? job.finishedOn - (job.processedOn || 0) : null,
})
})
worker.on('failed', (job, error) => {
logger.error('Job failed', {
jobId: job?.id,
jobName: job?.name,
error: error.message,
attemptsMade: job?.attemptsMade,
})
})
return worker
}
Every worker gets completion timing, failure logging, and error handling for free. No boilerplate in individual worker definitions.
The createJobHandler function is the bridge between BullMQ's raw Job object and our handler functions. It routes jobs by name and wraps them in a JobContext:
// apps/worker/src/workers/utils/createJobHandler.ts
export function createJobHandler<T extends Record<string, JobHandler | LegacyJobHandler>>(
jobMappings: T
) {
return async (job: Job, token?: string, signal?: AbortSignal) => {
const jobName = job.name
const jobFunction = jobMappings[jobName as keyof T]
if (!jobFunction) {
throw new Error(`Job function not found: ${jobName}`)
}
try {
const ctx = createJobContext(job, signal)
return await jobFunction(ctx as any)
} catch (error) {
if (error instanceof JobCancelledError) {
logger.info('Job cancelled gracefully', { jobId: job.id, jobName })
throw error
}
throw error
}
}
}
The JobContext gives handlers a clean interface instead of reaching into the raw BullMQ Job:
// packages/lib/src/jobs/types/job-context.ts
export interface JobContext<T = any> {
job: Job<T>
signal?: AbortSignal
data: T
jobId: string
jobName: string
updateProgress: (progress: number) => Promise<void>
log: (message: string) => Promise<void>
isCancelled: () => boolean
throwIfCancelled: () => void
getChildrenValues: () => Promise<Record<string, any>>
hasChildren: () => Promise<boolean>
}
export type JobHandler<T = any, R = any> = (ctx: JobContext<T>) => Promise<R>
A job handler function looks like this:
export async function sendEmailJob(ctx: JobContext<SendEmailJobData>) {
const { emailType, payload, meta } = ctx.data
ctx.throwIfCancelled()
// ... send the email
await ctx.log(`Email sent: ${emailType}`)
}
No BullMQ imports needed in the handler. No awareness of retries or queue configuration. Just receive data, do work, return a result.
The startWorkers function spins up all 20+ workers in parallel:
// apps/worker/src/workers/index.ts
export async function startWorkers() {
const eventsWorker = startEventsWorker()
const eventHandlersWorker = startEventHandlersWorker()
const maintenanceWorker = startMaintenanceWorker()
const webhooksWorker = startWebhooksWorker()
const messageSyncWorker = startMessageSyncWorker()
const shopifyWorker = startShopifyWorker()
const workflowDelayWorker = startWorkflowDelayWorker()
const scheduledTriggerWorker = startScheduledTriggerWorker()
const documentProcessingWorker = startDocumentProcessingWorker()
const datasetMaintenanceWorker = startDatasetMaintenanceWorker()
const datasetEmbeddingWorker = startDatasetEmbeddingWorker()
const thumbnailWorker = startThumbnailWorker()
const oauth2RefreshWorker = startOAuth2RefreshWorker()
const dataImportWorker = startDataImportWorker()
const pollingSyncWorker = startPollingSyncWorker()
const emailWorker = startEmailWorker()
const messageProcessingWorker = startMessageProcessingWorker()
const appTriggerWorker = startAppTriggerWorker()
const pollingTriggerWorker = startPollingTriggerWorker()
const aiAgentWorker = startAiAgentWorker()
return Promise.all([
eventsWorker,
eventHandlersWorker,
maintenanceWorker,
webhooksWorker,
// ... all workers
])
}
Each start*Worker() call creates a BullMQ Worker instance that immediately starts polling its queue. The Promise.all ensures all workers are ready before the health check server starts accepting traffic.
We ship Auxx.ai as both a SaaS product and a self-hosted open-source project. Some jobs only make sense in the SaaS context — billing sync, trial cleanup, lifecycle emails, demo seeding. We handle this with a simple wrapper:
// apps/worker/src/workers/worker-definitions/maintenance-worker.ts
function cloudOnly(handler: JobHandler): JobHandler {
return async (ctx) => {
if (isSelfHosted()) {
logger.info(`Skipping ${ctx.jobName} in self-hosted mode`)
return
}
return handler(ctx)
}
}
const jobMappings = {
// Always runs
orphanedFileCleanupJob,
deletedFileCleanupJob,
oauth2TokenRefreshScannerJob,
// Cloud-only
applyScheduledSubscriptionChangesJob: cloudOnly(applyScheduledSubscriptionChangesJob),
stripeSubscriptionSyncJob: cloudOnly(stripeSubscriptionSyncJob),
demoCleanupJob: cloudOnly(demoCleanupJob),
expiredTrialAccountCleanup: cloudOnly(expiredTrialAccountCleanupJob),
sendGettingStartedEmailsJob: cloudOnly(sendGettingStartedEmailsJob),
}
No if branches scattered across job handlers. No feature flags. The cloudOnly wrapper is defense in depth — the schedules for these jobs also check isSelfHosted() before being registered — but if a job somehow gets enqueued in self-hosted mode, it silently no-ops.
At this point we have a clear separation:
packages/lib/src/jobs/queues/ — shared between the worker and any app that needs to enqueue jobsapps/worker/src/workers/worker-definitions/ — one file per worker, plain object mappingspackages/lib/ — the actual business logic, no BullMQ awarenesscreateWorker, createJobHandler) live in apps/worker/src/workers/utils/ — the glueThe handler code doesn't know it's running inside BullMQ. It receives a JobContext, does its work, and returns. That means we can test handlers by constructing a mock context — no Redis needed.
In Part 2, we'll look at how jobs get dispatched, the factory pattern for scoped enqueuers, parent-child flows for multi-step pipelines, and the 25+ scheduled jobs that keep the system running.