Skip to main content

Workflows

Pikku workflows provide a powerful way to orchestrate multi-step processes with automatic state management, deterministic replay, and step caching. They're perfect for complex business processes that span multiple operations, need to handle failures gracefully, or require time-based delays.

What are Workflows?

Workflows are long-running, multi-step processes that can survive server restarts, handle failures, and resume from where they left off. Unlike simple function calls or RPC chains, workflows maintain their state in a persistent external store and use deterministic replay to ensure correctness.

Key features:

  • Deterministic Replay: Completed steps are never re-executed. When a workflow resumes, it replays from the beginning using cached results for completed steps.
  • State Persistence: Workflow state is stored in an external store (like PostgreSQL or Redis), surviving server restarts and crashes.
  • Step Caching: Each step's result is cached, preventing duplicate work during replay.
  • Multiple Execution Modes: Run workflows inline (synchronous) for testing or remote (asynchronous via queue workers) for production.
  • Type Safety: Full TypeScript support with generated types for workflow inputs and outputs.

When to Use Workflows

Workflows are ideal for scenarios that involve:

  • Multi-step business processes: User onboarding, order fulfillment, approval workflows
  • Operations requiring time delays: Sending reminder emails after X days, trial expiration handling
  • Processes that need to survive failures: Payment processing, data migration, batch operations
  • Long-running tasks: Report generation, video processing, data aggregation

Workflows vs Other Pikku Features

FeatureUse CaseState ManagementExecution
WorkflowsMulti-step processes with delays and failure recoveryPersistent (external store)Asynchronous (queue-based) or synchronous
Queue JobsSingle background tasksNone (fire-and-forget)Asynchronous (queue-based)
Scheduled TasksRecurring cron-like jobsNoneTime-based triggers
RPC CallsDirect function invocationsNoneSynchronous or asynchronous

Core Concepts

Workflow Runs

Each workflow execution is called a run. A run has:

  • A unique runId
  • Current status: running, completed, failed, or cancelled
  • Input data
  • Output data (when completed)
  • Metadata (creation time, last interaction, etc.)

Steps

Workflows are composed of steps. Each step has:

  • A descriptive name (used for logging and debugging)
  • A status: pending, scheduled, succeeded, or failed
  • Cached result data (for replay)
  • Execution metadata (including attemptCount for retries)

There are three types of steps:

  1. RPC Steps: Call other Pikku functions via queue workers (with optional retry configuration)
  2. Inline Steps: Execute code immediately with caching (with optional retry configuration)
  3. Sleep Steps: Time-based delays

Learn more in Step Types.

Execution Modes

Workflows automatically run in one of two modes based on your configuration:

Inline Mode (synchronous):

  • Used when no queue service is configured
  • Executes immediately in the same process
  • Perfect for testing and development
  • No queue workers required
  • Steps execute sequentially without queue jobs

Remote Mode (asynchronous):

  • Used when a queue service is configured
  • Executes via Pikku queue workers
  • Production-ready with horizontal scaling
  • Survives server restarts
  • Steps execute in separate worker processes

The mode is determined automatically based on whether a queueService is available in your singleton services.

How Workflows Work

Remote Mode Flow

1. User calls rpc.startWorkflow('workflow-name', inputData)
2. Workflow run created in state storage
3. Orchestrator job added to queue
4. Orchestrator executes workflow function
5. On RPC step:
a. Step marked as "scheduled"
b. Step worker job added to queue
c. Worker executes RPC call
d. Result stored in state
e. Orchestrator resumed
6. Orchestrator replays from beginning
- Uses cached results for completed steps
- Continues from last completed step
7. Process repeats until workflow completes or fails

Deterministic Replay

When a workflow resumes (after a delay, failure, or RPC step completion), it replays from the beginning. However, completed steps are not re-executed - their cached results are used instead.

This ensures:

  • Workflows can survive failures and resume correctly
  • Step results are consistent across replays
  • Complex branching logic works as expected
  • Time-based operations (sleep) don't reset

Example:

// First execution
await workflow.do('Step 1', async () => createUser()) // Executes
await workflow.do('Step 2', async () => sendEmail()) // Executes
await workflow.sleep('Wait', '5min') // Schedules timer

// After 5 minutes (replay)
await workflow.do('Step 1', async () => createUser()) // SKIPPED (cached)
await workflow.do('Step 2', async () => sendEmail()) // SKIPPED (cached)
await workflow.sleep('Wait', '5min') // SKIPPED (cached)
await workflow.do('Step 3', async () => sendReminder()) // Executes

Quick Example

Here's a simple user onboarding workflow:

import { pikkuWorkflowFunc } from './.pikku/workflow/pikku-workflow-types.gen'

export const onboardingWorkflow = pikkuWorkflowFunc<
{ email: string; userId: string },
{ success: boolean }
>(async ({ workflow, rpc }, data) => {
// Step 1: Create user profile (RPC step - runs in queue worker)
await workflow.do(
'Create user profile in database',
'createUserProfile',
data
)

// Step 2: Generate welcome message (inline step - runs immediately)
const message = await workflow.do(
'Generate welcome message',
async () => `Welcome to our platform, ${data.email}!`
)

// Step 3: Wait 5 minutes before sending email
await workflow.sleep('Wait 5 minutes before welcome email', '5min')

// Step 4: Send welcome email (RPC step - runs in queue worker)
await workflow.do(
'Send welcome email',
'sendEmail',
{ to: data.email, message }
)

return { success: true }
})

Wire it to your application:

import { wireWorkflow } from './.pikku/workflow/pikku-workflow-types.gen'
import { onboardingWorkflow } from './workflows.functions'

wireWorkflow({
name: 'onboarding',
description: 'User onboarding workflow',
func: onboardingWorkflow,
tags: ['onboarding', 'users']
})

Note: Execution mode (inline vs remote) is determined automatically based on whether a queue service is configured.

Start the workflow:

// Via RPC
const runId = await rpc.startWorkflow('onboarding', {
email: 'user@example.com',
userId: 'user-123'
})

Visualizing Workflows in the Console

The Pikku Console renders your workflows as interactive visual graphs, lets you start workflow runs with custom input, and streams execution progress in real time. See Console Features for details.

Next Steps