Redis (@pikku/redis)
The @pikku/redis package provides Redis implementations for workflow orchestration, channel state, and deployment tracking. It uses the ioredis driver.
note
Redis does not provide AIStorageService or AIRunStateService. For AI Agent persistence, use @pikku/pg or @pikku/kysely.
Installationβ
npm install @pikku/redis ioredis
Servicesβ
RedisWorkflowServiceβ
Workflow orchestration with Redis persistence using hashes and sorted sets.
import { RedisWorkflowService } from '@pikku/redis'
import Redis from 'ioredis'
const redis = new Redis(process.env.REDIS_URL!)
const workflowService = new RedisWorkflowService(redis)
await workflowService.init()
Constructor: new RedisWorkflowService(connectionOrConfig, keyPrefix?)
| Parameter | Type | Default | Description |
|---|---|---|---|
connectionOrConfig | Redis | RedisOptions | string | β | ioredis connection, options, or URL |
keyPrefix | string | 'pikku:' | Key prefix for all Redis keys |
RedisWorkflowRunServiceβ
Read-only workflow run queries for the Console.
import { RedisWorkflowRunService } from '@pikku/redis'
const workflowRunService = new RedisWorkflowRunService(redis)
RedisAgentRunServiceβ
Read-only agent run queries for the Console. Reads from Redis-backed AI state.
import { RedisAgentRunService } from '@pikku/redis'
const agentRunService = new RedisAgentRunService(redis)
RedisChannelStoreβ
WebSocket channel and subscription persistence.
import { RedisChannelStore } from '@pikku/redis'
const channelStore = new RedisChannelStore(redis)
await channelStore.init()
RedisDeploymentServiceβ
Multi-instance deployment tracking with heartbeat.
import { RedisDeploymentService } from '@pikku/redis'
const deploymentService = new RedisDeploymentService(config, redis)
await deploymentService.init()
Key Structureβ
Redis keys use the configured prefix (default pikku:) with the following patterns:
| Pattern | Purpose |
|---|---|
workflows:run:<id> | Workflow run state (hash) |
workflows:step:<runId>:<step> | Workflow step state (hash) |
workflows:step-history:<runId> | Step execution history (sorted set) |
ai:thread:<id> | AI thread data (hash) |
ai:threads | Thread index by update time (sorted set) |
ai:messages:<threadId> | Messages per thread (sorted set) |
ai:run:<id> | Agent run data (hash) |
Full Exampleβ
import {
RedisWorkflowService,
RedisChannelStore,
RedisAgentRunService,
} from '@pikku/redis'
import Redis from 'ioredis'
const redis = new Redis(process.env.REDIS_URL!)
const workflowService = new RedisWorkflowService(redis)
await workflowService.init()
const channelStore = new RedisChannelStore(redis)
await channelStore.init()
const singletonServices = await createSingletonServices(config, {
workflowService,
channelStore,
agentRunService: new RedisAgentRunService(redis),
})