Workflow Configuration
This guide covers workflow configuration options for state storage and queue settings.
pikku.config.json
Add workflow configuration to your Pikku config:
{
"workflows": {
"singleQueue": true,
"path": "src/workflows/pikku.workflows.gen.ts"
}
}
Options
singleQueue: (boolean, required) Must betrueduring initial testing phasepath: (string) Output path for generated workflow types
Currently, only singleQueue: true is supported during initial testing. This means all workflows share a single orchestrator queue job and a single worker queue job to process them.
In the future, we'll allow individual queue jobs to be created per workflow for more granular control, retries, and scaling.
State Storage
Workflows require a workflowService service in your singleton services. Choose between PostgreSQL or Redis.
Option 1: PostgreSQL + pg-boss
Use PostgreSQL for both state storage and queue:
import { PgWorkflowService } from '@pikku/pg'
import { PgBossQueueService } from '@pikku/queue-pg-boss'
import postgres from 'postgres'
export const createSingletonServices = async () => {
const sql = postgres('postgresql://localhost:5432/myapp')
const queueService = new PgBossQueueService('postgresql://localhost:5432/myapp')
const workflowService = new PgWorkflowService(
sql,
queueService,
'workflows' // schema name
)
return {
queueService,
workflowService,
// ... other services
}
}
The PostgreSQL state service automatically creates:
workflow_runstable - stores workflow run metadataworkflow_steptable - stores step results and status
For inline mode (testing), pass undefined as the queue service:
const workflowService = new PgWorkflowService(
sql,
undefined, // No queue service = inline mode
'workflows'
)
Option 2: Redis + BullMQ
Use Redis for both state storage and queue:
import { RedisWorkflowService } from '@pikku/redis'
import { BullQueueService } from '@pikku/queue-bullmq'
import { Redis } from 'ioredis'
export const createSingletonServices = async () => {
const redis = new Redis('redis://localhost:6379')
const queueService = new BullQueueService('redis://localhost:6379')
const workflowService = new RedisWorkflowService(
redis,
queueService,
'workflows' // key prefix
)
return {
queueService,
workflowService,
// ... other services
}
}
The Redis state service stores data in Redis hashes with the specified prefix.
For inline mode (testing), pass undefined as the queue service:
const workflowService = new RedisWorkflowService(
redis,
undefined, // No queue service = inline mode
'workflows'
)
Execution Mode
The execution mode (inline vs remote) is determined automatically:
- Remote mode: Used when a
queueServiceis configured in singleton services - Inline mode: Used when no
queueServiceis available
You don't need to specify the execution mode in the wiring configuration.
Workflow Service API
The workflowService provides methods to query and interact with workflow runs:
getRun(runId)
Get workflow run details by ID:
export const checkWorkflowStatus = pikkuSessionlessFunc<
{ runId: string },
{ status: string; output?: any }
>(async ({ workflowService }, data) => {
const run = await workflowService!.getRun(data.runId)
if (!run) {
throw new Error(`Workflow not found: ${data.runId}`)
}
return {
status: run.status, // 'running' | 'completed' | 'failed' | 'cancelled'
output: run.output, // Output data if completed
error: run.error, // Error details if failed
createdAt: run.createdAt, // Creation timestamp
}
})
getRunHistory(runId)
Get all step attempts for a workflow run in chronological order:
export const getWorkflowHistory = pikkuSessionlessFunc<
{ runId: string },
Array<any>
>(async ({ workflowService }, data) => {
const history = await workflowService!.getRunHistory(data.runId)
// Each entry includes:
// - stepName: Step description
// - status: 'succeeded' | 'failed'
// - attemptCount: Attempt number (1, 2, 3, ...)
// - result: Step result (if succeeded)
// - error: Error details (if failed)
// - createdAt: Timestamp
return history
})
Polling for Completion
Example of polling for workflow completion:
export const triggerAndWait = pikkuSessionlessFunc<
{ input: any },
any
>(async ({ rpc, workflowService, logger }, data) => {
// Start workflow
const { runId } = await rpc.startWorkflow('myWorkflow', data.input)
// Poll for completion
const maxAttempts = 30
const pollIntervalMs = 1000
for (let attempt = 0; attempt < maxAttempts; attempt++) {
const run = await workflowService!.getRun(runId)
if (!run) {
throw new Error(`Workflow not found: ${runId}`)
}
if (run.status === 'completed') {
return run.output
}
if (run.status === 'failed') {
throw new Error(run.error?.message || 'Workflow failed')
}
if (run.status === 'cancelled') {
throw new Error('Workflow was cancelled')
}
await new Promise(resolve => setTimeout(resolve, pollIntervalMs))
}
throw new Error(`Workflow timeout after ${maxAttempts} attempts`)
})
Running Workflow Workers
Workflows require queue workers to execute orchestrator and step jobs. See the workflows template for a complete example of setting up workers.
Pikku automatically generates workers for:
pikku-workflow-orchestrator: Executes workflow functionspikku-workflow-step-worker: Executes RPC steps
Next Steps
- Main Guide: Back to workflow overview
- Step Types: Learn about RPC, inline, and sleep steps
- Queue Documentation: Configure queue workers