Pikku PG-Boss Queue Runtime
This skill helps you set up background queue workers using PG-Boss and PostgreSQL for reliable job processing.
When to use this skillβ
- PostgreSQL-based job queue (no Redis required)
- Already using PostgreSQL database
- Need job persistence and durability
- Job result tracking and monitoring
- Job priorities and delays
- Automatic retries with database-backed state
- Simpler infrastructure (one less service)
- ACID transaction guarantees for jobs
- Built-in job archival and cleanup
vs BullMQ: PG-Boss uses PostgreSQL instead of Redis. Choose PG-Boss if you prefer PostgreSQL or want to avoid Redis. Choose BullMQ for higher throughput and push-based delivery.
Quick Setupβ
Prerequisites: See pikku-project-setup for project structure detection and common setup patterns.
1. Install Packagesβ
npm install @pikku/queue-pg-boss @pikku/core pg-boss
2. Create Worker Fileβ
Standalone: Create src/start.ts
based on templates/pg-boss/src/start.ts
Workspace: Create worker file importing from functions package
Key imports:
- Import bootstrap from queue subdirectory (see pikku-project-setup for queue bootstrap paths)
- Standalone:
./.pikku/queue/pikku-bootstrap-queue.gen.js
- Workspace:
@my-app/functions/.pikku/queue/pikku-bootstrap-queue.gen.js
- Import
PgBossQueueWorkers
from@pikku/queue-pg-boss
- Import config, services, and session factory
3. Configure PostgreSQL Connectionβ
const connectionString =
process.env.DATABASE_URL ||
'postgres://postgres:password@localhost:5432/pikku_queue'
const pgBossQueueWorkers = new PgBossQueueWorkers(
connectionString,
singletonServices,
createSessionServices
)
Critical: Call await init()
before registerQueues()
to initialize PG-Boss and create database tables.
4. Setup Queue Service (for enqueuing)β
Add PgBossQueueService
to singleton services in your HTTP/channel handlers:
import { PgBossQueueService } from '@pikku/queue-pg-boss'
const queue = new PgBossQueueService(process.env.DATABASE_URL)
5. Update Package.json Scriptsβ
See pikku-project-setup for complete script patterns. Queue workers use same scripts as Express/Fastify.
6. Generate & Verifyβ
# Start PostgreSQL (Docker)
docker run -d -p 5432:5432 -e POSTGRES_PASSWORD=password -e POSTGRES_DB=pikku_queue postgres:16-alpine
# Generate wiring (if applicable to your project type)
npm run pikku
# Start worker
npm run dev
# Verify worker is processing (check logs and database)
Expected outcome: Worker starts, connects to PostgreSQL, creates PG-Boss tables on first run, registers queue processors, begins processing jobs. Jobs added via queue.add()
are processed by workers.
Installationβ
npm install @pikku/queue-pg-boss @pikku/core pg-boss
Setupβ
Standalone Projectβ
For standalone projects where functions are in the same package.
Example: templates/pg-boss/src/start.ts
Pattern:
import { PgBossQueueWorkers } from '@pikku/queue-pg-boss'
import {
createConfig,
createSingletonServices,
createSessionServices,
} from './services.js'
import './.pikku/queue/pikku-bootstrap-queue.gen.js'
async function main(): Promise<void> {
const config = await createConfig()
const singletonServices = await createSingletonServices(config)
singletonServices.logger.info('Starting PG-Boss queue workers...')
// Use DATABASE_URL environment variable or connection string
const connectionString =
process.env.DATABASE_URL ||
'postgres://postgres:password@localhost:5432/pikku_queue'
const pgBossQueueWorkers = new PgBossQueueWorkers(
connectionString,
singletonServices,
createSessionServices
)
// Initialize pg-boss (creates tables if needed)
await pgBossQueueWorkers.init()
// Register queue processors
await pgBossQueueWorkers.registerQueues()
// Handle graceful shutdown
process.on('SIGTERM', async () => {
singletonServices.logger.info('Shutting down gracefully...')
await pgBossQueueWorkers.close()
process.exit(0)
})
process.on('SIGINT', async () => {
singletonServices.logger.info('Shutting down gracefully...')
await pgBossQueueWorkers.close()
process.exit(0)
})
}
main()
Key points:
- Import bootstrap from
./.pikku/queue/pikku-bootstrap-queue.gen.js
(note/queue/
directory) - Create
PgBossQueueWorkers
with connection string, services, and session factory - Call
await init()
to start pg-boss (creates database tables) - Call
await registerQueues()
to start processing - Handle SIGTERM/SIGINT for graceful shutdown
Workspace Projectβ
Backend imports functions from the functions package.
Pattern:
import { PgBossQueueWorkers } from '@pikku/queue-pg-boss'
import {
createConfig,
createSingletonServices,
createSessionServices,
} from '@my-app/functions/src/services'
import '@my-app/functions/.pikku/queue/pikku-bootstrap-queue.gen.js'
async function main(): Promise<void> {
const config = await createConfig()
const singletonServices = await createSingletonServices(config)
const pgBossQueueWorkers = new PgBossQueueWorkers(
process.env.DATABASE_URL!,
singletonServices,
createSessionServices
)
await pgBossQueueWorkers.init()
await pgBossQueueWorkers.registerQueues()
// ... graceful shutdown handlers ...
}
main()
Key differences:
- Import config/services from functions package
- Import bootstrap from functions:
@my-app/functions/.pikku/queue/pikku-bootstrap-queue.gen.js
- No custom filtering support for queue workers
PostgreSQL Configurationβ
PG-Boss requires PostgreSQL connection configuration.
Connection string:
const connectionString = 'postgres://user:password@host:port/database'
const pgBossQueueWorkers = new PgBossQueueWorkers(
connectionString,
singletonServices,
createSessionServices
)
Connection options object:
import PgBoss from 'pg-boss'
const options: PgBoss.ConstructorOptions = {
connectionString: process.env.DATABASE_URL,
// Connection pool settings
max: 20, // Max connections in pool
// Application name for monitoring
application_name: 'pikku-queue-worker',
// Archival settings (automatic job cleanup)
archiveCompletedAfterSeconds: 60 * 60 * 24, // Archive completed jobs after 1 day
deleteAfterDays: 7, // Delete archived jobs after 7 days
// Maintenance settings
maintenanceIntervalMinutes: 15, // Run maintenance every 15 minutes
}
const pgBossQueueWorkers = new PgBossQueueWorkers(
options,
singletonServices,
createSessionServices
)
Database setup:
PG-Boss automatically creates required tables on init()
. No manual schema setup needed.
Production tips:
- Use connection pooling (adjust
max
based on workload) - Configure archival to prevent database bloat
- Enable SSL for secure connections
- Set appropriate timeout values
- Monitor database size and query performance
Queue Service (Enqueuing Jobs)β
Use PgBossQueueService
to add jobs to queues from your HTTP/channel handlers.
Setup in services:
import { PgBossQueueService } from '@pikku/queue-pg-boss'
import type { QueueService } from '@pikku/core/queue'
export const createSingletonServices = async (config: Config) => {
const queue: QueueService = new PgBossQueueService(process.env.DATABASE_URL)
return {
queue,
logger,
// ... other services
}
}
Adding jobs:
// In your Pikku function
await services.queue.add('emailQueue', {
to: 'user@example.com',
subject: 'Welcome!',
body: 'Thanks for signing up',
})
// With options
await services.queue.add('emailQueue', data, {
priority: 1, // Higher priority = processed first
delay: 5000, // Delay 5 seconds before processing
attempts: 3, // Retry up to 3 times
jobId: 'unique-id', // Deduplicate jobs
})
Job options:
priority
: Job priority (lower number = higher priority)delay
: Delay in milliseconds before processingattempts
: Number of retry attemptsjobId
: Custom job ID for deduplication
See: pikku-queue skill for queue function definitions and enqueue patterns.
Worker Configurationβ
Configure worker behavior using workerConfig
in your queue function definition.
Example:
import { defineQueue } from '@pikku/core/queue'
export const sendEmailQueue = defineQueue({
func: sendEmail,
queueName: 'emailQueue',
workerConfig: {
batchSize: 5, // Process 5 jobs in a batch
pollInterval: 2000, // Poll every 2 seconds
},
})
Worker config options:
Option | Description | PG-Boss Mapping |
---|---|---|
batchSize | Jobs to process in a batch | batchSize |
pollInterval | Polling interval (ms) | pollingIntervalSeconds (converts ms to seconds) |
Unsupported options (ignored):
name
: PG-Boss identifies workers by queue nameautorun
: Always enabled in PG-BosslockDuration
: Managed by job-level expirationdrainDelay
: Handled internallymaxStalledCount
: Managed by retry mechanismprefetch
: Managed internallyvisibilityTimeout
: Uses PostgreSQL locks instead
Fallback options (managed by PG-Boss):
removeOnComplete
: Managed by archival system (see archiveCompletedAfterSeconds)removeOnFail
: Managed by archival system
Job Lifecycleβ
Job states:
- created: Job added to queue
- active: Job being processed
- completed: Job finished successfully
- failed: Job failed after all retries
- retry: Job scheduled for retry
- expired: Job exceeded time limit
Job archival: PG-Boss automatically archives completed and failed jobs based on configuration:
archiveCompletedAfterSeconds
: Move completed jobs to archive tabledeleteAfterDays
: Delete old archived jobs
Job control:
// Fail job with custom error
throw new QueueJobFailedError('Invalid email format')
// Discard job (don't retry)
throw new QueueJobDiscardedError('Job no longer needed')
Developmentβ
Scriptsβ
Standalone:
{
"scripts": {
"pikku": "pikku all",
"prebuild": "npm run pikku",
"dev": "tsx --watch src/start.ts",
"start": "tsx src/start.ts"
}
}
Workspace:
{
"scripts": {
"dev": "tsx --watch src/start.ts",
"start": "tsx src/start.ts"
}
}
Local Developmentβ
Run PostgreSQL locally:
# Docker
docker run -d -p 5432:5432 \
-e POSTGRES_PASSWORD=password \
-e POSTGRES_DB=pikku_queue \
postgres:16-alpine
# Or use docker-compose
docker-compose up postgres
Start worker:
npm run dev
Deploymentβ
Dockerβ
PG-Boss workers can run in containers alongside your HTTP servers or as dedicated worker instances.
Example Dockerfile:
FROM node:20-slim
WORKDIR /app
COPY package*.json ./
RUN npm ci --production
COPY dist ./dist
CMD ["node", "dist/start.js"]
Scaling Workersβ
Run multiple worker instances for horizontal scaling:
# Docker Compose
docker-compose up --scale worker=5
Key points:
- Multiple workers automatically share jobs via PostgreSQL locks
- No coordination service needed
- Scale based on queue depth and job duration
- Monitor PostgreSQL connection pool usage
Environment Variablesβ
DATABASE_URL=postgres://user:password@host:5432/database
NODE_ENV=production
Monitoringβ
Database Queriesβ
Monitor queue health with SQL queries:
-- Active jobs
SELECT * FROM pgboss.job WHERE state = 'active';
-- Failed jobs
SELECT * FROM pgboss.job WHERE state = 'failed';
-- Queue depth
SELECT name, COUNT(*) FROM pgboss.job
WHERE state = 'created'
GROUP BY name;
-- Archive size
SELECT COUNT(*) FROM pgboss.archive;
Metricsβ
Monitor key metrics:
- Queue depth per queue
- Active jobs count
- Failed jobs count
- Average processing time
- Database connection pool usage
- Archive table size
Note: Unlike BullMQ, PG-Boss does not have a built-in web UI. Use database monitoring tools or build custom dashboards.
Performance Tipsβ
- Batch size: Use
batchSize
to process multiple jobs efficiently - Poll interval: Balance between latency and database load
- Connection pool: Set
max
connections based on workers Γ batchSize - Archival: Configure aggressive archival to prevent table bloat
- Vacuum: Run VACUUM ANALYZE regularly on job tables
- Indexes: PG-Boss creates appropriate indexes automatically
- Job size: Keep job data small (use references to large data)
- Priorities: Use sparingly (adds query overhead)
Database maintenance:
-- Vacuum job table
VACUUM ANALYZE pgboss.job;
-- Vacuum archive table
VACUUM ANALYZE pgboss.archive;
Comparison: PG-Boss vs BullMQβ
Feature | PG-Boss | BullMQ |
---|---|---|
Backing store | PostgreSQL | Redis |
Delivery | Polling | Push (pub/sub) |
Throughput | Medium | High |
Durability | ACID guarantees | Redis persistence |
Setup complexity | Simpler (one less service) | Requires Redis |
Job archival | Automatic | Manual |
Monitoring | SQL queries | Bull Board UI |
Choose PG-Boss if:
- Already using PostgreSQL
- Want simpler infrastructure (no Redis)
- Need ACID guarantees
- Prefer SQL-based monitoring
Choose BullMQ if:
- Need high throughput
- Want push-based delivery
- Prefer Redis
- Want built-in monitoring UI
Examplesβ
Standalone:
- templates/pg-boss - PG-Boss worker
Critical Rulesβ
Standalone Projectsβ
- Import bootstrap from:
./.pikku/queue/pikku-bootstrap-queue.gen.js
(note/queue/
directory) - Import services from local files
- Create
PgBossQueueWorkers
with connection string, singleton services, and session factory - Call
await init()
beforeregisterQueues()
to initialize PG-Boss - Call
await registerQueues()
to start processing - Handle SIGTERM/SIGINT for graceful shutdown
Workspace Projectsβ
- Import config/services from functions:
@my-app/functions/src/...
- Import bootstrap from functions:
@my-app/functions/.pikku/queue/pikku-bootstrap-queue.gen.js
- Backend package.json has
"@my-app/functions": "workspace:*"
PostgreSQL Configurationβ
- Set DATABASE_URL environment variable
- Configure connection pool size appropriately
- Enable SSL for production
- Configure archival settings to prevent bloat
- Set maintenance interval for cleanup
Service Integrationβ
- Add
PgBossQueueService
to singleton services for enqueuing jobs - Use same database for both workers and service
- Configure job options (priority, delay, attempts) appropriately
Worker Configurationβ
- Set appropriate
batchSize
for throughput - Configure
pollInterval
to balance latency and database load - Understand which options are unsupported/fallback
Database Maintenanceβ
- Monitor job and archive table sizes
- Run VACUUM ANALYZE regularly
- Configure aggressive archival for high-volume queues
- Monitor connection pool usage
Developmentβ
- Run PostgreSQL locally (Docker recommended)
- Use
tsx --watch
for development - Monitor queue depth and processing rate
- Test failure scenarios and retries
Deploymentβ
- Use environment variables for database config
- Scale workers horizontally as needed
- Monitor queue metrics via SQL queries
- Configure graceful shutdown
- Monitor database performance
Performanceβ
- Keep job data small (use references)
- Use priorities sparingly
- Optimize
batchSize
andpollInterval
- Run database vacuum regularly
- Monitor and optimize database connections
Related Skillsβ
Prerequisites:
- pikku-project-setup - Project structure and common setup patterns
- pikku-functions - Creating Pikku function definitions
Wiring:
- pikku-queue - Queue function definitions and enqueue patterns
Alternative Queue Runtimes:
- pikku-queue-bullmq - Redis-based queue alternative (higher throughput)