Skip to main content
Logo for PG Boss

PG Boss

The pg-boss runtime provides PostgreSQL-based job queue processing with robust persistence, transactions, and reliability features.

Live Example

Overview

pg-boss provides:

  • PostgreSQL-backed queues for ACID-compliant job processing
  • Built-in persistence with automatic table creation
  • Job retries with configurable limits
  • Batch processing for high-throughput scenarios
  • Graceful shutdown with proper cleanup
  • Job results tracking with full status monitoring

Quick Start

1. Create a New Project

Create a new Pikku project with pg-boss support:

npm create pikku@latest

Select pg-boss as your runtime option during setup.

Project Structure

After creating your project, you'll have these key files:

Queue Worker Functions

Define your job processing logic (same as other queue systems):

queue.functions.ts
loading...

Queue Worker Registration

Register workers with specific queues:

queue.wiring.ts
loading...

pg-boss Runtime Server

The main server that processes jobs using PostgreSQL:

start.ts
loading...

Setup Guide

Complete setup instructions:

README.md
loading...

How It Works

  1. Database Connection: Connects to PostgreSQL using DATABASE_URL
  2. Auto Schema: Creates necessary tables automatically on startup
  3. Job Processing: Polls PostgreSQL for new jobs and distributes to workers
  4. ACID Compliance: All job operations are transactional

Configuration

Database Connection

Set your PostgreSQL connection string:

export DATABASE_URL="postgres://user:password@localhost:5432/your_database"

Worker Configuration

Configure job processing behavior:

  • batchSize: Number of jobs to process in parallel
  • pollInterval: How often to poll for new jobs (milliseconds)

Unsupported Options

Some configurations are managed by pg-boss internally:

  • Worker names (uses queue names instead)
  • Lock duration (handled by PostgreSQL)
  • Visibility timeout (uses PostgreSQL locks)

Database Requirements

  • PostgreSQL 9.5+ required
  • Automatic setup: pg-boss creates tables on first run
  • No migrations needed: Schema is managed automatically

Job Processing

Queue workers are standard Pikku functions that:

  • Accept typed job data from PostgreSQL
  • Return typed results stored in the database
  • Handle errors with automatic retries
  • Support long-running operations with transaction safety

Monitoring

pg-boss provides comprehensive monitoring:

  • Job status in PostgreSQL tables
  • Built-in job completion tracking
  • Failed job inspection and retry
  • Database-level monitoring with SQL queries

Error Handling

Failed jobs are automatically retried with PostgreSQL-backed persistence:

  • Configurable retry limits
  • Exponential backoff support
  • Dead letter queue functionality
  • Full error logging and tracking

PgBossServiceFactory API

The PgBossServiceFactory from @pikku/queue-pg-boss manages the lifecycle of all pg-boss components:

import { PgBossServiceFactory } from '@pikku/queue-pg-boss'

const pgBossFactory = new PgBossServiceFactory({
connectionString: process.env.DATABASE_URL!,
})
await pgBossFactory.init()

Key Methods

MethodReturnsDescription
getQueueService()QueueServicePublishes jobs to queues
getQueueWorkers()QueueWorkersProcesses jobs from queues
getSchedulerService()SchedulerServiceManages scheduled/recurring tasks
init()Promise<void>Initializes pg-boss and creates tables
close()Promise<void>Shuts down pg-boss gracefully

Worker Registration

import { registerQueueWorkers } from '@pikku/core/queue'

const queueWorkers = pgBossFactory.getQueueWorkers()
registerQueueWorkers(queueWorkers, singletonServices, createWireServices)

Full Setup with Workflows

import { PgBossServiceFactory } from '@pikku/queue-pg-boss'
import { registerQueueWorkers } from '@pikku/core/queue'
import { createSchedulerRuntimeHandlers } from '@pikku/core/scheduler'

const pgBossFactory = new PgBossServiceFactory({
connectionString: process.env.DATABASE_URL!,
})
await pgBossFactory.init()

const singletonServices = await createSingletonServices(config, {
queueService: pgBossFactory.getQueueService(),
})

// Register queue workers
const queueWorkers = pgBossFactory.getQueueWorkers()
registerQueueWorkers(queueWorkers, singletonServices, createWireServices)

// Set up scheduler
const schedulerHandlers = createSchedulerRuntimeHandlers(
singletonServices,
createWireServices
)
const schedulerService = pgBossFactory.getSchedulerService()
schedulerService.setHandlers(schedulerHandlers)
await schedulerService.start()

Graceful Shutdown

process.on('SIGTERM', async () => {
await schedulerService.stop()
await pgBossFactory.close()
process.exit(0)
})

Advantages

  • ACID Compliance: Full transaction support
  • No External Dependencies: Uses existing PostgreSQL infrastructure
  • Reliability: Database-backed persistence
  • Monitoring: Standard SQL queries for job inspection
  • Scalability: Horizontal scaling with multiple workers