Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
Build and deploy AI applications on Azure AI Foundry using Microsoft's model catalog and AI services
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
finetuning/scripts/monitor_training.py
1# /// script2# dependencies = [3# "openai>=1.0",4# "azure-identity",5# "azure-ai-projects",6# ]7# ///8"""9monitor_training.py โ Monitor a fine-tuning job until completion.1011Polls the job status and streams training events (reward, loss, errors)12in real time. Exits when the job reaches a terminal state.1314Usage:15python monitor_training.py --job-id ftjob-abc12316python monitor_training.py --base-url https://<resource>.services.ai.azure.com/api/projects/<project>/openai/v1/ --api-key KEY --job-id ftjob-abc12317python monitor_training.py --job-id ftjob-abc123 --poll-interval 3018"""1920import argparse21import os22import sys2324try:25sys.stdout.reconfigure(encoding="utf-8")26sys.stderr.reconfigure(encoding="utf-8")27except (AttributeError, OSError):28pass # Stream not reconfigurable (older Python or non-tty); default encoding is fine29import time3031sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))32from common import HelpOnErrorParser, get_clients3334TERMINAL_STATUSES = {"succeeded", "failed", "cancelled"}353637def monitor_job(client, job_id, poll_interval=15):38"""Poll a fine-tuning job until it reaches a terminal state."""39# Cap memory for long-running jobs (RFT can run hours/days, accumulating thousands of events)40seen_events = set()41MAX_SEEN_EVENTS = 50004243print(f"Monitoring job: {job_id}")44print(f"Polling every {poll_interval}s. Ctrl+C to stop.\n")4546while True:47try:48job = client.fine_tuning.jobs.retrieve(job_id)49except Exception as e:50print(f"โ ๏ธ Error retrieving job: {e}")51time.sleep(poll_interval)52continue5354status = (job.status or "").lower()5556# Fetch and display new events57try:58events = list(client.fine_tuning.jobs.list_events(job_id, limit=20))59for event in reversed(events):60event_key = (event.created_at, event.message)61if event_key not in seen_events:62if len(seen_events) >= MAX_SEEN_EVENTS:63# Keep only the most recent half โ a fully-flushed dedup window64# would risk re-printing old events on transient API hiccups, but65# without trimming this set grows unbounded for long RFT runs.66seen_events = set(list(seen_events)[-(MAX_SEEN_EVENTS // 2):])67seen_events.add(event_key)68ts = time.strftime("%H:%M:%S", time.localtime(event.created_at))69level = event.level or "info"7071# Highlight step events72if "Step" in event.message and "reward" in event.message:73print(f" ๐ [{ts}] {event.message}")74elif "Step" in event.message and "loss" in event.message:75print(f" ๐ [{ts}] {event.message}")76elif "error" in event.message.lower() or level == "error":77print(f" โ [{ts}] {event.message}")78elif "started" in event.message.lower() or "completed" in event.message.lower():79print(f" ๐ [{ts}] {event.message}")80else:81print(f" โน๏ธ [{ts}] {event.message}")82except Exception:83pass # Events API may not be available for all job states8485# Check terminal state86if status in TERMINAL_STATUSES:87print(f"\n{'='*50}")88if status == "succeeded":89model = job.fine_tuned_model or "unknown"90print(f" โ Job succeeded!")91print(f" Fine-tuned model: {model}")92if job.trained_tokens:93print(f" Trained tokens: {job.trained_tokens:,}")94elif status == "failed":95print(f" โ Job failed.")96if hasattr(job, "error") and job.error:97print(f" Error: {job.error}")98elif status == "cancelled":99print(f" โ ๏ธ Job was cancelled.")100print(f"{'='*50}")101return status102103time.sleep(poll_interval)104105106def build_parser():107parser = HelpOnErrorParser(108description="Monitor a fine-tuning job until completion",109epilog=(110"Example:\n"111" python monitor_training.py --job-id ftjob-abc123\n"112" python monitor_training.py --base-url https://<resource>.services.ai.azure.com/api/projects/<project>/openai/v1/ --api-key KEY --job-id ftjob-abc123"113),114formatter_class=argparse.RawTextHelpFormatter,115)116parser.add_argument("--base-url", default=os.environ.get("OPENAI_BASE_URL"), help="Project /v1/ endpoint URL")117parser.add_argument("--endpoint", default=os.environ.get("AZURE_OPENAI_ENDPOINT"),118help="Azure OpenAI endpoint (fallback)")119parser.add_argument("--api-key", default=os.environ.get("AZURE_OPENAI_API_KEY"), help="API key")120parser.add_argument("--project-endpoint", default=os.environ.get("AZURE_AI_PROJECT_ENDPOINT"),121help="Azure AI project endpoint (alternative to --base-url)")122parser.add_argument("--job-id", required=True, help="Fine-tuning job ID (e.g., ftjob-abc123)")123parser.add_argument("--poll-interval", type=int, default=15, help="Seconds between status checks (default: 15)")124return parser125126127if __name__ == "__main__":128parser = build_parser()129if len(sys.argv) == 1:130parser.print_help()131sys.exit(0)132133args = parser.parse_args()134client, method = get_clients(base_url=args.base_url, azure_endpoint=args.endpoint, project_endpoint=args.project_endpoint, api_key=args.api_key)135status = monitor_job(client, args.job_id, args.poll_interval)136sys.exit(0 if status == "succeeded" else 1)137