Dispatching Jobs, Flows, and Schedules with BullMQ (Part 2)

Dispatching Jobs, Flows, and Schedules with BullMQ (Part 2)
Markus Klooth
Markus Klooth
12 min read

How we enqueue jobs, build parent-child processing pipelines with FlowProducer, and manage 25+ cron schedules — with real code from our open-source codebase.

In Part 1, we covered how our worker app boots, how queues are defined, and the handler registry pattern. Now let's look at the other side — how jobs get enqueued, how we handle multi-step pipelines, and how scheduled jobs are managed.

Dispatching jobs

The simplest way to enqueue a job is calling queue.add():

import { getQueue, Queues } from '@auxx/lib/jobs/queues'

const queue = getQueue(Queues.thumbnailQueue)
await queue.add('generateThumbnailJob', {
  fileId: 'file_123',
  organizationId: 'org_456',
  width: 200,
  height: 200,
})

The first argument is the job name — it must match a key in the worker definition's mappings object. The second is the payload. You can optionally pass job options as a third argument to override the queue's defaults.

This pattern works fine for one-off dispatches. But when you're enqueuing the same type of job from multiple places with shared context, it gets repetitive.

The enqueuer factory pattern

For email jobs, we use a factory that pre-fills context fields:

// packages/lib/src/jobs/email/enqueue-email-job.ts

const DEFAULT_EMAIL_JOB_OPTIONS: JobsOptions = {
  attempts: 3,
  backoff: { type: 'exponential', delay: 5000 },
  removeOnComplete: { count: 500 },
  removeOnFail: { count: 2000 },
}

export async function enqueueEmailJob<T extends EmailType>(
  emailType: T,
  data: EmailPayloadByType[T] & EmailEnqueueContext,
  options?: JobsOptions
) {
  const { actorId, source, organizationId, requestId, idempotencyKey, ...payload } = data
  const queue = getQueue(Queues.emailQueue)
  const jobId = idempotencyKey ? `email-${emailType}-${idempotencyKey}` : undefined

  const jobData: SendEmailJobData<T> = {
    emailType,
    payload: payload as EmailPayloadByType[T],
    meta: { actorUserId: actorId, source, organizationId, requestId, idempotencyKey },
  }

  return queue.add('sendEmailJob', jobData, {
    ...DEFAULT_EMAIL_JOB_OPTIONS,
    ...options,
    jobId,
  })
}

A few things here worth noting.

Type-safe payloads. The EmailType generic ensures the payload matches the email type. You can't pass an invitation payload to a password reset email — TypeScript catches it at compile time.

Idempotency keys. If you pass an idempotencyKey, it becomes the BullMQ job ID. BullMQ deduplicates jobs with the same ID — if the job already exists, the add() call is a no-op. This prevents double-sends when a webhook fires twice or a user double-clicks.

Queue-specific retry settings. Email jobs use 3 attempts with 5-second exponential backoff instead of the global 5 attempts with 1-second backoff. Email failures tend to be transient API issues — we want to wait longer between retries but not retry as many times.

Job retention. Instead of the global removeOnComplete: true, email jobs keep the last 500 completed and 2,000 failed jobs. This gives us a window to inspect recent email delivery without jobs disappearing immediately.

The factory function layers on top of this:

export function createEmailEnqueuer(base: Omit<EmailEnqueueContext, 'idempotencyKey'>) {
  return async function enqueueScopedEmailJob<T extends EmailType>(
    emailType: T,
    payload: EmailPayloadByType[T],
    params?: { options?: JobsOptions; idempotencyKey?: string }
  ) {
    return enqueueEmailJob(emailType, { ...payload, ...base, idempotencyKey: params?.idempotencyKey }, params?.options)
  }
}

Now a tRPC router or service can create a scoped enqueuer once and use it throughout:

const enqueueEmail = createEmailEnqueuer({
  actorId: ctx.session.userId,
  source: 'ticket-reply',
  organizationId: ctx.session.defaultOrganizationId,
})

// Later — no need to pass context again
await enqueueEmail('ticketReply', { ticketId, recipientEmail, body })
await enqueueEmail('ticketReplyConfirmation', { ticketId, agentEmail })

Parent-child flows with FlowProducer

Some operations are inherently multi-step. When a user uploads a document to a knowledge base, we need to:

  1. Parse and chunk the document into segments
  2. Generate vector embeddings for each segment (in parallel batches)
  3. Finalize the document — mark it as ready, update stats, optionally resume a paused workflow

Steps 2 and 3 have a dependency: finalization can't run until all embedding batches complete. This is exactly what BullMQ's FlowProducer is built for.

The FlowProducer singleton

// packages/lib/src/jobs/queues/flow-producer.ts

let flowProducerInstance: FlowProducer | null = null

export function getFlowProducer(): FlowProducer {
  if (!flowProducerInstance) {
    flowProducerInstance = new FlowProducer({
      connection: getConnectionOptions(),
    })

    flowProducerInstance.on('error', (error) => {
      logger.error('FlowProducer error', { error: error.message })
    })
  }
  return flowProducerInstance
}

Same singleton pattern as the queue cache. One connection, shared across the process.

Type-safe flow definitions

We define a FlowJobDefinition interface that maps cleanly to BullMQ's FlowJob but uses our Queues enum:

export interface FlowJobDefinition<T = any> {
  name: string
  queue: Queues
  data: T
  opts?: FlowOpts
  children?: FlowJobDefinition[]
}

function toFlowJob(def: FlowJobDefinition): FlowJob {
  return {
    name: def.name,
    queueName: def.queue,
    data: def.data,
    opts: def.opts,
    children: def.children?.map(toFlowJob),
  }
}

export async function addFlow(flow: FlowJobDefinition) {
  const producer = getFlowProducer()
  const flowJob = toFlowJob(flow)
  return producer.add(flowJob)
}

The addFlow call is atomic — the parent and all children are created in a single Redis transaction. Either the entire flow exists or none of it does.

The document processing flow

Here's how the document processing pipeline uses flows:

// packages/lib/src/jobs/flows/document-processing-flow.ts

export async function createDocumentProcessingFlow(params: {
  documentId: string
  datasetId: string
  organizationId: string
  segments: Array<{ segmentId: string; content: string }>
  batchSize?: number
  workflowResume?: WorkflowResumeInfo
}) {
  const { documentId, datasetId, organizationId, segments, batchSize = 20, workflowResume } = params

  if (segments.length === 0) return null

  // Create embedding batch children
  const embeddingChildren: FlowJobDefinition<FlowEmbeddingGenerationJobData>[] = []

  for (let i = 0; i < segments.length; i += batchSize) {
    const batch = segments.slice(i, i + batchSize)
    const batchIndex = Math.floor(i / batchSize)

    embeddingChildren.push({
      name: DocumentFlowJobs.GENERATE_EMBEDDINGS,
      queue: Queues.embeddingQueue,
      data: {
        organizationId,
        datasetId,
        documentId,
        segmentIds: batch.map((s) => s.segmentId),
        batchIndex,
        totalBatches: Math.ceil(segments.length / batchSize),
        totalSegments: segments.length,
      },
      opts: {
        attempts: 3,
        backoff: { type: 'exponential', delay: 2000 },
        removeOnComplete: { age: 300 },
        removeOnFail: false,
      },
    })
  }

  // Parent: finalize after all embeddings complete
  const flow: FlowJobDefinition<FinalizeDocumentJobData> = {
    name: DocumentFlowJobs.FINALIZE_DOCUMENT,
    queue: Queues.documentProcessingQueue,
    data: {
      documentId,
      datasetId,
      organizationId,
      totalSegments: segments.length,
      startedAt: Date.now(),
      workflowResume,
    },
    children: embeddingChildren,
  }

  return addFlow(flow)
}

The flow structure looks like this:

finalize-document (parent, documentProcessingQueue)
  ├── generate-embeddings-batch #0 (embeddingQueue)
  ├── generate-embeddings-batch #1 (embeddingQueue)
  ├── generate-embeddings-batch #2 (embeddingQueue)
  └── generate-embeddings-batch #N (embeddingQueue)

BullMQ guarantees the parent won't run until every child completes. The children run in parallel on the embedding queue (concurrency 3). When they're all done, the finalize job runs on the document processing queue.

A few details:

Cross-queue flows. Children run on embeddingQueue, the parent runs on documentProcessingQueue. Each queue has its own workers with different concurrency. The flow spans both.

Batch metadata. Each child gets batchIndex and totalBatches so the handler can report meaningful progress. "Processing batch 3 of 12" is more useful than "processing."

Workflow resume. The workflowResume field is optional. When a workflow node uploads a document and wants to wait for embeddings before continuing, it passes resume info. The finalize job checks this and triggers workflow continuation after embeddings complete.

Failed children don't retry the parent. If an embedding batch fails after all retries, the parent stays in a waiting state. We keep failed children (removeOnFail: false) so we can inspect what went wrong. The parent can be retried manually once the issue is fixed.

Accessing child results from the parent

The finalize job can read what its children produced:

export async function finalizeDocumentJob(ctx: JobContext<FinalizeDocumentJobData>) {
  const hasChildren = await ctx.hasChildren()
  if (hasChildren) {
    const childResults = await ctx.getChildrenValues()
    // childResults is a Record<string, any> keyed by child job ID
  }

  // Mark document as ready, update stats, resume workflow...
}

This is wrapped in the JobContext interface, so handlers don't interact with BullMQ's job API directly.

Scheduled jobs

We have 25+ recurring jobs registered via BullMQ's upsertJobScheduler. They're all set up in a single setupSchedules() function that runs during worker boot.

Cron-based schedules

// apps/worker/src/workers/index.ts

export async function setupSchedules() {
  const maintenanceQueue = getQueue(Queues.maintenanceQueue)

  // Every hour — clean up orphaned files
  await maintenanceQueue.upsertJobScheduler(
    'orphanedFileCleanupJob',
    { pattern: '0 * * * *' },
    {
      data: { batchSize: 100, dryRun: false },
      opts: {
        attempts: 3,
        backoff: { type: 'exponential', delay: 60000 },
        priority: 10,
      },
    }
  )

  // Every day at 2 AM — clean up soft-deleted files
  await maintenanceQueue.upsertJobScheduler(
    'deletedFileCleanupJob',
    { pattern: '0 2 * * *' },
    {
      data: { batchSize: 200, dryRun: false },
      opts: { attempts: 3, backoff: { type: 'exponential', delay: 60000 }, priority: 10 },
    }
  )

  // Every 15 minutes — scan for OAuth2 tokens needing refresh
  await maintenanceQueue.upsertJobScheduler(
    'oauth2TokenRefreshScannerJob',
    { pattern: '*/15 * * * *' },
    {
      data: { dryRun: false, batchSize: 50 },
      opts: {
        attempts: 3,
        backoff: { type: 'exponential', delay: 60000 },
        priority: 8,
        removeOnComplete: { count: 10 },
        removeOnFail: { count: 50 },
      },
    }
  )

  // ... 20+ more schedules
}

upsertJobScheduler is idempotent — calling it with the same scheduler name updates the existing schedule instead of creating duplicates. This means we can safely restart the worker without accumulating zombie schedules.

Interval-based schedules

Some jobs use interval timing instead of cron patterns. The polling sync pipeline uses configurable intervals:

const pollingSyncQueue = getQueue(Queues.pollingSyncQueue)

const messageListFetchIntervalMs = Number.parseInt(
  process.env.SYNC_MESSAGE_LIST_FETCH_INTERVAL_MS ??
    String(constants.timing.pollingSync.messageListFetchIntervalMs),
  10
)

// Scan for integrations needing sync (default: every 5 min)
await pollingSyncQueue.upsertJobScheduler(
  'pollingSyncScannerJob',
  { every: messageListFetchIntervalMs },
  {
    data: { dryRun: false },
    opts: {
      attempts: 3,
      backoff: { type: 'exponential', delay: 60000 },
      priority: 5,
      removeOnComplete: { count: 20 },
      removeOnFail: { count: 50 },
    },
  }
)

// Check for stuck jobs (default: every 15 min)
await pollingSyncQueue.upsertJobScheduler(
  'pollingStaleCheckJob',
  { every: staleCheckIntervalMs },
  {
    data: { staleThresholdMs: 900000 },
    opts: { attempts: 1, priority: 10, removeOnComplete: { count: 20 } },
  }
)

// Relaunch failed polling integrations (default: every 30 min)
await pollingSyncQueue.upsertJobScheduler(
  'pollingRelaunchFailedJob',
  { every: relaunchFailedIntervalMs },
  { data: {}, opts: { attempts: 1, priority: 10 } },
)

The intervals are configurable via environment variables, with sensible defaults from the config package. In development you might want faster polling (1 minute). In production with many integrations, 5 minutes prevents overloading email provider APIs.

Jitter on cron schedules

Thumbnail cleanup schedules include random jitter to prevent thundering herd:

const thumbnailJitter = Math.floor(Math.random() * 5)
await maintenanceQueue.upsertJobScheduler(
  'thumbnailCleanupJob',
  { pattern: `${thumbnailJitter} 3 * * *` },
  { ... }
)

Instead of every instance firing at exactly 0 3 * * *, each gets a random minute between 0-4. This spreads the load across a 5-minute window. Not critical with a single worker instance, but good practice for when we scale horizontally.

Cloud-only schedules

Billing, trial, and lifecycle email jobs are only scheduled in the SaaS deployment:

if (!isSelfHosted()) {
  // Every hour at :15 — apply scheduled subscription changes
  await maintenanceQueue.upsertJobScheduler(
    'applyScheduledSubscriptionChangesJob',
    { pattern: '15 * * * *' },
    { ... }
  )

  // Every day at 3 AM UTC — clean up expired trial accounts
  await maintenanceQueue.upsertJobScheduler(
    'expiredTrialAccountCleanup',
    { pattern: '0 3 * * *', tz: 'UTC' },
    {
      data: { dryRun: false, gracePeriodDays: 14, batchSize: 10, sendNotifications: true },
      ...
    }
  )

  // Lifecycle emails — getting started, mid-trial, conversion
  await maintenanceQueue.upsertJobScheduler('sendGettingStartedEmailsJob', ...)
  await maintenanceQueue.upsertJobScheduler('sendMidTrialEmailsJob', ...)
  await maintenanceQueue.upsertJobScheduler('sendTrialConversionEmailsJob', ...)
}

This is the same isSelfHosted() check used in the cloudOnly() handler wrapper from Part 1. Two layers of protection — the schedule doesn't get registered, and even if a job somehow gets enqueued, the handler no-ops.

The full schedule map

Here's every scheduled job in the system:

JobScheduleQueueNotes
orphanedFileCleanupJobhourlymaintenancebatch: 100
deletedFileCleanupJob2 AMmaintenancebatch: 200
storageQuotaCheckJob4 AMmaintenance
cleanupExpiredMediaAssetsJobhourlymaintenance1h expiry
thumbnailCleanupJob3 AM ±5minmaintenancejitter
thumbnailVersionCleanupJobSun 4 AM ±5minmaintenanceweekly, jitter
orphanedAppBundleCleanupJob5 AMmaintenanceS3 + DB
oauth2TokenRefreshScannerJobevery 15 minmaintenancepriority: 8
integrationTokenRefreshScannerJobevery 15 minmaintenanceGmail/Outlook
quotaResetJob1 AM UTCmaintenanceAI quotas
requestDocumentSuggestionsJob8 AMmaintenance
datasetOrphanedDataCleanup3 AMdataset-maintenance
pollingSyncScannerJobevery 5 minpolling-syncconfigurable
messagesImportScannerJobevery 1 minpolling-syncconfigurable
pollingStaleCheckJobevery 15 minpolling-syncstuck detection
pollingRelaunchFailedJobevery 30 minpolling-syncself-healing
applyScheduledSubscriptionChangesJobhourly at :15maintenancecloud-only
stripeSubscriptionSyncJobhourly at :45maintenancecloud-only
demoCleanupJobevery 15 minmaintenancecloud-only
expiredTrialAccountCleanup3 AM UTCmaintenancecloud-only, 14-day grace
sendGettingStartedEmailsJobevery 30 minmaintenancecloud-only
sendMidTrialEmailsJob10 AM UTCmaintenancecloud-only
sendTrialConversionEmailsJob10 AM UTCmaintenancecloud-only

Most heavy cleanup jobs run between 2-5 AM to avoid competing with user-triggered jobs during business hours. Token refresh scanners run every 15 minutes because OAuth2 tokens typically expire in 1 hour — checking every 15 minutes gives us 3-4 chances to refresh before expiry.

What we covered

This post walked through three patterns:

  1. Direct dispatch for simple job enqueuing, with factory functions for scoped context
  2. FlowProducer for parent-child dependencies where a finalization step must wait for parallel processing to complete
  3. Scheduled jobs via upsertJobScheduler for cron-based and interval-based recurring work

In Part 3, we'll cover the parts that keep this system running in production — error handling and retry strategies, lock renewal failure recovery, queue monitoring and metrics, and graceful shutdown.