Skip to main content

Queue Examples

This page shows practical examples of using Pikku queues for common use cases. Each example includes the function definition, worker registration, and client usage.

Email Processing​

Function Definition​

// email-worker.functions.ts
import { pikkuSessionlessFunc } from '@pikku/core'

interface EmailJob {
to: string
subject: string
body: string
template?: string
data?: Record<string, any>
}

interface EmailResult {
messageId: string
sentAt: Date
provider: string
}

export const sendEmail = pikkuSessionlessFunc<EmailJob, EmailResult>(
async (context, jobData) => {
const { logger, services } = context

logger.info('Sending email', { to: jobData.to, subject: jobData.subject })

try {
const result = await services.emailService.send({
to: jobData.to,
subject: jobData.subject,
body: jobData.body,
template: jobData.template,
data: jobData.data
})

logger.info('Email sent successfully', { messageId: result.messageId })

return {
messageId: result.messageId,
sentAt: new Date(),
provider: 'sendgrid'
}
} catch (error) {
logger.error('Email sending failed', { error: error.message })
throw error
}
}
)

Worker Registration​

// email-worker.routes.ts
import { addQueueWorker } from '@pikku/core'
import { sendEmail } from './email-worker.functions'

addQueueWorker({
queueName: 'email-queue',
func: sendEmail,
config: {
concurrency: 5,
retries: 3,
retryDelay: 10000,
timeout: 30000
}
})

Client Usage​

// In your application
import { PikkuQueue } from './.pikku/pikku-queue.gen'

const queueClient = new PikkuQueue(queueService)

// Send welcome email
async function sendWelcomeEmail(userEmail: string, userName: string) {
const jobId = await queueClient.add('email-queue', {
to: userEmail,
subject: 'Welcome to our platform!',
body: `Hello ${userName}, welcome to our platform!`,
template: 'welcome',
data: { name: userName }
})

return jobId
}

// Send password reset email
async function sendPasswordReset(userEmail: string, resetToken: string) {
return await queueClient.add('email-queue', {
to: userEmail,
subject: 'Password Reset Request',
body: `Click here to reset your password: ${resetToken}`,
template: 'password-reset',
data: { resetToken }
}, {
priority: 'high', // High priority for security-related emails
retries: 5 // More retries for important emails
})
}

Image Processing​

Function Definition​

// image-worker.functions.ts
import { pikkuSessionlessFunc } from '@pikku/core'

interface ImageJob {
imageUrl: string
userId: string
operations: Array<{
type: 'resize' | 'crop' | 'filter'
params: Record<string, any>
}>
}

interface ImageResult {
processedUrl: string
thumbnailUrl: string
processedAt: Date
fileSize: number
}

export const processImage = pikkuSessionlessFunc<ImageJob, ImageResult>(
async (context, jobData) => {
const { logger, services } = context

logger.info('Processing image', {
imageUrl: jobData.imageUrl,
userId: jobData.userId
})

// Download image
const imageBuffer = await services.imageService.download(jobData.imageUrl)

// Apply operations
let processedImage = imageBuffer
for (const operation of jobData.operations) {
processedImage = await services.imageService.apply(
processedImage,
operation.type,
operation.params
)
}

// Generate thumbnail
const thumbnail = await services.imageService.resize(processedImage, {
width: 200,
height: 200
})

// Upload processed images
const processedUrl = await services.storageService.upload(
processedImage,
`processed/${jobData.userId}/${Date.now()}.jpg`
)

const thumbnailUrl = await services.storageService.upload(
thumbnail,
`thumbnails/${jobData.userId}/${Date.now()}.jpg`
)

return {
processedUrl,
thumbnailUrl,
processedAt: new Date(),
fileSize: processedImage.length
}
}
)

Worker Registration​

// image-worker.routes.ts
import { addQueueWorker } from '@pikku/core'
import { processImage } from './image-worker.functions'

addQueueWorker({
queueName: 'image-processing',
func: processImage,
config: {
concurrency: 2, // CPU-intensive, limit concurrency
retries: 2, // Fewer retries for expensive operations
timeout: 300000, // 5 minute timeout for large images
pollingInterval: 2000
}
})

Client Usage​

// User uploads image
async function processUserImage(userId: string, imageUrl: string) {
const jobId = await queueClient.add('image-processing', {
imageUrl,
userId,
operations: [
{ type: 'resize', params: { width: 1200, height: 800 } },
{ type: 'filter', params: { type: 'sharpen' } }
]
})

// Wait for processing to complete
try {
const result = await queueClient.waitForCompletion(
'image-processing',
jobId,
{ timeout: 300000 }
)

return {
originalUrl: imageUrl,
processedUrl: result.processedUrl,
thumbnailUrl: result.thumbnailUrl
}
} catch (error) {
throw new Error(`Image processing failed: ${error.message}`)
}
}

Data Export​

Function Definition​

// export-worker.functions.ts
import { pikkuSessionlessFunc } from '@pikku/core'

interface ExportJob {
userId: string
dataType: 'users' | 'orders' | 'analytics'
format: 'csv' | 'json' | 'xlsx'
filters?: Record<string, any>
dateRange?: {
start: Date
end: Date
}
}

interface ExportResult {
fileUrl: string
fileName: string
recordCount: number
fileSize: number
exportedAt: Date
}

export const exportData = pikkuSessionlessFunc<ExportJob, ExportResult>(
async (context, jobData) => {
const { logger, services } = context

logger.info('Starting data export', {
dataType: jobData.dataType,
format: jobData.format,
userId: jobData.userId
})

// Fetch data based on type and filters
const data = await services.dataService.fetch({
type: jobData.dataType,
filters: jobData.filters,
dateRange: jobData.dateRange
})

logger.info('Data fetched', { recordCount: data.length })

// Generate export file
const exportFile = await services.exportService.generate({
data,
format: jobData.format,
fileName: `${jobData.dataType}_export_${Date.now()}.${jobData.format}`
})

// Upload to storage
const fileUrl = await services.storageService.upload(
exportFile.buffer,
`exports/${jobData.userId}/${exportFile.fileName}`
)

// Send notification email
await services.notificationService.send({
userId: jobData.userId,
type: 'export_complete',
data: { fileUrl, fileName: exportFile.fileName }
})

return {
fileUrl,
fileName: exportFile.fileName,
recordCount: data.length,
fileSize: exportFile.buffer.length,
exportedAt: new Date()
}
}
)

Worker Registration​

// export-worker.routes.ts
import { addQueueWorker } from '@pikku/core'
import { exportData } from './export-worker.functions'

addQueueWorker({
queueName: 'data-export',
func: exportData,
config: {
concurrency: 1, // Sequential processing for large exports
retries: 2, // Limited retries for expensive operations
timeout: 1800000, // 30 minute timeout
pollingInterval: 10000
}
})

Client Usage​

// Initiate data export
async function initiateDataExport(
userId: string,
dataType: 'users' | 'orders' | 'analytics',
format: 'csv' | 'json' | 'xlsx'
) {
const jobId = await queueClient.add('data-export', {
userId,
dataType,
format,
dateRange: {
start: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000), // 30 days ago
end: new Date()
}
})

return jobId
}

// Check export status
async function checkExportStatus(jobId: string) {
const job = await queueClient.getJob('data-export', jobId)

return {
status: job.status,
progress: job.progress,
result: job.result,
error: job.error
}
}

Notification System​

Function Definition​

// notification-worker.functions.ts
import { pikkuSessionlessFunc } from '@pikku/core'

interface NotificationJob {
userId: string
type: 'email' | 'sms' | 'push' | 'webhook'
template: string
data: Record<string, any>
priority: 'low' | 'normal' | 'high' | 'urgent'
channels?: string[]
}

interface NotificationResult {
deliveryId: string
channel: string
status: 'sent' | 'failed' | 'pending'
sentAt: Date
metadata?: Record<string, any>
}

export const sendNotification = pikkuSessionlessFunc<
NotificationJob,
NotificationResult[]
>(
async (context, jobData) => {
const { logger, services } = context

logger.info('Processing notification', {
userId: jobData.userId,
type: jobData.type,
priority: jobData.priority
})

// Get user notification preferences
const userPrefs = await services.userService.getNotificationPreferences(
jobData.userId
)

// Determine channels to use
const channels = jobData.channels || [jobData.type]
const enabledChannels = channels.filter(channel =>
userPrefs[channel]?.enabled
)

if (enabledChannels.length === 0) {
logger.info('No enabled channels for user', { userId: jobData.userId })
return []
}

// Send notifications across all enabled channels
const results: NotificationResult[] = []

for (const channel of enabledChannels) {
try {
const result = await services.notificationService.send({
userId: jobData.userId,
channel,
template: jobData.template,
data: jobData.data,
priority: jobData.priority
})

results.push({
deliveryId: result.id,
channel,
status: 'sent',
sentAt: new Date(),
metadata: result.metadata
})
} catch (error) {
logger.error('Notification sending failed', {
channel,
error: error.message
})

results.push({
deliveryId: '',
channel,
status: 'failed',
sentAt: new Date(),
metadata: { error: error.message }
})
}
}

return results
}
)

Worker Registration​

// notification-worker.routes.ts
import { addQueueWorker } from '@pikku/core'
import { sendNotification } from './notification-worker.functions'

// High priority notifications
addQueueWorker({
queueName: 'notifications-urgent',
func: sendNotification,
config: {
concurrency: 10,
retries: 5,
timeout: 15000,
pollingInterval: 1000
}
})

// Normal priority notifications
addQueueWorker({
queueName: 'notifications-normal',
func: sendNotification,
config: {
concurrency: 5,
retries: 3,
timeout: 30000,
pollingInterval: 5000
}
})

// Low priority notifications
addQueueWorker({
queueName: 'notifications-low',
func: sendNotification,
config: {
concurrency: 2,
retries: 2,
timeout: 60000,
pollingInterval: 30000
}
})

Client Usage​

// Send notification based on priority
async function sendUserNotification(
userId: string,
type: 'email' | 'sms' | 'push',
template: string,
data: Record<string, any>,
priority: 'low' | 'normal' | 'high' | 'urgent' = 'normal'
) {
const queueName = `notifications-${priority === 'urgent' ? 'urgent' : priority}`

const jobId = await queueClient.add(queueName, {
userId,
type,
template,
data,
priority
})

return jobId
}

// Batch notifications
async function sendBatchNotifications(notifications: NotificationJob[]) {
const groupedByPriority = notifications.reduce((groups, notification) => {
const priority = notification.priority
if (!groups[priority]) groups[priority] = []
groups[priority].push(notification)
return groups
}, {} as Record<string, NotificationJob[]>)

const jobIds: string[] = []

for (const [priority, batch] of Object.entries(groupedByPriority)) {
const queueName = `notifications-${priority === 'urgent' ? 'urgent' : priority}`
const batchJobIds = await queueClient.addBatch(queueName, batch)
jobIds.push(...batchJobIds)
}

return jobIds
}

Background Cleanup​

Function Definition​

// cleanup-worker.functions.ts
import { pikkuSessionlessFunc } from '@pikku/core'

interface CleanupJob {
type: 'files' | 'logs' | 'cache' | 'database'
olderThan: number // milliseconds
dryRun?: boolean
}

interface CleanupResult {
itemsRemoved: number
spaceFreed: number // bytes
errors: string[]
duration: number // milliseconds
}

export const runCleanup = pikkuSessionlessFunc<CleanupJob, CleanupResult>(
async (context, jobData) => {
const { logger, services } = context
const startTime = Date.now()

logger.info('Starting cleanup', {
type: jobData.type,
olderThan: jobData.olderThan,
dryRun: jobData.dryRun
})

let itemsRemoved = 0
let spaceFreed = 0
const errors: string[] = []

try {
switch (jobData.type) {
case 'files':
const fileResult = await services.storageService.cleanup({
olderThan: jobData.olderThan,
dryRun: jobData.dryRun
})
itemsRemoved = fileResult.filesRemoved
spaceFreed = fileResult.spaceFreed
break

case 'logs':
const logResult = await services.logService.cleanup({
olderThan: jobData.olderThan,
dryRun: jobData.dryRun
})
itemsRemoved = logResult.logsRemoved
spaceFreed = logResult.spaceFreed
break

case 'cache':
const cacheResult = await services.cacheService.cleanup({
olderThan: jobData.olderThan,
dryRun: jobData.dryRun
})
itemsRemoved = cacheResult.itemsRemoved
spaceFreed = cacheResult.spaceFreed
break

case 'database':
const dbResult = await services.databaseService.cleanup({
olderThan: jobData.olderThan,
dryRun: jobData.dryRun
})
itemsRemoved = dbResult.recordsRemoved
spaceFreed = dbResult.spaceFreed
break
}
} catch (error) {
errors.push(error.message)
logger.error('Cleanup failed', { error: error.message })
}

const duration = Date.now() - startTime

logger.info('Cleanup completed', {
itemsRemoved,
spaceFreed,
duration,
errors: errors.length
})

return {
itemsRemoved,
spaceFreed,
errors,
duration
}
}
)

Worker Registration​

// cleanup-worker.routes.ts
import { addQueueWorker } from '@pikku/core'
import { runCleanup } from './cleanup-worker.functions'

addQueueWorker({
queueName: 'cleanup-tasks',
func: runCleanup,
config: {
concurrency: 1, // Sequential cleanup to avoid conflicts
retries: 1, // Limited retries for cleanup tasks
timeout: 3600000, // 1 hour timeout
pollingInterval: 60000
}
})

Client Usage​

// Schedule daily cleanup
async function scheduleCleanup() {
const cleanupJobs = [
{
type: 'files' as const,
olderThan: 30 * 24 * 60 * 60 * 1000, // 30 days
dryRun: false
},
{
type: 'logs' as const,
olderThan: 7 * 24 * 60 * 60 * 1000, // 7 days
dryRun: false
},
{
type: 'cache' as const,
olderThan: 24 * 60 * 60 * 1000, // 24 hours
dryRun: false
}
]

const jobIds = await queueClient.addBatch('cleanup-tasks', cleanupJobs)
return jobIds
}

// Run cleanup with monitoring
async function runCleanupWithMonitoring(type: string) {
const jobId = await queueClient.add('cleanup-tasks', {
type,
olderThan: 24 * 60 * 60 * 1000,
dryRun: false
})

// Monitor progress
const job = await queueClient.getJob('cleanup-tasks', jobId)

const result = await job.waitForCompletion?.({
timeout: 3600000, // 1 hour
pollInterval: 10000 // Check every 10 seconds
})

return result
}

Best Practices from Examples​

1. Error Handling​

Always handle errors gracefully and provide meaningful error messages:

try {
const result = await processData(data)
return result
} catch (error) {
logger.error('Processing failed', { error: error.message, data })
throw new Error(`Processing failed: ${error.message}`)
}

2. Progress Reporting​

For long-running tasks, report progress:

for (let i = 0; i < items.length; i++) {
await processItem(items[i])

if (i % 100 === 0) {
logger.info('Progress update', {
processed: i,
total: items.length,
percentage: Math.round((i / items.length) * 100)
})
}
}

3. Resource Management​

Clean up resources properly:

let connection
try {
connection = await createConnection()
const result = await processWithConnection(connection, data)
return result
} finally {
if (connection) {
await connection.close()
}
}

4. Timeout Handling​

Set appropriate timeouts based on job complexity:

const timeout = jobData.complexity === 'high' ? 1800000 : 300000
await queueClient.add('processing-queue', jobData, { timeout })

These examples show how to build robust, scalable background processing systems with Pikku queues. Each pattern can be adapted to your specific use case while maintaining type safety and reliability.