The Basics
Orchestrate with workflow.do()
Each workflow.do() call is a durable step. RPC steps run as queue jobs. Inline steps execute immediately. Both are cached for replay.
const onboardUser = pikkuWorkflowFunc<
{ email: string; userId: string },
{ success: boolean }
>(async ({}, data, { workflow }) => {
// Step 1: RPC call (executed as queue job)
const user = await workflow.do(
'Create profile',
'createUserProfile',
{ email: data.email, userId: data.userId }
)
// Step 2: Inline step (immediate, cached)
const message = await workflow.do(
'Generate welcome',
async () => `Welcome, ${data.email}!`
)
// Step 3: Durable sleep
await workflow.sleep('Wait 5 minutes', '5min')
// Step 4: Another RPC call
await workflow.do('Send email', 'sendEmail', {
to: data.email,
subject: 'Welcome!',
body: message,
})
return { success: true }
})
Deterministic replay
Completed steps are never re-executed. Results are cached and replayed on recovery.
Plain TypeScript
Loops, conditionals, Promise.all — use any TypeScript construct. No YAML, no DSL.
Typed I/O
Workflow input and output are fully typed. Each RPC step infers types from the target function.
Step Types
Four step primitives
Every workflow is built from these four building blocks.
workflow.do(name, rpcName, data, options?)
Execute a Pikku function as a queue job. Supports retries and retry delay.
workflow.do(name, async () => value)
Inline step — runs immediately, result cached for replay. Great for transformations.
workflow.sleep(name, duration)
Durable sleep. Survives restarts — the workflow resumes after the duration.
workflow.suspend(reason)
Pause the workflow until explicitly resumed. For human-in-the-loop approval flows.
Patterns
Fan-out, retry, branch
Use standard TypeScript for control flow. Promise.all for parallelism. if/else for branching. Retries via step options.
// Fan-out: parallel steps with Promise.all
const users = await Promise.all(
data.userIds.map(async (userId) =>
await workflow.do(`Get user ${userId}`, 'userGet', { userId })
)
)
// Retry: automatic retry with backoff
const payment = await workflow.do(
'Process payment',
'processPayment',
{ amount: 100 },
{ retries: 3, retryDelay: '1s' }
)
// Branch: standard TypeScript conditionals
if (user.plan === 'pro') {
await workflow.do('Apply discount', 'applyDiscount', { userId })
}
Graph Workflows
Declarative DAGs
For node-based workflows, use pikkuWorkflowGraph. Define nodes, edges, and input mappings — Pikku handles execution order and parallelism.
const userOnboarding = pikkuWorkflowGraph({
description: 'Onboard a new user',
nodes: {
createProfile: 'createUserProfile',
sendWelcome: 'sendEmail',
setupDefaults: 'createDefaultTodos',
},
config: {
createProfile: {
next: ['sendWelcome', 'setupDefaults'], // Parallel
},
sendWelcome: {
input: (ref) => ({
to: ref('createProfile', 'email'),
subject: 'Welcome!',
}),
},
},
})
Branching
Use graph.branch('key') inside a node to select which edge to follow. Record-based next config maps keys to nodes.
ref() for data flow
Use ref('nodeId', 'path') to reference output from previous nodes — resolved at runtime.
HTTP Wiring
Start, poll, resume
Pikku provides helper functions to wire workflows to HTTP endpoints — start, run to completion, or poll status.
// Start a workflow (returns runId)
wireHTTP({
method: 'post',
route: '/workflow/onboard',
func: workflowStart('userOnboarding'),
})
// Run to completion (synchronous)
wireHTTP({
method: 'post',
route: '/workflow/onboard/run',
func: workflow('userOnboarding'),
})
// Check status
wireHTTP({
method: 'get',
route: '/workflow/status/:runId',
func: workflowStatus('userOnboarding'),
})
Start wiring workflows in 5 minutes
One command to scaffold a project with workflow wiring already configured.
MIT Licensed · DSL & graph workflows