Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
Comprehensive Cloudflare platform skill covering Workers, D1, R2, KV, AI, Durable Objects, and security.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
references/queues/patterns.md
1# Queues Patterns & Best Practices23## Async Task Processing45```typescript6// Producer: Accept request, queue work7export default {8async fetch(request: Request, env: Env): Promise<Response> {9const { userId, reportType } = await request.json();10await env.REPORT_QUEUE.send({ userId, reportType, requestedAt: Date.now() });11return Response.json({ message: 'Report queued', status: 'pending' });12}13};1415// Consumer: Process reports16export default {17async queue(batch: MessageBatch, env: Env): Promise<void> {18for (const msg of batch.messages) {19const { userId, reportType } = msg.body;20const report = await generateReport(userId, reportType, env);21await env.REPORTS_BUCKET.put(`${userId}/${reportType}.pdf`, report);22msg.ack();23}24}25};26```2728## Buffering API Calls2930```typescript31// Producer: Queue log entries32ctx.waitUntil(env.LOGS_QUEUE.send({33method: request.method,34url: request.url,35timestamp: Date.now()36}));3738// Consumer: Batch write to external API39async queue(batch: MessageBatch, env: Env): Promise<void> {40const logs = batch.messages.map(m => m.body);41await fetch(env.LOG_ENDPOINT, { method: 'POST', body: JSON.stringify({ logs }) });42batch.ackAll();43}44```4546## Rate Limiting Upstream4748```typescript49async queue(batch: MessageBatch, env: Env): Promise<void> {50for (const msg of batch.messages) {51try {52await callRateLimitedAPI(msg.body);53msg.ack();54} catch (error) {55if (error.status === 429) {56const retryAfter = parseInt(error.headers.get('Retry-After') || '60');57msg.retry({ delaySeconds: retryAfter });58} else throw error;59}60}61}62```6364## Event-Driven Workflows6566```typescript67// R2 event → Queue → Worker68export default {69async queue(batch: MessageBatch, env: Env): Promise<void> {70for (const msg of batch.messages) {71const event = msg.body;72if (event.action === 'PutObject') {73await processNewFile(event.object.key, env);74} else if (event.action === 'DeleteObject') {75await cleanupReferences(event.object.key, env);76}77msg.ack();78}79}80};81```8283## Dead Letter Queue Pattern8485```typescript86// Main queue: After max_retries, goes to DLQ automatically87export default {88async queue(batch: MessageBatch, env: Env): Promise<void> {89for (const msg of batch.messages) {90try {91await riskyOperation(msg.body);92msg.ack();93} catch (error) {94console.error(`Failed after ${msg.attempts} attempts:`, error);95}96}97}98};99100// DLQ consumer: Log and store failed messages101export default {102async queue(batch: MessageBatch, env: Env): Promise<void> {103for (const msg of batch.messages) {104await env.FAILED_KV.put(msg.id, JSON.stringify(msg.body));105msg.ack();106}107}108};109```110111## Priority Queues112113High priority: `max_batch_size: 5, max_batch_timeout: 1`. Low priority: `max_batch_size: 100, max_batch_timeout: 30`.114115## Delayed Job Processing116117```typescript118await env.EMAIL_QUEUE.send({ to, template, userId }, { delaySeconds: 3600 });119```120121## Fan-out Pattern122123```typescript124async fetch(request: Request, env: Env): Promise<Response> {125const event = await request.json();126127// Send to multiple queues for parallel processing128await Promise.all([129env.ANALYTICS_QUEUE.send(event),130env.NOTIFICATIONS_QUEUE.send(event),131env.AUDIT_LOG_QUEUE.send(event)132]);133134return Response.json({ status: 'processed' });135}136```137138## Idempotency Pattern139140```typescript141async queue(batch: MessageBatch, env: Env): Promise<void> {142for (const msg of batch.messages) {143// Check if already processed144const processed = await env.PROCESSED_KV.get(msg.id);145if (processed) {146msg.ack();147continue;148}149150await processMessage(msg.body);151await env.PROCESSED_KV.put(msg.id, '1', { expirationTtl: 86400 });152msg.ack();153}154}155```156157## Integration: D1 Batch Writes158159```typescript160async queue(batch: MessageBatch, env: Env): Promise<void> {161// Collect all inserts for single D1 batch162const statements = batch.messages.map(msg =>163env.DB.prepare('INSERT INTO events (id, data, created) VALUES (?, ?, ?)')164.bind(msg.id, JSON.stringify(msg.body), Date.now())165);166167try {168await env.DB.batch(statements);169batch.ackAll();170} catch (error) {171console.error('D1 batch failed:', error);172batch.retryAll({ delaySeconds: 60 });173}174}175```176177## Integration: Workflows178179```typescript180// Queue triggers Workflow for long-running tasks181async queue(batch: MessageBatch, env: Env): Promise<void> {182for (const msg of batch.messages) {183try {184const instance = await env.MY_WORKFLOW.create({185id: msg.id,186params: msg.body187});188console.log('Workflow started:', instance.id);189msg.ack();190} catch (error) {191msg.retry({ delaySeconds: 30 });192}193}194}195```196197## Integration: Durable Objects198199```typescript200// Queue distributes work to Durable Objects by ID201async queue(batch: MessageBatch, env: Env): Promise<void> {202for (const msg of batch.messages) {203const { userId, action } = msg.body;204205// Route to user-specific DO206const id = env.USER_DO.idFromName(userId);207const stub = env.USER_DO.get(id);208209try {210await stub.fetch(new Request('https://do/process', {211method: 'POST',212body: JSON.stringify({ action, messageId: msg.id })213}));214msg.ack();215} catch (error) {216msg.retry({ delaySeconds: 60 });217}218}219}220```221