Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
Comprehensive Cloudflare platform skill covering Workers, D1, R2, KV, AI, Durable Objects, and security.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
references/workflows/patterns.md
1# Workflow Patterns23## Image Processing Pipeline45```typescript6export class ImageProcessingWorkflow extends WorkflowEntrypoint<Env, Params> {7async run(event, step) {8const imageData = await step.do('fetch', async () => (await this.env.BUCKET.get(event.payload.imageKey)).arrayBuffer());9const description = await step.do('generate description', async () =>10await this.env.AI.run('@cf/llava-hf/llava-1.5-7b-hf', {image: Array.from(new Uint8Array(imageData)), prompt: 'Describe this image', max_tokens: 50})11);12await step.waitForEvent('await approval', { type: 'approved', timeout: '24h' });13await step.do('publish', async () => await this.env.BUCKET.put(`public/${event.payload.imageKey}`, imageData));14}15}16```1718## User Lifecycle1920```typescript21export class UserLifecycleWorkflow extends WorkflowEntrypoint<Env, Params> {22async run(event, step) {23await step.do('welcome email', async () => await sendEmail(event.payload.email, 'Welcome!'));24await step.sleep('trial period', '7 days');25const hasConverted = await step.do('check conversion', async () => {26const user = await this.env.DB.prepare('SELECT subscription_status FROM users WHERE id = ?').bind(event.payload.userId).first();27return user.subscription_status === 'active';28});29if (!hasConverted) await step.do('trial expiration email', async () => await sendEmail(event.payload.email, 'Trial ending'));30}31}32```3334## Data Pipeline3536```typescript37export class DataPipelineWorkflow extends WorkflowEntrypoint<Env, Params> {38async run(event, step) {39const rawData = await step.do('extract', {retries: { limit: 10, delay: '30s', backoff: 'exponential' }}, async () => {40const res = await fetch(event.payload.sourceUrl);41if (!res.ok) throw new Error('Fetch failed');42return res.json();43});44const transformed = await step.do('transform', async () =>45rawData.map(item => ({ id: item.id, normalized: normalizeData(item) }))46);47const dataRef = await step.do('store', async () => {48const key = `processed/${Date.now()}.json`;49await this.env.BUCKET.put(key, JSON.stringify(transformed));50return { key };51});52await step.do('load', async () => {53const data = await (await this.env.BUCKET.get(dataRef.key)).json();54for (let i = 0; i < data.length; i += 100) {55await this.env.DB.batch(data.slice(i, i + 100).map(item =>56this.env.DB.prepare('INSERT INTO records VALUES (?, ?)').bind(item.id, item.normalized)57));58}59});60}61}62```6364## Human-in-the-Loop Approval6566```typescript67export class ApprovalWorkflow extends WorkflowEntrypoint<Env, Params> {68async run(event, step) {69await step.do('create approval', async () => await this.env.DB.prepare('INSERT INTO approvals (id, user_id, status) VALUES (?, ?, ?)').bind(event.instanceId, event.payload.userId, 'pending').run());70try {71const approval = await step.waitForEvent<{ approved: boolean }>('wait for approval', { type: 'approval-response', timeout: '48h' });72if (approval.approved) { await step.do('process approval', async () => {}); }73else { await step.do('handle rejection', async () => {}); }74} catch (e) {75await step.do('auto reject', async () => await this.env.DB.prepare('UPDATE approvals SET status = ? WHERE id = ?').bind('auto-rejected', event.instanceId).run());76}77}78}79```8081## Testing Workflows8283### Setup8485```typescript86// vitest.config.ts87import { defineWorkersConfig } from '@cloudflare/vitest-pool-workers/config';8889export default defineWorkersConfig({90test: {91poolOptions: {92workers: {93wrangler: { configPath: './wrangler.jsonc' }94}95}96}97});98```99100### Introspection API101102```typescript103import { introspectWorkflowInstance } from 'cloudflare:test';104105const instance = await env.MY_WORKFLOW.create({ params: { userId: '123' } });106const introspector = await introspectWorkflowInstance(env.MY_WORKFLOW, instance.id);107108// Wait for step completion109const result = await introspector.waitForStepResult({ name: 'fetch user', index: 0 });110111// Mock step behavior112await introspector.modify(async (m) => {113await m.mockStepResult({ name: 'api call' }, { mocked: true });114});115```116117## Best Practices118119### ✅ DO1201211. **Granular steps**: One API call per step (unless proving idempotency)1222. **Idempotency**: Check-then-execute; use idempotency keys1233. **Deterministic names**: Use static or step-output-based names1244. **Return state**: Persist via step returns, not variables1255. **Always await**: `await step.do()`, avoid dangling promises1266. **Deterministic conditionals**: Base on `event.payload` or step outputs1277. **Store large data externally**: R2/KV for data exceeding step return limit, return refs1288. **Batch creation**: `createBatch()` for multiple instances129130### ❌ DON'T1311321. **One giant step**: Breaks durability & retry control1332. **State outside steps**: Lost on hibernation1343. **Mutate events**: Events immutable, return new state1354. **Non-deterministic logic outside steps**: `Math.random()`, `Date.now()` must be in steps1365. **Side effects outside steps**: May duplicate on restart1376. **Non-deterministic step names**: Prevents caching1387. **Ignore timeouts**: `waitForEvent` throws, use try-catch1398. **Reuse instance IDs**: Must be unique within retention140141## Orchestration Patterns142143### Fan-Out (Parallel Processing)144```typescript145const files = await step.do('list', async () => this.env.BUCKET.list());146await Promise.all(files.objects.map((file, i) => step.do(`process ${i}`, async () => processFile(await (await this.env.BUCKET.get(file.key)).arrayBuffer()))));147```148149### Parent-Child Workflows150```typescript151const child = await step.do('start child', async () => await this.env.CHILD_WORKFLOW.create({id: `child-${event.instanceId}`, params: { data: result.data }}));152await step.do('other work', async () => console.log(`Child started: ${child.id}`));153```154155### Race Pattern156```typescript157const winner = await Promise.race([158step.do('option A', async () => slowOperation()),159step.do('option B', async () => fastOperation())160]);161```162163### Scheduled Workflow Chain164```typescript165export default { async scheduled(event, env) { await env.DAILY_WORKFLOW.create({id: `daily-${event.scheduledTime}`, params: { timestamp: event.scheduledTime }}); }};166export class DailyWorkflow extends WorkflowEntrypoint<Env, Params> {167async run(event, step) {168await step.do('daily task', async () => {});169await step.sleep('wait 7 days', '7 days');170await step.do('weekly followup', async () => {});171}172}173```174175See: [configuration.md](./configuration.md), [api.md](./api.md), [gotchas.md](./gotchas.md)176