Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
Comprehensive Cloudflare platform skill covering Workers, D1, R2, KV, AI, Durable Objects, and security.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
references/queues/api.md
1# Queues API Reference23## Producer: Send Messages45```typescript6// Basic send7await env.MY_QUEUE.send({ url: request.url, timestamp: Date.now() });89// Options: delay (max 43200s), contentType (json|text|bytes|v8)10await env.MY_QUEUE.send(message, { delaySeconds: 600 });11await env.MY_QUEUE.send(message, { delaySeconds: 0 }); // Override queue default1213// Batch (up to 100 msgs or 256 KB)14await env.MY_QUEUE.sendBatch([15{ body: 'msg1' },16{ body: 'msg2' },17{ body: 'msg3', options: { delaySeconds: 300 } }18]);1920// Non-blocking with ctx.waitUntil - send continues after response21ctx.waitUntil(env.MY_QUEUE.send({ data: 'async' }));2223// Background tasks in queue consumer24export default {25async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext): Promise<void> {26for (const msg of batch.messages) {27await processMessage(msg.body);2829// Fire-and-forget analytics (doesn't block ack)30ctx.waitUntil(31env.ANALYTICS_QUEUE.send({ messageId: msg.id, processedAt: Date.now() })32);3334msg.ack();35}36}37};38```3940## Consumer: Push-based (Worker)4142```typescript43// Type-safe handler with ExportedHandler44interface Env {45MY_QUEUE: Queue;46DB: D1Database;47}4849export default {50async queue(batch: MessageBatch<MessageBody>, env: Env, ctx: ExecutionContext): Promise<void> {51// batch.queue, batch.messages.length52for (const msg of batch.messages) {53// msg.id, msg.body, msg.timestamp, msg.attempts54try {55await processMessage(msg.body);56msg.ack();57} catch (error) {58msg.retry({ delaySeconds: 600 });59}60}61}62} satisfies ExportedHandler<Env>;63```6465**CRITICAL WARNINGS:**66671. **Messages not explicitly ack'd or retry'd will auto-retry indefinitely** until `max_retries` is reached. Always call `msg.ack()` or `msg.retry()` for each message.68692. **Throwing uncaught errors retries the ENTIRE batch**, not just the failed message. Always wrap individual message processing in try/catch and call `msg.retry()` explicitly per message.7071```typescript72// ❌ BAD: Uncaught error retries entire batch73async queue(batch: MessageBatch): Promise<void> {74for (const msg of batch.messages) {75await riskyOperation(msg.body); // If this throws, entire batch retries76msg.ack();77}78}7980// ✅ GOOD: Catch per message, handle individually81async queue(batch: MessageBatch): Promise<void> {82for (const msg of batch.messages) {83try {84await riskyOperation(msg.body);85msg.ack();86} catch (error) {87msg.retry({ delaySeconds: 60 });88}89}90}91```9293## Ack/Retry Precedence Rules94951. **Per-message calls take precedence**: If you call both `msg.ack()` and `msg.retry()`, last call wins962. **Batch calls don't override**: `batch.ackAll()` only affects messages without explicit ack/retry973. **No action = automatic retry**: Messages with no explicit action retry with configured delay9899```typescript100async queue(batch: MessageBatch): Promise<void> {101for (const msg of batch.messages) {102msg.ack(); // Message marked for ack103msg.retry(); // Overrides ack - message will retry104}105106batch.ackAll(); // Only affects messages not explicitly handled above107}108```109110## Batch Operations111112```typescript113// Acknowledge entire batch114try {115await bulkProcess(batch.messages);116batch.ackAll();117} catch (error) {118batch.retryAll({ delaySeconds: 300 });119}120```121122## Exponential Backoff123124```typescript125async queue(batch: MessageBatch, env: Env): Promise<void> {126for (const msg of batch.messages) {127try {128await processMessage(msg.body);129msg.ack();130} catch (error) {131// 30s, 60s, 120s, 240s, 480s, ... up to 12h max132const delay = Math.min(30 * (2 ** msg.attempts), 43200);133msg.retry({ delaySeconds: delay });134}135}136}137```138139## Multiple Queues, Single Consumer140141```typescript142export default {143async queue(batch: MessageBatch, env: Env): Promise<void> {144switch (batch.queue) {145case 'high-priority': await processUrgent(batch.messages); break;146case 'low-priority': await processDeferred(batch.messages); break;147case 'email': await sendEmails(batch.messages); break;148default: batch.retryAll();149}150}151};152```153154## Consumer: Pull-based (HTTP)155156```typescript157// Pull messages158const response = await fetch(159`https://api.cloudflare.com/client/v4/accounts/${ACCOUNT_ID}/queues/${QUEUE_ID}/messages/pull`,160{161method: 'POST',162headers: { 'authorization': `Bearer ${API_TOKEN}`, 'content-type': 'application/json' },163body: JSON.stringify({ visibility_timeout_ms: 6000, batch_size: 50 })164}165);166167const data = await response.json();168169// Acknowledge170await fetch(171`https://api.cloudflare.com/client/v4/accounts/${ACCOUNT_ID}/queues/${QUEUE_ID}/messages/ack`,172{173method: 'POST',174headers: { 'authorization': `Bearer ${API_TOKEN}`, 'content-type': 'application/json' },175body: JSON.stringify({176acks: [{ lease_id: msg.lease_id }],177retries: [{ lease_id: msg2.lease_id, delay_seconds: 600 }]178})179}180);181```182183## Interfaces184185```typescript186interface MessageBatch<Body = unknown> {187readonly queue: string;188readonly messages: Message<Body>[];189ackAll(): void;190retryAll(options?: QueueRetryOptions): void;191}192193interface Message<Body = unknown> {194readonly id: string;195readonly timestamp: Date;196readonly body: Body;197readonly attempts: number;198ack(): void;199retry(options?: QueueRetryOptions): void;200}201202interface QueueSendOptions {203contentType?: 'text' | 'bytes' | 'json' | 'v8';204delaySeconds?: number; // 0-43200205}206```207