Skip to content

Server Mode

Run batch jobs on Node.js without a frontend. Perfect for cron jobs, data pipelines, and CLI tools.

Example code: server-libsql

When to Use

  • Scheduled batch processing (cron)
  • Data import/export pipelines
  • CLI tools with resumable operations
  • Microservice background workers

Install

bash
pnpm add @coji/durably kysely zod @libsql/client @libsql/kysely-libsql

Project Structure

txt
├── jobs/
│   └── process-image.ts    # Job definition
├── lib/
│   ├── database.ts         # Database dialect
│   └── durably.ts          # Durably instance
└── main.ts                 # Entry point

Setup

Database

ts
// lib/database.ts
import { LibsqlDialect } from '@libsql/kysely-libsql'
import { createClient } from '@libsql/client'

const client = createClient({
  url: process.env.TURSO_DATABASE_URL ?? 'file:local.db',
  authToken: process.env.TURSO_AUTH_TOKEN,
})

export const dialect = new LibsqlDialect({ client })

Job Definition

ts
// jobs/process-image.ts
import { defineJob } from '@coji/durably'
import { z } from 'zod'

const delay = (ms: number) => new Promise((r) => setTimeout(r, ms))

export const processImageJob = defineJob({
  name: 'process-image',
  input: z.object({ filename: z.string() }),
  output: z.object({ url: z.string() }),
  run: async (step, input) => {
    const data = await step.run('download', async () => {
      step.progress(1, 3, 'Downloading...')
      await delay(500)
      return { size: 1024000 }
    })

    await step.run('resize', async () => {
      step.progress(2, 3, 'Resizing...')
      await delay(500)
      return { width: 800, height: 600, size: data.size / 2 }
    })

    const uploaded = await step.run('upload', async () => {
      step.progress(3, 3, 'Uploading...')
      await delay(500)
      return { url: `https://cdn.example.com/${input.filename}` }
    })

    return { url: uploaded.url }
  },
})

Durably Instance

ts
// lib/durably.ts
import { createDurably } from '@coji/durably'
import { processImageJob } from '../jobs/process-image'
import { dialect } from './database'

export const durably = createDurably({
  dialect,
  jobs: { processImage: processImageJob },
})

Basic Usage

triggerAndWait() queues a job and blocks until it finishes:

ts
// main.ts
import { durably } from './lib/durably'

await durably.init()

const { id, output } = await durably.jobs.processImage.triggerAndWait({
  filename: 'photo.jpg',
})
console.log(`Run ${id}: ${output.url}`)

await durably.stop()
await durably.db.destroy()

Waiting for Existing Runs

If you have a run ID from a previous trigger, use waitForRun() to wait for it without creating a new run:

ts
// Trigger first, wait later
const { id } = await durably.jobs.processImage.trigger({
  filename: 'photo.jpg',
})

// Wait for completion (e.g., from a different code path)
const run = await durably.waitForRun(id, { timeout: 60000 })
console.log('Done:', run.output)

When the run reaches a terminal state in this process, the wait settles immediately from the emitted event. If another worker or process completes, fails, or cancels the same run against the shared database, waitForRun() observes that outcome by polling storage; you can pass pollingIntervalMs on the wait options, or omit it to use the instance default from createDurably() (see createDurably).

This is useful when one job triggers another and the caller needs to wait for the child job's completion.

Event Monitoring

Subscribe to events for logging, metrics, or debugging:

ts
durably.on('run:leased', (e) => console.log(`[leased] ${e.jobName}`))
durably.on('step:complete', (e) => console.log(`[step] ${e.stepName}`))
durably.on('run:complete', (e) =>
  console.log(`[done] ${JSON.stringify(e.output)}`),
)
durably.on('run:fail', (e) => console.log(`[fail] ${e.error}`))

Cron Integration

Combine with node-cron for scheduled execution:

ts
import cron from 'node-cron'
import { durably } from './lib/durably'

await durably.init()

// Run every hour
cron.schedule('0 * * * *', async () => {
  await durably.jobs.processImage.trigger({ filename: 'scheduled.jpg' })
})

CLI with Progress

Build CLI tools with real-time progress:

ts
import { program } from 'commander'
import { durably } from './lib/durably'

program.command('process <filename>').action(async (filename) => {
  await durably.init()

  durably.on('run:progress', ({ progress }) => {
    process.stdout.write(
      `\r${progress.current}/${progress.total} - ${progress.message}`,
    )
  })

  const { output } = await durably.jobs.processImage.triggerAndWait({
    filename,
  })
  console.log(`\nDone: ${output.url}`)

  await durably.stop()
})

program.parse()

Idempotency & Concurrency

ts
// Prevent duplicate runs
await durably.jobs.processImage.trigger(
  { filename: 'photo.jpg' },
  { idempotencyKey: `process-${new Date().toISOString().slice(0, 10)}` },
)

// Only one job with this key runs at a time
await durably.jobs.processImage.trigger(
  { filename: 'photo.jpg' },
  { concurrencyKey: 'image-processing' },
)

Error Handling

Durably doesn't auto-retry. Check status and retrigger manually:

ts
const run = await durably.getRun(runId)

if (run?.status === 'failed') {
  const newRun = await durably.retrigger(runId) // Creates a fresh run
  console.log(`New run: ${newRun.id}`)
}

if (run?.status === 'leased') {
  await durably.cancel(runId)
}

See Error Handling & Retrigger for more patterns.

Next Steps

Released under the MIT License.