Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
Deploy, evaluate, and manage AI agents end-to-end on Microsoft Azure AI Foundry
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
finetuning/scripts/monitor_training.py
1# /// script2# dependencies = [3# "openai>=1.0",4# "azure-identity",5# "azure-ai-projects",6# ]7# ///8"""9monitor_training.py — Monitor a fine-tuning job until completion.1011Polls the job status and streams training events (reward, loss, errors)12in real time. Exits when the job reaches a terminal state.1314Usage:15python monitor_training.py --job-id ftjob-abc12316python monitor_training.py --base-url https://<resource>.services.ai.azure.com/api/projects/<project>/openai/v1/ --api-key KEY --job-id ftjob-abc12317python monitor_training.py --job-id ftjob-abc123 --poll-interval 3018"""1920import argparse21import os22import sys2324try:25sys.stdout.reconfigure(encoding="utf-8")26sys.stderr.reconfigure(encoding="utf-8")27except (AttributeError, OSError):28pass # Stream not reconfigurable (older Python or non-tty); default encoding is fine29import time3031sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))32from common import HelpOnErrorParser, get_clients3334TERMINAL_STATUSES = {"succeeded", "failed", "cancelled"}353637def monitor_job(client, job_id, poll_interval=15):38"""Poll a fine-tuning job until it reaches a terminal state."""39# Cap memory for long-running jobs (RFT can run hours/days, accumulating thousands of events)40seen_events = set()41MAX_SEEN_EVENTS = 50004243print(f"Monitoring job: {job_id}")44print(f"Polling every {poll_interval}s. Ctrl+C to stop.\n")4546while True:47try:48job = client.fine_tuning.jobs.retrieve(job_id)49except Exception as e:50print(f"⚠️ Error retrieving job: {e}")51time.sleep(poll_interval)52continue5354status = (job.status or "").lower()5556# Fetch and display new events57try:58events = list(client.fine_tuning.jobs.list_events(job_id, limit=20))59for event in reversed(events):60event_key = (event.created_at, event.message)61if event_key not in seen_events:62if len(seen_events) >= MAX_SEEN_EVENTS:63# Keep only the most recent half — a fully-flushed dedup window64# would risk re-printing old events on transient API hiccups, but65# without trimming this set grows unbounded for long RFT runs.66seen_events = set(list(seen_events)[-(MAX_SEEN_EVENTS // 2):])67seen_events.add(event_key)68ts = time.strftime("%H:%M:%S", time.localtime(event.created_at))69level = event.level or "info"7071# Highlight step events72if "Step" in event.message and "reward" in event.message:73print(f" 📈 [{ts}] {event.message}")74elif "Step" in event.message and "loss" in event.message:75print(f" 📉 [{ts}] {event.message}")76elif "error" in event.message.lower() or level == "error":77print(f" ❌ [{ts}] {event.message}")78elif "started" in event.message.lower() or "completed" in event.message.lower():79print(f" 🔔 [{ts}] {event.message}")80else:81print(f" ℹ️ [{ts}] {event.message}")82except Exception:83pass # Events API may not be available for all job states8485# Check terminal state86if status in TERMINAL_STATUSES:87print(f"\n{'='*50}")88if status == "succeeded":89model = job.fine_tuned_model or "unknown"90print(f" ✅ Job succeeded!")91print(f" Fine-tuned model: {model}")92if job.trained_tokens:93print(f" Trained tokens: {job.trained_tokens:,}")94elif status == "failed":95print(f" ❌ Job failed.")96if hasattr(job, "error") and job.error:97print(f" Error: {job.error}")98elif status == "cancelled":99print(f" ⚠️ Job was cancelled.")100print(f"{'='*50}")101return status102103time.sleep(poll_interval)104105106def build_parser():107parser = HelpOnErrorParser(108description="Monitor a fine-tuning job until completion",109epilog=(110"Example:\n"111" python monitor_training.py --job-id ftjob-abc123\n"112" python monitor_training.py --base-url https://<resource>.services.ai.azure.com/api/projects/<project>/openai/v1/ --api-key KEY --job-id ftjob-abc123"113),114formatter_class=argparse.RawTextHelpFormatter,115)116parser.add_argument("--base-url", default=os.environ.get("OPENAI_BASE_URL"), help="Project /v1/ endpoint URL")117parser.add_argument("--endpoint", default=os.environ.get("AZURE_OPENAI_ENDPOINT"),118help="Azure OpenAI endpoint (fallback)")119parser.add_argument("--api-key", default=os.environ.get("AZURE_OPENAI_API_KEY"), help="API key")120parser.add_argument("--project-endpoint", default=os.environ.get("AZURE_AI_PROJECT_ENDPOINT"),121help="Azure AI project endpoint (alternative to --base-url)")122parser.add_argument("--job-id", required=True, help="Fine-tuning job ID (e.g., ftjob-abc123)")123parser.add_argument("--poll-interval", type=int, default=15, help="Seconds between status checks (default: 15)")124return parser125126127if __name__ == "__main__":128parser = build_parser()129if len(sys.argv) == 1:130parser.print_help()131sys.exit(0)132133args = parser.parse_args()134client, method = get_clients(base_url=args.base_url, azure_endpoint=args.endpoint, project_endpoint=args.project_endpoint, api_key=args.api_key)135status = monitor_job(client, args.job_id, args.poll_interval)136sys.exit(0 if status == "succeeded" else 1)137