Skip to main content

Graph Workflows

Graph workflows define multi-step processes declaratively as a graph of nodes. Instead of writing imperative code with workflow.do() and workflow.sleep(), you describe the workflow structure as data — nodes, connections, and data flow between them.

When to Use Graph Workflows

Use graph workflows when:

  • You want a visual/declarative workflow definition
  • The workflow will be viewed or edited in the Pikku Console
  • You prefer configuration over code
  • The flow is primarily RPC calls with data passing between them

Use DSL workflows when:

  • You need complex conditional logic within steps
  • You prefer imperative programming style
  • You need inline code execution between steps

Basic Example

graph.graph.ts
loading...

Structure

A graph workflow has three main parts:

nodes

Maps node IDs to RPC function names:

nodes: {
createUser: 'userCreate', // node ID -> RPC function name
sendWelcome: 'emailSend',
updateCRM: 'crmUserCreate',
}

config

Configures each node's input, flow control, and error handling:

config: {
createUser: {
next: 'sendWelcome', // which node runs next
onError: 'handleError', // error handler node
},
sendWelcome: {
input: (ref) => ({ // map inputs from other nodes
to: ref('createUser', 'email'),
name: ref('createUser', 'name'),
}),
},
}

description and tags

Optional metadata for documentation and filtering:

description: 'User onboarding workflow',
tags: ['onboarding', 'users'],

References with ref()

The ref() function connects nodes by referencing outputs from previous nodes or the workflow input.

Reference workflow input

input: (ref) => ({
userId: ref('input', 'userId'), // from workflow input
email: ref('input', 'email'),
})

Reference node output

input: (ref) => ({
orderId: ref('validateOrder', 'orderId'), // from validateOrder node output
total: ref('processPayment', 'chargedAmount'),
})

The CLI generates types so ref() provides autocomplete for both node IDs and output fields.

Flow Control

Sequential

Nodes run one after another:

config: {
step1: { next: 'step2' },
step2: { next: 'step3' },
step3: { /* no next = workflow ends */ },
}

Parallel

Multiple nodes run concurrently:

config: {
fetchData: {
next: ['sendEmail', 'updateCRM', 'logActivity'], // all three run in parallel
},
}

Branching

Route to different nodes based on function logic:

config: {
checkInventory: {
next: {
inStock: 'processOrder',
outOfStock: 'notifyBackorder',
discontinued: 'refundCustomer',
},
},
}

In your RPC function, call graph.branch() to select the path:

export const checkInventory = pikkuSessionlessFunc<
{ productId: string },
{ available: boolean }
>({
func: async ({ graph }, { productId }) => {
const stock = await getStock(productId)

if (stock.discontinued) {
graph.branch('discontinued')
} else if (stock.quantity > 0) {
graph.branch('inStock')
} else {
graph.branch('outOfStock')
}

return { available: stock.quantity > 0 }
},
})

Error Handling

Route errors to handler nodes with onError:

config: {
processPayment: {
next: 'sendConfirmation',
onError: 'handlePaymentError', // runs if processPayment fails
},
handlePaymentError: {
input: (ref) => ({
orderId: ref('input', 'orderId'),
}),
},
}

State Management

Store and retrieve state across the workflow using graph.setState() and graph.getState():

export const updateProgress = pikkuSessionlessFunc<
{ step: string },
{ updated: boolean }
>({
func: async ({ graph }, { step }) => {
const state = await graph.getState()
const completedSteps = (state.completedSteps as string[]) || []

await graph.setState('completedSteps', [...completedSteps, step])
await graph.setState('lastUpdated', new Date().toISOString())

return { updated: true }
},
})

Graph Wire Context

Functions running in a graph workflow receive a graph object in their services:

interface PikkuGraphWire {
runId: string // workflow run ID
graphName: string // workflow name
nodeId: string // current node ID
branch: (key: string) => void
setState: (name: string, value: unknown) => Promise<void>
getState: () => Promise<Record<string, unknown>>
}

Wiring to HTTP

Use graphStart() to wire a graph workflow to an HTTP endpoint:

graph.wiring.ts
loading...

graphStart() takes the workflow name and the entry node ID. It returns a function that starts the workflow and returns a { runId } response.

Visualizing in the Console

The Pikku Console renders graph workflows as interactive visual graphs, showing nodes, connections, and execution progress in real time. See Console Features for details.

Next Steps

  • DSL Workflows: Imperative workflow style with workflow.do() and workflow.sleep()
  • Steps: Step types and retry configuration
  • Configuration: State storage and execution mode setup