Skip to main content

Protocol Unification

One Function, Every Protocol

Pikku's core innovation is protocol unification: write your business logic once as a function, then wire it to any protocol without changing the implementation.

How It Works​

All protocols share the same foundation:

  1. Define a function with TypeScript input/output types
  2. Wire to protocols that need it
  3. Same auth, permissions, validation across all protocols

Let's see how each protocol handles data flow.


HTTP: Request In, Response Out​

The simplest protocolβ€”synchronous request/response.

How It Works​

// 1. Define the function
export const getUser = pikkuFunc<
{ userId: string },
{ id: string; name: string; email: string }
>({
func: async ({ database }, { userId }) => {
return await database.users.findById(userId)
}
})

// 2. Wire to HTTP
wireHTTP({
method: 'get',
route: '/users/:userId',
func: getUser
})

Data Flow​

Client Request
↓
GET /users/123
↓
Pikku extracts { userId: '123' } from route params
↓
Calls getUser({ userId: '123' })
↓
Function returns { id: '123', name: 'John', email: 'john@example.com' }
↓
HTTP 200 with JSON response
↓
Client receives response

Key Points:

  • Route params, query params, and body all become function input
  • Return value becomes HTTP response body
  • Errors become HTTP status codes (404, 422, 500, etc.)

Server-Sent Events (SSE): Progressive Enhancement​

Add real-time streaming to HTTP endpoints without breaking existing clients.

How It Works​

// 1. Define the function (same as before)
export const trackOrder = pikkuFunc<
{ orderId: string },
{ status: string; location: string; eta: string }
>({
func: async ({ database, channel }, { orderId }) => {
const order = await database.orders.findById(orderId)

// Send progress updates if SSE is enabled
if (channel) {
await channel.send({ type: 'status', data: 'Processing order...' })
await channel.send({ type: 'location', data: order.currentLocation })
}

return {
status: order.status,
location: order.currentLocation,
eta: order.estimatedDelivery
}
}
})

// 2. Wire to HTTP with SSE enabled
wireHTTP({
method: 'get',
route: '/orders/:orderId/track',
func: trackOrder,
sse: true // Enable Server-Sent Events
})

Data Flow​

Regular HTTP Client β†’ GET /orders/123/track
↓
Function executes (channel is undefined)
↓
Returns final result as JSON
↓
HTTP 200 with { status: '...', location: '...', eta: '...' }

SSE Client β†’ GET /orders/123/track (Accept: text/event-stream)
↓
Function executes (channel is available)
↓
Sends progress updates via channel.send()
↓
Client receives streaming events in real-time
↓
Function completes and returns final result
↓
SSE connection closes

Key Points:

  • Same function works for both regular HTTP and SSE
  • Detect SSE with if (channel) check
  • Regular HTTP clients get final result only
  • SSE clients get progress updates + final result
  • No breaking changes to existing API consumers

WebSocket: Bidirectional with Channels​

WebSocket adds real-time, bidirectional communication through channels.

How It Works​

// 1. Define the function (same as before)
export const getUser = pikkuFunc<
{ userId: string },
{ id: string; name: string; email: string }
>({
func: async ({ database, channel }, { userId }) => {
const user = await database.users.findById(userId)

// Optional: Send updates via channel
if (channel) {
await channel.send({ type: 'user-fetched', user })
}

return user
}
})

// 2. Wire to WebSocket channel
wireChannel({
name: 'users',
onMessageWiring: {
getUser: { func: getUser }
}
})

Data Flow​

Client connects to WebSocket
↓
Subscribes to 'users' channel
↓
Sends message: { action: 'getUser', data: { userId: '123' } }
↓
Pikku extracts data and calls getUser({ userId: '123' })
↓
Function returns user object
↓
WebSocket sends response message back to client
↓
Function also sends additional update via channel.send()
↓
All subscribers to 'users' channel receive the update

Key Points:

  • channel service available for sending messages to subscribers
  • Same function works for both request/response and pub/sub patterns
  • Automatic message routing by action name

Queues: Async Job Processing​

Queues enable background job processing with optional result waiting.

How It Works​

// 1. Define the function (same as before)
export const sendEmail = pikkuFunc<
{ to: string; subject: string; body: string },
{ messageId: string }
>({
func: async ({ emailService }, { to, subject, body }) => {
const messageId = await emailService.send({ to, subject, body })
return { messageId }
}
})

// 2. Wire to queue
wireQueueWorker({
queue: 'email-queue',
func: sendEmail
})

Data Flow (Fire and Forget)​

Client adds job to queue
↓
queueClient.add('email-queue', { to: 'user@example.com', ... })
↓
Job stored in queue (Redis, PostgreSQL, SQS, etc.)
↓
Queue worker picks up job
↓
Calls sendEmail({ to: '...', subject: '...', body: '...' })
↓
Function executes (sends email)
↓
Returns result (if provider supports it - BullMQ/pg-boss yes, SQS no)
↓
Job marked as complete

Data Flow (Wait for Result - BullMQ/pg-boss only)​

Client adds job and waits
↓
const jobId = await queueClient.add('email-queue', data)
const job = await queueClient.getJob('email-queue', jobId)
const result = await job.waitForCompletion?.({ timeout: 30000 })
↓
Job processed in background
↓
Client receives result: { messageId: '...' } (if supported by provider)

Key Points:

  • Same function works for both fire-and-forget and wait-for-result patterns
  • Provider-dependent features (BullMQ and pg-boss support waiting, SQS doesn't)
  • Automatic retry on failures

MCP: Transform for AI Agents​

Model Context Protocol (MCP) exposes functions to AI agents like Claude. Requires special response formatting.

How It Works with Adapters​

// 1. Domain function (reusable everywhere)
export const getUser = pikkuFunc<
{ userId: string },
{ id: string; name: string; email: string }
>({
func: async ({ database }, { userId }) => {
return await database.users.findById(userId)
}
})

// 2. MCP adapter (transforms for AI)
export const getUserMCP = pikkuMCPResourceFunc<
{ userId: string }
>({
func: async ({ rpc }, { userId }) => {
// Call domain function via RPC
const user = await rpc.invoke('getUser', { userId })

// Transform to MCP format
return [
{
uri: `user://${userId}`,
text: JSON.stringify(user)
}
]
}
})

// 3. Wire to MCP
wireMCPResource({
uri: 'user/{userId}',
title: 'User Information',
description: 'Retrieve user data by ID',
func: getUserMCP
})

Data Flow​

AI Agent requests resource
↓
MCP protocol: GET resource user/123
↓
Pikku calls getUserMCP({ userId: '123' })
↓
Adapter calls rpc.invoke('getUser', { userId: '123' })
↓
Domain function executes and returns user object
↓
Adapter transforms to MCP format: [{ uri: 'user://123', text: '{"id":"123",...}' }]
↓
AI Agent receives formatted response

Key Points:

  • Keep domain logic in regular pikkuFunc (reusable)
  • Use thin pikkuMCPResourceFunc or pikkuMCPToolFunc adapters
  • Adapters transform data to MCP-required formats
  • MCP tools return [{ type: 'text', text: '...' }]
  • MCP resources return [{ uri: '...', text: '...' }]

RPC: Internal Function Calls​

Call functions from within other functions with full type safety.

How It Works​

// 1. Define functions
export const getUser = pikkuFunc<
{ userId: string },
{ id: string; name: string; email: string }
>({
func: async ({ database }, { userId }) => {
return await database.users.findById(userId)
}
})

export const getUserOrders = pikkuFunc<
{ userId: string },
{ user: User; orders: Order[] }
>({
func: async ({ database, rpc }, { userId }) => {
// Call another function internally
const user = await rpc.invoke('getUser', { userId })
const orders = await database.orders.findByUserId(userId)

return { user, orders }
}
})

Data Flow​

getUserOrders called
↓
Needs user data
↓
Calls rpc.invoke('getUser', { userId: '123' })
↓
Pikku executes getUser with same permissions/session context
↓
Returns user object
↓
getUserOrders continues execution
↓
Returns combined result

Key Points:

  • Type-safe internal function calls
  • Shares same session and permissions context
  • Enables composition without code duplication

Scheduled Tasks: Time-Based Execution​

Run functions on a schedule (cron-like).

How It Works​

// 1. Define the function
export const sendDailyDigest = pikkuFunc<
void,
{ emailsSent: number }
>({
func: async ({ database, emailService }) => {
const users = await database.users.findActive()

for (const user of users) {
await emailService.sendDigest(user)
}

return { emailsSent: users.length }
}
})

// 2. Wire to scheduler
wireScheduler({
cron: '0 9 * * *', // Every day at 9 AM
func: sendDailyDigest
})

Data Flow​

Scheduler tick (9:00 AM)
↓
Pikku calls sendDailyDigest()
↓
Function executes
↓
Returns result { emailsSent: 42 }
↓
Result logged
↓
Waits for next scheduled time

Key Points:

  • No input parameters (scheduled, not triggered by external event)
  • Runs in background on schedule
  • Same middleware and error handling as other protocols

CLI: Command-Line Interface​

Expose functions as CLI commands.

How It Works​

// 1. Define the function (same as before)
export const getUser = pikkuFunc<
{ userId: string },
{ id: string; name: string; email: string }
>({
func: async ({ database }, { userId }) => {
return await database.users.findById(userId)
}
})

// 2. Wire to CLI
wireCLI({
program: 'users',
commands: {
get: pikkuCLICommand({
parameters: '<userId>',
func: getUser
})
}
})

Data Flow​

Terminal: users get 123
↓
CLI parser extracts { userId: '123' }
↓
Calls getUser({ userId: '123' })
↓
Function returns user object
↓
CLI formats and prints to stdout
↓
User sees formatted output

Key Points:

  • Same function works in CLI and HTTP/WebSocket/etc.
  • Automatic argument parsing
  • Formatted console output

The Unification Pattern​

All protocols follow the same pattern:

  1. Input Extraction β€” Protocol-specific data (route params, WebSocket message, queue job, etc.) β†’ function parameters
  2. Function Execution β€” Same TypeScript function runs with same services, permissions, validation
  3. Output Transformation β€” Function return value β†’ protocol-specific response (HTTP JSON, WebSocket message, queue result, MCP format)

This is why you can write logic once and deploy everywhere.

Key Benefits​

βœ… Write Once, Wire Anywhere​

// One function
export const getUser = pikkuFunc<Input, Output>({ ... })

// Many protocols
wireHTTP({ route: '/users/:userId', func: getUser })
wireChannel({ onMessageWiring: { getUser: { func: getUser } } })
wireQueueWorker({ queue: 'get-user', func: getUser })
wireMCPResource({ uri: 'user/{userId}', func: getUserMCP }) // Thin adapter
wireCLI({ commands: { get: pikkuCLICommand({ func: getUser }) } })

βœ… Same Behavior Everywhere​

  • Authentication works the same way
  • Permissions are enforced consistently
  • Validation happens automatically
  • Middleware runs for all protocols
  • Error handling is uniform

βœ… Easy Testing​

Test the function directlyβ€”no need to mock HTTP requests, WebSocket connections, or queue infrastructure:

const result = await getUser.func(services, { userId: '123' })
expect(result.name).toBe('John Doe')

βœ… Flexible Deployment​

  • Start with HTTP monolith
  • Add WebSocket for real-time features
  • Move expensive operations to queues
  • Expose to AI agents via MCP
  • Provide CLI for admin tasks

Same code. Different protocols. Zero rewrites.

Next Steps​


Get Started

Ready to try Pikku? Get up and running in 5 minutes.


Learn More

Dive deeper into why Pikku gives you the flexibility to succeed.


Questions or Feedback?