Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
A comprehensive collection of Agent Skills for context engineering, multi-agent architectures, and production agent systems.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
researcher/scripts/loop_common.py
1#!/usr/bin/env python32"""Shared helpers for the continuous research loop scripts.34All queue mutations go through atomic writes (`write_json`, `write_jsonl`) and5optional file locks (`queue_lock`). The lock uses fcntl exclusive flock so6concurrent loop_step + loop_discover invocations cannot race on the inbox or7parked queue.8"""910from __future__ import annotations1112import errno13import fcntl14import hashlib15import json16import os17import tempfile18from contextlib import contextmanager19from datetime import datetime, timezone20from pathlib import Path21from typing import Any, Iterable, Iterator222324ROOT = Path(__file__).resolve().parents[2]25RESEARCHER = ROOT / "researcher"26QUEUE_DIR = RESEARCHER / "queue"27ORCH_DIR = RESEARCHER / "orchestration"28REPORTS_DIR = RESEARCHER / "reports"29RUNS_DIR = RESEARCHER / "runs"30SNAPSHOTS_DIR = REPORTS_DIR / "snapshots"31LOCK_DIR = QUEUE_DIR / ".locks"32JSONL_QUARANTINE_DIR = REPORTS_DIR / "jsonl-quarantine"333435def utc_now() -> str:36return datetime.now(timezone.utc).replace(microsecond=0).isoformat()373839def today_utc() -> str:40return datetime.now(timezone.utc).strftime("%Y-%m-%d")414243def load_json(path: Path) -> Any:44return json.loads(path.read_text(encoding="utf-8"))454647def _atomic_write(path: Path, payload: str) -> None:48path.parent.mkdir(parents=True, exist_ok=True)49fd, tmp_name = tempfile.mkstemp(50prefix=f".{path.name}.",51suffix=".tmp",52dir=str(path.parent),53)54try:55with os.fdopen(fd, "w", encoding="utf-8") as handle:56handle.write(payload)57handle.flush()58os.fsync(handle.fileno())59os.replace(tmp_name, path)60except Exception:61try:62os.unlink(tmp_name)63except FileNotFoundError:64pass65raise666768def write_json(path: Path, data: Any) -> None:69_atomic_write(path, json.dumps(data, indent=2) + "\n")707172def _quarantine_bad_line(path: Path, line_number: int, line: str, exc: Exception) -> None:73JSONL_QUARANTINE_DIR.mkdir(parents=True, exist_ok=True)74timestamp = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S")75target = JSONL_QUARANTINE_DIR / f"{path.name}.{timestamp}.{line_number}.txt"76target.write_text(f"# {exc}\n{line}\n", encoding="utf-8")777879def read_jsonl(path: Path, *, tolerant: bool = True) -> list[dict[str, Any]]:80if not path.exists():81return []82records: list[dict[str, Any]] = []83for line_number, raw in enumerate(path.read_text(encoding="utf-8").splitlines(), start=1):84line = raw.strip()85if not line:86continue87try:88value = json.loads(line)89except json.JSONDecodeError as exc:90if not tolerant:91raise92_quarantine_bad_line(path, line_number, raw, exc)93continue94if not isinstance(value, dict):95if not tolerant:96raise ValueError(f"{path}:{line_number} expected object, got {type(value).__name__}")97_quarantine_bad_line(path, line_number, raw, ValueError("not an object"))98continue99records.append(value)100return records101102103def write_jsonl(path: Path, records: Iterable[dict[str, Any]]) -> None:104lines = [json.dumps(record, sort_keys=True) for record in records]105_atomic_write(path, ("\n".join(lines) + "\n") if lines else "")106107108def append_jsonl(path: Path, record: dict[str, Any]) -> None:109path.parent.mkdir(parents=True, exist_ok=True)110with path.open("a", encoding="utf-8") as handle:111try:112fcntl.flock(handle.fileno(), fcntl.LOCK_EX)113except OSError:114pass115handle.write(json.dumps(record, sort_keys=True) + "\n")116handle.flush()117try:118fcntl.flock(handle.fileno(), fcntl.LOCK_UN)119except OSError:120pass121122123@contextmanager124def queue_lock(name: str) -> Iterator[None]:125"""Exclusive lock for queue mutations. Use one lock per queue file family."""126127LOCK_DIR.mkdir(parents=True, exist_ok=True)128lock_path = LOCK_DIR / f"{name}.lock"129handle = open(lock_path, "a+")130try:131try:132fcntl.flock(handle.fileno(), fcntl.LOCK_EX)133except OSError as exc:134if exc.errno not in {errno.EACCES, errno.EAGAIN}:135raise136yield137finally:138try:139fcntl.flock(handle.fileno(), fcntl.LOCK_UN)140except OSError:141pass142handle.close()143144145def load_config() -> dict[str, Any]:146return load_json(ORCH_DIR / "config.json")147148149def source_id_for(url: str) -> str:150digest = hashlib.sha256(url.strip().lower().encode("utf-8")).hexdigest()151return digest[:16]152153154def list_run_dirs() -> list[Path]:155if not RUNS_DIR.exists():156return []157return sorted(path for path in RUNS_DIR.iterdir() if path.is_dir())158159160def load_run_state(run_dir: Path) -> dict[str, Any] | None:161state_file = run_dir / "run-state.json"162if not state_file.exists():163return None164try:165return load_json(state_file)166except json.JSONDecodeError:167return None168169170def active_run_urls() -> set[str]:171urls: set[str] = set()172for run_dir in list_run_dirs():173state = load_run_state(run_dir)174if not state:175continue176if state.get("current_state") == "closed":177continue178url = state.get("source_url")179if isinstance(url, str) and url:180urls.add(url)181return urls182183184def closed_run_urls() -> set[str]:185urls: set[str] = set()186for run_dir in list_run_dirs():187state = load_run_state(run_dir)188if not state:189continue190if state.get("current_state") != "closed":191continue192url = state.get("source_url")193if isinstance(url, str) and url:194urls.add(url)195return urls196197198def categorize_runs() -> dict[str, list[Path]]:199buckets: dict[str, list[Path]] = {200"active": [],201"parked": [],202"closed": [],203"unknown": [],204}205parked_ids = {record.get("run_id") for record in read_jsonl(QUEUE_DIR / "parked.jsonl")}206for run_dir in list_run_dirs():207state = load_run_state(run_dir)208if not state:209buckets["unknown"].append(run_dir)210continue211current = state.get("current_state")212if current == "closed":213buckets["closed"].append(run_dir)214elif run_dir.name in parked_ids:215buckets["parked"].append(run_dir)216else:217buckets["active"].append(run_dir)218return buckets219220221def runs_created_today() -> int:222today = today_utc()223count = 0224for run_dir in list_run_dirs():225state = load_run_state(run_dir)226if state and state.get("created_at", "").startswith(today):227count += 1228return count229