The Basics
Define a worker in two lines
Write your function, wire it to a queue name. Pikku handles deserialization, retries, and error routing.
const processReminder = pikkuSessionlessFunc({
title: 'Process Reminder',
func: async ({ db, emailService }, { todoId, userId }) => {
const todo = await db.getTodo(todoId)
await emailService.sendReminder(userId, todo)
return { sent: true }
}
})
wireQueueWorker({
name: 'todo-reminders',
func: processReminder,
})
Type-safe payloads
Job data is validated and typed — your function receives exactly what you expect
Multi-backend
Same wireQueueWorker works with BullMQ, PG Boss, or AWS SQS — swap the service, not the code
Middleware support
Apply logging, metrics, or auth middleware per-worker or globally
Job Control
Progress, fail, discard
Every queue function gets a wire.queue object to control the current job.
updateProgress()
Report progress (0–100 or custom). Consumers can poll job status and show progress bars.
fail(reason)
Fail the current job. If retries are configured, the job goes back in the queue with backoff.
discard(reason)
Permanently remove the job — no retry, no dead-letter queue. For invalid or irrelevant work.
const processReminder = pikkuSessionlessFunc({
title: 'Process Reminder',
func: async ({ db }, { todoId }, wire) => {
await wire.queue.updateProgress(25)
const todo = await db.getTodo(todoId)
if (!todo) {
// Permanently remove — no retry
await wire.queue.discard('Todo not found')
return
}
if (todo.completed) {
// Fail with reason — will retry
await wire.queue.fail('Todo already completed')
return
}
await wire.queue.updateProgress(100)
return { sent: true }
}
})
Retries & Config
Retry strategies built in
Configure worker-level concurrency and job-level retry, backoff, priority, and delay — based on the underlying queue system.
wireQueueWorker({
name: 'todo-reminders',
func: processReminder,
config: {
batchSize: 5, // Process 5 jobs in parallel
removeOnComplete: 100, // Keep last 100 completed jobs
}
})
// Publishing with job-level options
const jobId = await queue.add('todo-reminders', {
todoId: 'abc-123',
userId: 'user-456'
}, {
priority: 10, // Higher = processed first
delay: 5000, // Wait 5s before processing
attempts: 3, // Retry up to 3 times
backoff: { type: 'exponential', delay: 1000 },
})
Priority
Higher-priority jobs get processed first
Delay
Schedule jobs to run after a delay
Backoff
Linear, exponential, or fixed retry delay
Dead letter
Route failed jobs to a dead-letter queue
Type-Safe Publishing
Typed queue.add()
Pikku generates a typed queue client. Queue names, payloads, and results — all autocompleted.
import { PikkuQueue } from '.pikku/pikku-queue.gen.js'
const queue = new PikkuQueue(queueService)
// Fully typed — queue name and payload are inferred
const jobId = await queue.add('todo-reminders', {
todoId: 'abc-123',
userId: 'user-456'
})
// Get job status and result
const job = await queue.getJob('todo-reminders', jobId)
const status = await job.status() // 'waiting' | 'active' | 'completed' | 'failed'
const result = await job.waitForCompletion(30_000)
Generated from wirings
PikkuQueue is auto-generated with typed overloads for every queue worker you've wired.
Job lifecycle
Poll status, wait for completion with timeout, and retrieve typed results — all from the same job handle.
Backends
One wiring, many backends
Same wireQueueWorker code works across all backends. Swap the queue service, not your functions.
BullMQ / Redis
High-performance, push-based. Job results, progress, priority, delays.
PG Boss / PostgreSQL
Database-backed, persistent. No extra infra if you already use Postgres.
AWS SQS
Serverless, fire-and-forget. Scales to zero, no servers to manage.
Config validation built in. Pikku warns you at startup if you use a config option your backend doesn't support — no silent failures.
Start wiring queues in 5 minutes
One command to scaffold a project with queue wiring already configured.
MIT Licensed · Works with BullMQ, PG Boss & AWS SQS