Pikku BullMQ Queue Runtime
This skill helps you set up background queue workers using BullMQ and Redis for reliable job processing.
When to use this skillβ
- Reliable background job processing
- Job result tracking and monitoring
- Job priorities and delays
- Automatic retries with exponential backoff
- High-throughput Redis-based queuing
- Job progress tracking
- Job lifecycle events (completed, failed, stalled)
- Distributed worker pools
- Need for job persistence and durability
Performance: BullMQ uses Redis for near-instant job delivery via pub/sub, eliminating polling overhead.
Quick Setupβ
Prerequisites: See pikku-project-setup for project structure detection and common setup patterns.
1. Install Packagesβ
npm install @pikku/queue-bullmq @pikku/core bullmq ioredis
2. Create Worker Fileβ
Standalone: Create src/start.ts
based on templates/bullmq/src/start.ts
Workspace: Create worker file importing from functions package
Key imports:
- Import bootstrap from queue subdirectory (see pikku-project-setup for queue bootstrap paths)
- Standalone:
./.pikku/queue/pikku-bootstrap-queue.gen.js
- Workspace:
@my-app/functions/.pikku/queue/pikku-bootstrap-queue.gen.js
- Import
BullQueueWorkers
from@pikku/queue-bullmq
- Import config, services, and session factory
3. Configure Redis Connectionβ
const redisConnectionOptions = {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD,
}
const bullQueueWorkers = new BullQueueWorkers(
redisConnectionOptions,
singletonServices,
createSessionServices
)
4. Setup Queue Service (for enqueuing)β
Add BullQueueService
to singleton services in your HTTP/channel handlers:
import { BullQueueService } from '@pikku/queue-bullmq'
const queue = new BullQueueService(redisConnectionOptions)
5. Update Package.json Scriptsβ
See pikku-project-setup for complete script patterns. Queue workers use same scripts as Express/Fastify.
6. Generate & Verifyβ
# Generate wiring (if applicable to your project type)
npm run pikku
# Start worker
npm run dev
# Verify worker is processing (check logs)
Expected outcome: Worker starts, connects to Redis, registers queue processors, begins processing jobs. Jobs added via queue.add()
are processed by workers.
Installationβ
npm install @pikku/queue-bullmq @pikku/core bullmq ioredis
Setupβ
Standalone Projectβ
For standalone projects where functions are in the same package.
Example: templates/bullmq/src/start.ts
Pattern:
import { BullQueueWorkers } from '@pikku/queue-bullmq'
import {
createConfig,
createSingletonServices,
createSessionServices,
} from './services.js'
import './.pikku/queue/pikku-bootstrap-queue.gen.js'
async function main(): Promise<void> {
const config = await createConfig()
const singletonServices = await createSingletonServices(config)
singletonServices.logger.info('Starting Bull queue workers...')
const bullQueueWorkers = new BullQueueWorkers(
{}, // Redis connection options
singletonServices,
createSessionServices
)
await bullQueueWorkers.registerQueues()
}
main()
Key points:
- Import bootstrap from
./.pikku/queue/pikku-bootstrap-queue.gen.js
(note/queue/
directory) - Create
BullQueueWorkers
with Redis connection, services, and session factory - Call
registerQueues()
to start processing - Worker runs continuously until process exits
Workspace Projectβ
Backend imports functions from the functions package.
Pattern:
import { BullQueueWorkers } from '@pikku/queue-bullmq'
import {
createConfig,
createSingletonServices,
createSessionServices,
} from '@my-app/functions/src/services'
import '@my-app/functions/.pikku/queue/pikku-bootstrap-queue.gen.js'
async function main(): Promise<void> {
const config = await createConfig()
const singletonServices = await createSingletonServices(config)
const bullQueueWorkers = new BullQueueWorkers(
{},
singletonServices,
createSessionServices
)
await bullQueueWorkers.registerQueues()
}
main()
Key differences:
- Import config/services from functions package
- Import bootstrap from functions:
@my-app/functions/.pikku/queue/pikku-bootstrap-queue.gen.js
- No custom filtering support for queue workers
Redis Configurationβ
BullMQ requires Redis connection configuration.
Pattern:
import { ConnectionOptions } from 'bullmq'
const redisConnectionOptions: ConnectionOptions = {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD,
db: parseInt(process.env.REDIS_DB || '0'),
// TLS for production
tls: process.env.REDIS_TLS === 'true' ? {} : undefined,
// Connection retry strategy
maxRetriesPerRequest: 3,
enableReadyCheck: true,
enableOfflineQueue: true,
}
const bullQueueWorkers = new BullQueueWorkers(
redisConnectionOptions,
singletonServices,
createSessionServices
)
Redis URL: You can also use a connection string:
const redisConnectionOptions = {
connection: 'redis://user:password@host:6379/0',
}
Production tips:
- Use Redis Cluster for high availability
- Enable TLS for secure connections
- Use connection pooling for multiple workers
- Set appropriate timeout values
Queue Service (Enqueuing Jobs)β
Use BullQueueService
to add jobs to queues from your HTTP/channel handlers.
Setup in services:
import { BullQueueService } from '@pikku/queue-bullmq'
import type { QueueService } from '@pikku/core/queue'
export const createSingletonServices = async (config: Config) => {
const queue: QueueService = new BullQueueService({
host: config.redis.host,
port: config.redis.port,
password: config.redis.password,
})
return {
queue,
logger,
// ... other services
}
}
Adding jobs:
// In your Pikku function
await services.queue.add('emailQueue', {
to: 'user@example.com',
subject: 'Welcome!',
body: 'Thanks for signing up',
})
// With options
await services.queue.add('emailQueue', data, {
priority: 1, // Higher priority = processed first
delay: 5000, // Delay 5 seconds before processing
attempts: 3, // Retry up to 3 times
jobId: 'unique-id', // Deduplicate jobs
removeOnComplete: 10, // Keep last 10 completed jobs
removeOnFail: 50, // Keep last 50 failed jobs
})
Job options:
priority
: Job priority (lower number = higher priority)delay
: Delay in milliseconds before processingattempts
: Number of retry attemptsjobId
: Custom job ID for deduplicationremoveOnComplete
: Number of completed jobs to keepremoveOnFail
: Number of failed jobs to keep
See: pikku-queue skill for queue function definitions and enqueue patterns.
Worker Configurationβ
Configure worker behavior using workerConfig
in your queue function definition.
Example:
import { defineQueue } from '@pikku/core/queue'
export const sendEmailQueue = defineQueue({
func: sendEmail,
queueName: 'emailQueue',
workerConfig: {
name: 'email-worker',
batchSize: 5, // Process 5 jobs concurrently
autorun: true, // Start processing automatically
lockDuration: 30000, // Job lock duration (30s)
drainDelay: 5, // Poll delay when queue empty (5ms)
maxStalledCount: 3, // Max recoveries from stalled state
removeOnComplete: 100, // Keep last 100 completed jobs
removeOnFail: 500, // Keep last 500 failed jobs
},
})
Worker config options:
Option | Description | BullMQ Mapping |
---|---|---|
name | Worker identifier | name |
batchSize | Concurrent job processing | concurrency |
autorun | Auto-start processing | autorun |
lockDuration | Job lock duration (ms) | lockDuration |
drainDelay | Empty queue poll delay (ms) | drainDelay |
maxStalledCount | Max stalled recoveries | maxStalledCount |
removeOnComplete | Keep N completed jobs | removeOnComplete |
removeOnFail | Keep N failed jobs | removeOnFail |
Unsupported options:
visibilityTimeout
: BullMQ uses locks insteadpollInterval
: BullMQ is push-based (pub/sub)prefetch
: BullMQ manages automatically
Job Lifecycleβ
Job states:
- waiting: Job added to queue
- active: Job being processed
- completed: Job finished successfully
- failed: Job failed after all retries
- delayed: Job scheduled for future processing
- stalled: Job exceeded lock duration
Progress tracking:
async function processVideo(
data: VideoData,
services: Services,
job: QueueJob
) {
// Update progress
await job.updateProgress(25)
// ... do work ...
await job.updateProgress(50)
// ... more work ...
await job.updateProgress(100)
return { videoUrl: 'https://...' }
}
Job control:
// Fail job with custom error
throw new QueueJobFailedError('Invalid video format')
// Discard job (don't retry)
throw new QueueJobDiscardedError('Job no longer needed')
Developmentβ
Scriptsβ
Standalone:
{
"scripts": {
"pikku": "pikku all",
"prebuild": "npm run pikku",
"dev": "tsx --watch src/start.ts",
"start": "tsx src/start.ts"
}
}
Workspace:
{
"scripts": {
"dev": "tsx --watch src/start.ts",
"start": "tsx src/start.ts"
}
}
Local Developmentβ
Run Redis locally:
# Docker
docker run -d -p 6379:6379 redis:7-alpine
# Or use docker-compose
docker-compose up redis
Start worker:
npm run dev
Deploymentβ
Dockerβ
BullMQ workers can run in containers alongside your HTTP servers or as dedicated worker instances.
Example Dockerfile:
FROM node:20-slim
WORKDIR /app
COPY package*.json ./
RUN npm ci --production
COPY dist ./dist
CMD ["node", "dist/start.js"]
Scaling Workersβ
Run multiple worker instances for horizontal scaling:
# Docker Compose
docker-compose up --scale worker=5
Key points:
- Multiple workers automatically share jobs
- Use worker names for monitoring
- Scale based on queue depth and job duration
- Monitor Redis memory usage
Environment Variablesβ
REDIS_HOST=redis.example.com
REDIS_PORT=6379
REDIS_PASSWORD=your-password
REDIS_DB=0
REDIS_TLS=true
NODE_ENV=production
Monitoringβ
Bull Boardβ
Use Bull Board for web-based queue monitoring:
npm install @bull-board/api @bull-board/express
Setup:
import { createBullBoard } from '@bull-board/api'
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'
import { ExpressAdapter } from '@bull-board/express'
const serverAdapter = new ExpressAdapter()
createBullBoard({
queues: [new BullMQAdapter(emailQueue)],
serverAdapter,
})
app.use('/admin/queues', serverAdapter.getRouter())
Access at: http://localhost:3000/admin/queues
Metricsβ
Monitor key metrics:
- Queue depth (waiting jobs)
- Processing rate (jobs/second)
- Completion rate
- Failure rate
- Stalled jobs
- Average processing time
- Redis memory usage
Performance Tipsβ
- Concurrency: Use
batchSize
to process multiple jobs in parallel - Job retention: Limit
removeOnComplete
andremoveOnFail
to prevent memory bloat - Lock duration: Set
lockDuration
> job processing time to prevent stalls - Redis optimization: Use Redis Cluster for high throughput
- Job size: Keep job data small (use references to large data)
- Priorities: Use sparingly (adds overhead)
- Backoff: Configure exponential backoff for retries
Redis memory management:
workerConfig: {
removeOnComplete: 10, // Keep minimal completed jobs
removeOnFail: 100, // Keep failed jobs for debugging
}
Examplesβ
Standalone:
- templates/bullmq - BullMQ worker
Critical Rulesβ
Standalone Projectsβ
- Import bootstrap from:
./.pikku/queue/pikku-bootstrap-queue.gen.js
(note/queue/
directory) - Import services from local files
- Create
BullQueueWorkers
with Redis connection, singleton services, and session factory - Call
await registerQueues()
to start processing - Handle process signals for graceful shutdown
Workspace Projectsβ
- Import config/services from functions:
@my-app/functions/src/...
- Import bootstrap from functions:
@my-app/functions/.pikku/queue/pikku-bootstrap-queue.gen.js
- Backend package.json has
"@my-app/functions": "workspace:*"
Redis Configurationβ
- Configure Redis connection (host, port, password)
- Enable TLS for production
- Set connection retry strategy
- Use Redis Cluster for high availability
- Monitor Redis memory usage
Service Integrationβ
- Add
BullQueueService
to singleton services for enqueuing jobs - Use same Redis configuration for both workers and service
- Configure job options (priority, delay, attempts) appropriately
Worker Configurationβ
- Set appropriate
batchSize
for concurrency - Configure
lockDuration
> job processing time - Limit
removeOnComplete
andremoveOnFail
to prevent memory bloat - Use worker
name
for monitoring and identification
Developmentβ
- Run Redis locally (Docker recommended)
- Use
tsx --watch
for development - Monitor queue depth and processing rate
- Test failure scenarios and retries
Deploymentβ
- Use environment variables for Redis config
- Scale workers horizontally as needed
- Monitor queue metrics (depth, rate, failures)
- Set up Bull Board for monitoring
- Configure graceful shutdown
Performanceβ
- Keep job data small (use references)
- Use priorities sparingly
- Configure exponential backoff for retries
- Optimize
batchSize
based on job duration - Monitor and optimize Redis memory usage
Related Skillsβ
Prerequisites:
- pikku-project-setup - Project structure and common setup patterns
- pikku-functions - Creating Pikku function definitions
Wiring:
- pikku-queue - Queue function definitions and enqueue patterns
Alternative Queue Runtimes:
- pikku-queue-pg-boss - PostgreSQL-based queue alternative (no Redis required)