EventHub API
The EventHub service provides topic-based pub/sub messaging for real-time communication across channels, Server-Sent Events, and custom transports.
Overview
EventHub enables:
- Topic-based message routing
- Broadcasting to multiple subscribers
- Selective exclusion (send to all except one)
- Stateless and stateful operation modes
- Cross-transport compatibility (WebSockets, SSE, etc.)
Methods
subscribe()
Subscribe a channel/connection to a topic.
await eventHub.subscribe(topic: string, subscriberId: string)
Parameters:
topic- Topic name (e.g.,'room:lobby','user:123')subscriberId- Unique identifier for the subscriber (typicallychannel.channelId)
Example:
// Subscribe channel to a room topic
await eventHub.subscribe(`room:${roomId}`, channel.channelId)
unsubscribe()
Unsubscribe a channel/connection from a topic.
await eventHub.unsubscribe(topic: string, subscriberId: string)
Parameters:
topic- Topic name to unsubscribe fromsubscriberId- Subscriber identifier to remove
Example:
// Unsubscribe when leaving a room
await eventHub.unsubscribe(`room:${roomId}`, channel.channelId)
Automatic Cleanup
Pikku automatically unsubscribes all topics when a channel disconnects. You only need to manually unsubscribe when changing subscriptions during an active connection.
publish()
Publish a message to all subscribers of a topic.
await eventHub.publish(
topic: string,
excludeId: string | null,
payload: any
)
Parameters:
topic- Topic to publish toexcludeId- Subscriber ID to exclude (set tonullto broadcast to all)payload- Message payload (must be JSON-serializable)
Examples:
Broadcast to all subscribers:
await eventHub.publish(
`room:${roomId}`,
null, // Send to all
{
type: 'message',
text: 'Hello everyone!',
userId: currentUserId
}
)
Broadcast to all except sender:
await eventHub.publish(
`room:${roomId}`,
channel.channelId, // Exclude sender
{
type: 'user_joined',
userId: newUserId,
username: newUserName
}
)
Usage Patterns
Chat Room Example
// Connection - subscribe to room
export const joinRoom = pikkuChannelConnectionFunc<
{ welcome: string },
{ room: string }
>(async ({ eventHub, channel }) => {
const room = channel.openingData.room
await eventHub.subscribe(`room:${room}`, channel.channelId)
return { welcome: `Welcome to ${room}!` }
})
// Message - broadcast to room (excluding sender)
export const sendMessage = pikkuChannelFunc<
{ message: string },
void,
{ room: string }
>(async ({ eventHub, channel, userSession }, data) => {
const room = channel.openingData.room
await eventHub.publish(
`room:${room}`,
channel.channelId, // Don't echo back to sender
{
message: data.message,
userId: userSession.userId,
timestamp: Date.now()
}
)
})
// Disconnect - cleanup (optional, happens automatically)
export const leaveRoom = pikkuChannelDisconnectionFunc<{ room: string }>(
async ({ eventHub }, data) => {
await eventHub.unsubscribe(`room:${data.room}`, data.channelId)
}
)
Presence System
// Publish presence updates
export const updatePresence = pikkuFunc<
{ status: 'online' | 'away' | 'offline' },
void
>(async ({ eventHub, userSession }, data) => {
await eventHub.publish('presence:updates', null, {
userId: userSession.userId,
status: data.status,
timestamp: Date.now()
})
})
// Subscribe to presence on connect
export const watchPresence = pikkuChannelConnectionFunc(
async ({ eventHub, channel }) => {
await eventHub.subscribe('presence:updates', channel.channelId)
}
)
Notification System
// Subscribe user to their personal notification channel
export const connectNotifications = pikkuChannelConnectionFunc<
void,
{ userId: string }
>(async ({ eventHub, channel }) => {
const userId = channel.openingData.userId
await eventHub.subscribe(`notifications:${userId}`, channel.channelId)
})
// Trigger notification from any function
export const sendNotification = pikkuFunc<
{ userId: string; title: string; message: string },
void
>(async ({ eventHub }, data) => {
await eventHub.publish(`notifications:${data.userId}`, null, {
title: data.title,
message: data.message,
timestamp: Date.now()
})
})