Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
Reviews, improves, and writes SwiftUI code following state management, view composition, performance, and iOS 26+ Liquid Glass best practices.
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
scripts/instruments_parser/events.py
1"""Discovery helpers for os_log messages and os_signpost intervals.23These let an agent locate a focus window (e.g. "after the log saying X",4"during signpost Y") before running the main lane analysis.5"""6from __future__ import annotations78from pathlib import Path9from typing import Any1011from . import xctrace, xml_utils1213OS_LOG_SCHEMA = "os-log"14OS_SIGNPOST_SCHEMA = "os-signpost"15OS_SIGNPOST_INTERVAL_SCHEMA = "os-signpost-interval"161718def list_logs(19trace_path: Path,20toc_schemas: frozenset[str],21subsystem: str | None = None,22category: str | None = None,23message_contains: str | None = None,24message_type: str | None = None,25limit: int | None = None,26window_ns: tuple[int, int] | None = None,27run: int = 1,28) -> list[dict[str, Any]]:29"""Return os_log entries, optionally filtered. Case-insensitive contains.3031`limit` counts *post-filter* matches — including the window filter — so32the caller gets N matching logs inside the window rather than the first33N matching logs that might all fall outside it.34"""35if OS_LOG_SCHEMA not in toc_schemas:36return []37xml_bytes = xctrace.export_schema(trace_path, OS_LOG_SCHEMA, run=run)38stream = xml_utils.RowStream(xml_bytes)39needle = message_contains.lower() if message_contains else None4041out: list[dict[str, Any]] = []42for row in stream:43time_el = row.get("time")44if time_el is None:45continue46time_ns = xml_utils.int_text(stream.resolve(time_el))47if time_ns is None:48continue49if not xml_utils.in_window(time_ns, window_ns):50continue5152sub = _str_of(row, stream, "subsystem")53cat = _str_of(row, stream, "category")54typ = _str_of(row, stream, "message-type")55fmt = _str_of(row, stream, "format-string")56msg = _str_of(row, stream, "message") or fmt5758if subsystem and (sub or "") != subsystem:59continue60if category and (cat or "") != category:61continue62if message_type and (typ or "") != message_type:63continue64if needle and needle not in (msg or "").lower() and needle not in (fmt or "").lower():65continue6667process_el = row.get("process")68process = (69xml_utils.extract_process(process_el, stream).get("name")70if process_el is not None else None71)7273out.append({74"time_ns": time_ns,75"time_ms": round(time_ns / 1_000_000, 3),76"type": typ,77"subsystem": sub,78"category": cat,79"process": process,80"message": msg,81"format_string": fmt,82})83if limit is not None and len(out) >= limit:84break8586out.sort(key=lambda e: e["time_ns"])87return out888990def list_signposts(91trace_path: Path,92toc_schemas: frozenset[str],93name_contains: str | None = None,94subsystem: str | None = None,95category: str | None = None,96window_ns: tuple[int, int] | None = None,97run: int = 1,98) -> dict[str, list[dict[str, Any]]]:99"""Return signpost intervals (paired begin/end) plus single-point events.100101Shape: { "intervals": [...], "events": [...] }. Intervals have102start_ms/end_ms/duration_ms; events have a single time_ms.103104Reads two complementary schemas:105* `os-signpost-interval`: already-paired intervals (this is where106user-emitted signposts like com.example.MyApp typically land).107* `os-signpost`: raw begin/end/event rows; we pair begins with ends108ourselves and fall back to point events for unpaired rows. Most109Apple-framework signposts (CloudKit, AppKit, …) live here.110111Filters are AND-combined. `name_contains` is a case-insensitive substring112match. `window_ns` keeps intervals that overlap the window (not strict113containment) and point events whose timestamp falls inside it.114"""115# The two signpost schemas overlap: every paired begin/end in `os-signpost`116# also shows up as a row in `os-signpost-interval`. To avoid duplicates we117# prefer the pre-paired schema for intervals and only mine `os-signpost`118# for point events (and for begin/end pairing as a fallback when the119# interval schema is missing — older traces).120intervals: list[dict[str, Any]] = []121events: list[dict[str, Any]] = []122123has_intervals = OS_SIGNPOST_INTERVAL_SCHEMA in toc_schemas124if has_intervals:125intervals.extend(_read_interval_schema(trace_path, run=run))126127if OS_SIGNPOST_SCHEMA in toc_schemas:128more_intervals, more_events = _read_event_schema(trace_path, run=run)129if not has_intervals:130intervals.extend(more_intervals)131events.extend(more_events)132133intervals.sort(key=lambda i: i["start_ns"])134events.sort(key=lambda e: e["time_ns"])135136needle = name_contains.lower() if name_contains else None137138def _matches(entry: dict) -> bool:139if subsystem and (entry.get("subsystem") or "") != subsystem:140return False141if category and (entry.get("category") or "") != category:142return False143if needle and needle not in (entry.get("name") or "").lower():144return False145return True146147if subsystem or category or needle:148intervals = [i for i in intervals if _matches(i)]149events = [e for e in events if _matches(e)]150151if window_ns is not None:152s, e = window_ns153intervals = [154i for i in intervals155if not (i["end_ns"] < s or i["start_ns"] > e)156]157events = [ev for ev in events if s <= ev["time_ns"] <= e]158159return {"intervals": intervals, "events": events}160161162def _read_interval_schema(trace_path: Path, run: int = 1) -> list[dict[str, Any]]:163"""Read the os-signpost-interval schema (pre-paired intervals)."""164xml_bytes = xctrace.export_schema(trace_path, OS_SIGNPOST_INTERVAL_SCHEMA, run=run)165stream = xml_utils.RowStream(xml_bytes)166167out: list[dict[str, Any]] = []168for row in stream:169start_el = xml_utils.first_present(row, "start", "time")170dur_el = row.get("duration")171if start_el is None or dur_el is None:172continue173start_ns = xml_utils.int_text(stream.resolve(start_el))174dur_ns = xml_utils.int_text(stream.resolve(dur_el))175if start_ns is None or dur_ns is None:176continue177end_ns = start_ns + dur_ns178179name = _str_of(row, stream, "name")180sub = _str_of(row, stream, "subsystem")181cat = _str_of(row, stream, "category")182signpost_id = _str_of(row, stream, "identifier") or _str_of(row, stream, "signpost-id")183process_el = row.get("process")184process = (185xml_utils.extract_process(process_el, stream).get("name")186if process_el is not None else None187)188189out.append({190"start_ns": start_ns,191"end_ns": end_ns,192"duration_ns": dur_ns,193"start_ms": round(start_ns / 1_000_000, 3),194"end_ms": round(end_ns / 1_000_000, 3),195"duration_ms": round(dur_ns / 1_000_000, 3),196"name": name,197"subsystem": sub,198"category": cat,199"process": process,200"signpost_id": signpost_id,201})202return out203204205def _read_event_schema(206trace_path: Path,207run: int = 1,208) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:209"""Read the os-signpost schema and pair begin/end rows into intervals."""210xml_bytes = xctrace.export_schema(trace_path, OS_SIGNPOST_SCHEMA, run=run)211stream = xml_utils.RowStream(xml_bytes)212213pending: dict[tuple, dict] = {}214intervals: list[dict[str, Any]] = []215events: list[dict[str, Any]] = []216217for row in stream:218time_el = xml_utils.first_present(row, "time", "start")219if time_el is None:220continue221time_ns = xml_utils.int_text(stream.resolve(time_el))222if time_ns is None:223continue224225name = _str_of(row, stream, "name")226sub = _str_of(row, stream, "subsystem")227cat = _str_of(row, stream, "category")228event_type = _str_of(row, stream, "event-type") or _str_of(row, stream, "message-type")229signpost_id = _str_of(row, stream, "signpost-id") or _str_of(row, stream, "identifier")230process_el = row.get("process")231process = (232xml_utils.extract_process(process_el, stream).get("name")233if process_el is not None else None234)235236key = (process, sub, cat, name, signpost_id)237etype = (event_type or "").lower()238239if etype in ("begin", "interval begin", "start"):240pending[key] = {"start_ns": time_ns, "name": name,241"subsystem": sub, "category": cat,242"process": process, "signpost_id": signpost_id}243elif etype in ("end", "interval end", "stop"):244start = pending.pop(key, None)245if start is not None:246dur_ns = time_ns - start["start_ns"]247intervals.append({248**start,249"end_ns": time_ns,250"duration_ns": dur_ns,251"start_ms": round(start["start_ns"] / 1_000_000, 3),252"end_ms": round(time_ns / 1_000_000, 3),253"duration_ms": round(dur_ns / 1_000_000, 3),254})255else:256events.append(_point_event(time_ns, name, sub, cat,257process, signpost_id, event_type))258else:259events.append(_point_event(time_ns, name, sub, cat,260process, signpost_id, event_type))261262# Unclosed begins are surfaced as point events so nothing is silently dropped.263for info in pending.values():264events.append(_point_event(info["start_ns"], info["name"],265info["subsystem"], info["category"],266info["process"], info["signpost_id"],267"Begin (unclosed)"))268269return intervals, events270271272def _point_event(time_ns, name, subsystem, category, process, signpost_id, event_type):273return {274"time_ns": time_ns,275"time_ms": round(time_ns / 1_000_000, 3),276"name": name,277"subsystem": subsystem,278"category": category,279"process": process,280"signpost_id": signpost_id,281"event_type": event_type,282}283284285def _str_of(row, stream, key):286el = row.get(key)287if el is None:288return None289resolved = stream.resolve(el)290txt = xml_utils.str_text(resolved) or resolved.get("fmt")291return txt292