Skip to content

Events

Durably provides an event system for monitoring job execution and extensibility.

Core event categories

Fifteen core event types are exposed as the DurablyEvent discriminated union. For filtering and typing, they are grouped into:

  • Domain (lifecycle facts)DomainEvent / DomainEventType: run:trigger, run:coalesced, run:complete, run:fail, run:cancel, run:delete
  • Operational (execution and diagnostics)OperationalEvent / OperationalEventType: run:leased, run:lease-renewed, run:progress, step:start, step:complete, step:fail, step:cancel, log:write, worker:error

The helper isDomainEvent(event) returns true when event.type is a domain event (no category field is added to emitted payloads).

ts
import { isDomainEvent, type DurablyEvent } from '@coji/durably'

// Filter domain events in a handler that receives mixed event types
function handleEvent(event: DurablyEvent) {
  if (isDomainEvent(event)) {
    // narrowed to DomainEvent — state transition facts only
    console.log(event.type, event.runId)
  }
}

Subscribing to Events

ts
durably.on(eventType: string, listener: (event) => void): void

Event Types

Run Events

run:trigger

Fired when a job is triggered (before worker picks it up).

ts
durably.on('run:trigger', (event) => {
  // event: {
  //   type: 'run:trigger',
  //   runId: string,
  //   jobName: string,
  //   input: unknown,
  //   labels: Record<string, string>,
  //   timestamp: string,
  //   sequence: number
  // }
})

run:coalesced

Fired when a trigger was coalesced onto an existing pending run (same concurrency key, with coalesce: 'skip'). Not fired for normal triggers or idempotent hits.

ts
durably.on('run:coalesced', (event) => {
  // event: {
  //   type: 'run:coalesced',
  //   runId: string,           // ID of the existing pending run
  //   jobName: string,
  //   labels: Record<string, string>,  // existing run's labels
  //   skippedInput: unknown,   // the new input that was NOT used
  //   skippedLabels: Record<string, string>, // the new labels that were NOT used
  //   timestamp: string,
  //   sequence: number
  // }
})

run:leased

Fired when a run acquires a lease and begins execution.

ts
durably.on('run:leased', (event) => {
  // event: {
  //   type: 'run:leased',
  //   runId: string,
  //   jobName: string,
  //   input: unknown,
  //   leaseOwner: string,
  //   leaseExpiresAt: string,
  //   labels: Record<string, string>,
  //   timestamp: string,
  //   sequence: number
  // }
})

run:lease-renewed

Fired when a run's lease is renewed during execution.

ts
durably.on('run:lease-renewed', (event) => {
  // event: {
  //   type: 'run:lease-renewed',
  //   runId: string,
  //   jobName: string,
  //   leaseOwner: string,
  //   leaseExpiresAt: string,
  //   labels: Record<string, string>,
  //   timestamp: string,
  //   sequence: number
  // }
})

run:complete

Fired when a run completes successfully.

ts
durably.on('run:complete', (event) => {
  // event: {
  //   type: 'run:complete',
  //   runId: string,
  //   jobName: string,
  //   output: unknown,
  //   duration: number,
  //   labels: Record<string, string>,
  //   timestamp: string,
  //   sequence: number
  // }
})

run:fail

Fired when a run fails.

ts
durably.on('run:fail', (event) => {
  // event: {
  //   type: 'run:fail',
  //   runId: string,
  //   jobName: string,
  //   error: string,
  //   failedStepName: string,
  //   labels: Record<string, string>,
  //   timestamp: string,
  //   sequence: number
  // }
})

run:progress

Fired when step.progress() is called.

ts
durably.on('run:progress', (event) => {
  // event: {
  //   type: 'run:progress',
  //   runId: string,
  //   jobName: string,
  //   progress: { current: number, total?: number, message?: string },
  //   labels: Record<string, string>,
  //   timestamp: string,
  //   sequence: number
  // }
})

run:cancel

Fired when a run is cancelled via cancel() API.

ts
durably.on('run:cancel', (event) => {
  // event: {
  //   type: 'run:cancel',
  //   runId: string,
  //   jobName: string,
  //   labels: Record<string, string>,
  //   timestamp: string,
  //   sequence: number
  // }
})

run:delete

Fired when a run is deleted via deleteRun() API.

ts
durably.on('run:delete', (event) => {
  // event: {
  //   type: 'run:delete',
  //   runId: string,
  //   jobName: string,
  //   labels: Record<string, string>,
  //   timestamp: string,
  //   sequence: number
  // }
})

Step Events

step:start

Fired when a step begins execution.

ts
durably.on('step:start', (event) => {
  // event: {
  //   type: 'step:start',
  //   runId: string,
  //   jobName: string,
  //   stepName: string,
  //   stepIndex: number,
  //   labels: Record<string, string>,
  //   timestamp: string,
  //   sequence: number
  // }
})

step:complete

Fired when a step completes successfully.

ts
durably.on('step:complete', (event) => {
  // event: {
  //   type: 'step:complete',
  //   runId: string,
  //   jobName: string,
  //   stepName: string,
  //   stepIndex: number,
  //   output: unknown,
  //   duration: number,
  //   labels: Record<string, string>,
  //   timestamp: string,
  //   sequence: number
  // }
})

step:fail

Fired when a step fails.

ts
durably.on('step:fail', (event) => {
  // event: {
  //   type: 'step:fail',
  //   runId: string,
  //   jobName: string,
  //   stepName: string,
  //   stepIndex: number,
  //   error: string,
  //   labels: Record<string, string>,
  //   timestamp: string,
  //   sequence: number
  // }
})

step:cancel

Fired when a step is cancelled (run was cancelled while step was executing).

ts
durably.on('step:cancel', (event) => {
  // event: {
  //   type: 'step:cancel',
  //   runId: string,
  //   jobName: string,
  //   stepName: string,
  //   stepIndex: number,
  //   labels: Record<string, string>,
  //   timestamp: string,
  //   sequence: number
  // }
})

Log Events

log:write

Fired when step.log methods are called.

ts
durably.on('log:write', (event) => {
  // event: {
  //   type: 'log:write',
  //   runId: string,
  //   jobName: string,
  //   labels: Record<string, string>,
  //   stepName: string | null,
  //   level: 'info' | 'warn' | 'error',
  //   message: string,
  //   data: unknown,
  //   timestamp: string,
  //   sequence: number
  // }
})

Worker Events

worker:error

Fired when an internal worker error occurs (e.g., lease renewal failure).

ts
durably.on('worker:error', (event) => {
  // event: {
  //   type: 'worker:error',
  //   error: string,
  //   context: string,  // e.g., 'lease-renewal'
  //   runId?: string,
  //   timestamp: string,
  //   sequence: number
  // }
})

Synchronous Execution

Listeners run synchronously

Event listeners are called synchronously in the worker's hot path. A slow listener will block job execution and lease renewal until it returns. Keep listeners fast and non-blocking.

Do:

ts
// Fast: queue work for later
durably.on('run:complete', (e) => {
  void sendToAnalytics(e) // fire-and-forget async
})

// Fast: simple logging
durably.on('run:fail', (e) => {
  console.error(`[${e.jobName}] Failed: ${e.error}`)
})

Don't:

ts
// Slow: synchronous heavy computation blocks the worker
durably.on('run:complete', (e) => {
  const report = generateExpensiveReport(e) // ❌ blocks polling
  fs.writeFileSync('report.json', JSON.stringify(report))
})

Error Handling

Exceptions thrown in event listeners are caught and forwarded to the error handler — they do not crash the worker, abort the current run, or interrupt subsequent listeners for the same event. If a listener returns a rejected Promise (async listener), the rejection is also forwarded to onError. Use onError to catch both:

ts
durably.onError((error, event) => {
  console.error('Listener error:', error, 'during event:', event.type)
})

Type Definitions

All events use a discriminated union pattern:

ts
interface BaseEvent {
  type: string
  timestamp: string
  sequence: number
}

type DurablyEvent =
  | RunTriggerEvent
  | RunLeasedEvent
  | RunLeaseRenewedEvent
  | RunCompleteEvent
  | RunFailEvent
  | RunCancelEvent
  | RunProgressEvent
  | StepStartEvent
  | StepCompleteEvent
  | StepFailEvent
  | StepCancelEvent
  | LogWriteEvent
  | WorkerErrorEvent

// Shared data types used by events and callbacks
interface ProgressData {
  current: number
  total?: number
  message?: string
}

interface LogData {
  level: 'info' | 'warn' | 'error'
  message: string
  data?: unknown
  stepName?: string | null
}

RunProgressEvent contains a progress: ProgressData field. LogWriteEvent extends LogData with additional event fields (runId, jobName, labels, etc.).

Both ProgressData and LogData are also used as callback parameter types in TriggerAndWaitOptions.

Example

ts
const durably = createDurably({ dialect })

// Log all events
durably.on('run:leased', (e) => {
  console.log(`[${e.jobName}] Run leased: ${e.runId}`)
})

durably.on('run:complete', (e) => {
  console.log(`[${e.jobName}] Run completed in ${e.duration}ms`)
})

durably.on('run:fail', (e) => {
  console.error(`[${e.jobName}] Run failed: ${e.error}`)
  // Send alert to monitoring service
  alertService.notify({
    title: `Job ${e.jobName} failed`,
    message: e.error,
    runId: e.runId,
  })
})

durably.on('step:complete', (e) => {
  console.log(`  Step "${e.stepName}" completed in ${e.duration}ms`)
})

// Handle listener errors
durably.onError((error, event) => {
  console.error('Event listener threw:', error)
})

Released under the MIT License.