Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
A comprehensive collection of Agent Skills for context engineering, multi-agent architectures, and production agent systems.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
skills/project-development/scripts/pipeline_template.py
1"""2LLM Batch Processing Pipeline Template.34A composable, staged pipeline architecture for LLM batch processing.5Each stage is discrete, idempotent, and cacheable. Customize the acquire,6prepare, process, parse, and render functions for your use case.78Use when:9- Building a new batch processing pipeline with structured LLM outputs10- Prototyping an acquire -> prepare -> process -> parse -> render workflow11- Need a file-system-based state machine for pipeline stage tracking1213Usage:14python pipeline_template.py acquire --batch-id 2025-01-1515python pipeline_template.py prepare --batch-id 2025-01-1516python pipeline_template.py process --batch-id 2025-01-15 --workers 1017python pipeline_template.py parse --batch-id 2025-01-1518python pipeline_template.py render --batch-id 2025-01-1519python pipeline_template.py all --batch-id 2025-01-1520python pipeline_template.py clean --batch-id 2025-01-15 --clean-stage process21python pipeline_template.py estimate --batch-id 2025-01-152223Programmatic usage:24from pipeline_template import stage_acquire, stage_prepare, stage_process25stage_acquire("2025-01-15", limit=5)26stage_prepare("2025-01-15")27stage_process("2025-01-15", model="claude-sonnet-4-20250514", max_workers=3)28"""2930import argparse31import json32import re33import time34from concurrent.futures import ThreadPoolExecutor, as_completed35from dataclasses import dataclass, field, asdict36from datetime import date37from pathlib import Path38from typing import Any3940__all__ = [41"Item",42"ParsedResult",43"stage_acquire",44"stage_prepare",45"stage_process",46"stage_parse",47"stage_render",48"stage_clean",49"stage_estimate",50"parse_response",51"get_batch_dir",52"get_item_dir",53"get_output_dir",54]555657# -----------------------------------------------------------------------------58# Configuration - Customize for your use case59# -----------------------------------------------------------------------------6061DATA_DIR = Path("data")62OUTPUT_DIR = Path("output")6364# Prompt template with structured output requirements65PROMPT_TEMPLATE = """Analyze the following content and provide your response in exactly this format.6667## Summary68[2-3 sentence summary of the content]6970## Key Points71- [Point 1]72- [Point 2]73- [Point 3]7475## Score76Rating: [1-10]77Confidence: [low/medium/high]7879## Reasoning80[Explanation of your analysis]8182Follow this format exactly because I will be parsing it programmatically.8384---8586# Content to Analyze8788Title: {title}8990{content}91"""929394# -----------------------------------------------------------------------------95# Data Structures96# -----------------------------------------------------------------------------9798@dataclass99class Item:100"""Represents a single item to process through the pipeline.101102Use when: creating items during the acquire stage or loading raw data103from any source (API, database, file system).104"""105106id: str107title: str108content: str109metadata: dict[str, Any] = field(default_factory=dict)110111112@dataclass113class ParsedResult:114"""Structured result from LLM response parsing.115116Use when: extracting structured data from free-text LLM responses117during the parse stage.118"""119120summary: str = ""121key_points: list[str] = field(default_factory=list)122score: int | None = None123confidence: str = ""124reasoning: str = ""125parse_errors: list[str] = field(default_factory=list)126127128# -----------------------------------------------------------------------------129# Path Utilities130# -----------------------------------------------------------------------------131132def get_batch_dir(batch_id: str) -> Path:133"""Get the data directory for a batch.134135Use when: resolving the root directory for a specific batch run.136"""137return DATA_DIR / batch_id138139140def get_item_dir(batch_id: str, item_id: str) -> Path:141"""Get the directory for a specific item.142143Use when: locating stage output files for a single pipeline item.144"""145return get_batch_dir(batch_id) / item_id146147148def get_output_dir(batch_id: str) -> Path:149"""Get the output directory for a batch.150151Use when: writing final rendered outputs (HTML, reports, etc.).152"""153return OUTPUT_DIR / batch_id154155156# -----------------------------------------------------------------------------157# Stage: Acquire158# -----------------------------------------------------------------------------159160def stage_acquire(batch_id: str, limit: int | None = None) -> list[Path]:161"""Stage 1: Acquire raw data from sources.162163Use when: fetching data from APIs, databases, or file systems164and persisting it as raw.json per item for downstream stages.165166Output: {batch_dir}/{item_id}/raw.json167Returns: List of item directories that were acquired.168"""169batch_dir = get_batch_dir(batch_id)170batch_dir.mkdir(parents=True, exist_ok=True)171172# CUSTOMIZE: Replace with your data acquisition logic173items = fetch_items_from_source(limit)174175acquired_dirs: list[Path] = []176for item in items:177item_dir = get_item_dir(batch_id, item.id)178item_dir.mkdir(exist_ok=True)179180raw_file = item_dir / "raw.json"181if not raw_file.exists():182with open(raw_file, "w") as f:183json.dump(asdict(item), f, indent=2)184print(f"Acquired: {item.id}")185else:186print(f"Cached: {item.id}")187188acquired_dirs.append(item_dir)189190print(f"\nAcquire complete. {len(items)} items in {batch_dir}")191return acquired_dirs192193194def fetch_items_from_source(limit: int | None = None) -> list[Item]:195"""CUSTOMIZE: Implement your data fetching logic here.196197Use when: pulling raw items from your specific data source.198Replace this with actual API calls, database queries, etc.199"""200# Example: Generate sample items201items: list[Item] = []202for i in range(limit or 10):203items.append(Item(204id=f"item-{i:04d}",205title=f"Sample Item {i}",206content=f"This is sample content for item {i}. " * 10,207metadata={"source": "example", "index": i},208))209return items210211212# -----------------------------------------------------------------------------213# Stage: Prepare214# -----------------------------------------------------------------------------215216def stage_prepare(batch_id: str) -> int:217"""Stage 2: Generate prompts from raw data.218219Use when: transforming raw acquired data into LLM-ready prompts220using the configured PROMPT_TEMPLATE.221222Output: {batch_dir}/{item_id}/prompt.md223Returns: Number of items prepared.224"""225batch_dir = get_batch_dir(batch_id)226prepared_count = 0227228for item_dir in sorted(batch_dir.iterdir()):229if not item_dir.is_dir():230continue231232raw_file = item_dir / "raw.json"233prompt_file = item_dir / "prompt.md"234235if not raw_file.exists():236continue237238if prompt_file.exists():239continue240241with open(raw_file) as f:242item_data: dict[str, Any] = json.load(f)243244prompt = generate_prompt(item_data)245246with open(prompt_file, "w") as f:247f.write(prompt)248249prepared_count += 1250print(f"Prepared: {item_dir.name}")251252print(f"\nPrepare complete. {prepared_count} items prepared.")253return prepared_count254255256def generate_prompt(item_data: dict[str, Any]) -> str:257"""Generate prompt from item data using template.258259Use when: converting a raw item dict into a formatted prompt string.260"""261return PROMPT_TEMPLATE.format(262title=item_data.get("title", "Untitled"),263content=item_data.get("content", ""),264)265266267# -----------------------------------------------------------------------------268# Stage: Process269# -----------------------------------------------------------------------------270271def stage_process(272batch_id: str,273model: str = "claude-sonnet-4-20250514",274max_workers: int = 5,275) -> list[tuple[str, int, str | None]]:276"""Stage 3: Execute LLM calls (the expensive, non-deterministic stage).277278Use when: sending prepared prompts to the LLM API and caching279responses. This is the only non-deterministic stage.280281Output: {batch_dir}/{item_id}/response.md282Returns: List of (item_id, char_count, error_or_none) tuples.283"""284batch_dir = get_batch_dir(batch_id)285286# Collect items needing processing287to_process: list[tuple[Path, str]] = []288for item_dir in sorted(batch_dir.iterdir()):289if not item_dir.is_dir():290continue291292prompt_file = item_dir / "prompt.md"293response_file = item_dir / "response.md"294295if prompt_file.exists() and not response_file.exists():296to_process.append((item_dir, prompt_file.read_text()))297298if not to_process:299print("No items to process.")300return []301302print(f"Processing {len(to_process)} items with {max_workers} workers...")303304results: list[tuple[str, int, str | None]] = []305306def process_one(args: tuple[Path, str]) -> tuple[str, int, str | None]:307item_dir, prompt = args308response_file = item_dir / "response.md"309310try:311# CUSTOMIZE: Replace with your LLM API call312response = call_llm(prompt, model)313314with open(response_file, "w") as f:315f.write(response)316317return (item_dir.name, len(response), None)318except Exception as e:319return (item_dir.name, 0, str(e))320321with ThreadPoolExecutor(max_workers=max_workers) as executor:322futures = {executor.submit(process_one, item): item for item in to_process}323324for future in as_completed(futures):325item_id, chars, error = future.result()326results.append((item_id, chars, error))327if error:328print(f" {item_id}: Error - {error}")329else:330print(f" {item_id}: Done ({chars} chars)")331332print(f"\nProcess complete. {len(results)} items processed.")333return results334335336def call_llm(prompt: str, model: str) -> str:337"""CUSTOMIZE: Implement your LLM API call here.338339Use when: sending a single prompt to the LLM and returning the response.340Replace with actual OpenAI, Anthropic, etc. API calls.341"""342# Example mock response - replace with actual API call343#344# import anthropic345# client = anthropic.Anthropic()346# message = client.messages.create(347# model=model,348# max_tokens=1024,349# messages=[{"role": "user", "content": prompt}],350# )351# return message.content[0].text352353# Simulate API delay354time.sleep(0.1)355356# Return mock structured response357return """## Summary358This is a sample summary of the analyzed content.359360## Key Points361- First key observation from the content362- Second important finding363- Third notable aspect364365## Score366Rating: 7367Confidence: medium368369## Reasoning370The content demonstrates several characteristics that merit this rating.371The analysis considered multiple factors including relevance and clarity.372"""373374375# -----------------------------------------------------------------------------376# Stage: Parse377# -----------------------------------------------------------------------------378379def stage_parse(batch_id: str) -> list[dict[str, Any]]:380"""Stage 4: Extract structured data from LLM responses.381382Use when: converting free-text LLM responses into structured383ParsedResult objects for aggregation and rendering.384385Output: {batch_dir}/{item_id}/parsed.json386Returns: List of parsed result dicts with item IDs.387"""388batch_dir = get_batch_dir(batch_id)389all_results: list[dict[str, Any]] = []390391for item_dir in sorted(batch_dir.iterdir()):392if not item_dir.is_dir():393continue394395response_file = item_dir / "response.md"396parsed_file = item_dir / "parsed.json"397398if not response_file.exists():399continue400401response = response_file.read_text()402result = parse_response(response)403404with open(parsed_file, "w") as f:405json.dump(asdict(result), f, indent=2)406407all_results.append({408"id": item_dir.name,409**asdict(result),410})411412error_count = len(result.parse_errors)413print(f"Parsed: {item_dir.name} (score={result.score}, errors={error_count})")414415# Save aggregated results416agg_file = batch_dir / "all_results.json"417with open(agg_file, "w") as f:418json.dump(all_results, f, indent=2)419420print(f"\nParse complete. Results saved to {agg_file}")421return all_results422423424def parse_response(text: str) -> ParsedResult:425"""Parse structured LLM response with graceful error handling.426427Use when: extracting sections, scores, and lists from a formatted428LLM response. Logs parse errors rather than raising exceptions.429"""430result = ParsedResult()431432# Extract summary433try:434result.summary = extract_section(text, "Summary") or ""435except Exception as e:436result.parse_errors.append(f"Summary: {e}")437438# Extract key points439try:440result.key_points = extract_list_items(text, "Key Points")441except Exception as e:442result.parse_errors.append(f"Key Points: {e}")443444# Extract score445try:446result.score = extract_score(text, "Rating", 1, 10)447except Exception as e:448result.parse_errors.append(f"Score: {e}")449450# Extract confidence451try:452result.confidence = extract_field(text, "Confidence") or ""453except Exception as e:454result.parse_errors.append(f"Confidence: {e}")455456# Extract reasoning457try:458result.reasoning = extract_section(text, "Reasoning") or ""459except Exception as e:460result.parse_errors.append(f"Reasoning: {e}")461462return result463464465def extract_section(text: str, section_name: str) -> str | None:466"""Extract content between section headers.467468Use when: pulling a named markdown section from LLM output.469"""470pattern = rf'(?:^|\n)(?:#+ *)?{re.escape(section_name)}[:\s]*\n(.*?)(?=\n#|\Z)'471match = re.search(pattern, text, re.IGNORECASE | re.DOTALL)472return match.group(1).strip() if match else None473474475def extract_field(text: str, field_name: str) -> str | None:476"""Extract value after field label.477478Use when: pulling a single key-value field (e.g., "Confidence: high").479"""480pattern = rf'(?:\*\*)?{re.escape(field_name)}(?:\*\*)?[\s:\-]+([^\n]+)'481match = re.search(pattern, text, re.IGNORECASE)482return match.group(1).strip() if match else None483484485def extract_list_items(text: str, section_name: str) -> list[str]:486"""Extract bullet points from a section.487488Use when: parsing a markdown list under a named section header.489"""490section = extract_section(text, section_name)491if not section:492return []493494items = re.findall(r'^[\-\*]\s*(.+)$', section, re.MULTILINE)495return [item.strip() for item in items]496497498def extract_score(499text: str, field_name: str, min_val: int, max_val: int500) -> int | None:501"""Extract and validate numeric score.502503Use when: pulling a bounded integer score from LLM output.504"""505raw = extract_field(text, field_name)506if not raw:507return None508509match = re.search(r'\d+', raw)510if not match:511return None512513score = int(match.group())514return max(min_val, min(max_val, score))515516517# -----------------------------------------------------------------------------518# Stage: Render519# -----------------------------------------------------------------------------520521def stage_render(batch_id: str) -> Path | None:522"""Stage 5: Generate final outputs from parsed results.523524Use when: producing human-readable output (HTML, reports)525from aggregated parsed results.526527Output: {output_dir}/index.html528Returns: Path to the rendered output file, or None if no results.529"""530batch_dir = get_batch_dir(batch_id)531output_dir = get_output_dir(batch_id)532output_dir.mkdir(parents=True, exist_ok=True)533534# Load aggregated results535results_file = batch_dir / "all_results.json"536if not results_file.exists():537print("No results to render. Run parse stage first.")538return None539540with open(results_file) as f:541results: list[dict[str, Any]] = json.load(f)542543# CUSTOMIZE: Replace with your rendering logic544html = render_html(results, batch_id)545546output_file = output_dir / "index.html"547with open(output_file, "w") as f:548f.write(html)549550print(f"Rendered: {output_file}")551return output_file552553554def render_html(results: list[dict[str, Any]], batch_id: str) -> str:555"""Generate HTML output from results.556557Use when: creating a summary HTML table from parsed pipeline results.558"""559import html as html_lib560561rows = ""562for r in results:563rows += f"""564<tr>565<td>{html_lib.escape(r.get('id', ''))}</td>566<td>{html_lib.escape(r.get('summary', '')[:100])}...</td>567<td>{r.get('score', 'N/A')}</td>568<td>{html_lib.escape(r.get('confidence', ''))}</td>569</tr>"""570571return f"""<!DOCTYPE html>572<html>573<head>574<meta charset="utf-8">575<title>Results - {batch_id}</title>576<style>577body {{ font-family: system-ui, sans-serif; max-width: 1000px; margin: 0 auto; padding: 20px; }}578table {{ width: 100%; border-collapse: collapse; }}579th, td {{ text-align: left; padding: 10px; border-bottom: 1px solid #ddd; }}580th {{ background: #f5f5f5; }}581</style>582</head>583<body>584<h1>Results: {batch_id}</h1>585<p>{len(results)} items processed</p>586<table>587<tr>588<th>ID</th>589<th>Summary</th>590<th>Score</th>591<th>Confidence</th>592</tr>593{rows}594</table>595</body>596</html>"""597598599# -----------------------------------------------------------------------------600# Clean Stage601# -----------------------------------------------------------------------------602603def stage_clean(batch_id: str, from_stage: str | None = None) -> int:604"""Remove stage outputs to enable re-processing.605606Use when: a stage produced bad results and needs to be re-run,607or when clearing all intermediate files for a fresh pipeline run.608609Returns: Number of files deleted.610"""611batch_dir = get_batch_dir(batch_id)612613if not batch_dir.exists():614print(f"No data directory for {batch_id}")615return 0616617stage_outputs: dict[str, list[str]] = {618"acquire": ["raw.json"],619"prepare": ["prompt.md"],620"process": ["response.md"],621"parse": ["parsed.json"],622}623624stage_order = ["acquire", "prepare", "process", "parse", "render"]625626if from_stage:627start_idx = stage_order.index(from_stage)628stages_to_clean = stage_order[start_idx:]629else:630stages_to_clean = stage_order631632files_to_delete: set[str] = set()633for s in stages_to_clean:634files_to_delete.update(stage_outputs.get(s, []))635636deleted_count = 0637for item_dir in batch_dir.iterdir():638if not item_dir.is_dir():639continue640641for filename in files_to_delete:642filepath = item_dir / filename643if filepath.exists():644filepath.unlink()645deleted_count += 1646647# Clean aggregated results648if "parse" in stages_to_clean:649agg_file = batch_dir / "all_results.json"650if agg_file.exists():651agg_file.unlink()652deleted_count += 1653654print(f"Cleaned {deleted_count} files from stage '{from_stage or 'all'}' onwards")655return deleted_count656657658# -----------------------------------------------------------------------------659# Cost Estimation660# -----------------------------------------------------------------------------661662def stage_estimate(batch_id: str) -> dict[str, Any] | None:663"""Estimate processing costs before running the process stage.664665Use when: projecting token costs and budget requirements before666committing to expensive LLM API calls.667668Returns: Dict with item_count, token estimates, and cost projection,669or None if no prompts are available.670"""671batch_dir = get_batch_dir(batch_id)672673if not batch_dir.exists():674print(f"No data directory for {batch_id}. Run acquire first.")675return None676677# Count items and estimate tokens678item_count = 0679total_prompt_chars = 0680681for item_dir in batch_dir.iterdir():682if not item_dir.is_dir():683continue684685prompt_file = item_dir / "prompt.md"686if prompt_file.exists():687total_prompt_chars += len(prompt_file.read_text())688item_count += 1689690if item_count == 0:691print("No prompts found. Run prepare first.")692return None693694# Rough token estimation (1 token ~ 4 chars)695est_input_tokens = total_prompt_chars / 4696est_output_tokens = item_count * 500 # Assume 500 tokens per response697698# Example pricing (adjust for your model)699input_price = 3.0 / 1_000_000 # $3 per MTok700output_price = 15.0 / 1_000_000 # $15 per MTok701702est_cost = (est_input_tokens * input_price) + (est_output_tokens * output_price)703704estimate: dict[str, Any] = {705"batch_id": batch_id,706"item_count": item_count,707"est_input_tokens": int(est_input_tokens),708"est_output_tokens": int(est_output_tokens),709"est_cost_usd": round(est_cost, 2),710}711712print(f"Cost Estimate for {batch_id}")713print(f" Items: {item_count}")714print(f" Estimated input tokens: {int(est_input_tokens):,}")715print(f" Estimated output tokens: {int(est_output_tokens):,}")716print(f" Estimated cost: ${est_cost:.2f}")717print(f"\nNote: Actual costs may vary. Add 20-30% buffer for retries.")718719return estimate720721722# -----------------------------------------------------------------------------723# CLI724# -----------------------------------------------------------------------------725726def main() -> None:727"""Entry point for CLI usage. Parses arguments and dispatches to stages."""728parser = argparse.ArgumentParser(729description="LLM Batch Processing Pipeline",730formatter_class=argparse.RawDescriptionHelpFormatter,731epilog=__doc__,732)733734parser.add_argument(735"stage",736choices=["acquire", "prepare", "process", "parse", "render", "all", "clean", "estimate"],737help="Pipeline stage to run",738)739parser.add_argument(740"--batch-id",741default=None,742help="Batch identifier (default: today's date)",743)744parser.add_argument(745"--limit",746type=int,747default=None,748help="Limit number of items (for testing)",749)750parser.add_argument(751"--workers",752type=int,753default=5,754help="Number of parallel workers for processing",755)756parser.add_argument(757"--model",758default="claude-sonnet-4-20250514",759help="Model to use for processing",760)761parser.add_argument(762"--clean-stage",763choices=["acquire", "prepare", "process", "parse"],764help="For clean: only clean this stage and downstream",765)766767args = parser.parse_args()768769batch_id = args.batch_id or date.today().isoformat()770print(f"Batch ID: {batch_id}\n")771772if args.stage == "clean":773stage_clean(batch_id, args.clean_stage)774elif args.stage == "estimate":775stage_estimate(batch_id)776elif args.stage == "all":777stage_acquire(batch_id, args.limit)778stage_prepare(batch_id)779stage_process(batch_id, args.model, args.workers)780stage_parse(batch_id)781stage_render(batch_id)782else:783if args.stage == "acquire":784stage_acquire(batch_id, args.limit)785elif args.stage == "prepare":786stage_prepare(batch_id)787elif args.stage == "process":788stage_process(batch_id, args.model, args.workers)789elif args.stage == "parse":790stage_parse(batch_id)791elif args.stage == "render":792stage_render(batch_id)793794795if __name__ == "__main__":796main()797