Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from bundle
Persistent local Oura OAuth connection, recent data pull, and markdown analysis workflow.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
scripts/oura_analyzer.py
1#!/usr/bin/env python32from __future__ import annotations34import argparse5import base646import csv7import datetime as dt8import http.server9import json10import math11import os12import secrets as pysecrets13import statistics14import sys15import threading16import time17import urllib.error18import urllib.parse19import urllib.request20import webbrowser21from pathlib import Path22from typing import Any232425SCRIPT_DIR = Path(__file__).resolve().parent26SKILL_DIR = SCRIPT_DIR.parent27STATE_DIR = Path(os.environ.get("OURA_ANALYZER_HOME", "~/.local/share/oura-analyzer")).expanduser()28DEFAULT_TOKEN_FILE = STATE_DIR / "secrets" / "oura_tokens.json"29RAW_DIR = STATE_DIR / "data" / "raw"30IMPORTED_DIR = STATE_DIR / "data" / "imported"31REPORTS_DIR = STATE_DIR / "reports"32API_BASE = "https://api.ouraring.com"33AUTHORIZE_URL = "https://cloud.ouraring.com/oauth/authorize"34TOKEN_URL = f"{API_BASE}/oauth/token"353637def load_dotenv() -> None:38env_path = STATE_DIR / ".env"39if not env_path.exists():40return41for line in env_path.read_text(encoding="utf-8").splitlines():42line = line.strip()43if not line or line.startswith("#") or "=" not in line:44continue45key, value = line.split("=", 1)46os.environ.setdefault(key.strip(), value.strip())474849def ensure_dirs() -> None:50for path in (RAW_DIR, IMPORTED_DIR, REPORTS_DIR, DEFAULT_TOKEN_FILE.parent):51path.mkdir(parents=True, exist_ok=True)525354def iso_now() -> str:55return dt.datetime.now(dt.timezone.utc).replace(microsecond=0).isoformat()565758def date_today() -> dt.date:59return dt.datetime.now(dt.timezone.utc).date()606162def parse_date(value: str) -> dt.date:63return dt.date.fromisoformat(value)646566def read_json(path: Path) -> Any:67return json.loads(path.read_text(encoding="utf-8"))686970def write_json(path: Path, data: Any) -> None:71path.parent.mkdir(parents=True, exist_ok=True)72path.write_text(json.dumps(data, indent=2, ensure_ascii=True) + "\n", encoding="utf-8")737475def resolve_token_file(path_arg: str | None) -> Path:76token_file = path_arg or os.environ.get("OURA_TOKEN_FILE")77if not token_file:78return DEFAULT_TOKEN_FILE79path = Path(token_file)80return path if path.is_absolute() else STATE_DIR / path818283def paths_command(_args: argparse.Namespace) -> None:84paths = {85"state_dir": STATE_DIR,86"env_file": STATE_DIR / ".env",87"token_file": DEFAULT_TOKEN_FILE,88"latest_api_pull": RAW_DIR / "latest_api_pull.json",89"latest_export": IMPORTED_DIR / "latest_export.json",90"latest_report": REPORTS_DIR / "latest_analysis.md",91}92for key, value in paths.items():93print(f"{key}={value}")949596def required_env(name: str) -> str:97value = os.environ.get(name)98if not value:99raise SystemExit(f"Missing {name}. Put it in .env or pass the equivalent CLI option.")100return value101102103def http_json(104method: str,105url: str,106*,107headers: dict[str, str] | None = None,108form: dict[str, str] | None = None,109timeout: int = 30,110) -> Any:111payload = None112request_headers = {"Accept": "application/json"}113if headers:114request_headers.update(headers)115if form is not None:116payload = urllib.parse.urlencode(form).encode("utf-8")117request_headers["Content-Type"] = "application/x-www-form-urlencoded"118request = urllib.request.Request(url, data=payload, headers=request_headers, method=method)119try:120with urllib.request.urlopen(request, timeout=timeout) as response:121return json.loads(response.read().decode("utf-8"))122except urllib.error.HTTPError as exc:123body = exc.read().decode("utf-8", errors="replace")124try:125parsed = json.loads(body)126except json.JSONDecodeError:127parsed = {"status": exc.code, "detail": body}128raise RuntimeError(f"HTTP {exc.code} for {url}: {json.dumps(parsed)}") from exc129130131def build_basic_auth_header(client_id: str, client_secret: str) -> str:132raw = f"{client_id}:{client_secret}".encode("utf-8")133return "Basic " + base64.b64encode(raw).decode("ascii")134135136class OAuthCallbackHandler(http.server.BaseHTTPRequestHandler):137server_version = "OuraAnalyzer/1.0"138139def do_GET(self) -> None: # noqa: N802140parsed = urllib.parse.urlparse(self.path)141query = urllib.parse.parse_qs(parsed.query)142self.server.result = query # type: ignore[attr-defined]143body = (144"<html><body><h1>Oura authorization received</h1>"145"<p>You can close this tab and return to the terminal.</p></body></html>"146).encode("utf-8")147self.send_response(200)148self.send_header("Content-Type", "text/html; charset=utf-8")149self.send_header("Content-Length", str(len(body)))150self.end_headers()151self.wfile.write(body)152153def log_message(self, fmt: str, *args: Any) -> None:154return155156157def wait_for_oauth_callback(redirect_uri: str, timeout_seconds: int = 180) -> dict[str, list[str]]:158parsed = urllib.parse.urlparse(redirect_uri)159if parsed.scheme != "http" or parsed.hostname not in {"127.0.0.1", "localhost"} or not parsed.port:160raise SystemExit("Redirect URI must be a local http://localhost:<port>/... or http://127.0.0.1:<port>/... URI for the built-in callback server.")161162server = http.server.ThreadingHTTPServer((parsed.hostname, parsed.port), OAuthCallbackHandler)163server.result = None # type: ignore[attr-defined]164thread = threading.Thread(target=server.handle_request, daemon=True)165thread.start()166167deadline = time.time() + timeout_seconds168while time.time() < deadline:169result = getattr(server, "result", None)170if result is not None:171server.server_close()172return result173time.sleep(0.2)174175server.server_close()176raise SystemExit("Timed out waiting for the Oura OAuth callback.")177178179def exchange_code_for_tokens(client_id: str, client_secret: str, code: str, redirect_uri: str) -> dict[str, Any]:180return http_json(181"POST",182TOKEN_URL,183headers={"Authorization": build_basic_auth_header(client_id, client_secret)},184form={185"grant_type": "authorization_code",186"code": code,187"redirect_uri": redirect_uri,188},189)190191192def refresh_access_token(193client_id: str,194client_secret: str,195refresh_token: str,196) -> dict[str, Any]:197return http_json(198"POST",199TOKEN_URL,200headers={"Authorization": build_basic_auth_header(client_id, client_secret)},201form={202"grant_type": "refresh_token",203"refresh_token": refresh_token,204},205)206207208def token_expired(token_data: dict[str, Any], skew_seconds: int = 60) -> bool:209obtained_at = token_data.get("obtained_at")210expires_in = token_data.get("expires_in")211if not obtained_at or not expires_in:212return False213obtained = dt.datetime.fromisoformat(obtained_at)214expiry = obtained + dt.timedelta(seconds=int(expires_in))215return dt.datetime.now(dt.timezone.utc) >= (expiry - dt.timedelta(seconds=skew_seconds))216217218def authorize_command(args: argparse.Namespace) -> None:219client_id = args.client_id or required_env("OURA_CLIENT_ID")220client_secret = args.client_secret or required_env("OURA_CLIENT_SECRET")221redirect_uri = args.redirect_uri or os.environ.get("OURA_REDIRECT_URI") or "http://localhost:8765/callback"222token_file = resolve_token_file(args.token_file)223state = pysecrets.token_urlsafe(24)224scopes = args.scopes or "email personal daily heartrate workout tag session spo2Daily"225226query = urllib.parse.urlencode(227{228"response_type": "code",229"client_id": client_id,230"redirect_uri": redirect_uri,231"scope": scopes,232"state": state,233}234)235auth_url = f"{AUTHORIZE_URL}?{query}"236print("Open this URL to authorize Oura:")237print(auth_url)238if not args.no_browser:239webbrowser.open(auth_url)240241callback = wait_for_oauth_callback(redirect_uri, timeout_seconds=args.timeout)242if callback.get("state", [None])[0] != state:243raise SystemExit("OAuth state mismatch.")244if "error" in callback:245raise SystemExit(f"Oura returned an authorization error: {callback['error'][0]}")246247code = callback.get("code", [None])[0]248if not code:249raise SystemExit("No authorization code returned by Oura.")250251tokens = exchange_code_for_tokens(client_id, client_secret, code, redirect_uri)252tokens["obtained_at"] = iso_now()253tokens["redirect_uri"] = redirect_uri254tokens["scopes_requested"] = scopes255write_json(token_file, tokens)256print(f"Saved tokens to {token_file}")257258259def load_token_data(token_file: Path) -> dict[str, Any]:260if not token_file.exists():261raise SystemExit(f"Token file not found: {token_file}. Run authorize first.")262return read_json(token_file)263264265def ensure_live_access_token(args: argparse.Namespace, token_file: Path) -> tuple[str, dict[str, Any]]:266token_data = load_token_data(token_file)267if token_expired(token_data):268client_id = args.client_id or required_env("OURA_CLIENT_ID")269client_secret = args.client_secret or required_env("OURA_CLIENT_SECRET")270refresh_token_value = token_data.get("refresh_token")271if not refresh_token_value:272raise SystemExit("Access token expired and no refresh token is available.")273refreshed = refresh_access_token(client_id, client_secret, refresh_token_value)274refreshed["obtained_at"] = iso_now()275refreshed["redirect_uri"] = token_data.get("redirect_uri")276refreshed["scopes_requested"] = token_data.get("scopes_requested")277write_json(token_file, refreshed)278token_data = refreshed279access_token = token_data.get("access_token")280if not access_token:281raise SystemExit("No access_token in token file.")282return access_token, token_data283284285def api_get(token: str, path: str, params: dict[str, str] | None = None) -> Any:286url = f"{API_BASE}{path}"287if params:288url += "?" + urllib.parse.urlencode(params)289return http_json("GET", url, headers={"Authorization": f"Bearer {token}"})290291292def fetch_paginated_collection(token: str, path: str, params: dict[str, str]) -> list[dict[str, Any]]:293results: list[dict[str, Any]] = []294next_token = None295while True:296page_params = dict(params)297if next_token:298page_params["next_token"] = next_token299payload = api_get(token, path, page_params)300batch = payload.get("data", [])301if not isinstance(batch, list):302raise RuntimeError(f"Unexpected payload for {path}: {payload}")303results.extend(batch)304next_token = payload.get("next_token")305if not next_token:306return results307308309def pull_command(args: argparse.Namespace) -> None:310token_file = resolve_token_file(args.token_file)311access_token, _token_data = ensure_live_access_token(args, token_file)312313end_date = parse_date(args.end_date) if args.end_date else date_today()314start_date = parse_date(args.start_date) if args.start_date else (end_date - dt.timedelta(days=args.days - 1))315if start_date > end_date:316raise SystemExit("start_date must be on or before end_date.")317318base_params = {"start_date": start_date.isoformat(), "end_date": end_date.isoformat()}319pulled_at = iso_now()320dataset = {321"source": "oura_api_v2",322"pulled_at": pulled_at,323"date_range": base_params,324"endpoints": {},325}326327dataset["endpoints"]["personal_info"] = api_get(access_token, "/v2/usercollection/personal_info")328for name in ("daily_activity", "daily_sleep", "daily_readiness", "sleep"):329path = f"/v2/usercollection/{name}"330dataset["endpoints"][name] = fetch_paginated_collection(access_token, path, base_params)331332timestamp = dt.datetime.now().strftime("%Y%m%d-%H%M%S")333output_path = RAW_DIR / f"api-pull-{timestamp}.json"334latest_path = RAW_DIR / "latest_api_pull.json"335write_json(output_path, dataset)336write_json(latest_path, dataset)337print(f"Saved API pull to {output_path}")338print(f"Updated {latest_path}")339340341def normalize_name(value: str) -> str:342lowered = value.lower()343chars = []344for ch in lowered:345chars.append(ch if ch.isalnum() else "_")346return "_".join(filter(None, "".join(chars).split("_")))347348349def infer_dataset_key(path: Path) -> str:350return normalize_name(path.stem)351352353def import_export_command(args: argparse.Namespace) -> None:354source_path = Path(args.path).expanduser().resolve()355if not source_path.exists():356raise SystemExit(f"Path does not exist: {source_path}")357358csv_paths = [source_path] if source_path.is_file() else sorted(source_path.rglob("*.csv"))359if not csv_paths:360raise SystemExit("No CSV files found in the provided export path.")361362imported: dict[str, Any] = {363"source": "oura_export_csv",364"imported_at": iso_now(),365"origin_path": str(source_path),366"files": {},367}368369for csv_path in csv_paths:370key = infer_dataset_key(csv_path)371with csv_path.open("r", encoding="utf-8-sig", newline="") as handle:372reader = csv.DictReader(handle)373rows = list(reader)374imported["files"][key] = {375"file_name": csv_path.name,376"path": str(csv_path),377"rows": rows,378}379380timestamp = dt.datetime.now().strftime("%Y%m%d-%H%M%S")381output_path = IMPORTED_DIR / f"export-import-{timestamp}.json"382latest_path = IMPORTED_DIR / "latest_export.json"383write_json(output_path, imported)384write_json(latest_path, imported)385print(f"Imported {len(csv_paths)} CSV files into {output_path}")386print(f"Updated {latest_path}")387388389def safe_float(value: Any) -> float | None:390if value is None or value == "":391return None392if isinstance(value, (int, float)):393return float(value)394try:395return float(str(value).strip())396except ValueError:397return None398399400def seconds_to_hours(seconds: float | None) -> float | None:401if seconds is None:402return None403return seconds / 3600.0404405406def pick_first(mapping: dict[str, Any], keys: tuple[str, ...]) -> Any:407for key in keys:408if key in mapping and mapping[key] not in (None, ""):409return mapping[key]410return None411412413def mean_or_none(values: list[float]) -> float | None:414return statistics.fmean(values) if values else None415416417def median_or_none(values: list[float]) -> float | None:418return statistics.median(values) if values else None419420421def correlation(xs: list[float], ys: list[float]) -> float | None:422if len(xs) < 3 or len(xs) != len(ys):423return None424mean_x = statistics.fmean(xs)425mean_y = statistics.fmean(ys)426num = sum((x - mean_x) * (y - mean_y) for x, y in zip(xs, ys))427den_x = math.sqrt(sum((x - mean_x) ** 2 for x in xs))428den_y = math.sqrt(sum((y - mean_y) ** 2 for y in ys))429if den_x == 0 or den_y == 0:430return None431return num / (den_x * den_y)432433434def ymd(value: Any) -> str | None:435if value is None:436return None437text = str(value)438return text[:10] if len(text) >= 10 else None439440441def prepare_api_days(dataset: dict[str, Any]) -> dict[str, dict[str, Any]]:442endpoints = dataset.get("endpoints", {})443by_day: dict[str, dict[str, Any]] = {}444445def touch(day: str) -> dict[str, Any]:446return by_day.setdefault(day, {"day": day})447448for row in endpoints.get("daily_readiness", []):449day = row.get("day")450if not day:451continue452target = touch(day)453target["readiness_score"] = row.get("score")454for key, value in row.get("contributors", {}).items():455target[f"readiness_{key}"] = value456457for row in endpoints.get("daily_sleep", []):458