Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
A comprehensive collection of Agent Skills for context engineering, multi-agent architectures, and production agent systems.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
researcher/scripts/loop_step.py
1#!/usr/bin/env python32"""Advance the continuous research loop by one step.34A single invocation either:561. Pulls the oldest inbox item into a new run (if under budget).72. Advances the oldest active run by exactly one safe state transition.83. Parks runs that need human or model judgment.94. Returns exit code 78 when there is no safe work to do.1011The loop is intentionally conservative. It never invokes LLMs and never makes12network calls that are not whitelisted. The default `--allow-fetch` is off, so13the loop scheduler that runs unattended only manages bookkeeping until a human14or another adapter retrieves the source.15"""1617from __future__ import annotations1819import argparse20import json21import re22import subprocess23import sys24import urllib.error25import urllib.request26from datetime import datetime, timezone27from pathlib import Path28from typing import Any2930from loop_common import (31QUEUE_DIR,32REPORTS_DIR,33RESEARCHER,34ROOT,35RUNS_DIR,36append_jsonl,37categorize_runs,38load_config,39load_run_state,40queue_lock,41read_jsonl,42runs_created_today,43today_utc,44utc_now,45write_jsonl,46)474849FAILURE_LOG = REPORTS_DIR / "loop-failures.jsonl"50LOOP_EVENTS = REPORTS_DIR / "loop-events.jsonl"51PARKED_FILE = QUEUE_DIR / "parked.jsonl"52DONE_FILE = QUEUE_DIR / "done.jsonl"53QUARANTINE_FILE = QUEUE_DIR / "quarantine.jsonl"54INBOX_FILE = QUEUE_DIR / "inbox.jsonl"55RESEARCH_LOOP = RESEARCHER / "scripts" / "research_loop.py"56USER_AGENT = "context-engineering-researcher/0.1 (+https://github.com/muratcankoylan/Agent-Skills-for-Context-Engineering)"57MAX_FETCH_BYTES = 1_500_000585960def record_event(event: dict[str, Any]) -> None:61event = dict(event)62event["timestamp"] = utc_now()63append_jsonl(LOOP_EVENTS, event)646566def record_failure(event: dict[str, Any]) -> None:67event = dict(event)68event["timestamp"] = utc_now()69append_jsonl(FAILURE_LOG, event)707172def failures_today() -> int:73if not FAILURE_LOG.exists():74return 075today = today_utc()76count = 077for line in FAILURE_LOG.read_text(encoding="utf-8").splitlines():78if not line.strip():79continue80try:81record = json.loads(line)82except json.JSONDecodeError:83continue84if record.get("timestamp", "").startswith(today):85count += 186return count878889def park_run(run_id: str, reason: str) -> None:90with queue_lock("parked"):91parked = read_jsonl(PARKED_FILE)92if any(record.get("run_id") == run_id for record in parked):93return94parked.append({"run_id": run_id, "reason": reason, "parked_at": utc_now()})95write_jsonl(PARKED_FILE, parked)969798def unpark_run(run_id: str) -> None:99with queue_lock("parked"):100parked = [record for record in read_jsonl(PARKED_FILE) if record.get("run_id") != run_id]101write_jsonl(PARKED_FILE, parked)102103104def mark_done(run_id: str, status: str, reason: str) -> None:105with queue_lock("done"):106done = read_jsonl(DONE_FILE)107if any(record.get("run_id") == run_id for record in done):108return109done.append({"run_id": run_id, "status": status, "reason": reason, "closed_at": utc_now()})110write_jsonl(DONE_FILE, done)111112113def quarantine_source(record: dict[str, Any], reason: str) -> None:114record = dict(record)115record["quarantined_at"] = utc_now()116record["quarantine_reason"] = reason117with queue_lock("quarantine"):118append_jsonl(QUARANTINE_FILE, record)119120121def peek_inbox_item() -> dict[str, Any] | None:122inbox = read_jsonl(INBOX_FILE)123if not inbox:124return None125inbox.sort(key=lambda record: record.get("discovered_at", ""))126return inbox[0]127128129def remove_inbox_item(source_id: str) -> None:130inbox = read_jsonl(INBOX_FILE)131inbox = [record for record in inbox if record.get("source_id") != source_id]132write_jsonl(INBOX_FILE, inbox)133134135def reap_closed_runs() -> list[dict[str, Any]]:136events: list[dict[str, Any]] = []137if not RUNS_DIR.exists():138return events139done_ids = {record.get("run_id") for record in read_jsonl(DONE_FILE)}140for run_dir in sorted(RUNS_DIR.iterdir()):141if not run_dir.is_dir():142continue143state = load_run_state(run_dir)144if not state or state.get("current_state") != "closed":145continue146if run_dir.name in done_ids:147continue148unpark_run(run_dir.name)149mark_done(150run_dir.name,151status=state.get("close_status") or "unknown",152reason=state.get("close_reason") or "",153)154events.append({"action": "reaped", "run_id": run_dir.name})155return events156157158def init_run(source: dict[str, Any]) -> dict[str, Any]:159cmd = [160sys.executable,161str(RESEARCH_LOOP),162"init",163"--title",164source.get("title") or source.get("url") or "untitled",165"--url",166source.get("url", ""),167"--author-or-org",168source.get("author_or_org", ""),169"--source-type",170normalize_source_type(source.get("source_type")),171"--reason",172source.get("candidate_reason", ""),173]174completed = subprocess.run(cmd, cwd=ROOT, capture_output=True, text=True, check=False)175if completed.returncode != 0:176return {"ok": False, "stderr": completed.stderr.strip(), "stdout": completed.stdout.strip()}177run_relative = completed.stdout.strip().splitlines()[-1] if completed.stdout.strip() else ""178return {"ok": True, "run_relative": run_relative}179180181def normalize_source_type(value: str | None) -> str:182allowed = {"paper", "engineering_blog", "documentation", "benchmark", "code", "talk", "other"}183if isinstance(value, str) and value in allowed:184return value185return "other"186187188def fetch_url(url: str, dest_dir: Path) -> dict[str, Any]:189if not url.lower().startswith(("http://", "https://")):190return {"ok": False, "error": f"unsupported url scheme: {url[:32]}", "url": url}191dest_dir.mkdir(parents=True, exist_ok=True)192safe_name = re.sub(r"[^A-Za-z0-9._-]+", "-", url).strip("-")[:120] or "source"193target = dest_dir / f"{safe_name}.html"194request = urllib.request.Request(url, headers={"User-Agent": USER_AGENT, "Accept": "text/html,*/*"})195try:196with urllib.request.urlopen(request, timeout=30) as response:197final_url = getattr(response, "url", url)198if not final_url.lower().startswith(("http://", "https://")):199return {"ok": False, "error": "redirect changed scheme", "url": final_url}200content_type = response.headers.get("Content-Type", "")201data = response.read(MAX_FETCH_BYTES + 1)202truncated = len(data) > MAX_FETCH_BYTES203data = data[:MAX_FETCH_BYTES]204target.write_bytes(data)205return {206"ok": True,207"path": str(target.relative_to(ROOT)),208"bytes": len(data),209"content_type": content_type,210"truncated": truncated,211"final_url": final_url,212}213except urllib.error.HTTPError as exc:214return {"ok": False, "error": f"http {exc.code}", "url": url}215except urllib.error.URLError as exc:216return {"ok": False, "error": f"network error: {exc.reason}", "url": url}217except TimeoutError:218return {"ok": False, "error": "timeout", "url": url}219220221def attempt_retrieval(run_dir: Path, url: str) -> dict[str, Any]:222raw_dir = run_dir / "sources" / "evidence" / "raw"223return fetch_url(url, raw_dir)224225226def append_evidence_pointer(run_dir: Path, raw_record: dict[str, Any]) -> None:227summary_path = run_dir / "sources" / "evidence" / "retrieval.md"228summary_path.parent.mkdir(parents=True, exist_ok=True)229line = (230f"- {raw_record.get('path', '')} ({raw_record.get('bytes', 0)} bytes, "231f"{raw_record.get('content_type', '')}) retrieved at {utc_now()}\n"232)233with summary_path.open("a", encoding="utf-8") as handle:234handle.write(line)235236237def update_run_queue_retrieval(run_dir: Path, status: str, raw_paths: list[str], notes: str) -> None:238queue = run_dir / "sources" / "queue.jsonl"239if not queue.exists():240return241records = [json.loads(line) for line in queue.read_text(encoding="utf-8").splitlines() if line.strip()]242if not records:243return244records[0].update(245{246"retrieval_status": status,247"retrieved_at": utc_now(),248"raw_evidence": raw_paths,249"retrieval_notes": notes,250}251)252queue.write_text("\n".join(json.dumps(record, sort_keys=True) for record in records) + "\n", encoding="utf-8")253254255def advance_initialized(run_dir: Path, state: dict[str, Any], allow_fetch: bool) -> dict[str, Any]:256url = state.get("source_url")257run_id = run_dir.name258if not url:259park_run(run_id, "no source URL on run-state.json")260return {"action": "parked", "run_id": run_id, "reason": "no source URL"}261if not allow_fetch:262park_run(run_id, "automatic retrieval disabled; needs manual retrieve")263return {"action": "parked", "run_id": run_id, "reason": "fetch disabled"}264fetched = attempt_retrieval(run_dir, url)265if not fetched.get("ok"):266park_run(run_id, f"retrieval failed: {fetched.get('error')}")267record_failure({"phase": "retrieval", "run_id": run_id, "url": url, "error": fetched.get("error")})268return {"action": "parked", "run_id": run_id, "reason": fetched.get("error")}269append_evidence_pointer(run_dir, fetched)270update_run_queue_retrieval(271run_dir,272status="retrieved",273raw_paths=[fetched["path"]],274notes=f"auto fetch via loop_step; content_type={fetched.get('content_type', '')}",275)276completed = subprocess.run(277[278sys.executable,279str(RESEARCH_LOOP),280"retrieve",281"--run-dir",282str(run_dir),283"--notes",284"auto fetch via loop_step",285],286cwd=ROOT,287capture_output=True,288text=True,289check=False,290)291if completed.returncode != 0:292record_failure(293{"phase": "retrieve-state", "run_id": run_id, "stderr": completed.stderr.strip()}294)295park_run(run_id, "could not record retrieved state")296return {"action": "parked", "run_id": run_id, "reason": "state transition failed"}297return {"action": "advanced", "run_id": run_id, "to_state": "retrieved", "bytes": fetched.get("bytes")}298299300def advance_retrieved(run_dir: Path) -> dict[str, Any]:301run_id = run_dir.name302park_run(run_id, "needs source evaluation by human or judge agent")303return {"action": "parked", "run_id": run_id, "reason": "needs evaluation"}304305306def advance_run(run_dir: Path, allow_fetch: bool) -> dict[str, Any]:307state = load_run_state(run_dir)308if not state:309park_run(run_dir.name, "missing run-state.json")310return {"action": "parked", "run_id": run_dir.name, "reason": "missing state"}311current = state.get("current_state")312if current == "initialized":313return advance_initialized(run_dir, state, allow_fetch)314if current == "retrieved":315return advance_retrieved(run_dir)316if current in {"evaluated", "proposed", "novelty_checked", "validated"}:317park_run(run_dir.name, f"needs human or model action from state {current}")318return {"action": "parked", "run_id": run_dir.name, "reason": f"needs action from {current}"}319if current == "pr_ready":320park_run(run_dir.name, "PR is ready for human merge approval")321return {"action": "parked", "run_id": run_dir.name, "reason": "needs merge approval"}322if current == "closed":323unpark_run(run_dir.name)324mark_done(325run_dir.name,326status=state.get("close_status") or "unknown",327reason=state.get("close_reason") or "",328)329return {"action": "closed", "run_id": run_dir.name}330park_run(run_dir.name, f"unknown current state {current}")331return {"action": "parked", "run_id": run_dir.name, "reason": f"unknown state {current}"}332333334def loop_step(config: dict[str, Any], allow_fetch: bool) -> dict[str, Any]:335budgets = config.get("budgets", {})336max_active = budgets.get("max_active_runs", 3)337max_runs_today = budgets.get("max_runs_per_day", 6)338max_failures = budgets.get("max_failures_per_day", 5)339max_parked = budgets.get("max_parked", 12)340# `mode` governs future LLM-judge feeds, not stdlib HTTP retrieval. Retrieval is341# controlled by --allow-fetch alone so the daemon can stage evidence without342# incurring paid-API spend.343344reaped = reap_closed_runs()345for event in reaped:346record_event({"phase": "reap", **event})347348if failures_today() >= max_failures:349record_event({"action": "stop", "reason": "failure budget exceeded"})350return {"ok": True, "action": "stop", "reason": "failure budget exceeded"}351352buckets = categorize_runs()353active = buckets["active"]354parked = read_jsonl(PARKED_FILE)355if len(parked) >= max_parked:356record_event({"action": "stop", "reason": "parked queue full"})357return {"ok": True, "action": "stop", "reason": "parked queue full"}358359if len(active) < max_active and runs_created_today() < max_runs_today:360with queue_lock("inbox"):361# Re-check budgets inside the lock so concurrent loop_step invocations362# cannot both pass the pre-check and exceed max_active/max_per_day.363current_buckets = categorize_runs()364if (365len(current_buckets["active"]) < max_active366and runs_created_today() < max_runs_today367):368source = peek_inbox_item()369if source:370init = init_run(source)371if not init.get("ok"):372remove_inbox_item(source.get("source_id"))373quarantine_source(374source,375reason=f"init failed: {init.get('stderr', '')[:200]}",376)377record_failure(378{379"phase": "init",380"source": source.get("url"),381"stderr": init.get("stderr"),382}383)384record_event({"action": "init-failed", "source": source.get("url")})385return {"ok": True, "action": "init-failed", "source": source.get("url")}386remove_inbox_item(source.get("source_id"))387event = {388"action": "initialized",389"source": source.get("url"),390"run": init.get("run_relative"),391}392record_event(event)393return {"ok": True, **event}394395if not active:396record_event({"action": "no-op", "reason": "no active runs and inbox empty or budget reached"})397return {"ok": True, "action": "no-op"}398399active.sort(key=lambda path: load_run_state(path).get("updated_at", "") if load_run_state(path) else "")400target = active[0]401result = advance_run(target, allow_fetch)402record_event({"phase": "advance", **result})403return {"ok": True, **result}404405406def main() -> int:407parser = argparse.ArgumentParser(description="Advance the continuous research loop by one step")408parser.add_argument("--allow-fetch", action="store_true", help="permit HTTP GET retrieval of source URLs")409parser.add_argument("--json", action="store_true")410args = parser.parse_args()411412config = load_config()413result = loop_step(config, args.allow_fetch)414if args.json:415print(json.dumps(result, indent=2))416else:417print(f"loop_step: {result.get('action')} {result.get('reason') or ''}".strip())418if result.get("action") == "no-op":419return 78420return 0421422423if __name__ == "__main__":424sys.exit(main())425