Protocol Unification
One Function, Every Protocol
Pikku's core innovation is protocol unification: write your business logic once as a function, then wire it to any protocol without changing the implementation.
How It Worksβ
All protocols share the same foundation:
- Define a function with TypeScript input/output types
- Wire to protocols that need it
- Same auth, permissions, validation across all protocols
Let's see how each protocol handles data flow.
HTTP: Request In, Response Outβ
The simplest protocolβsynchronous request/response.
How It Worksβ
// 1. Define the function
export const getUser = pikkuFunc<
{ userId: string },
{ id: string; name: string; email: string }
>({
func: async ({ database }, { userId }) => {
return await database.users.findById(userId)
}
})
// 2. Wire to HTTP
wireHTTP({
method: 'get',
route: '/users/:userId',
func: getUser
})
Data Flowβ
Client Request
β
GET /users/123
β
Pikku extracts { userId: '123' } from route params
β
Calls getUser({ userId: '123' })
β
Function returns { id: '123', name: 'John', email: 'john@example.com' }
β
HTTP 200 with JSON response
β
Client receives response
Key Points:
- Route params, query params, and body all become function input
- Return value becomes HTTP response body
- Errors become HTTP status codes (404, 422, 500, etc.)
Server-Sent Events (SSE): Progressive Enhancementβ
Add real-time streaming to HTTP endpoints without breaking existing clients.
How It Worksβ
// 1. Define the function (same as before)
export const trackOrder = pikkuFunc<
{ orderId: string },
{ status: string; location: string; eta: string }
>({
func: async ({ database, channel }, { orderId }) => {
const order = await database.orders.findById(orderId)
// Send progress updates if SSE is enabled
if (channel) {
await channel.send({ type: 'status', data: 'Processing order...' })
await channel.send({ type: 'location', data: order.currentLocation })
}
return {
status: order.status,
location: order.currentLocation,
eta: order.estimatedDelivery
}
}
})
// 2. Wire to HTTP with SSE enabled
wireHTTP({
method: 'get',
route: '/orders/:orderId/track',
func: trackOrder,
sse: true // Enable Server-Sent Events
})
Data Flowβ
Regular HTTP Client β GET /orders/123/track
β
Function executes (channel is undefined)
β
Returns final result as JSON
β
HTTP 200 with { status: '...', location: '...', eta: '...' }
SSE Client β GET /orders/123/track (Accept: text/event-stream)
β
Function executes (channel is available)
β
Sends progress updates via channel.send()
β
Client receives streaming events in real-time
β
Function completes and returns final result
β
SSE connection closes
Key Points:
- Same function works for both regular HTTP and SSE
- Detect SSE with
if (channel)check - Regular HTTP clients get final result only
- SSE clients get progress updates + final result
- No breaking changes to existing API consumers
WebSocket: Bidirectional with Channelsβ
WebSocket adds real-time, bidirectional communication through channels.
How It Worksβ
// 1. Define the function (same as before)
export const getUser = pikkuFunc<
{ userId: string },
{ id: string; name: string; email: string }
>({
func: async ({ database, channel }, { userId }) => {
const user = await database.users.findById(userId)
// Optional: Send updates via channel
if (channel) {
await channel.send({ type: 'user-fetched', user })
}
return user
}
})
// 2. Wire to WebSocket channel
wireChannel({
name: 'users',
onMessageWiring: {
getUser: { func: getUser }
}
})
Data Flowβ
Client connects to WebSocket
β
Subscribes to 'users' channel
β
Sends message: { action: 'getUser', data: { userId: '123' } }
β
Pikku extracts data and calls getUser({ userId: '123' })
β
Function returns user object
β
WebSocket sends response message back to client
β
Function also sends additional update via channel.send()
β
All subscribers to 'users' channel receive the update
Key Points:
channelservice available for sending messages to subscribers- Same function works for both request/response and pub/sub patterns
- Automatic message routing by action name
Queues: Async Job Processingβ
Queues enable background job processing with optional result waiting.
How It Worksβ
// 1. Define the function (same as before)
export const sendEmail = pikkuFunc<
{ to: string; subject: string; body: string },
{ messageId: string }
>({
func: async ({ emailService }, { to, subject, body }) => {
const messageId = await emailService.send({ to, subject, body })
return { messageId }
}
})
// 2. Wire to queue
wireQueueWorker({
queue: 'email-queue',
func: sendEmail
})
Data Flow (Fire and Forget)β
Client adds job to queue
β
queueClient.add('email-queue', { to: 'user@example.com', ... })
β
Job stored in queue (Redis, PostgreSQL, SQS, etc.)
β
Queue worker picks up job
β
Calls sendEmail({ to: '...', subject: '...', body: '...' })
β
Function executes (sends email)
β
Returns result (if provider supports it - BullMQ/pg-boss yes, SQS no)
β
Job marked as complete
Data Flow (Wait for Result - BullMQ/pg-boss only)β
Client adds job and waits
β
const jobId = await queueClient.add('email-queue', data)
const job = await queueClient.getJob('email-queue', jobId)
const result = await job.waitForCompletion?.({ timeout: 30000 })
β
Job processed in background
β
Client receives result: { messageId: '...' } (if supported by provider)
Key Points:
- Same function works for both fire-and-forget and wait-for-result patterns
- Provider-dependent features (BullMQ and pg-boss support waiting, SQS doesn't)
- Automatic retry on failures
MCP: Transform for AI Agentsβ
Model Context Protocol (MCP) exposes functions to AI agents like Claude. Requires special response formatting.
How It Works with Adaptersβ
// 1. Domain function (reusable everywhere)
export const getUser = pikkuFunc<
{ userId: string },
{ id: string; name: string; email: string }
>({
func: async ({ database }, { userId }) => {
return await database.users.findById(userId)
}
})
// 2. MCP adapter (transforms for AI)
export const getUserMCP = pikkuMCPResourceFunc<
{ userId: string }
>({
func: async ({ rpc }, { userId }) => {
// Call domain function via RPC
const user = await rpc.invoke('getUser', { userId })
// Transform to MCP format
return [
{
uri: `user://${userId}`,
text: JSON.stringify(user)
}
]
}
})
// 3. Wire to MCP
wireMCPResource({
uri: 'user/{userId}',
title: 'User Information',
description: 'Retrieve user data by ID',
func: getUserMCP
})
Data Flowβ
AI Agent requests resource
β
MCP protocol: GET resource user/123
β
Pikku calls getUserMCP({ userId: '123' })
β
Adapter calls rpc.invoke('getUser', { userId: '123' })
β
Domain function executes and returns user object
β
Adapter transforms to MCP format: [{ uri: 'user://123', text: '{"id":"123",...}' }]
β
AI Agent receives formatted response
Key Points:
- Keep domain logic in regular
pikkuFunc(reusable) - Use thin
pikkuMCPResourceFuncorpikkuMCPToolFuncadapters - Adapters transform data to MCP-required formats
- MCP tools return
[{ type: 'text', text: '...' }] - MCP resources return
[{ uri: '...', text: '...' }]
RPC: Internal Function Callsβ
Call functions from within other functions with full type safety.
How It Worksβ
// 1. Define functions
export const getUser = pikkuFunc<
{ userId: string },
{ id: string; name: string; email: string }
>({
func: async ({ database }, { userId }) => {
return await database.users.findById(userId)
}
})
export const getUserOrders = pikkuFunc<
{ userId: string },
{ user: User; orders: Order[] }
>({
func: async ({ database, rpc }, { userId }) => {
// Call another function internally
const user = await rpc.invoke('getUser', { userId })
const orders = await database.orders.findByUserId(userId)
return { user, orders }
}
})
Data Flowβ
getUserOrders called
β
Needs user data
β
Calls rpc.invoke('getUser', { userId: '123' })
β
Pikku executes getUser with same permissions/session context
β
Returns user object
β
getUserOrders continues execution
β
Returns combined result
Key Points:
- Type-safe internal function calls
- Shares same session and permissions context
- Enables composition without code duplication
Scheduled Tasks: Time-Based Executionβ
Run functions on a schedule (cron-like).
How It Worksβ
// 1. Define the function
export const sendDailyDigest = pikkuFunc<
void,
{ emailsSent: number }
>({
func: async ({ database, emailService }) => {
const users = await database.users.findActive()
for (const user of users) {
await emailService.sendDigest(user)
}
return { emailsSent: users.length }
}
})
// 2. Wire to scheduler
wireScheduler({
cron: '0 9 * * *', // Every day at 9 AM
func: sendDailyDigest
})
Data Flowβ
Scheduler tick (9:00 AM)
β
Pikku calls sendDailyDigest()
β
Function executes
β
Returns result { emailsSent: 42 }
β
Result logged
β
Waits for next scheduled time
Key Points:
- No input parameters (scheduled, not triggered by external event)
- Runs in background on schedule
- Same middleware and error handling as other protocols
CLI: Command-Line Interfaceβ
Expose functions as CLI commands.
How It Worksβ
// 1. Define the function (same as before)
export const getUser = pikkuFunc<
{ userId: string },
{ id: string; name: string; email: string }
>({
func: async ({ database }, { userId }) => {
return await database.users.findById(userId)
}
})
// 2. Wire to CLI
wireCLI({
program: 'users',
commands: {
get: pikkuCLICommand({
parameters: '<userId>',
func: getUser
})
}
})
Data Flowβ
Terminal: users get 123
β
CLI parser extracts { userId: '123' }
β
Calls getUser({ userId: '123' })
β
Function returns user object
β
CLI formats and prints to stdout
β
User sees formatted output
Key Points:
- Same function works in CLI and HTTP/WebSocket/etc.
- Automatic argument parsing
- Formatted console output
The Unification Patternβ
All protocols follow the same pattern:
- Input Extraction β Protocol-specific data (route params, WebSocket message, queue job, etc.) β function parameters
- Function Execution β Same TypeScript function runs with same services, permissions, validation
- Output Transformation β Function return value β protocol-specific response (HTTP JSON, WebSocket message, queue result, MCP format)
This is why you can write logic once and deploy everywhere.
Key Benefitsβ
β Write Once, Wire Anywhereβ
// One function
export const getUser = pikkuFunc<Input, Output>({ ... })
// Many protocols
wireHTTP({ route: '/users/:userId', func: getUser })
wireChannel({ onMessageWiring: { getUser: { func: getUser } } })
wireQueueWorker({ queue: 'get-user', func: getUser })
wireMCPResource({ uri: 'user/{userId}', func: getUserMCP }) // Thin adapter
wireCLI({ commands: { get: pikkuCLICommand({ func: getUser }) } })
β Same Behavior Everywhereβ
- Authentication works the same way
- Permissions are enforced consistently
- Validation happens automatically
- Middleware runs for all protocols
- Error handling is uniform
β Easy Testingβ
Test the function directlyβno need to mock HTTP requests, WebSocket connections, or queue infrastructure:
const result = await getUser.func(services, { userId: '123' })
expect(result.name).toBe('John Doe')
β Flexible Deploymentβ
- Start with HTTP monolith
- Add WebSocket for real-time features
- Move expensive operations to queues
- Expose to AI agents via MCP
- Provide CLI for admin tasks
Same code. Different protocols. Zero rewrites.
Next Stepsβ
- TypeScript Everywhere β See how type safety flows across all protocols
- Get started β Build your first unified backend
- Documentation β Deep dive into each protocol
Get Started
Ready to try Pikku? Get up and running in 5 minutes.
Learn More
Dive deeper into why Pikku gives you the flexibility to succeed.
Questions or Feedback?
- π» GitHub
- π¬ Discussions
- π Documentation
- π¦ Twitter/X
- π¬ Discord