Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
A comprehensive collection of Agent Skills for context engineering, multi-agent architectures, and production agent systems.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
researcher/scripts/loop_discover.py
1#!/usr/bin/env python32"""Discover candidate sources and append them to the inbox.34By default the discoverer only reads `researcher/discovery/manual-seed.jsonl`.5Paid feeds (Parallel deep research, web search) are off by default and require6explicit config opt-in plus separate adapter scripts.7"""89from __future__ import annotations1011import argparse12import json13import sys14from pathlib import Path15from typing import Any1617from loop_common import (18QUEUE_DIR,19RESEARCHER,20active_run_urls,21append_jsonl,22closed_run_urls,23load_config,24queue_lock,25read_jsonl,26source_id_for,27utc_now,28write_jsonl,29)303132def load_manual_seed(path: Path) -> list[dict[str, Any]]:33if not path.exists():34return []35records: list[dict[str, Any]] = []36for line_number, line in enumerate(path.read_text(encoding="utf-8").splitlines(), start=1):37if not line.strip():38continue39try:40records.append(json.loads(line))41except json.JSONDecodeError as exc:42raise ValueError(f"manual-seed line {line_number} invalid JSON: {exc}") from exc43return records444546def _normalize_url(url: str) -> str:47return url.strip().lower()484950def existing_urls(inbox: list[dict[str, Any]], quarantine: list[dict[str, Any]]) -> set[str]:51urls: set[str] = set()52for record in inbox + quarantine:53url = record.get("url")54if isinstance(url, str):55urls.add(_normalize_url(url))56urls.update(_normalize_url(value) for value in active_run_urls())57urls.update(_normalize_url(value) for value in closed_run_urls())58return urls596061def normalize_candidate(record: dict[str, Any], feed: str) -> dict[str, Any]:62raw_url = str(record.get("url", "")).strip()63if not raw_url:64raise ValueError("candidate is missing url")65normalized = _normalize_url(raw_url)66return {67"source_id": source_id_for(normalized),68"url": raw_url,69"url_normalized": normalized,70"title": str(record.get("title", "")).strip(),71"author_or_org": str(record.get("author_or_org", "")).strip(),72"source_type": str(record.get("source_type", "other")).strip(),73"candidate_reason": str(record.get("candidate_reason", "")).strip(),74"feed": feed,75"discovered_at": utc_now(),76"attempts": 0,77"last_status": "queued",78}798081def discover(config: dict[str, Any], dry_run: bool, limit: int | None) -> dict[str, Any]:82feeds = config.get("feeds", {})83inbox_path = QUEUE_DIR / "inbox.jsonl"84quarantine_path = QUEUE_DIR / "quarantine.jsonl"85discovery_max = limit if limit is not None else config.get("limits", {}).get("discovery_max_new_per_run", 8)86max_inbox_size = config.get("budgets", {}).get("max_inbox_size", 200)8788with queue_lock("inbox"):89inbox = read_jsonl(inbox_path)90quarantine = read_jsonl(quarantine_path)91seen = existing_urls(inbox, quarantine)92new_records: list[dict[str, Any]] = []9394manual_seed_rel = feeds.get("manual_seed")95if manual_seed_rel:96manual_path = RESEARCHER.parent / manual_seed_rel97for record in load_manual_seed(manual_path):98try:99candidate = normalize_candidate(record, feed="manual-seed")100except ValueError as exc:101print(f"skipping manual-seed candidate: {exc}", file=sys.stderr)102continue103key = candidate.get("url_normalized") or candidate["url"]104if key in seen:105continue106new_records.append(candidate)107seen.add(key)108if len(new_records) >= discovery_max:109break110111if feeds.get("enable_parallel_deep_research"):112print(113"parallel deep research feed is enabled in config but not implemented in this loop. "114"skipping until adapter is added.",115file=sys.stderr,116)117if feeds.get("enable_web_search"):118print(119"web search feed is enabled in config but not implemented in this loop. "120"skipping until adapter is added.",121file=sys.stderr,122)123124capacity_remaining = max(0, max_inbox_size - len(inbox))125if len(new_records) > capacity_remaining:126new_records = new_records[:capacity_remaining]127128if not dry_run and new_records:129inbox.extend(new_records)130write_jsonl(inbox_path, inbox)131132return {133"ok": True,134"dry_run": dry_run,135"new": len(new_records),136"inbox_size": len(inbox) if not dry_run else len(inbox) + len(new_records),137"capacity_remaining": capacity_remaining - len(new_records),138"first_new": new_records[:3],139}140141142def main() -> int:143parser = argparse.ArgumentParser(description="Discover candidate sources and append to inbox")144parser.add_argument("--dry-run", action="store_true", help="show what would be added without writing")145parser.add_argument("--limit", type=int, default=None, help="override discovery_max_new_per_run")146parser.add_argument("--json", action="store_true")147args = parser.parse_args()148149config = load_config()150result = discover(config, args.dry_run, args.limit)151if args.json:152print(json.dumps(result, indent=2))153else:154suffix = " (dry-run)" if result["dry_run"] else ""155print(156f"Discovery added {result['new']} sources{suffix}; "157f"inbox now {result['inbox_size']}, capacity remaining {result['capacity_remaining']}"158)159for record in result["first_new"]:160print(f"- {record['source_id']} {record['url']}")161return 0162163164if __name__ == "__main__":165sys.exit(main())166