PG Boss
The pg-boss runtime provides PostgreSQL-based job queue processing with robust persistence, transactions, and reliability features.
Live Example
Overview
pg-boss provides:
- PostgreSQL-backed queues for ACID-compliant job processing
- Built-in persistence with automatic table creation
- Job retries with configurable limits
- Batch processing for high-throughput scenarios
- Graceful shutdown with proper cleanup
- Job results tracking with full status monitoring
Quick Start
1. Create a New Project
Create a new Pikku project with pg-boss support:
npm create pikku@latest
Select pg-boss as your runtime option during setup.
Project Structure
After creating your project, you'll have these key files:
Queue Worker Functions
Define your job processing logic (same as other queue systems):
loading...
Queue Worker Registration
Register workers with specific queues:
loading...
pg-boss Runtime Server
The main server that processes jobs using PostgreSQL:
loading...
Setup Guide
Complete setup instructions:
loading...
How It Works
- Database Connection: Connects to PostgreSQL using
DATABASE_URL - Auto Schema: Creates necessary tables automatically on startup
- Job Processing: Polls PostgreSQL for new jobs and distributes to workers
- ACID Compliance: All job operations are transactional
Configuration
Database Connection
Set your PostgreSQL connection string:
export DATABASE_URL="postgres://user:password@localhost:5432/your_database"
Worker Configuration
Configure job processing behavior:
- batchSize: Number of jobs to process in parallel
- pollInterval: How often to poll for new jobs (milliseconds)
Unsupported Options
Some configurations are managed by pg-boss internally:
- Worker names (uses queue names instead)
- Lock duration (handled by PostgreSQL)
- Visibility timeout (uses PostgreSQL locks)
Database Requirements
- PostgreSQL 9.5+ required
- Automatic setup: pg-boss creates tables on first run
- No migrations needed: Schema is managed automatically
Job Processing
Queue workers are standard Pikku functions that:
- Accept typed job data from PostgreSQL
- Return typed results stored in the database
- Handle errors with automatic retries
- Support long-running operations with transaction safety
Monitoring
pg-boss provides comprehensive monitoring:
- Job status in PostgreSQL tables
- Built-in job completion tracking
- Failed job inspection and retry
- Database-level monitoring with SQL queries
Error Handling
Failed jobs are automatically retried with PostgreSQL-backed persistence:
- Configurable retry limits
- Exponential backoff support
- Dead letter queue functionality
- Full error logging and tracking
PgBossServiceFactory API
The PgBossServiceFactory from @pikku/queue-pg-boss manages the lifecycle of all pg-boss components:
import { PgBossServiceFactory } from '@pikku/queue-pg-boss'
const pgBossFactory = new PgBossServiceFactory({
connectionString: process.env.DATABASE_URL!,
})
await pgBossFactory.init()
Key Methods
| Method | Returns | Description |
|---|---|---|
getQueueService() | QueueService | Publishes jobs to queues |
getQueueWorkers() | QueueWorkers | Processes jobs from queues |
getSchedulerService() | SchedulerService | Manages scheduled/recurring tasks |
init() | Promise<void> | Initializes pg-boss and creates tables |
close() | Promise<void> | Shuts down pg-boss gracefully |
Worker Registration
import { registerQueueWorkers } from '@pikku/core/queue'
const queueWorkers = pgBossFactory.getQueueWorkers()
registerQueueWorkers(queueWorkers, singletonServices, createWireServices)
Full Setup with Workflows
import { PgBossServiceFactory } from '@pikku/queue-pg-boss'
import { registerQueueWorkers } from '@pikku/core/queue'
import { createSchedulerRuntimeHandlers } from '@pikku/core/scheduler'
const pgBossFactory = new PgBossServiceFactory({
connectionString: process.env.DATABASE_URL!,
})
await pgBossFactory.init()
const singletonServices = await createSingletonServices(config, {
queueService: pgBossFactory.getQueueService(),
})
// Register queue workers
const queueWorkers = pgBossFactory.getQueueWorkers()
registerQueueWorkers(queueWorkers, singletonServices, createWireServices)
// Set up scheduler
const schedulerHandlers = createSchedulerRuntimeHandlers(
singletonServices,
createWireServices
)
const schedulerService = pgBossFactory.getSchedulerService()
schedulerService.setHandlers(schedulerHandlers)
await schedulerService.start()
Graceful Shutdown
process.on('SIGTERM', async () => {
await schedulerService.stop()
await pgBossFactory.close()
process.exit(0)
})
Advantages
- ACID Compliance: Full transaction support
- No External Dependencies: Uses existing PostgreSQL infrastructure
- Reliability: Database-backed persistence
- Monitoring: Standard SQL queries for job inspection
- Scalability: Horizontal scaling with multiple workers