Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
A comprehensive collection of Agent Skills for context engineering, multi-agent architectures, and production agent systems.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
skills/project-development/references/pipeline-patterns.md
1# Pipeline Patterns for LLM Projects23This reference provides detailed patterns for structuring LLM processing pipelines. These patterns apply to batch processing, data analysis, content generation, and similar workloads.45## The Canonical Pipeline67```8acquire → prepare → process → parse → render9```1011### Stage Characteristics1213| Stage | Deterministic | Expensive | Parallelizable | Idempotent |14|-------|---------------|-----------|----------------|------------|15| Acquire | Yes | Low | Yes | Yes |16| Prepare | Yes | Low | Yes | Yes |17| Process | No | High | Yes | Yes (with caching) |18| Parse | Yes | Low | Yes | Yes |19| Render | Yes | Low | Partially | Yes |2021The key insight: only the Process stage involves LLM calls. All other stages are deterministic transformations that can be debugged, tested, and iterated independently.2223## File System State Management2425### Directory Structure Pattern2627```28project/29├── data/30│ └── {batch_id}/31│ └── {item_id}/32│ ├── raw.json # Acquire output33│ ├── prompt.md # Prepare output34│ ├── response.md # Process output35│ └── parsed.json # Parse output36├── output/37│ └── {batch_id}/38│ └── index.html # Render output39└── config/40└── prompts/41└── template.md # Prompt templates42```4344### State Checking Pattern4546```python47def needs_processing(item_dir: Path, stage: str) -> bool:48"""Check if an item needs processing for a given stage."""49stage_outputs = {50"acquire": ["raw.json"],51"prepare": ["prompt.md"],52"process": ["response.md"],53"parse": ["parsed.json"],54}5556for output_file in stage_outputs[stage]:57if not (item_dir / output_file).exists():58return True59return False60```6162### Clean/Retry Pattern6364```python65def clean_from_stage(item_dir: Path, stage: str):66"""Remove outputs from stage and all downstream stages."""67stage_order = ["acquire", "prepare", "process", "parse", "render"]68stage_outputs = {69"acquire": ["raw.json"],70"prepare": ["prompt.md"],71"process": ["response.md"],72"parse": ["parsed.json"],73}7475start_idx = stage_order.index(stage)76for s in stage_order[start_idx:]:77for output_file in stage_outputs.get(s, []):78filepath = item_dir / output_file79if filepath.exists():80filepath.unlink()81```8283## Parallel Execution Patterns8485### ThreadPoolExecutor for LLM Calls8687```python88from concurrent.futures import ThreadPoolExecutor, as_completed8990def process_batch(items: list, max_workers: int = 10):91"""Process items in parallel with progress tracking."""92results = []9394with ThreadPoolExecutor(max_workers=max_workers) as executor:95futures = {executor.submit(process_item, item): item for item in items}9697for future in as_completed(futures):98item = futures[future]99try:100result = future.result()101results.append((item, result, None))102except Exception as e:103results.append((item, None, str(e)))104105return results106```107108### Batch Size Considerations109110- **Small batches (1-10)**: Sequential processing is fine; overhead of parallelization not worth it111- **Medium batches (10-100)**: Parallelize with 5-15 workers depending on API rate limits112- **Large batches (100+)**: Consider chunking with checkpoints; implement resume capability113114### Rate Limiting115116```python117import time118from functools import wraps119120def rate_limited(calls_per_second: float):121"""Decorator to rate limit function calls."""122min_interval = 1.0 / calls_per_second123last_call = [0.0]124125def decorator(func):126@wraps(func)127def wrapper(*args, **kwargs):128elapsed = time.time() - last_call[0]129if elapsed < min_interval:130time.sleep(min_interval - elapsed)131result = func(*args, **kwargs)132last_call[0] = time.time()133return result134return wrapper135return decorator136```137138## Structured Output Patterns139140### Prompt Template Structure141142```markdown143[INSTRUCTION BLOCK]144Analyze the following content and provide your response in exactly this format.145146[FORMAT SPECIFICATION]147## Section 1: Summary148[Your summary here - 2-3 sentences]149150## Section 2: Analysis151- Point 1152- Point 2153- Point 3154155## Section 3: Score156Rating: [1-10]157Confidence: [low/medium/high]158159[FORMAT ENFORCEMENT]160Follow this format exactly because I will be parsing it programmatically.161162---163164[CONTENT BLOCK]165# Title: {title}166167## Content168{content}169170## Additional Context171{context}172```173174### Parsing Patterns175176**Section Extraction**177178```python179import re180181def extract_section(text: str, section_name: str) -> str | None:182"""Extract content between section headers."""183# Match section header with optional markdown formatting184pattern = rf'(?:^|\n)(?:#+ *)?{re.escape(section_name)}[:\s]*\n(.*?)(?=\n(?:#+ |\Z))'185match = re.search(pattern, text, re.IGNORECASE | re.DOTALL)186return match.group(1).strip() if match else None187```188189**Structured Field Extraction**190191```python192def extract_field(text: str, field_name: str) -> str | None:193"""Extract value after field label."""194# Handle: "Field: value" or "Field - value" or "**Field**: value"195pattern = rf'(?:\*\*)?{re.escape(field_name)}(?:\*\*)?[\s:\-]+([^\n]+)'196match = re.search(pattern, text, re.IGNORECASE)197return match.group(1).strip() if match else None198```199200**List Extraction**201202```python203def extract_list_items(text: str, section_name: str) -> list[str]:204"""Extract bullet points from a section."""205section = extract_section(text, section_name)206if not section:207return []208209# Match lines starting with -, *, or numbered210items = re.findall(r'^[\-\*\d\.]+\s*(.+)$', section, re.MULTILINE)211return [item.strip() for item in items]212```213214**Score Extraction with Validation**215216```python217def extract_score(text: str, field_name: str, min_val: int, max_val: int) -> int | None:218"""Extract and validate numeric score."""219raw = extract_field(text, field_name)220if not raw:221return None222223# Extract first number from the value224match = re.search(r'\d+', raw)225if not match:226return None227228score = int(match.group())229return max(min_val, min(max_val, score)) # Clamp to valid range230```231232### Graceful Degradation233234```python235@dataclass236class ParseResult:237summary: str = ""238score: int | None = None239items: list[str] = field(default_factory=list)240parse_errors: list[str] = field(default_factory=list)241242def parse_response(text: str) -> ParseResult:243"""Parse LLM response with graceful error handling."""244result = ParseResult()245246# Try each field, log errors but continue247try:248result.summary = extract_section(text, "Summary") or ""249except Exception as e:250result.parse_errors.append(f"Summary extraction failed: {e}")251252try:253result.score = extract_score(text, "Rating", 1, 10)254except Exception as e:255result.parse_errors.append(f"Score extraction failed: {e}")256257try:258result.items = extract_list_items(text, "Analysis")259except Exception as e:260result.parse_errors.append(f"Items extraction failed: {e}")261262return result263```264265## Error Handling Patterns266267### Retry with Exponential Backoff268269```python270import time271from functools import wraps272273def retry_with_backoff(max_retries: int = 3, base_delay: float = 1.0):274"""Retry decorator with exponential backoff."""275def decorator(func):276@wraps(func)277def wrapper(*args, **kwargs):278last_exception = None279for attempt in range(max_retries):280try:281return func(*args, **kwargs)282except Exception as e:283last_exception = e284if attempt < max_retries - 1:285delay = base_delay * (2 ** attempt)286time.sleep(delay)287raise last_exception288return wrapper289return decorator290```291292### Error Logging Pattern293294```python295import json296from datetime import datetime297298def log_error(item_dir: Path, stage: str, error: str, context: dict = None):299"""Log error to file for later analysis."""300error_file = item_dir / "errors.jsonl"301302error_record = {303"timestamp": datetime.now().isoformat(),304"stage": stage,305"error": error,306"context": context or {},307}308309with open(error_file, "a") as f:310f.write(json.dumps(error_record) + "\n")311```312313### Partial Success Handling314315```python316def process_batch_with_partial_success(items: list) -> tuple[list, list]:317"""Process batch, separating successes from failures."""318successes = []319failures = []320321for item in items:322try:323result = process_item(item)324successes.append((item, result))325except Exception as e:326failures.append((item, str(e)))327log_error(item.directory, "process", str(e))328329# Report summary330print(f"Processed {len(items)} items: {len(successes)} succeeded, {len(failures)} failed")331332return successes, failures333```334335## Cost Estimation Patterns336337### Token Counting338339```python340import tiktoken341342def count_tokens(text: str, model: str = "gpt-4") -> int:343"""Count tokens for cost estimation."""344try:345encoding = tiktoken.encoding_for_model(model)346except KeyError:347encoding = tiktoken.get_encoding("cl100k_base")348349return len(encoding.encode(text))350351def estimate_cost(352input_tokens: int,353output_tokens: int,354input_price_per_mtok: float,355output_price_per_mtok: float,356) -> float:357"""Estimate cost in dollars."""358input_cost = (input_tokens / 1_000_000) * input_price_per_mtok359output_cost = (output_tokens / 1_000_000) * output_price_per_mtok360return input_cost + output_cost361```362363### Batch Cost Estimation364365```python366def estimate_batch_cost(367items: list,368prompt_template: str,369avg_output_tokens: int = 1000,370model_pricing: dict = None,371) -> dict:372"""Estimate total cost for a batch."""373model_pricing = model_pricing or {374"input_price_per_mtok": 3.00, # Example: GPT-4 Turbo input375"output_price_per_mtok": 15.00, # Example: GPT-4 Turbo output376}377378total_input_tokens = 0379for item in items:380prompt = format_prompt(prompt_template, item)381total_input_tokens += count_tokens(prompt)382383total_output_tokens = len(items) * avg_output_tokens384385estimated_cost = estimate_cost(386total_input_tokens,387total_output_tokens,388**model_pricing,389)390391return {392"item_count": len(items),393"total_input_tokens": total_input_tokens,394"total_output_tokens": total_output_tokens,395"estimated_cost_usd": estimated_cost,396"avg_input_tokens_per_item": total_input_tokens / len(items),397"cost_per_item_usd": estimated_cost / len(items),398}399```400401## CLI Pattern402403### Standard CLI Structure404405```python406import argparse407from datetime import date408409def main():410parser = argparse.ArgumentParser(description="LLM Processing Pipeline")411412parser.add_argument(413"stage",414choices=["acquire", "prepare", "process", "parse", "render", "all", "clean"],415help="Pipeline stage to run",416)417parser.add_argument(418"--batch-id",419default=None,420help="Batch identifier (default: today's date)",421)422parser.add_argument(423"--limit",424type=int,425default=None,426help="Limit number of items (for testing)",427)428parser.add_argument(429"--workers",430type=int,431default=10,432help="Number of parallel workers for processing",433)434parser.add_argument(435"--model",436default="gpt-4-turbo",437help="Model to use for processing",438)439parser.add_argument(440"--dry-run",441action="store_true",442help="Estimate costs without processing",443)444parser.add_argument(445"--clean-stage",446choices=["acquire", "prepare", "process", "parse"],447help="For clean: only clean this stage and downstream",448)449450args = parser.parse_args()451452batch_id = args.batch_id or date.today().isoformat()453454if args.stage == "clean":455stage_clean(batch_id, args.clean_stage)456elif args.dry_run:457estimate_costs(batch_id, args.limit)458else:459run_pipeline(batch_id, args.stage, args.limit, args.workers, args.model)460461if __name__ == "__main__":462main()463```464465## Rendering Patterns466467### Static HTML Output468469```python470import html471import json472473def render_html(data: list[dict], output_path: Path, template: str):474"""Render data to static HTML file."""475# Escape data for JavaScript embedding476data_json = json.dumps([477{k: html.escape(str(v)) if isinstance(v, str) else v478for k, v in item.items()}479for item in data480])481482html_content = template.replace("{{DATA_JSON}}", data_json)483484output_path.parent.mkdir(parents=True, exist_ok=True)485with open(output_path, "w") as f:486f.write(html_content)487```488489### Incremental Output490491```python492def render_incremental(items: list, output_dir: Path):493"""Render each item as it completes, plus index."""494output_dir.mkdir(parents=True, exist_ok=True)495496# Render individual item pages497for item in items:498item_html = render_item(item)499item_path = output_dir / f"{item.id}.html"500with open(item_path, "w") as f:501f.write(item_html)502503# Render index linking to all items504index_html = render_index(items)505with open(output_dir / "index.html", "w") as f:506f.write(index_html)507```508509## Checkpoint and Resume Pattern510511For long-running pipelines:512513```python514import json515from pathlib import Path516517class PipelineCheckpoint:518def __init__(self, checkpoint_file: Path):519self.checkpoint_file = checkpoint_file520self.state = self._load()521522def _load(self) -> dict:523if self.checkpoint_file.exists():524with open(self.checkpoint_file) as f:525return json.load(f)526return {"completed": [], "failed": [], "last_item": None}527528def save(self):529with open(self.checkpoint_file, "w") as f:530json.dump(self.state, f, indent=2)531532def mark_complete(self, item_id: str):533self.state["completed"].append(item_id)534self.state["last_item"] = item_id535self.save()536537def mark_failed(self, item_id: str, error: str):538self.state["failed"].append({"id": item_id, "error": error})539self.save()540541def get_remaining(self, all_items: list[str]) -> list[str]:542completed = set(self.state["completed"])543return [item for item in all_items if item not in completed]544```545546## Testing Patterns547548### Stage Unit Tests549550```python551def test_prepare_stage():552"""Test prompt generation independently."""553test_item = {"id": "test", "content": "Sample content"}554prompt = prepare_prompt(test_item)555556assert "Sample content" in prompt557assert "## Section 1" in prompt # Format markers present558559def test_parse_stage():560"""Test parsing with known good output."""561test_response = """562## Summary563This is a test summary.564565## Score566Rating: 7567"""568569result = parse_response(test_response)570assert result.summary == "This is a test summary."571assert result.score == 7572573def test_parse_stage_malformed():574"""Test parsing handles malformed output."""575test_response = "Some random text without sections"576577result = parse_response(test_response)578assert result.summary == ""579assert result.score is None580assert len(result.parse_errors) > 0581```582583### Integration Test Pattern584585```python586def test_pipeline_end_to_end():587"""Test full pipeline with single item."""588test_dir = Path("test_data")589test_item = create_test_item()590591try:592# Run each stage593acquire_result = stage_acquire(test_dir, [test_item])594assert (test_dir / test_item.id / "raw.json").exists()595596prepare_result = stage_prepare(test_dir)597assert (test_dir / test_item.id / "prompt.md").exists()598599# Skip process stage in unit tests (costs money)600# Create mock response instead601mock_response(test_dir / test_item.id)602603parse_result = stage_parse(test_dir)604assert (test_dir / test_item.id / "parsed.json").exists()605606finally:607# Cleanup608shutil.rmtree(test_dir, ignore_errors=True)609```610611