Loading source
Pulling the file list, source metadata, and syntax-aware rendering for this listing.
Source from repo
Prepare Azure environments for new workloads—subscriptions, networking, identity, and landing zones
Files
Skill
Size
Entrypoint
Format
Open file
Syntax-highlighted preview of this file as included in the skill package.
references/services/durable-task-scheduler/python.md
1# Durable Task Scheduler — Python23## Learn More45- [Durable Task Scheduler documentation](https://learn.microsoft.com/azure/durable-task-scheduler/)6- [Durable Functions Python guide](https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-overview?tabs=python)78## Durable Functions Setup910### Required Packages1112```txt13# requirements.txt14azure-functions15azure-functions-durable16azure-identity17```1819> **💡 Finding latest versions**: Run `pip index versions azure-functions-durable` or check [pypi.org/project/azure-functions-durable](https://pypi.org/project/azure-functions-durable/) for the latest stable release.2021### host.json2223```json24{25"version": "2.0",26"extensions": {27"durableTask": {28"hubName": "default",29"storageProvider": {30"type": "durabletask-scheduler",31"connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING"32}33}34},35"extensionBundle": {36"id": "Microsoft.Azure.Functions.ExtensionBundle",37"version": "[4.*, 5.0.0)"38}39}40```4142> **💡 NOTE**: Python uses extension bundles, so the storage provider type is `durabletask-scheduler`. .NET isolated uses the NuGet package directly and requires `azureManaged` instead — see [dotnet.md](dotnet.md).4344### local.settings.json4546```json47{48"IsEncrypted": false,49"Values": {50"FUNCTIONS_WORKER_RUNTIME": "python",51"AzureWebJobsStorage": "UseDevelopmentStorage=true",52"DURABLE_TASK_SCHEDULER_CONNECTION_STRING": "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None"53}54}55```5657## Minimal Example5859```python60import azure.functions as func61import azure.durable_functions as df6263my_app = df.DFApp(http_auth_level=func.AuthLevel.FUNCTION)6465# HTTP Starter66@my_app.route(route="orchestrators/{function_name}", methods=["POST"])67@my_app.durable_client_input(client_name="client")68async def http_start(req: func.HttpRequest, client):69function_name = req.route_params.get('function_name')70instance_id = await client.start_new(function_name)71return client.create_check_status_response(req, instance_id)7273# Orchestrator74@my_app.orchestration_trigger(context_name="context")75def my_orchestration(context: df.DurableOrchestrationContext):76result1 = yield context.call_activity("say_hello", "Tokyo")77result2 = yield context.call_activity("say_hello", "Seattle")78return f"{result1}, {result2}"7980# Activity81@my_app.activity_trigger(input_name="name")82def say_hello(name: str) -> str:83return f"Hello {name}!"84```8586## Workflow Patterns8788### Fan-Out/Fan-In8990```python91@my_app.orchestration_trigger(context_name="context")92def fan_out_fan_in(context: df.DurableOrchestrationContext):93cities = ["Tokyo", "Seattle", "London", "Paris", "Berlin"]9495# Fan-out: schedule all in parallel96parallel_tasks = []97for city in cities:98task = context.call_activity("say_hello", city)99parallel_tasks.append(task)100101# Fan-in: wait for all102results = yield context.task_all(parallel_tasks)103return results104```105106### Human Interaction107108```python109import datetime110111@my_app.orchestration_trigger(context_name="context")112def approval_workflow(context: df.DurableOrchestrationContext):113yield context.call_activity("send_approval_request", context.get_input())114115# Wait for approval event with timeout116timeout = context.current_utc_datetime + datetime.timedelta(days=3)117approval_task = context.wait_for_external_event("ApprovalEvent")118timeout_task = context.create_timer(timeout)119120winner = yield context.task_any([approval_task, timeout_task])121122if winner == approval_task:123approved = approval_task.result124return "Approved" if approved else "Rejected"125return "Timed out"126```127128## Orchestration Determinism129130| ❌ NEVER | ✅ ALWAYS USE |131|----------|--------------|132| `datetime.now()` | `context.current_utc_datetime` |133| `uuid.uuid4()` | `context.new_uuid()` |134| `random.random()` | Pass random values from activities |135| `time.sleep()` | `context.create_timer()` |136| Direct I/O, HTTP, database | `context.call_activity()` |137138### Replay-Safe Logging139140```python141import logging142143@my_app.orchestration_trigger(context_name="context")144def my_orchestration(context: df.DurableOrchestrationContext):145# Check if replaying to avoid duplicate logs146if not context.is_replaying:147logging.info("Started") # Only logs once, not on replay148result = yield context.call_activity("my_activity", "input")149return result150```151152## Error Handling & Retry153154```python155retry_options = df.RetryOptions(156first_retry_interval_in_milliseconds=5000,157max_number_of_attempts=3,158backoff_coefficient=2.0,159max_retry_interval_in_milliseconds=60000160)161162@my_app.orchestration_trigger(context_name="context")163def workflow_with_retry(context: df.DurableOrchestrationContext):164try:165result = yield context.call_activity_with_retry(166"unreliable_service",167retry_options,168context.get_input()169)170return result171except Exception as ex:172context.set_custom_status({"error": str(ex)})173yield context.call_activity("compensation_activity", context.get_input())174return "Compensated"175```176177## Durable Task SDK (Non-Functions)178179For applications running outside Azure Functions (containers, VMs, Azure Container Apps, Azure Kubernetes Service):180181```python182import asyncio183from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker184185# Activity function186def say_hello(ctx, name: str) -> str:187return f"Hello {name}!"188189# Orchestrator function190def my_orchestration(ctx, name: str) -> str:191result = yield ctx.call_activity('say_hello', input=name)192return result193194async def main():195with DurableTaskSchedulerWorker(196host_address="http://localhost:8080",197secure_channel=False,198taskhub="default"199) as worker:200worker.add_activity(say_hello)201worker.add_orchestrator(my_orchestration)202worker.start()203204# Client205from durabletask.azuremanaged.client import DurableTaskSchedulerClient206client = DurableTaskSchedulerClient(207host_address="http://localhost:8080",208taskhub="default",209token_credential=None,210secure_channel=False211)212instance_id = client.schedule_new_orchestration("my_orchestration", input="World")213result = client.wait_for_orchestration_completion(instance_id, timeout=30)214print(f"Output: {result.serialized_output}")215216if __name__ == "__main__":217asyncio.run(main())218```219220