Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from bundle
Preview-first Gmail search and guarded batch archive using existing gog auth.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
scripts/gmail_mail_tool.py
1#!/usr/bin/env python32"""Generic Gmail mailbox tool: search, preview, and batch-archive mail.34Uses existing `gog` OAuth secrets, fetches one mailbox snapshot, filters locally,5archives selected messages with Gmail batchModify, retries on rate limits, and6prints a JSON summary with before/selected/archived/after counts.78Examples:9python3 scripts/gmail_mail_tool.py --account you@example.com --from-email notifications@github.com10python3 scripts/gmail_mail_tool.py --account you@example.com --query 'from:[email protected] newer_than:30d'11python3 scripts/gmail_mail_tool.py --account you@example.com --from-email notifications@github.com --apply12"""1314from __future__ import annotations1516import argparse17import json18import os19import random20import socket21import subprocess22import sys23import tempfile24import time25import urllib.error26import urllib.parse27import urllib.request28from dataclasses import dataclass29from datetime import datetime, timezone30from email.utils import parseaddr, parsedate_to_datetime31from pathlib import Path32from typing import Any3334DEFAULT_ACCOUNT = os.environ.get("GMAIL_TOOL_ACCOUNT", os.environ.get("GOG_ACCOUNT", "")).strip()35HOME = Path.home()36GOG_CREDENTIALS = Path(37os.environ.get(38"GMAIL_TOOL_GOG_CREDENTIALS",39str(HOME / ".config/gogcli/credentials.json"),40)41).expanduser()42GOG_KEYRING_PASSWORD_FILE = Path(43os.environ.get(44"GMAIL_TOOL_GOG_KEYRING_PASSWORD_FILE",45str(HOME / ".config/gogcli/.keyring_password"),46)47).expanduser()48GMAIL_API_ROOT = "https://gmail.googleapis.com/gmail/v1"49DEFAULT_LOG_FILE = Path(50os.environ.get(51"GMAIL_TOOL_LOG_FILE",52str(Path.cwd() / "gmail_mail_tool.last.log"),53)54).expanduser()5556LOG_FILE_PATH: Path | None = None575859class GmailApiError(RuntimeError):60pass616263@dataclass64class MessageMeta:65message_id: str66thread_id: str67internal_date_ms: int68labels: list[str]69from_header: str70from_email: str71subject: str72snippet: str73list_id: str74to_header: str75cc_header: str767778def log_line(message: str) -> None:79timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")80line = f"[{timestamp}] {message}"81print(line, file=sys.stderr, flush=True)82if LOG_FILE_PATH is not None:83with LOG_FILE_PATH.open("a", encoding="utf-8") as handle:84handle.write(line + "\n")858687def configure_logging(log_file: str) -> Path:88path = Path(log_file).expanduser() if log_file else DEFAULT_LOG_FILE89path.parent.mkdir(parents=True, exist_ok=True)90path.write_text("", encoding="utf-8")91global LOG_FILE_PATH92LOG_FILE_PATH = path93return path949596def parse_args() -> argparse.Namespace:97parser = argparse.ArgumentParser(98description="Generic Gmail mailbox tool for local filtering and batch archive."99)100parser.add_argument("--account", default=DEFAULT_ACCOUNT, help="gog account email")101parser.add_argument(102"--query",103default="",104help="Optional Gmail search query; still fetched as one snapshot then filtered locally",105)106parser.add_argument(107"--all-mail",108action="store_true",109help="Search all mail instead of default INBOX scope",110)111parser.add_argument(112"--label",113action="append",114default=[],115help="Additional Gmail label ids/names to require in list request (repeatable)",116)117parser.add_argument(118"--from-email",119action="append",120default=[],121help="Exact sender email match (repeatable; OR within the same flag)",122)123parser.add_argument(124"--from-contains",125action="append",126default=[],127help="Substring match against raw From header (repeatable)",128)129parser.add_argument(130"--subject-contains",131action="append",132default=[],133help="Substring match against subject (repeatable)",134)135parser.add_argument(136"--snippet-contains",137action="append",138default=[],139help="Substring match against Gmail snippet (repeatable)",140)141parser.add_argument(142"--list-id-contains",143action="append",144default=[],145help="Substring match against List-Id header (repeatable)",146)147parser.add_argument(148"--to-contains",149action="append",150default=[],151help="Substring match against To header (repeatable)",152)153parser.add_argument(154"--cc-contains",155action="append",156default=[],157help="Substring match against Cc header (repeatable)",158)159parser.add_argument(160"--has-label",161action="append",162default=[],163help="Keep only messages that already have all these labels (repeatable)",164)165parser.add_argument(166"--missing-label",167action="append",168default=[],169help="Keep only messages that do not have any of these labels (repeatable)",170)171parser.add_argument(172"--older-than-days",173type=int,174default=0,175help="Keep only messages older than N full days",176)177parser.add_argument(178"--newer-than-days",179type=int,180default=0,181help="Keep only messages newer than N full days",182)183parser.add_argument(184"--sample-limit",185type=int,186default=25,187help="How many selected message samples to print in the summary",188)189parser.add_argument(190"--page-size",191type=int,192default=500,193help="Gmail list page size (max 500)",194)195parser.add_argument(196"--max-pages",197type=int,198default=0,199help="Optional mailbox page cap; 0 means fetch all pages",200)201parser.add_argument(202"--max-messages",203type=int,204default=0,205help="Optional message cap after listing; 0 means no cap",206)207parser.add_argument(208"--chunk-size",209type=int,210default=500,211help="batchModify chunk size (Gmail allows up to 1000)",212)213parser.add_argument(214"--metadata-delay-ms",215type=int,216default=0,217help="Optional pause between metadata GETs; 0 disables pacing",218)219parser.add_argument(220"--min-backoff-seconds",221type=int,222default=60,223help="Fallback backoff minimum when Retry-After is absent",224)225parser.add_argument(226"--max-backoff-seconds",227type=int,228default=90,229help="Fallback backoff maximum when Retry-After is absent",230)231parser.add_argument(232"--http-timeout-seconds",233type=int,234default=25,235help="Per-request HTTP timeout; stuck calls fail fast instead of hanging indefinitely",236)237parser.add_argument(238"--progress-every",239type=int,240default=25,241help="Progress log cadence for metadata fetch/archive phases",242)243parser.add_argument(244"--apply",245action="store_true",246help="Actually archive selected messages (remove INBOX label)",247)248parser.add_argument(249"--allow-empty-filter",250action="store_true",251help="Allow running without query/local filters (dangerous; preview only unless --apply)",252)253parser.add_argument(254"--max-apply-messages",255type=int,256default=200,257help="Safety cap: refuse --apply when selected messages exceed this count",258)259parser.add_argument(260"--max-apply-ratio",261type=float,262default=0.35,263help="Safety cap: refuse --apply when selected/scanned ratio exceeds this fraction",264)265parser.add_argument(266"--force-unsafe-apply",267action="store_true",268help="Bypass safety caps for large/query-only apply runs",269)270parser.add_argument(271"--out",272default="",273help="Optional path to write the JSON summary",274)275parser.add_argument(276"--log-file",277default=str(DEFAULT_LOG_FILE),278help="Path to runtime log file on disk",279)280return parser.parse_args()281282283def load_keyring_password() -> str:284env_value = os.environ.get("GOG_KEYRING_PASSWORD", "")285if env_value:286return env_value287if not GOG_KEYRING_PASSWORD_FILE.exists():288raise GmailApiError(289"GOG keyring password file not found. Set GOG_KEYRING_PASSWORD or GMAIL_TOOL_GOG_KEYRING_PASSWORD_FILE."290)291return GOG_KEYRING_PASSWORD_FILE.read_text(encoding="utf-8").strip()292293294def refresh_access_token(account: str, *, http_timeout_seconds: int) -> str:295started = time.monotonic()296log_line(f"TOKEN refresh start account={account}")297keyring_password = load_keyring_password()298with tempfile.NamedTemporaryFile("w+", suffix=".json", delete=True) as tmp:299env = dict(os.environ)300env["GOG_KEYRING_PASSWORD"] = keyring_password301subprocess.run(302[303"gog",304"auth",305"tokens",306"export",307account,308"--out",309tmp.name,310"--overwrite",311],312check=True,313env=env,314stdout=subprocess.DEVNULL,315stderr=subprocess.DEVNULL,316)317tmp.seek(0)318token_data = json.load(tmp)319320if not GOG_CREDENTIALS.exists():321raise GmailApiError(322"gog credentials.json not found. Set GMAIL_TOOL_GOG_CREDENTIALS or run gog auth credentials first."323)324credentials = json.loads(GOG_CREDENTIALS.read_text(encoding="utf-8"))325payload = urllib.parse.urlencode(326{327"client_id": credentials["client_id"],328"client_secret": credentials["client_secret"],329"refresh_token": token_data["refresh_token"],330"grant_type": "refresh_token",331}332).encode("utf-8")333request = urllib.request.Request(334"https://oauth2.googleapis.com/token",335data=payload,336method="POST",337)338with urllib.request.urlopen(request, timeout=http_timeout_seconds) as response:339data = json.loads(response.read().decode("utf-8"))340log_line(f"TOKEN refresh ok elapsed={time.monotonic() - started:.2f}s")341return data["access_token"]342343344def parse_retry_after(value: str | None) -> float | None:345if not value:346return None347value = value.strip()348if not value:349return None350if value.isdigit():351return float(value)352try:353retry_at = parsedate_to_datetime(value)354except (TypeError, ValueError, IndexError):355return None356return max(0.0, retry_at.timestamp() - time.time())357358359def extract_api_reason(raw: str) -> str:360try:361payload = json.loads(raw)362except json.JSONDecodeError:363return ""364error = payload.get("error", {})365errors = error.get("errors") or []366if errors and isinstance(errors[0], dict):367return str(errors[0].get("reason") or "")368return str(error.get("status") or "")369370371def request_json(372access_token: str,373method: str,374path: str,375*,376params: dict[str, Any] | None = None,377body: dict[str, Any] | None = None,378max_attempts: int = 8,379min_backoff_seconds: int = 60,380max_backoff_seconds: int = 90,381http_timeout_seconds: int = 25,382) -> Any:383url = f"{GMAIL_API_ROOT}{path}"384if params:385query = urllib.parse.urlencode(params, doseq=True)386url = f"{url}?{query}"387388headers = {"Authorization": f"Bearer {access_token}"}389data = None390if body is not None:391data = json.dumps(body).encode("utf-8")392headers["Content-Type"] = "application/json"393394for attempt in range(1, max_attempts + 1):395started = time.monotonic()396log_line(f"HTTP start method={method} path={path} attempt={attempt}/{max_attempts}")397request = urllib.request.Request(url, data=data, headers=headers, method=method)398try:399with urllib.request.urlopen(request, timeout=http_timeout_seconds) as response:400raw = response.read()401log_line(402f"HTTP ok method={method} path={path} status={getattr(response, 'status', 'unknown')} elapsed={time.monotonic() - started:.2f}s"403)404if not raw:405return None406return json.loads(raw.decode("utf-8"))407except urllib.error.HTTPError as error:408raw = error.read().decode("utf-8", errors="replace")409reason = extract_api_reason(raw)410retry_after = parse_retry_after(error.headers.get("Retry-After"))411log_line(412f"HTTP error method={method} path={path} code={error.code} reason={reason or 'unknown'} elapsed={time.monotonic() - started:.2f}s"413)414should_retry = (415(error.code == 403 and reason == "rateLimitExceeded")416or error.code in {429, 500, 502, 503, 504}417)418if should_retry and attempt < max_attempts:419delay = retry_after420if delay is None:421delay = random.uniform(min_backoff_seconds, max_backoff_seconds)422print(423f"[retry] {method} {path} failed with {error.code}/{reason or 'unknown'}; "424f"sleeping {delay:.1f}s before retry {attempt + 1}/{max_attempts}",425file=sys.stderr,426)427time.sleep(delay)428continue429raise GmailApiError(430f"{method} {path} failed with HTTP {error.code}: {raw}"431) from error432except (urllib.error.URLError, TimeoutError, socket.timeout) as error:433reason = getattr(error, "reason", str(error))434log_line(435f"HTTP network-error method={method} path={path} reason={reason} elapsed={time.monotonic() - started:.2f}s"436)437if attempt < max_attempts:438delay = random.uniform(5, 10)439print(440f"[retry] {method} {path} network error {reason}; "441f"sleeping {delay:.1f}s before retry {attempt + 1}/{max_attempts}",442file=sys.stderr,443)444time.sleep(delay)445continue446raise GmailApiError(f"{method} {path} failed: {error}") from error447448raise GmailApiError(f"{method} {path} exceeded retry budget")449450451def normalize_many(values: list[str]) -> list[str]:452return [value.strip().lower() for value in values if value and value.strip()]453454455def any_contains(haystack: str, needles: list[str]) -> bool:456if not needles:457return True458haystack = haystack.lower()459return any(needle in haystack for needle in needles)460461462def compute_age_days(internal_date_ms: int) -> float:463if not internal_date_ms:464return 0.0465now = datetime.now(timezone.utc)466created = datetime.fromtimestamp(internal_date_ms / 1000, tz=timezone.utc)467return max(0.0, (now - created).total_seconds() / 86400)468469470def list_message_ids(471access_token: str,472*,473label_ids: list[str],474query: str,475page_size: int,476max_pages: int,477max_messages: int,478min_backoff_seconds: int,479max_backoff_seconds: int,480http_timeout_seconds: int,481) -> list[str]:482page_token = ""483pages_fetched = 0484message_ids: list[str] = []485log_line(486f"LIST start labels={label_ids or ['ALL']} query={query!r} page_size={page_size} max_pages={max_pages or 'all'} max_messages={max_messages or 'all'}"487)488while True:489params: dict[str, Any] = {490"userId": "me",491"maxResults": min(max(page_size, 1), 500),492"fields": "messages/id,nextPageToken,resultSizeEstimate",493}494if label_ids:495params["labelIds"] = label_ids496if query:497params["q"] = query498if page_token