Dapr Agents, Catalyst and Aspire

What is Dapr Agents, and how does it work with Diagrid Catalyst and .NET Aspire (formerly .NET Aspire) to help developers build production-grade agentic applications?

In earlier blog post Dapr - Agent Integrations & .NET Aspire, I talked about how Agent Integrations augments and enhances other agentic frameworks e.g., CrewAI, LangGraph, Strands Agents, Microsoft Agent Framework, Google ADK, OpenAI Agents, Pydantic AI, Deep Agents by providing them with key capabilities that production systems demand.

In this blog post, I will take things a step further and foucs on Dapr Agents, Dapr's own native agent framework. If Agent Integrations are about bringing production-readiness to your existing framework, Dapr Agents is about building with production-readiness from the very beginning. It is a Python framework that sits directly on top of Dapr's building blocks using Dapr Workflows for durable, long-running execution, the State Management API for portable agent memory, Pub/Sub and Service Invocation for reliable and secure agent-to-agent communication, and secure agent identity as a default, not a configuration.

Aspire (formerly .NET Aspire) and Diagrid Catalyst play important supporting roles in this story. Aspire is a multi-language local dev-time orchestration toolchain that lets you define your entire stack frontends, APIs, containers, databases, and Dapr sidecars in code, with built-in OpenTelemetry delivering logs, traces, and health checks automatically, making it significantly easier to move from a local prototype to a resilient distributed application. Catalyst, on the other hand, is a centralized platform for running and operating the Dapr runtime as-a-service in production handling operational automation, scaling, zero-trust security, governance, and full visibility into every agent execution and workflow trace.

Prerequisites

Demo App

The demo app is based on the Aspire starter template, which includes a frontend (ASP.NET Core Blazor App), a backend (ASP.NET Core Minimal API), Service Defaults, and an AppHost project to demonstrate Aspire's capabilities.

The App acts as a Shipment Control Center - AI-Assisted Logistics Operations, accepts a shipment scenario from a Blazor web frontend and sends it through an API gateway to a Dapr Workflow orchestrator. The orchestrator coordinates five Python-based Dapr Agents to generate a complete shipment execution plan with real-time progress updates.

The Order Intake Agent normalizes the raw shipment request into a canonical shipment profile. The Carrier Capacity Agent evaluates feasible carriers, capacity, equipment, and booking options. The Route Planning Agent creates route legs, milestones, alternative paths, and ETA details. The Risk Compliance Agent validates documents, restrictions, operational risks, and mitigation strategies. Finally, the Customer Update Writer Agent converts the planning brief into an operations-ready execution plan along with a customer-facing update.

The image below depicts the application's Shipment Control Center UI.

NOTE: Considering a .NET stack, the Orchestrator can be written in .NET while keeping all Agent code in Python, as Dapr Agents currently only support Python. Alternatively, all Agents could be rewritten using the Microsoft Agent Framework, leveraging Dapr–Agent integration to make them production-ready. This may be demonstrated in an upcoming blog post.

Dapr-ization of the App

Refer. to the Dapr-ization of the App section in Diagrid Catalyst & .NET Aspire to configure the app to use the Dapr sidecar.

Start with Workflow & Agents

Since the application's basic structure is ready, let’s start creating the Agents folders and subfolders as shown below.

Orchestrator - Workflow

The orchestrator is responsible for managing the complete cross-agent business workflow. It starts each child workflow using the appropriate Dapr app ID and workflow name, waits for each workflow to complete, and reads the resulting output. It also validates whether the output is usable and handles retries or fallback logic when an agent times out, fails, or returns unusable final content. In addition, the orchestrator builds the input for the next agent based on the outputs of previous agents, publishes progress events throughout the execution lifecycle, and ultimately returns the finalized execution plan to the API. The orchestrator is not merely a routing component; it owns and manages the end-to-end cross-agent business process.

# pyproject.toml
[project]
name = "agent-orchestrator"
version = "0.1.0"
requires-python = ">=3.11,<3.14"
dependencies = [
    "dapr-agents==1.0.0",
    "dapr==1.17.3",
    "opentelemetry-api==1.41.1",
    "opentelemetry-sdk==1.41.1",
    "opentelemetry-exporter-otlp-proto-grpc==1.41.1",
    "python-dotenv==1.0.0",
    "uvicorn[standard]==0.31.1",
]
# app.py
from __future__ import annotations
 
import asyncio
import warnings
from datetime import datetime, timezone
from typing import Any
 
import dapr.ext.workflow as wf
from dapr.clients import DaprClient
from fastapi import Body, FastAPI, HTTPException
 
from config import READINESS_APP_IDS, logger
from shared.observability import current_trace_context, instrument_fastapi
from readiness import _check_agent_readiness, _check_chat_component_readiness, _check_workflow_readiness, _readiness_entry
from workflow_helpers import log_stage
 
def _get_task_from_body(body: dict[str, Any] | None) -> str:
    task = body.get("task") if isinstance(body, dict) else None
    if not isinstance(task, str) or not task.strip():
        raise HTTPException(status_code=422, detail="A non-empty 'task' field is required.")
    return task.strip()
 
def _get_requested_instance_id(body: dict[str, Any] | None) -> str | None:
    task_id = body.get("task_id") if isinstance(body, dict) else None
    if not isinstance(task_id, str) or not task_id.strip():
        return None
    return task_id.strip()
 
def _get_trace_context_from_body(body: dict[str, Any] | None) -> dict[str, str]:
    trace_context = body.get("trace_context") if isinstance(body, dict) else None
    if not isinstance(trace_context, dict):
        return {}
    return {str(key): str(value) for key, value in trace_context.items() if value}
 
def _workflow_status_url(instance_id: str) -> str:
    return f"/agent/instances/{instance_id}"
 
def create_app() -> FastAPI:
    app = FastAPI(title="Shipment Execution Workflow Service", version="1.0.0")
    instrument_fastapi(app)
    workflow_client = wf.DaprWorkflowClient()
 
    @app.get("/health")
    async def health() -> dict[str, str]:
        return {"status": "ok"}
 
    @app.get("/agent/readiness")
    async def readiness() -> dict[str, Any]:
        checks = [_readiness_entry("orchestrator", True, "ok")]
        results = await asyncio.gather(
            *[
                asyncio.to_thread(_check_agent_readiness, app_id)
                for app_id in READINESS_APP_IDS
                if app_id != "orchestrator"
            ]
        )
        checks.extend(results)
        checks.append(await asyncio.to_thread(_check_workflow_readiness, workflow_client))
        checks.append(await asyncio.to_thread(_check_chat_component_readiness))
        return {
            "ready": all(check["ready"] for check in checks),
            "checkedAt": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
            "dependencies": checks,
        }
 
    @app.post("/agent/run")
    async def start_workflow(body: dict[str, Any] = Body(default_factory=dict)) -> dict[str, str]:
        task = _get_task_from_body(body)
        requested_instance_id = _get_requested_instance_id(body)
        trace_context = _get_trace_context_from_body(body) or current_trace_context()
        log_stage(
            "run request received",
            task_id=requested_instance_id or "auto",
            body_keys=sorted(body.keys()) if isinstance(body, dict) else [],
        )
        if requested_instance_id:
            existing_state = await asyncio.to_thread(
                workflow_client.get_workflow_state,
                requested_instance_id,
                fetch_payloads=False,
            )
            if existing_state is not None:
                logger.info(
                    "Workflow instance %s already exists; returning existing status URL.",
                    requested_instance_id,
                )
                return {
                    "instance_id": requested_instance_id,
                    "status_url": _workflow_status_url(requested_instance_id),
                }
 
        workflow_input = {"task": task, "task_id": requested_instance_id, "trace_context": trace_context}
 
        def start_via_dapr_runtime() -> str:
            with warnings.catch_warnings():
                warnings.simplefilter("ignore", UserWarning)
                dapr_client = DaprClient()
                response = dapr_client.start_workflow(
                    workflow_component="dapr",
                    workflow_name="shipment_execution_workflow",
                    input=workflow_input,
                    instance_id=requested_instance_id,
                )
 
            return response.instance_id
 
        instance_id = await asyncio.to_thread(start_via_dapr_runtime)
        return {
            "instance_id": instance_id or "",
            "status_url": _workflow_status_url(instance_id or ""),
        }
 
    @app.get("/agent/instances/{instance_id}")
    async def get_status(instance_id: str) -> dict[str, Any]:
        workflow_state = await asyncio.to_thread(
            workflow_client.get_workflow_state,
            instance_id,
            fetch_payloads=True,
        )
        if workflow_state is None:
            raise HTTPException(status_code=404, detail="Workflow instance not found.")
 
        payload = workflow_state.to_json()
        payload["runtime_status"] = getattr(
            workflow_state.runtime_status,
            "name",
            str(workflow_state.runtime_status),
        )
        for field in ("created_at", "last_updated_at"):
            ts = payload.get(field)
            if ts:
                payload[field] = ts.isoformat()
        return payload
 
    return app
# api.py
from __future__ import annotations
 
import asyncio
import warnings
from datetime import datetime, timezone
from typing import Any
 
import dapr.ext.workflow as wf
from dapr.clients import DaprClient
from fastapi import Body, FastAPI, HTTPException
 
from config import READINESS_APP_IDS, logger
from shared.observability import current_trace_context, instrument_fastapi
from readiness import _check_agent_readiness, _check_chat_component_readiness, _check_workflow_readiness, _readiness_entry
from workflow_helpers import log_stage
 
def _get_task_from_body(body: dict[str, Any] | None) -> str:
    task = body.get("task") if isinstance(body, dict) else None
    if not isinstance(task, str) or not task.strip():
        raise HTTPException(status_code=422, detail="A non-empty 'task' field is required.")
    return task.strip()
 
def _get_requested_instance_id(body: dict[str, Any] | None) -> str | None:
    task_id = body.get("task_id") if isinstance(body, dict) else None
    if not isinstance(task_id, str) or not task_id.strip():
        return None
    return task_id.strip()
 
def _get_trace_context_from_body(body: dict[str, Any] | None) -> dict[str, str]:
    trace_context = body.get("trace_context") if isinstance(body, dict) else None
    if not isinstance(trace_context, dict):
        return {}
    return {str(key): str(value) for key, value in trace_context.items() if value}
 
def _workflow_status_url(instance_id: str) -> str:
    return f"/agent/instances/{instance_id}"
 
def create_app() -> FastAPI:
    app = FastAPI(title="Shipment Execution Workflow Service", version="1.0.0")
    instrument_fastapi(app)
    workflow_client = wf.DaprWorkflowClient()
 
    @app.get("/health")
    async def health() -> dict[str, str]:
        return {"status": "ok"}
 
    @app.get("/agent/readiness")
    async def readiness() -> dict[str, Any]:
        checks = [_readiness_entry("orchestrator", True, "ok")]
        results = await asyncio.gather(
            *[
                asyncio.to_thread(_check_agent_readiness, app_id)
                for app_id in READINESS_APP_IDS
                if app_id != "orchestrator"
            ]
        )
        checks.extend(results)
        checks.append(await asyncio.to_thread(_check_workflow_readiness, workflow_client))
        checks.append(await asyncio.to_thread(_check_chat_component_readiness))
        return {
            "ready": all(check["ready"] for check in checks),
            "checkedAt": datetime.now(timezone.utc).isoformat().replace("+00:00", "Z"),
            "dependencies": checks,
        }
 
    @app.post("/agent/run")
    async def start_workflow(body: dict[str, Any] = Body(default_factory=dict)) -> dict[str, str]:
        task = _get_task_from_body(body)
        requested_instance_id = _get_requested_instance_id(body)
        trace_context = _get_trace_context_from_body(body) or current_trace_context()
        log_stage(
            "run request received",
            task_id=requested_instance_id or "auto",
            body_keys=sorted(body.keys()) if isinstance(body, dict) else [],
        )
        if requested_instance_id:
            existing_state = await asyncio.to_thread(
                workflow_client.get_workflow_state,
                requested_instance_id,
                fetch_payloads=False,
            )
            if existing_state is not None:
                logger.info(
                    "Workflow instance %s already exists; returning existing status URL.",
                    requested_instance_id,
                )
                return {
                    "instance_id": requested_instance_id,
                    "status_url": _workflow_status_url(requested_instance_id),
                }
 
        workflow_input = {"task": task, "task_id": requested_instance_id, "trace_context": trace_context}
 
        def start_via_dapr_runtime() -> str:
            with warnings.catch_warnings():
                warnings.simplefilter("ignore", UserWarning)
                dapr_client = DaprClient()
                response = dapr_client.start_workflow(
                    workflow_component="dapr",
                    workflow_name="shipment_execution_workflow",
                    input=workflow_input,
                    instance_id=requested_instance_id,
                )
 
            return response.instance_id
 
        instance_id = await asyncio.to_thread(start_via_dapr_runtime)
        return {
            "instance_id": instance_id or "",
            "status_url": _workflow_status_url(instance_id or ""),
        }
 
    @app.get("/agent/instances/{instance_id}")
    async def get_status(instance_id: str) -> dict[str, Any]:
        workflow_state = await asyncio.to_thread(
            workflow_client.get_workflow_state,
            instance_id,
            fetch_payloads=True,
        )
        if workflow_state is None:
            raise HTTPException(status_code=404, detail="Workflow instance not found.")
 
        payload = workflow_state.to_json()
        payload["runtime_status"] = getattr(
            workflow_state.runtime_status,
            "name",
            str(workflow_state.runtime_status),
        )
        for field in ("created_at", "last_updated_at"):
            ts = payload.get(field)
            if ts:
                payload[field] = ts.isoformat()
        return payload
 
    return app
# config.py
from __future__ import annotations
 
import copy
import logging
import os
import sys
import warnings
from pathlib import Path
 
sys.path.append(str(Path(__file__).resolve().parents[1]))
 
from dapr.conf import settings as dapr_settings
from dapr_agents.tool.workflow import agent_workflow_id
from dotenv import load_dotenv
from shared.observability import configure_observability
import uvicorn
 
load_dotenv()
 
def normalize_dapr_endpoint_environment() -> None:
    for name in ("DAPR_GRPC_ENDPOINT", "DAPR_HTTP_ENDPOINT"):
        value = os.getenv(name)
        if value:
            normalized_value = value.rstrip("/")
            os.environ[name] = normalized_value
            setattr(dapr_settings, name, normalized_value)
 
normalize_dapr_endpoint_environment()
 
warnings.filterwarnings(
    "ignore",
    message="http and https schemes are deprecated for grpc.*",
    category=UserWarning,
    module="dapr.conf.helpers",
)
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s [%(name)s] %(message)s",
    stream=sys.stdout,
    force=True,
)
logger = logging.getLogger("orchestrator.app")
configure_observability("orchestrator")
 
UVICORN_LOG_CONFIG = copy.deepcopy(uvicorn.config.LOGGING_CONFIG)
UVICORN_LOG_CONFIG["handlers"]["default"]["stream"] = "ext://sys.stdout"
UVICORN_LOG_CONFIG["handlers"]["access"]["stream"] = "ext://sys.stdout"
 
AGENT_PORT = int(os.getenv("AGENT_PORT", "50050"))
CHILD_WORKFLOW_TIMEOUT_SECONDS = int(os.getenv("CHILD_WORKFLOW_TIMEOUT_SECONDS", "120"))
CHILD_WORKFLOW_MAX_ATTEMPTS = int(os.getenv("CHILD_WORKFLOW_MAX_ATTEMPTS", "2"))
CHILD_WORKFLOW_RETRY_FIRST_INTERVAL_SECONDS = int(os.getenv("CHILD_WORKFLOW_RETRY_FIRST_INTERVAL_SECONDS", "5"))
CHILD_WORKFLOW_RETRY_BACKOFF_COEFFICIENT = float(os.getenv("CHILD_WORKFLOW_RETRY_BACKOFF_COEFFICIENT", "2.0"))
ORDER_INTAKE_APP_ID = os.getenv("ORDER_INTAKE_APP_ID", "order-intake")
CARRIER_CAPACITY_APP_ID = os.getenv("CARRIER_CAPACITY_APP_ID", "carrier-capacity")
ROUTE_PLANNING_APP_ID = os.getenv("ROUTE_PLANNING_APP_ID", "route-planning")
RISK_COMPLIANCE_APP_ID = os.getenv("RISK_COMPLIANCE_APP_ID", "risk-compliance")
CUSTOMER_UPDATE_WRITER_APP_ID = os.getenv("CUSTOMER_UPDATE_WRITER_APP_ID", "customer-update-writer")
ORDER_INTAKE_AGENT_NAME = os.getenv("ORDER_INTAKE_AGENT_NAME", "order-intake-agent")
CARRIER_CAPACITY_AGENT_NAME = os.getenv("CARRIER_CAPACITY_AGENT_NAME", "carrier-capacity-agent")
ROUTE_PLANNING_AGENT_NAME = os.getenv("ROUTE_PLANNING_AGENT_NAME", "route-planning-agent")
RISK_COMPLIANCE_AGENT_NAME = os.getenv("RISK_COMPLIANCE_AGENT_NAME", "risk-compliance-agent")
CUSTOMER_UPDATE_WRITER_AGENT_NAME = os.getenv("CUSTOMER_UPDATE_WRITER_AGENT_NAME", "customer-update-writer-agent")
ORDER_INTAKE_WORKFLOW_NAME = os.getenv(
    "ORDER_INTAKE_WORKFLOW_NAME",
    agent_workflow_id(ORDER_INTAKE_AGENT_NAME),
)
CARRIER_CAPACITY_WORKFLOW_NAME = os.getenv(
    "CARRIER_CAPACITY_WORKFLOW_NAME",
    agent_workflow_id(CARRIER_CAPACITY_AGENT_NAME),
)
ROUTE_PLANNING_WORKFLOW_NAME = os.getenv(
    "ROUTE_PLANNING_WORKFLOW_NAME",
    agent_workflow_id(ROUTE_PLANNING_AGENT_NAME),
)
RISK_COMPLIANCE_WORKFLOW_NAME = os.getenv(
    "RISK_COMPLIANCE_WORKFLOW_NAME",
    agent_workflow_id(RISK_COMPLIANCE_AGENT_NAME),
)
CUSTOMER_UPDATE_WRITER_WORKFLOW_NAME = os.getenv(
    "CUSTOMER_UPDATE_WRITER_WORKFLOW_NAME",
    agent_workflow_id(CUSTOMER_UPDATE_WRITER_AGENT_NAME),
)
PROGRESS_PUBSUB_NAME = os.getenv("PROGRESS_PUBSUB_NAME", "pubsubstore")
PROGRESS_TOPIC_NAME = os.getenv("PROGRESS_TOPIC_NAME", "shipment-progress")
SHIPMENT_STATE_STORE_NAME = os.getenv("SHIPMENT_STATE_STORE_NAME", "statestore")
CHAT_COMPONENT = os.getenv("DAPR_CHAT_COMPONENT_NAME", "llama")
CHAT_READINESS_CACHE_SECONDS = int(os.getenv("CHAT_READINESS_CACHE_SECONDS", "20"))
CHAT_READINESS_TIMEOUT_SECONDS = int(os.getenv("CHAT_READINESS_TIMEOUT_SECONDS", "8"))
CHAT_READINESS_FAILURE_CACHE_SECONDS = int(os.getenv("CHAT_READINESS_FAILURE_CACHE_SECONDS", "5"))
CHAT_READINESS_MAX_ATTEMPTS = int(os.getenv("CHAT_READINESS_MAX_ATTEMPTS", "1"))
READINESS_APP_IDS = [
    "orchestrator",
    ORDER_INTAKE_APP_ID,
    CARRIER_CAPACITY_APP_ID,
    ROUTE_PLANNING_APP_ID,
    RISK_COMPLIANCE_APP_ID,
    CUSTOMER_UPDATE_WRITER_APP_ID,
]
# contracts.py
from __future__ import annotations
 
from dataclasses import dataclass
 
@dataclass(frozen=True)
class AgentCallResult:
    content: str
    source: str
    attempts: int
    fallback_reason: str = ""
    fallback_detail: str = ""
# dapr_helper
from __future__ import annotations
 
import os
 
def dapr_http_base_url() -> str | None:
    dapr_http_endpoint = os.getenv("DAPR_HTTP_ENDPOINT")
    dapr_http_port = os.getenv("DAPR_HTTP_PORT")
    if dapr_http_endpoint:
        return dapr_http_endpoint.rstrip("/")
    if dapr_http_port:
        return f"http://localhost:{dapr_http_port}"
    return None
 
def dapr_json_headers() -> dict[str, str]:
    headers = {"Content-Type": "application/json"}
    dapr_api_token = os.getenv("DAPR_API_TOKEN")
    if dapr_api_token:
        headers["dapr-api-token"] = dapr_api_token
    return headers
# progress.py
from __future__ import annotations
 
from datetime import datetime, timezone
import json
import urllib.error
import urllib.request
from typing import Any
 
from dapr.clients import DaprClient
 
from config import PROGRESS_PUBSUB_NAME, PROGRESS_TOPIC_NAME, logger
from dapr_helpers import dapr_http_base_url, dapr_json_headers
from workflow_runtime import runtime
 
def _workflow_progress_event(
    task_id: str,
    workflow_instance_id: str,
    agent: str,
    status: str,
    message: str,
    progress: int,
    source: str = "system",
    fallback_reason: str = "",
    attempts: int = 0,
    fallback_detail: str = "",
) -> dict[str, Any]:
    event = {
        "taskId": task_id,
        "workflowInstanceId": workflow_instance_id,
        "agent": agent,
        "status": status,
        "message": message,
        "progress": progress,
        "source": source,
    }
    if fallback_reason:
        event["fallbackReason"] = fallback_reason
    if attempts:
        event["attempts"] = attempts
    if fallback_detail:
        event["fallbackDetail"] = fallback_detail
    return event
 
@runtime.activity(name="publish_shipment_progress")
def publish_shipment_progress(_ctx: Any, event: dict[str, Any]) -> bool:
    event = dict(event)
    event["timestamp"] = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
 
    dapr_base_url = dapr_http_base_url()
    if dapr_base_url:
        url = f"{dapr_base_url}/v1.0/publish/{PROGRESS_PUBSUB_NAME}/{PROGRESS_TOPIC_NAME}"
        request = urllib.request.Request(
            url,
            data=json.dumps(event).encode("utf-8"),
            headers=dapr_json_headers(),
            method="POST",
        )
 
        try:
            with urllib.request.urlopen(request, timeout=3) as response:
                if response.status >= 300:
                    logger.warning("Dapr publish returned HTTP %s for progress event.", response.status)
                    return False
            return True
        except (urllib.error.URLError, TimeoutError) as exc:
            logger.warning("Failed to publish shipment progress event via HTTP: %s", exc)
            return False
 
    try:
        with DaprClient() as client:
            client.publish_event(
                pubsub_name=PROGRESS_PUBSUB_NAME,
                topic_name=PROGRESS_TOPIC_NAME,
                data=json.dumps(event),
                data_content_type="application/json",
            )
        return True
    except Exception as exc:
        logger.warning("Failed to publish shipment progress event via Dapr gRPC: %s", exc)
        return False
# readiness.py
from __future__ import annotations
 
import json
import threading
import time
import urllib.error
import urllib.request
from typing import Any
 
from dapr_agents.llm.dapr import DaprChatClient
 
from config import (
    CHAT_COMPONENT,
    CHAT_READINESS_CACHE_SECONDS,
    CHAT_READINESS_FAILURE_CACHE_SECONDS,
    CHAT_READINESS_MAX_ATTEMPTS,
    CHAT_READINESS_TIMEOUT_SECONDS,
)
from dapr_helpers import dapr_http_base_url, dapr_json_headers
 
_chat_readiness_cache_lock = threading.Lock()
_chat_readiness_cache: dict[str, Any] = {
    "checked_at": 0.0,
    "result": None,
}
 
def _readiness_entry(name: str, ready: bool, detail: str) -> dict[str, Any]:
    return {"name": name, "ready": ready, "detail": detail}
 
def _check_agent_readiness(app_id: str) -> dict[str, Any]:
    dapr_base_url = dapr_http_base_url()
    if not dapr_base_url:
        return _readiness_entry(
            app_id,
            False,
            "DAPR_HTTP_ENDPOINT or DAPR_HTTP_PORT is not configured.",
        )
 
    url = f"{dapr_base_url}/v1.0/invoke/{app_id}/method/agent/instances/__readiness_probe__"
    request = urllib.request.Request(url, headers=dapr_json_headers(), method="GET")
 
    try:
        with urllib.request.urlopen(request, timeout=3) as response:
            if response.status < 300 or response.status == 404:
                return _readiness_entry(app_id, True, f"HTTP {response.status}")
            body = response.read().decode("utf-8", errors="ignore")
            return _readiness_entry(app_id, False, f"HTTP {response.status}: {body}")
    except urllib.error.HTTPError as exc:
        if exc.code == 404:
            return _readiness_entry(app_id, True, "HTTP 404")
        body = exc.read().decode("utf-8", errors="ignore") if exc.fp else ""
        return _readiness_entry(app_id, False, f"HTTP {exc.code}: {body}".strip())
    except (urllib.error.URLError, TimeoutError) as exc:
        return _readiness_entry(app_id, False, str(exc))
 
def _check_workflow_readiness(workflow_client: Any) -> dict[str, Any]:
    try:
        state = workflow_client.get_workflow_state(
            "__readiness_probe__",
            fetch_payloads=False,
        )
 
        detail = "workflow state API responded"
        if state is not None:
            detail = "workflow state API responded with probe state"
        return _readiness_entry("workflow", True, detail)
    except Exception as exc:
        return _readiness_entry("workflow", False, f"workflow state API failed: {exc}")
 
def _check_chat_component_readiness() -> dict[str, Any]:
    cached = _get_cached_chat_readiness()
    if cached is not None:
        return cached
 
    return _refresh_chat_component_readiness()
 
def _get_cached_chat_readiness() -> dict[str, Any] | None:
    with _chat_readiness_cache_lock:
        checked_at = _chat_readiness_cache["checked_at"]
        result = _chat_readiness_cache["result"]
 
    if result is None:
        return None
 
    cache_seconds = CHAT_READINESS_CACHE_SECONDS if result.get("ready") else CHAT_READINESS_FAILURE_CACHE_SECONDS
    if time.monotonic() - checked_at > cache_seconds:
        return None
 
    return dict(result)
 
def _refresh_chat_component_readiness() -> dict[str, Any]:
    registration = _check_chat_component_registration()
    if not registration["ready"]:
        _store_chat_readiness(registration)
        return registration
 
    last_failure = registration
    for attempt in range(1, CHAT_READINESS_MAX_ATTEMPTS + 1):
        probe = _probe_chat_component()
        if probe["ready"]:
            _store_chat_readiness(probe)
            return probe
 
        last_failure = _readiness_entry(
            CHAT_COMPONENT,
            False,
            f"{probe['detail']} (attempt {attempt}/{CHAT_READINESS_MAX_ATTEMPTS})",
        )
 
        if attempt < CHAT_READINESS_MAX_ATTEMPTS:
            time.sleep(min(2, attempt))
 
    _store_chat_readiness(last_failure)
    return last_failure
 
def _store_chat_readiness(result: dict[str, Any]) -> None:
    with _chat_readiness_cache_lock:
        _chat_readiness_cache["checked_at"] = time.monotonic()
        _chat_readiness_cache["result"] = dict(result)
 
def _check_chat_component_registration() -> dict[str, Any]:
    dapr_base_url = dapr_http_base_url()
    if not dapr_base_url:
        return _readiness_entry(
            CHAT_COMPONENT,
            False,
            "DAPR_HTTP_ENDPOINT or DAPR_HTTP_PORT is not configured.",
        )
 
    url = f"{dapr_base_url}/v1.0/metadata"
    request = urllib.request.Request(url, headers=dapr_json_headers(), method="GET")
 
    try:
        with urllib.request.urlopen(request, timeout=3) as response:
            if response.status >= 300:
                return _readiness_entry(CHAT_COMPONENT, False, f"HTTP {response.status}")
            payload = json.loads(response.read().decode("utf-8"))
            components = payload.get("components") if isinstance(payload, dict) else None
            if not isinstance(components, list):
                return _readiness_entry(CHAT_COMPONENT, False, "Dapr metadata did not include components.")
            for component in components:
                if isinstance(component, dict) and component.get("name") == CHAT_COMPONENT:
                    component_type = component.get("type") or "unknown"
                    return _readiness_entry(CHAT_COMPONENT, True, f"component {component_type}")
            return _readiness_entry(CHAT_COMPONENT, False, "Chat component not registered with Dapr.")
    except urllib.error.HTTPError as exc:
        body = exc.read().decode("utf-8", errors="ignore") if exc.fp else ""
        return _readiness_entry(CHAT_COMPONENT, False, f"HTTP {exc.code}: {body}".strip())
    except (urllib.error.URLError, TimeoutError, json.JSONDecodeError) as exc:
        return _readiness_entry(CHAT_COMPONENT, False, str(exc))
 
def _probe_chat_component() -> dict[str, Any]:
    try:
        response = DaprChatClient(
            component_name=CHAT_COMPONENT,
            timeout=CHAT_READINESS_TIMEOUT_SECONDS,
        ).generate(
            messages=[{"role": "user", "content": "Reply with the single word ok."}],
            temperature=0,
        )
        message = response.get_message() if hasattr(response, "get_message") else None
        content = message.content.strip() if message and isinstance(message.content, str) else ""
        if content:
            return _readiness_entry(CHAT_COMPONENT, True, "Live chat probe succeeded.")
        return _readiness_entry(CHAT_COMPONENT, False, "Live chat probe returned an empty response.")
    except Exception as exc:
        return _readiness_entry(CHAT_COMPONENT, False, f"Live chat probe failed: {exc}")
# state_store.py
from __future__ import annotations
 
from datetime import datetime, timezone
import json
import urllib.error
import urllib.request
from typing import Any
 
from config import SHIPMENT_STATE_STORE_NAME, logger
from dapr_helpers import dapr_http_base_url, dapr_json_headers
from workflow_runtime import runtime
 
def _shipment_execution_state_key(task_id: str) -> str:
    return f"shipment-execution-plan-{task_id}"
 
def _shipment_execution_state_record(payload: dict[str, Any]) -> dict[str, Any]:
    saved_at = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
    return {
        "taskId": payload["task_id"],
        "workflowInstanceId": payload["workflow_instance_id"],
        "status": "completed",
        "savedAt": saved_at,
        "scenario": payload["shipment_scenario"],
        "shipmentRequest": payload["shipment_request"],
        "carrierOptions": payload["carrier_options"],
        "routePlan": payload["route_plan"],
        "riskCompliance": payload["risk_compliance"],
        "planningBrief": payload["planning_brief"],
        "executionPlan": payload["execution_plan"],
        "sources": payload["sources"],
    }
 
@runtime.activity(name="save_shipment_execution_state")
def save_shipment_execution_state(_ctx: Any, payload: dict[str, Any]) -> bool:
    dapr_base_url = dapr_http_base_url()
    if not dapr_base_url:
        logger.warning("DAPR_HTTP_ENDPOINT or DAPR_HTTP_PORT is not configured; shipment execution state was not saved.")
        return False
 
    task_id = str(payload.get("task_id") or "").strip()
    if not task_id:
        logger.warning("Shipment execution state was not saved because task_id was empty.")
        return False
 
    state_entry = {
        "key": _shipment_execution_state_key(task_id),
        "value": _shipment_execution_state_record(payload),
    }
    url = f"{dapr_base_url}/v1.0/state/{SHIPMENT_STATE_STORE_NAME}"
    request = urllib.request.Request(
        url,
        data=json.dumps([state_entry]).encode("utf-8"),
        headers=dapr_json_headers(),
        method="POST",
    )
 
    try:
        with urllib.request.urlopen(request, timeout=5) as response:
            if response.status >= 300:
                logger.warning("Dapr state save returned HTTP %s for shipment execution state.", response.status)
                return False
        return True
    except (urllib.error.URLError, TimeoutError) as exc:
        logger.warning("Failed to save shipment execution state: %s", exc)
        return False
# workflow_helpers.py
from __future__ import annotations
 
from datetime import timedelta
import json
import re
from typing import Any, Callable
 
import dapr.ext.workflow as wf
from dapr.ext.workflow import DaprWorkflowContext
 
from config import (
    CHILD_WORKFLOW_MAX_ATTEMPTS,
    CHILD_WORKFLOW_RETRY_BACKOFF_COEFFICIENT,
    CHILD_WORKFLOW_RETRY_FIRST_INTERVAL_SECONDS,
    CHILD_WORKFLOW_TIMEOUT_SECONDS,
    logger,
)
from contracts import AgentCallResult
 
def preview(value: Any, limit: int = 800) -> str:
    text = value if isinstance(value, str) else str(value)
    text = text.replace("\n", "\\n")
    return text if len(text) <= limit else f"{text[:limit]}...<truncated {len(text) - limit} chars>"
 
def log_stage(stage: str, **values: Any) -> None:
    details = " ".join(f"{key}={preview(value)}" for key, value in values.items())
    logger.debug("[Orchestrator] %s %s", stage, details)
 
def _extract_agent_content(agent_result: Any, stage: str) -> str:
    if isinstance(agent_result, dict):
        content = agent_result.get("content")
        if _is_usable_agent_content(content):
            return content
    if _is_usable_agent_content(agent_result):
        return agent_result
    raise ValueError(f"{stage} did not return a usable content payload.")
 
def _is_usable_agent_content(value: Any) -> bool:
    return isinstance(value, str) and value.strip().lower() not in {"", "none", "null"}
 
def _is_partial_tool_call(value: str) -> bool:
    normalized = value.strip()
    if normalized.startswith("<|python_tag|>"):
        normalized = normalized[len("<|python_tag|>") :].strip()
    return '"name"' in normalized and '"parameters"' in normalized
 
def _is_agent_error_content(value: str) -> bool:
    normalized = value.strip().lower()
    return (
        normalized.startswith("error generating response due to previous error")
        or normalized.startswith("error executing tool")
        or '"iserror": true' in normalized
        or "please check and resubmit the logistics planning brief" in normalized
        or normalized.startswith("error in ")
        or "validation error occurred" in normalized
    )
 
def _is_unusable_agent_output(value: str) -> bool:
    return _is_partial_tool_call(value) or _is_agent_error_content(value)
 
def _fallback_reason_description(reason: str) -> str:
    descriptions = {
        "timeout": "the child workflow did not finish within the configured timeout",
        "llm_tool_call_final": "the agent returned a tool call as the final answer instead of final output",
        "tool_error": "the expected tool returned an error payload",
        "empty_output": "the agent returned empty output",
        "invalid_output": "the agent returned a malformed or unusable payload",
        "workflow_error": "the child workflow failed while reading the result",
        "retry_exhausted": "all retry attempts were exhausted",
    }
    return descriptions.get(reason, "usable agent output was not returned")
 
def _classify_unusable_agent_output(value: str) -> str:
    if _is_partial_tool_call(value):
        return "llm_tool_call_final"
    if _is_agent_error_content(value):
        return "tool_error"
    if not _is_usable_agent_content(value):
        return "empty_output"
    return "invalid_output"
 
def _classify_agent_result_issue(agent_result: Any) -> str:
    if isinstance(agent_result, dict):
        if agent_result.get("isError") is True:
            return "tool_error"
        try:
            serialized_result = json.dumps(agent_result)
            if _is_agent_error_content(serialized_result):
                return "tool_error"
        except TypeError:
            pass
        content = agent_result.get("content")
        if not _is_usable_agent_content(content):
            return "empty_output" if content is None or str(content).strip().lower() in {"", "none", "null"} else "invalid_output"
        return _classify_unusable_agent_output(content)
    if not _is_usable_agent_content(agent_result):
        return "empty_output" if agent_result is None or str(agent_result).strip().lower() in {"", "none", "null"} else "invalid_output"
    return _classify_unusable_agent_output(str(agent_result))
 
def _child_workflow_retry_policy() -> wf.RetryPolicy | None:
    if CHILD_WORKFLOW_MAX_ATTEMPTS <= 1:
        return None
    return wf.RetryPolicy(
        first_retry_interval=timedelta(seconds=CHILD_WORKFLOW_RETRY_FIRST_INTERVAL_SECONDS),
        max_number_of_attempts=CHILD_WORKFLOW_MAX_ATTEMPTS,
        backoff_coefficient=CHILD_WORKFLOW_RETRY_BACKOFF_COEFFICIENT,
    )
 
def _child_workflow_attempt_instance_id(parent_instance_id: str, stage: str, attempt: int) -> str | None:
    if attempt == 1:
        return None
    normalized_stage = "".join(character.lower() if character.isalnum() else "-" for character in stage).strip("-")
    return f"{parent_instance_id}:{normalized_stage}:retry-{attempt}"
 
def _task_result(task: Any) -> Any:
    get_result = getattr(task, "get_result", None)
    if callable(get_result):
        return get_result()
    return task.result
 
def _call_child_workflow_or_fallback(
    ctx: DaprWorkflowContext,
    *,
    workflow: str,
    input: dict[str, Any],
    app_id: str,
    stage: str,
    fallback: str,
    timeout_seconds: int = CHILD_WORKFLOW_TIMEOUT_SECONDS,
    validate_content: Callable[[str], str | None] | None = None,
) -> AgentCallResult:
    max_attempts = max(1, CHILD_WORKFLOW_MAX_ATTEMPTS)
    retry_policy = _child_workflow_retry_policy()
    last_reason = "retry_exhausted"
    last_detail = ""
 
    for attempt in range(1, max_attempts + 1):
        agent_result: Any = None
        child_task = ctx.call_child_workflow(
            workflow=workflow,
            input=input,
            instance_id=_child_workflow_attempt_instance_id(ctx.instance_id, stage, attempt),
            retry_policy=retry_policy,
            app_id=app_id,
        )
        timeout_task = ctx.create_timer(timedelta(seconds=timeout_seconds))
        winner = yield wf.when_any([child_task, timeout_task])
 
        if winner is timeout_task:
            last_reason = "timeout"
            last_detail = f"{stage} attempt {attempt}/{max_attempts} timed out after {timeout_seconds}s."
            logger.warning(
                "%s attempt %s/%s timed out after %ss.",
                stage,
                attempt,
                max_attempts,
                timeout_seconds,
            )
            continue
 
        try:
            agent_result = _task_result(child_task)
            content = _extract_agent_content(agent_result, stage)
            if _is_unusable_agent_output(content):
                last_reason = _classify_unusable_agent_output(content)
                last_detail = f"{stage} attempt {attempt}/{max_attempts} returned {_fallback_reason_description(last_reason)}."
                logger.warning(
                    "%s attempt %s/%s returned unusable content. reason=%s",
                    stage,
                    attempt,
                    max_attempts,
                    last_reason,
                )
                continue
            if validate_content is not None:
                validation_error = validate_content(content)
                if validation_error is not None:
                    last_reason = "invalid_output"
                    last_detail = f"{stage} attempt {attempt}/{max_attempts} returned invalid output. {validation_error}"
                    logger.warning(
                        "%s attempt %s/%s returned invalid content. reason=%s detail=%s",
                        stage,
                        attempt,
                        max_attempts,
                        last_reason,
                        validation_error,
                    )
                    continue
            logger.info("%s completed with LLM output on attempt %s/%s.", stage, attempt, max_attempts)
            return AgentCallResult(content=content, source="llm", attempts=attempt)
        except Exception as exc:
            last_reason = _classify_agent_result_issue(agent_result) if agent_result is not None else "workflow_error"
            last_detail = str(exc)
            logger.warning(
                "%s attempt %s/%s returned an unusable payload. reason=%s Error: %s",
                stage,
                attempt,
                max_attempts,
                last_reason,
                exc,
            )
 
    logger.warning(
        "%s exhausted %s attempt(s); using deterministic fallback. reason=%s detail=%s",
        stage,
        max_attempts,
        last_reason,
        last_detail,
    )
    return AgentCallResult(
        content=fallback,
        source="fallback",
        attempts=max_attempts,
        fallback_reason=last_reason,
        fallback_detail=last_detail,
    )
 
def _completion_message(message: str, result: AgentCallResult | str) -> str:
    source = result.source if isinstance(result, AgentCallResult) else result
    if source == "fallback":
        reason = result.fallback_reason if isinstance(result, AgentCallResult) else ""
        attempts = result.attempts if isinstance(result, AgentCallResult) else CHILD_WORKFLOW_MAX_ATTEMPTS
        reason_text = _fallback_reason_description(reason)
        return f"{message} Deterministic rules were used after {attempts} attempt(s) because {reason_text}."
    return f"{message} LLM response was used."
 
def _compact_payload(value: str, limit: int = 700) -> str:
    if _is_unusable_agent_output(value):
        return "Requires manual confirmation from the completed shipment scenario before dispatch."
 
    normalized = " ".join(value.replace("```json", "").replace("```", "").split())
    return normalized if len(normalized) <= limit else f"{normalized[:limit].rstrip()}..."
 
def _fallback_execution_plan(
    shipment_scenario: str,
    shipment_request: str,
    carrier_options: str,
    route_plan: str,
    risk_compliance: str,
) -> str:
    return (
        "# Shipment Execution Plan\n\n"
        "## Automation Source\n\n"
        "Deterministic fallback was used because the final LLM response was unavailable or unusable.\n\n"
        "## Shipment Scenario\n\n"
        f"{shipment_scenario}\n\n"
        "## Normalized Shipment Request\n\n"
        f"{_compact_payload(shipment_request)}\n\n"
        "## Carrier And Capacity\n\n"
        f"{_compact_payload(carrier_options)}\n\n"
        "## Route And Schedule\n\n"
        f"{_compact_payload(route_plan)}\n\n"
        "## Risk And Compliance\n\n"
        f"{_compact_payload(risk_compliance)}\n\n"
        "## Dispatch Controls\n\n"
        "Dispatch should proceed only after required documents, carrier acceptance, pickup "
        "appointment, and risk mitigations are confirmed.\n\n"
        "## Execution Checklist\n\n"
        "- Validate normalized shipment request.\n"
        "- Confirm carrier capacity and service level.\n"
        "- Confirm route milestones and delivery window.\n"
        "- Clear documentation and compliance blockers.\n"
        "- Monitor risk triggers and exception thresholds.\n"
        "- Send proactive customer updates at pickup, hub departure, out-for-delivery, and delivery."
    )
 
def _normalize_for_presence_check(value: str) -> str:
    return " ".join(value.lower().split())
 
def _scenario_key_terms(value: str) -> set[str]:
    ignored_terms = {
        "move",
        "ship",
        "shipment",
        "from",
        "with",
        "using",
        "service",
        "next",
        "week",
        "required",
    }
    return {
        term
        for term in re.findall(r"[a-z0-9]+", value.lower())
        if len(term) >= 4 and term not in ignored_terms
    }
 
def _contains_current_scenario_terms(content: str, shipment_scenario: str) -> bool:
    scenario_terms = _scenario_key_terms(shipment_scenario)
    if not scenario_terms:
        return True
 
    content_terms = _scenario_key_terms(content)
    matched_terms = scenario_terms.intersection(content_terms)
    required_matches = min(2, max(1, len(scenario_terms) // 3))
    return len(matched_terms) >= required_matches
 
def _has_dangling_json_fragment(content: str) -> bool:
    in_fenced_block = False
    open_brace_line = False
 
    for raw_line in content.splitlines():
        line = raw_line.strip()
        if line.startswith("```"):
            in_fenced_block = not in_fenced_block
            continue
 
        if in_fenced_block:
            continue
 
        if open_brace_line and line.startswith("##"):
            return True
 
        if line == "{":
            open_brace_line = True
            continue
 
        if open_brace_line and line == "}":
            open_brace_line = False
 
    return open_brace_line
 
def _validate_final_execution_plan(content: str, shipment_scenario: str) -> str | None:
    normalized_content = _normalize_for_presence_check(content)
 
    if "shipment execution plan" not in normalized_content:
        return "The final plan did not include the expected title."
 
    if not _contains_current_scenario_terms(content, shipment_scenario):
        return "The final plan did not include the current shipment scenario."
 
    if _has_dangling_json_fragment(content):
        return "The final plan contained a dangling JSON fragment."
 
    return None
 
def _get_workflow_input(workflow_input: Any) -> tuple[str, str, dict[str, str]]:
    if isinstance(workflow_input, dict):
        task = workflow_input.get("task")
        task_id = workflow_input.get("task_id")
        trace_context = workflow_input.get("trace_context")
        if isinstance(task, str) and task.strip():
            normalized_task_id = task_id.strip() if isinstance(task_id, str) and task_id.strip() else ""
            return task.strip(), normalized_task_id, trace_context if isinstance(trace_context, dict) else {}
    if isinstance(workflow_input, str) and workflow_input.strip():
        return workflow_input.strip(), "", {}
    raise ValueError("Workflow input did not include a usable task.")
 
def _build_planning_brief(
    shipment_scenario: str,
    shipment_request: str,
    carrier_options: str,
    route_plan: str,
    risk_compliance: str,
    source_notes: dict[str, str] | None = None,
) -> str:
    source_notes = source_notes or {}
    automation_sources = "\n".join(
        f"- {stage}: {source_notes.get(stage, 'llm')}"
        for stage in ("order_intake", "carrier_capacity", "route_planning", "risk_compliance")
    )
    return (
        "## Logistics Planning Brief\n\n"
        f"### Original Scenario\n{shipment_scenario}\n\n"
        f"### Automation Sources\n{automation_sources}\n\n"
        f"### Normalized Shipment Request\n```json\n{shipment_request}\n```\n\n"
        f"### Feasible Carrier Options\n```json\n{carrier_options}\n```\n\n"
        f"### Recommended Route And Schedule\n```json\n{route_plan}\n```\n\n"
        f"### Risk And Compliance Assessment\n```json\n{risk_compliance}\n```\n\n"
        "### Planning Decision\n"
        "Proceed with the top-ranked feasible carrier when documents and pickup readiness are confirmed. "
        "Escalate to the alternate route if risk mitigation cannot be completed before dispatch."
    )
# workflow_runtime.py
from __future__ import annotations
 
import logging
import sys
 
import dapr.ext.workflow as wf
from dapr.ext.workflow.logger import LoggerOptions
 
class DurableTaskNoiseFilter(logging.Filter):
    _ignored_messages = (
        "Ignoring unexpected taskCompleted event with ID",
        "Ignoring unexpected timerFired event with ID",
    )
 
    def filter(self, record: logging.LogRecord) -> bool:
        if record.name != "durabletask-worker":
            return True
 
        message = record.getMessage()
        return not any(ignored in message for ignored in self._ignored_messages)
 
def configure_workflow_loggers() -> None:
    for logger_name in ("WorkflowRuntime", "durabletask-worker"):
        logging.getLogger(logger_name).propagate = False
 
def create_workflow_logger_options() -> LoggerOptions:
    configure_workflow_loggers()
    handler = logging.StreamHandler(sys.stdout)
    handler.addFilter(DurableTaskNoiseFilter())
    return LoggerOptions(log_handler=handler)
 
runtime = wf.WorkflowRuntime(logger_options=create_workflow_logger_options())
# workflow.py
from __future__ import annotations
 
from typing import Any
 
from dapr.ext.workflow import DaprWorkflowContext
from opentelemetry.trace import Status, StatusCode
 
from config import (
    CARRIER_CAPACITY_APP_ID,
    CARRIER_CAPACITY_WORKFLOW_NAME,
    CUSTOMER_UPDATE_WRITER_APP_ID,
    CUSTOMER_UPDATE_WRITER_WORKFLOW_NAME,
    ORDER_INTAKE_APP_ID,
    ORDER_INTAKE_WORKFLOW_NAME,
    RISK_COMPLIANCE_APP_ID,
    RISK_COMPLIANCE_WORKFLOW_NAME,
    ROUTE_PLANNING_APP_ID,
    ROUTE_PLANNING_WORKFLOW_NAME,
)
from shared.observability import record_workflow_event
from progress import _workflow_progress_event, publish_shipment_progress
from state_store import save_shipment_execution_state
from workflow_helpers import (
    _build_planning_brief,
    _call_child_workflow_or_fallback,
    _completion_message,
    _fallback_execution_plan,
    _get_workflow_input,
    _validate_final_execution_plan,
)
from workflow_runtime import runtime
 
@runtime.workflow(name="shipment_execution_workflow")
def shipment_execution_workflow(ctx: DaprWorkflowContext, workflow_input: Any) -> str:
    """Run the five logistics agents as child workflows."""
    shipment_scenario, task_id, trace_context = _get_workflow_input(workflow_input)
    correlation_id = task_id or ctx.instance_id
    yield ctx.call_activity(
        publish_shipment_progress,
        input=_workflow_progress_event(
            correlation_id,
            ctx.instance_id,
            "workflow",
            "started",
            "Shipment execution workflow started.",
            4,
        ),
    )
    record_workflow_event(
        "orchestrator.shipment_execution_workflow.started",
        trace_context,
        workflow_id=ctx.instance_id,
    )
    try:
        yield ctx.call_activity(
            publish_shipment_progress,
            input=_workflow_progress_event(
                correlation_id,
                ctx.instance_id,
                "order-intake",
                "started",
                "Order intake is normalizing the shipment request.",
                10,
            ),
        )
        record_workflow_event(
            "orchestrator.call_order_intake.scheduled",
            trace_context,
            workflow_id=ctx.instance_id,
            target_app_id=ORDER_INTAKE_APP_ID,
        )
        shipment_request_result = yield from _call_child_workflow_or_fallback(
            ctx,
            workflow=ORDER_INTAKE_WORKFLOW_NAME,
            input={"task": shipment_scenario, "_otel_span_context": trace_context},
            app_id=ORDER_INTAKE_APP_ID,
            stage="Order intake",
            fallback=(
                "Use the original shipment scenario as the canonical request. "
                f"Scenario: {shipment_scenario}"
            ),
        )
        shipment_request = shipment_request_result.content
        yield ctx.call_activity(
            publish_shipment_progress,
            input=_workflow_progress_event(
                correlation_id,
                ctx.instance_id,
                "order-intake",
                "completed",
                _completion_message("Order intake produced the canonical shipment request.", shipment_request_result),
                20,
                shipment_request_result.source,
                shipment_request_result.fallback_reason,
                shipment_request_result.attempts,
                shipment_request_result.fallback_detail,
            ),
        )
        record_workflow_event(
            "orchestrator.call_order_intake.completed",
            trace_context,
            workflow_id=ctx.instance_id,
            target_app_id=ORDER_INTAKE_APP_ID,
        )
 
        carrier_task = f"shipment_request:\n{shipment_request}"
        yield ctx.call_activity(
            publish_shipment_progress,
            input=_workflow_progress_event(
                correlation_id,
                ctx.instance_id,
                "carrier-capacity",
                "started",
                "Carrier capacity is ranking feasible options.",
                28,
            ),
        )
        record_workflow_event(
            "orchestrator.call_carrier_capacity.scheduled",
            trace_context,
            workflow_id=ctx.instance_id,
            target_app_id=CARRIER_CAPACITY_APP_ID,
        )
        carrier_options_result = yield from _call_child_workflow_or_fallback(
            ctx,
            workflow=CARRIER_CAPACITY_WORKFLOW_NAME,
            input={"task": carrier_task, "_otel_span_context": trace_context},
            app_id=CARRIER_CAPACITY_APP_ID,
            stage="Carrier capacity",
            fallback=(
                "Carrier capacity requires manual confirmation. Confirm full truckload "
                "availability, equipment fit, pickup appointment, service level, and carrier acceptance."
            ),
        )
        carrier_options = carrier_options_result.content
        yield ctx.call_activity(
            publish_shipment_progress,
            input=_workflow_progress_event(
                correlation_id,
                ctx.instance_id,
                "carrier-capacity",
                "completed",
                _completion_message("Carrier capacity returned ranked options.", carrier_options_result),
                40,
                carrier_options_result.source,
                carrier_options_result.fallback_reason,
                carrier_options_result.attempts,
                carrier_options_result.fallback_detail,
            ),
        )
        record_workflow_event(
            "orchestrator.call_carrier_capacity.completed",
            trace_context,
            workflow_id=ctx.instance_id,
            target_app_id=CARRIER_CAPACITY_APP_ID,
        )
 
        route_task = (
            f"shipment_request:\n{shipment_request}\n\n"
            f"carrier_options:\n{carrier_options}"
        )
        yield ctx.call_activity(
            publish_shipment_progress,
            input=_workflow_progress_event(
                correlation_id,
                ctx.instance_id,
                "route-planning",
                "started",
                "Route planning is building the movement plan.",
                48,
            ),
        )
        record_workflow_event(
            "orchestrator.call_route_planning.scheduled",
            trace_context,
            workflow_id=ctx.instance_id,
            target_app_id=ROUTE_PLANNING_APP_ID,
        )
        route_plan_result = yield from _call_child_workflow_or_fallback(
            ctx,
            workflow=ROUTE_PLANNING_WORKFLOW_NAME,
            input={"task": route_task, "_otel_span_context": trace_context},
            app_id=ROUTE_PLANNING_APP_ID,
            stage="Route planning",
            fallback=(
                "Route planning requires manual confirmation. Confirm pickup, linehaul, "
                "delivery appointment, ETA, route milestones, and fallback routing before dispatch."
            ),
        )
        route_plan = route_plan_result.content
        yield ctx.call_activity(
            publish_shipment_progress,
            input=_workflow_progress_event(
                correlation_id,
                ctx.instance_id,
                "route-planning",
                "completed",
                _completion_message("Route planning returned milestones and ETA.", route_plan_result),
                60,
                route_plan_result.source,
                route_plan_result.fallback_reason,
                route_plan_result.attempts,
                route_plan_result.fallback_detail,
            ),
        )
        record_workflow_event(
            "orchestrator.call_route_planning.completed",
            trace_context,
            workflow_id=ctx.instance_id,
            target_app_id=ROUTE_PLANNING_APP_ID,
        )
 
        risk_task = (
            f"shipment_request:\n{shipment_request}\n\n"
            f"route_plan:\n{route_plan}"
        )
        yield ctx.call_activity(
            publish_shipment_progress,
            input=_workflow_progress_event(
                correlation_id,
                ctx.instance_id,
                "risk-compliance",
                "started",
                "Risk and compliance is checking blockers and mitigations.",
                68,
            ),
        )
        record_workflow_event(
            "orchestrator.call_risk_compliance.scheduled",
            trace_context,
            workflow_id=ctx.instance_id,
            target_app_id=RISK_COMPLIANCE_APP_ID,
        )
        risk_compliance_result = yield from _call_child_workflow_or_fallback(
            ctx,
            workflow=RISK_COMPLIANCE_WORKFLOW_NAME,
            input={"task": risk_task, "_otel_span_context": trace_context},
            app_id=RISK_COMPLIANCE_APP_ID,
            stage="Risk compliance",
            fallback=(
                "Risk and compliance require manual confirmation. Verify bill of lading, "
                "cargo requirements, insurance, restrictions, temperature or handling controls, "
                "and exception triggers before dispatch."
            ),
        )
        risk_compliance = risk_compliance_result.content
        yield ctx.call_activity(
            publish_shipment_progress,
            input=_workflow_progress_event(
                correlation_id,
                ctx.instance_id,
                "risk-compliance",
                "completed",
                _completion_message("Risk and compliance returned documents, restrictions, and mitigations.", risk_compliance_result),
                80,
                risk_compliance_result.source,
                risk_compliance_result.fallback_reason,
                risk_compliance_result.attempts,
                risk_compliance_result.fallback_detail,
            ),
        )
        record_workflow_event(
            "orchestrator.call_risk_compliance.completed",
            trace_context,
            workflow_id=ctx.instance_id,
            target_app_id=RISK_COMPLIANCE_APP_ID,
        )
 
        planning_brief = _build_planning_brief(
            shipment_scenario,
            shipment_request,
            carrier_options,
            route_plan,
            risk_compliance,
            {
                "order_intake": shipment_request_result.source,
                "carrier_capacity": carrier_options_result.source,
                "route_planning": route_plan_result.source,
                "risk_compliance": risk_compliance_result.source,
            },
        )
 
        customer_update_task = (
            "Turn the following logistics planning brief into a shipment execution plan "
            "with a concise customer-facing update.\n\n"
            f"scenario:\n{shipment_scenario}\n\n"
            f"planning_brief:\n{planning_brief}"
        )
        yield ctx.call_activity(
            publish_shipment_progress,
            input=_workflow_progress_event(
                correlation_id,
                ctx.instance_id,
                "customer-update-writer",
                "started",
                "Customer update writer is assembling the execution plan.",
                88,
            ),
        )
        record_workflow_event(
            "orchestrator.call_customer_update_writer.scheduled",
            trace_context,
            workflow_id=ctx.instance_id,
            target_app_id=CUSTOMER_UPDATE_WRITER_APP_ID,
        )
        execution_plan_result = yield from _call_child_workflow_or_fallback(
            ctx,
            workflow=CUSTOMER_UPDATE_WRITER_WORKFLOW_NAME,
            input={"task": customer_update_task, "_otel_span_context": trace_context},
            app_id=CUSTOMER_UPDATE_WRITER_APP_ID,
            stage="Customer update writer",
            fallback=_fallback_execution_plan(
                shipment_scenario,
                shipment_request,
                carrier_options,
                route_plan,
                risk_compliance,
            ),
            validate_content=lambda content: _validate_final_execution_plan(content, shipment_scenario),
        )
        execution_plan = execution_plan_result.content
        yield ctx.call_activity(
            publish_shipment_progress,
            input=_workflow_progress_event(
                correlation_id,
                ctx.instance_id,
                "customer-update-writer",
                "completed",
                _completion_message("Customer update writer returned the final execution plan.", execution_plan_result),
                96,
                execution_plan_result.source,
                execution_plan_result.fallback_reason,
                execution_plan_result.attempts,
                execution_plan_result.fallback_detail,
            ),
        )
        record_workflow_event(
            "orchestrator.call_customer_update_writer.completed",
            trace_context,
            workflow_id=ctx.instance_id,
            target_app_id=CUSTOMER_UPDATE_WRITER_APP_ID,
        )
        yield ctx.call_activity(
            save_shipment_execution_state,
            input={
                "task_id": correlation_id,
                "workflow_instance_id": ctx.instance_id,
                "shipment_scenario": shipment_scenario,
                "shipment_request": shipment_request,
                "carrier_options": carrier_options,
                "route_plan": route_plan,
                "risk_compliance": risk_compliance,
                "planning_brief": planning_brief,
                "execution_plan": execution_plan,
                "sources": {
                    "order_intake": shipment_request_result.source,
                    "carrier_capacity": carrier_options_result.source,
                    "route_planning": route_plan_result.source,
                    "risk_compliance": risk_compliance_result.source,
                    "customer_update_writer": execution_plan_result.source,
                },
            },
        )
        record_workflow_event(
            "orchestrator.shipment_execution_workflow.completed",
            trace_context,
            workflow_id=ctx.instance_id,
        )
        yield ctx.call_activity(
            publish_shipment_progress,
            input=_workflow_progress_event(
                correlation_id,
                ctx.instance_id,
                "workflow",
                "completed",
                "Shipment execution workflow completed.",
                100,
            ),
        )
        return execution_plan
    except Exception as exc:
        yield ctx.call_activity(
            publish_shipment_progress,
            input=_workflow_progress_event(
                correlation_id,
                ctx.instance_id,
                "workflow",
                "failed",
                str(exc),
                100,
            ),
        )
        record_workflow_event(
            "orchestrator.shipment_execution_workflow.failed",
            trace_context,
            workflow_id=ctx.instance_id,
            status=Status(StatusCode.ERROR, str(exc)),
        )
        raise

Order Intake - Agent

It extracts lane details, cargo information, volume, service level, delivery windows, equipment requirements, and handling constraints, enabling downstream agents to operate on structured shipment data rather than raw user input.

# pyproject.toml
[project]
name = "agent-order-intake"
version = "0.1.0"
requires-python = ">=3.11,<3.14"
dependencies = [
    "dapr-agents==1.0.0",
    "dapr==1.17.3",
    "opentelemetry-api==1.41.1",
    "opentelemetry-sdk==1.41.1",
    "opentelemetry-exporter-otlp-proto-grpc==1.41.1",
    "python-dotenv==1.0.0",
    "uvicorn[standard]==0.31.1",
]
# app.py
from __future__ import annotations
 
import json
import re
import sys
from pathlib import Path
from typing import Any
 
sys.path.append(str(Path(__file__).resolve().parents[1]))
 
from pydantic import BaseModel, Field
 
from dapr_agents import DurableAgent
from dapr_agents.agents.configs import AgentExecutionConfig, ToolExecutionMode
from dapr_agents.llm.dapr import DaprChatClient
from dapr_agents.tool import AgentTool
from dapr_agents.workflow.runners import AgentRunner
from shared.observability import (
    reset_tool_trace_context,
    set_tool_trace_context,
    start_span,
)
from shared.agent_config import configure_agent_environment
from shared.agent_host import (
    create_agent_app,
    run_agent_service,
)
from shared.logging_utils import create_stage_logger
from shared.tool_recovery import (
    get_last_tool_content as _get_last_tool_content,
    looks_like_tool_call_content as _looks_like_tool_call_content,
    replace_response_with_tool_output,
    strip_agent_markup as _strip_agent_markup,
    tool_call_content_from_response as _tool_call_content_from_response,
)
from shared.workflow_runtime import create_workflow_runtime
 
runtime_config = configure_agent_environment(
    service_name="order-intake",
    logger_name="order_intake.app",
    default_port=50051,
)
logger = runtime_config.logger
UVICORN_LOG_CONFIG = runtime_config.uvicorn_log_config
AGENT_PORT = runtime_config.agent_port
CHAT_COMPONENT = runtime_config.chat_component
WORKFLOW_TRACE_CONTEXTS: dict[str, dict[str, str]] = {}
log_stage = create_stage_logger(logger, "OrderIntake")
 
class ShipmentScenarioToolArgs(BaseModel):
    scenario: str = Field(..., description="Shipment scenario or customer request to normalize.")
 
class TracedOrderIntakeAgent(DurableAgent):
    def record_initial_entry(self, ctx: Any, payload: dict[str, Any]) -> None:
        trace_context = payload.get("trace_context")
        if isinstance(trace_context, dict):
            WORKFLOW_TRACE_CONTEXTS[ctx.workflow_id] = {
                str(key): str(value) for key, value in trace_context.items()
            }
        token = set_tool_trace_context(WORKFLOW_TRACE_CONTEXTS.get(ctx.workflow_id))
        try:
            with start_span(
                "order_intake.workflow.started",
                workflow_id=ctx.workflow_id,
                source=payload.get("source"),
                triggering_workflow_instance_id=payload.get("triggering_workflow_instance_id"),
            ):
                super().record_initial_entry(ctx, payload)
        finally:
            reset_tool_trace_context(token)
 
    def call_llm(self, ctx: Any, payload: dict[str, Any]) -> dict[str, Any]:
        token = set_tool_trace_context(WORKFLOW_TRACE_CONTEXTS.get(ctx.workflow_id))
        try:
            with start_span(
                "order_intake.llm.call",
                workflow_id=ctx.workflow_id,
                source=payload.get("source"),
            ):
                response = super().call_llm(ctx, payload)
                tool_output = self._get_last_tool_content(payload.get("instance_id"), "NormalizeShipmentRequest")
                if tool_output is None:
                    tool_output = _normalize_from_tool_call_content(response, ctx, payload.get("task"))
                return replace_response_with_tool_output(
                    response,
                    tool_output,
                    "OrderIntake",
                    logger,
                )
        finally:
            reset_tool_trace_context(token)
 
    def run_tool(self, ctx: Any, payload: dict[str, Any]) -> dict[str, Any]:
        token = set_tool_trace_context(WORKFLOW_TRACE_CONTEXTS.get(ctx.workflow_id))
        try:
            tool_call = payload.get("tool_call", {})
            function = tool_call.get("function", {}) if isinstance(tool_call, dict) else {}
            with start_span(
                "order_intake.tool.run",
                workflow_id=ctx.workflow_id,
                tool_name=function.get("name"),
                tool_call_id=tool_call.get("id") if isinstance(tool_call, dict) else None,
            ):
                return super().run_tool(ctx, payload)
        finally:
            reset_tool_trace_context(token)
 
    def finalize_workflow(self, ctx: Any, payload: dict[str, Any]) -> None:
        token = set_tool_trace_context(WORKFLOW_TRACE_CONTEXTS.get(ctx.workflow_id))
        try:
            with start_span(
                "order_intake.workflow.finalized",
                workflow_id=ctx.workflow_id,
                triggering_workflow_instance_id=payload.get("triggering_workflow_instance_id"),
            ):
                super().finalize_workflow(ctx, payload)
        finally:
            reset_tool_trace_context(token)
            WORKFLOW_TRACE_CONTEXTS.pop(ctx.workflow_id, None)
 
    def summarize(self, ctx: Any, payload: dict[str, Any]) -> dict[str, Any]:
        try:
            return super().summarize(ctx, payload)
        except Exception as exc:
            logger.warning("Skipping non-critical memory summary: %s", exc)
            return {}
 
    def _get_last_tool_content(self, instance_id: Any, tool_name: str) -> str | None:
        if not isinstance(instance_id, str) or not instance_id.strip():
            return None
        try:
            entry = self._infra.get_state(instance_id)
        except Exception as exc:
            logger.warning("Could not load OrderIntake state to recover tool output: %s", exc)
            return None
        return _get_last_tool_content(entry, tool_name)
 
def _normalize_from_tool_call_content(
    response: dict[str, Any],
    ctx: Any | None = None,
    fallback_task: Any | None = None,
) -> str | None:
    tool_call_content = _tool_call_content_from_response(response, "NormalizeShipmentRequest")
    if tool_call_content is None:
        return None
 
    parameters = _tool_call_parameters(tool_call_content, "NormalizeShipmentRequest")
    scenario = None
    if parameters is not None:
        scenario = (
            parameters.get("scenario")
            or parameters.get("task")
            or parameters.get("shipment_scenario")
            or parameters.get("request")
        )
    if (
        isinstance(scenario, str)
        and isinstance(fallback_task, str)
        and _looks_like_malformed_structured_scenario(scenario)
    ):
        scenario = fallback_task
    if not isinstance(scenario, str) or not scenario.strip():
        scenario = fallback_task
    if not isinstance(scenario, str) or not scenario.strip():
        return None
 
    return normalize_shipment_request(scenario.strip(), ctx=ctx)
 
def _looks_like_malformed_structured_scenario(scenario: str) -> bool:
    stripped = scenario.strip()
    if not stripped:
        return False
    if stripped[0] not in "{[":
        return False
    try:
        parsed = json.loads(stripped)
    except json.JSONDecodeError:
        return True
    return isinstance(parsed, (dict, list))
 
def _tool_call_parameters(content: Any, tool_name: str) -> dict[str, Any] | None:
    if not _looks_like_tool_call_content(content, tool_name):
        return None
    stripped = _strip_agent_markup(str(content))
    try:
        payload = json.loads(stripped)
    except json.JSONDecodeError:
        payload = _parse_function_style_tool_call(stripped, tool_name)
    if not isinstance(payload, dict) or payload.get("name") != tool_name:
        return None
    parameters = payload.get("parameters") or payload.get("arguments") or payload.get("args")
    function = payload.get("function")
    if isinstance(function, dict):
        parameters = function.get("arguments") or function.get("parameters") or parameters
        if function.get("name") not in {None, tool_name}:
            return None
    if isinstance(parameters, str) and parameters.strip():
        try:
            parsed_parameters = json.loads(_strip_agent_markup(parameters))
        except json.JSONDecodeError:
            return None
        parameters = parsed_parameters
    return parameters if isinstance(parameters, dict) else None
 
def _parse_function_style_tool_call(content: str, tool_name: str) -> dict[str, Any] | None:
    match = re.fullmatch(rf"{re.escape(tool_name)}\s*\((.*)\)", content, flags=re.DOTALL)
    if not match:
        return None
 
    raw_arguments = match.group(1).strip()
    if not raw_arguments:
        return {"name": tool_name, "parameters": {}}
 
    if raw_arguments.startswith("{"):
        try:
            parsed = json.loads(raw_arguments)
        except json.JSONDecodeError:
            return None
        return {"name": tool_name, "parameters": parsed} if isinstance(parsed, dict) else None
 
    keyword_match = re.fullmatch(r"scenario\s*=\s*(.+)", raw_arguments, flags=re.DOTALL)
    if keyword_match:
        try:
            scenario = json.loads(keyword_match.group(1).strip())
        except json.JSONDecodeError:
            scenario = keyword_match.group(1).strip().strip("\"'")
        return {"name": tool_name, "parameters": {"scenario": scenario}}
 
    try:
        scenario = json.loads(raw_arguments)
    except json.JSONDecodeError:
        scenario = raw_arguments.strip("\"'")
    return {"name": tool_name, "parameters": {"scenario": scenario}}
 
 
def _resolve_task_id(task_id: str | None = None, ctx: Any | None = None) -> str:
    workflow_id = getattr(ctx, "workflow_id", None)
    if isinstance(workflow_id, str) and workflow_id.strip():
        return workflow_id.strip()
    if isinstance(task_id, str) and task_id.strip():
        return task_id.strip()
    return "unknown"
 
def normalize_shipment_request(
    scenario: str,
    task_id: str = "unknown",
    ctx: Any | None = None,
    _source_agent: str | None = None,
) -> str:
    """Parse a shipment scenario into a canonical shipment request."""
    task_id = _resolve_task_id(task_id, ctx)
    with start_span("order_intake.tool.normalize_shipment_request", task_id=task_id):
        log_stage("tool input", tool="normalize_shipment_request", task_id=task_id, scenario=scenario)
        request = _build_shipment_request(scenario)
        result = json.dumps(request, indent=2)
        log_stage("tool output", tool="normalize_shipment_request", task_id=task_id, request=request)
        return result
 
def _build_shipment_request(scenario: str) -> dict[str, Any]:
    text = scenario.strip()
    lower = text.lower()
    origin, destination = _extract_lane(text)
    cargo_type = _classify_cargo(lower)
    weight_lbs = _extract_weight_lbs(lower)
    shipment_type = _classify_shipment_type(lower, weight_lbs)
    return {
        "raw_request": text,
        "origin": origin,
        "destination": destination,
        "cargo_type": cargo_type,
        "weight_lbs": weight_lbs,
        "volume": _extract_volume(lower),
        "service_level": "expedited" if any(word in lower for word in ("expedite", "expedited", "urgent", "same day")) else "standard",
        "shipment_type": shipment_type,
        "cross_border": any(word in lower for word in ("customs", "cross-border", "cross border", "canada", "mexico")),
        "delivery_window": _extract_delivery_window(text),
        "required_fields_valid": bool(origin and destination and cargo_type),
    }
 
def _extract_lane(text: str) -> tuple[str, str]:
    lane_match = re.search(
        r"\bfrom\s+(.+?)\s+to\s+(.+?)(?:\s+by\s+|\s+with\s+|\s+using\s+|\s+next\s+week\b|\s+for\s+|$)",
        text,
        flags=re.IGNORECASE,
    )
    if lane_match:
        return lane_match.group(1).strip(" ,."), lane_match.group(2).strip(" ,.")
    return "unspecified origin", "unspecified destination"
 
def _classify_cargo(lower: str) -> str:
    if "hazmat" in lower or "hazardous" in lower:
        return "hazmat"
    if "reefer" in lower or "refrigerated" in lower or "temperature" in lower:
        return "reefer"
    if "return" in lower:
        return "returns"
    if "parcel" in lower:
        return "parcel"
    return "general freight"
 
def _extract_weight_lbs(lower: str) -> int | None:
    match = re.search(r"(\d[\d,]*(?:\.\d+)?)\s*(lb|lbs|pound|pounds|kg|kgs|kilogram|kilograms)", lower)
    if not match:
        return None
    value = float(match.group(1).replace(",", ""))
    unit = match.group(2)
    return round(value * 2.20462) if unit.startswith("kg") or unit.startswith("kilogram") else round(value)
 
def _extract_volume(lower: str) -> str:
    pallets = re.search(r"(\d+)\s+pallet", lower)
    if pallets:
        return f"{pallets.group(1)} pallets"
    cartons = re.search(r"(\d+)\s+(carton|box|boxes)", lower)
    if cartons:
        return f"{cartons.group(1)} {cartons.group(2)}"
    return "unspecified"
 
def _classify_shipment_type(lower: str, weight_lbs: int | None) -> str:
    if "ftl" in lower or "full truckload" in lower or (weight_lbs is not None and weight_lbs >= 15000):
        return "FTL"
    if "parcel" in lower or (weight_lbs is not None and weight_lbs <= 150):
        return "parcel"
    return "LTL"
 
def _extract_delivery_window(text: str) -> str:
    by_match = re.search(
        r"\bby\s+(.+?)(?:\s+using\s+|\s+with\s+|\s+expedited\b|[,.]|$)",
        text,
        flags=re.IGNORECASE,
    )
    if by_match:
        return by_match.group(1).strip()
    next_week_match = re.search(r"\bnext\s+week\b", text, flags=re.IGNORECASE)
    if next_week_match:
        return "next week"
    return "not specified"
 
NORMALIZE_SHIPMENT_REQUEST_TOOL = AgentTool(
    name="NormalizeShipmentRequest",
    description=normalize_shipment_request.__doc__ or "",
    func=normalize_shipment_request,
    args_model=ShipmentScenarioToolArgs,
)
 
def create_order_intake_agent() -> DurableAgent:
    """Build the order intake durable agent with its normalization tool."""
    return TracedOrderIntakeAgent(
        name="order-intake-agent",
        role="Order Intake Specialist",
        goal="Normalize a shipment scenario into a canonical shipment request.",
        instructions=[
            "Use exactly one tool call per assistant turn until the normalized shipment request is complete.",
            "Follow this state machine strictly:",
            "1. If NormalizeShipmentRequest has not returned, call NormalizeShipmentRequest with the requested scenario.",
            "2. After NormalizeShipmentRequest returns, do not call another tool. Return the normalized shipment request exactly as the final answer.",
            "Do not skip steps, combine steps, reorder steps, or emit conversational commentary between tool calls.",
            "Do not include task_id in tool arguments.",
        ],
        tools=[
            NORMALIZE_SHIPMENT_REQUEST_TOOL,
        ],
        llm=DaprChatClient(component_name=CHAT_COMPONENT),
        runtime=create_workflow_runtime(),
        execution=AgentExecutionConfig(
            max_iterations=4,
            tool_choice="auto",
            tool_execution_mode=ToolExecutionMode.SEQUENTIAL,
        ),
    )
 
def main() -> None:
    runner = AgentRunner()
    order_intake = create_order_intake_agent()
    app = create_agent_app(
        runner=runner,
        agent=order_intake,
        title="Order Intake Agent Service",
    )
    run_agent_service(
        runner=runner,
        agent=order_intake,
        app=app,
        port=AGENT_PORT,
        uvicorn_log_config=UVICORN_LOG_CONFIG,
        log_stage=log_stage,
        chat_component=CHAT_COMPONENT,
    )
 
if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        pass

Carrier Capacity - Agent

It uses the normalized shipment request to evaluate mode, service fit, capacity availability, temperature handling, and operational tradeoffs, then returns ranked carrier choices for route planning.

# pyproject.toml
[project]
name = "agent-carrier-capacity"
version = "0.1.0"
requires-python = ">=3.11,<3.14"
dependencies = [
    "dapr-agents==1.0.0",
    "dapr==1.17.3",
    "opentelemetry-api==1.41.1",
    "opentelemetry-sdk==1.41.1",
    "opentelemetry-exporter-otlp-proto-grpc==1.41.1",
    "python-dotenv==1.0.0",
    "uvicorn[standard]==0.31.1",
]
# app.py
from __future__ import annotations
 
import json
import sys
from pathlib import Path
from typing import Any
 
sys.path.append(str(Path(__file__).resolve().parents[1]))
 
from pydantic import BaseModel, Field
 
from dapr_agents import DurableAgent
from dapr_agents.agents.configs import AgentExecutionConfig, ToolExecutionMode
from dapr_agents.llm.dapr import DaprChatClient
from dapr_agents.tool import AgentTool
from dapr_agents.workflow.runners import AgentRunner
from shared.observability import (
    reset_tool_trace_context,
    set_tool_trace_context,
    start_span,
)
from shared.agent_config import configure_agent_environment
from shared.agent_host import (
    create_agent_app,
    run_agent_service,
)
from shared.json_payloads import (
    loads_object as _loads_object,
)
from shared.logging_utils import create_stage_logger
from shared.tool_recovery import (
    get_last_tool_content as _get_last_tool_content,
    replace_response_with_tool_output,
    tool_call_content_from_response as _tool_call_content_from_response,
)
from shared.workflow_runtime import create_workflow_runtime
 
runtime_config = configure_agent_environment(
    service_name="carrier-capacity",
    logger_name="carrier_capacity.app",
    default_port=50052,
)
logger = runtime_config.logger
UVICORN_LOG_CONFIG = runtime_config.uvicorn_log_config
AGENT_PORT = runtime_config.agent_port
CHAT_COMPONENT = runtime_config.chat_component
WORKFLOW_TRACE_CONTEXTS: dict[str, dict[str, str]] = {}
log_stage = create_stage_logger(logger, "CarrierCapacity")
 
class CarrierCapacityToolArgs(BaseModel):
    shipment_request: str = Field(..., description="Shipment request JSON returned by NormalizeShipmentRequest.")
 
class TracedCarrierCapacityAgent(DurableAgent):
    def record_initial_entry(self, ctx: Any, payload: dict[str, Any]) -> None:
        trace_context = payload.get("trace_context")
        if isinstance(trace_context, dict):
            WORKFLOW_TRACE_CONTEXTS[ctx.workflow_id] = {
                str(key): str(value) for key, value in trace_context.items()
            }
        token = set_tool_trace_context(WORKFLOW_TRACE_CONTEXTS.get(ctx.workflow_id))
        try:
            with start_span(
                "carrier_capacity.workflow.started",
                workflow_id=ctx.workflow_id,
                source=payload.get("source"),
                triggering_workflow_instance_id=payload.get("triggering_workflow_instance_id"),
            ):
                super().record_initial_entry(ctx, payload)
        finally:
            reset_tool_trace_context(token)
 
    def call_llm(self, ctx: Any, payload: dict[str, Any]) -> dict[str, Any]:
        token = set_tool_trace_context(WORKFLOW_TRACE_CONTEXTS.get(ctx.workflow_id))
        try:
            with start_span(
                "carrier_capacity.llm.call",
                workflow_id=ctx.workflow_id,
                source=payload.get("source"),
            ):
                response = super().call_llm(ctx, payload)
                tool_output = self._get_last_tool_content(payload.get("instance_id"), "EvaluateCarrierCapacity")
                if tool_output is None:
                    tool_call_content = _tool_call_content_from_response(response, "EvaluateCarrierCapacity")
                    if tool_call_content is not None:
                        tool_output = evaluate_carrier_capacity(tool_call_content, ctx=ctx)
                        if _used_default_payload(tool_output) and isinstance(payload.get("task"), str):
                            tool_output = evaluate_carrier_capacity(payload["task"], ctx=ctx)
                return replace_response_with_tool_output(
                    response,
                    tool_output,
                    "CarrierCapacity",
                    logger,
                )
        finally:
            reset_tool_trace_context(token)
 
    def run_tool(self, ctx: Any, payload: dict[str, Any]) -> dict[str, Any]:
        token = set_tool_trace_context(WORKFLOW_TRACE_CONTEXTS.get(ctx.workflow_id))
        try:
            tool_call = payload.get("tool_call", {})
            function = tool_call.get("function", {}) if isinstance(tool_call, dict) else {}
            with start_span(
                "carrier_capacity.tool.run",
                workflow_id=ctx.workflow_id,
                tool_name=function.get("name"),
                tool_call_id=tool_call.get("id") if isinstance(tool_call, dict) else None,
            ):
                return super().run_tool(ctx, payload)
        finally:
            reset_tool_trace_context(token)
 
    def finalize_workflow(self, ctx: Any, payload: dict[str, Any]) -> None:
        token = set_tool_trace_context(WORKFLOW_TRACE_CONTEXTS.get(ctx.workflow_id))
        try:
            with start_span(
                "carrier_capacity.workflow.finalized",
                workflow_id=ctx.workflow_id,
                triggering_workflow_instance_id=payload.get("triggering_workflow_instance_id"),
            ):
                super().finalize_workflow(ctx, payload)
        finally:
            reset_tool_trace_context(token)
            WORKFLOW_TRACE_CONTEXTS.pop(ctx.workflow_id, None)
 
    def summarize(self, ctx: Any, payload: dict[str, Any]) -> dict[str, Any]:
        try:
            return super().summarize(ctx, payload)
        except Exception as exc:
            logger.warning("Skipping non-critical memory summary: %s", exc)
            return {}
 
    def _get_last_tool_content(self, instance_id: Any, tool_name: str) -> str | None:
        if not isinstance(instance_id, str) or not instance_id.strip():
            return None
        try:
            entry = self._infra.get_state(instance_id)
        except Exception as exc:
            logger.warning("Could not load CarrierCapacity state to recover tool output: %s", exc)
            return None
        return _get_last_tool_content(entry, tool_name)
 
def _used_default_payload(tool_output: str) -> bool:
    try:
        payload = json.loads(tool_output)
    except json.JSONDecodeError:
        return False
    ranked_options = payload.get("ranked_options")
    if not isinstance(ranked_options, list) or not ranked_options:
        return False
    first_option = ranked_options[0]
    return (
        isinstance(first_option, dict)
        and first_option.get("estimated_cost_usd") == 545
        and first_option.get("mode") == "less-than-truckload"
        and first_option.get("service_level") == "standard"
    )
 
def _resolve_task_id(task_id: str | None = None, ctx: Any | None = None) -> str:
    workflow_id = getattr(ctx, "workflow_id", None)
    if isinstance(workflow_id, str) and workflow_id.strip():
        return workflow_id.strip()
    if isinstance(task_id, str) and task_id.strip():
        return task_id.strip()
    return "unknown"
 
def evaluate_carrier_capacity(
    shipment_request: str,
    task_id: str = "unknown",
    ctx: Any | None = None,
    _source_agent: str | None = None,
) -> str:
    """Rank feasible carrier capacity options for the shipment request."""
    task_id = _resolve_task_id(task_id, ctx)
    with start_span("carrier_capacity.tool.evaluate_carrier_capacity", task_id=task_id):
        request = _loads_object(shipment_request, "shipment_request")
        log_stage("tool input", tool="evaluate_carrier_capacity", task_id=task_id, request=request)
        cargo_type = str(request.get("cargo_type", "general freight")).lower()
        service_level = str(request.get("service_level", "standard")).lower()
        shipment_type = _shipment_type(request)
        base_cost = 950 if shipment_type == "FTL" else 420
        if "reefer" in cargo_type:
            base_cost += 350
        if "expedited" in service_level:
            base_cost += 500
        options = [
            {
                "carrier": "NorthStar Logistics",
                "mode": "truckload" if shipment_type == "FTL" else "less-than-truckload",
                "available": True,
                "estimated_cost_usd": base_cost + 125,
                "reliability_score": 94,
                "service_level": "expedited" if "expedited" in service_level else "standard",
                "notes": "Best reliability on the requested lane.",
            },
            {
                "carrier": "BlueLine Freight",
                "mode": "less-than-truckload",
                "available": shipment_type != "FTL",
                "estimated_cost_usd": max(base_cost - 80, 250),
                "reliability_score": 87,
                "service_level": "economy",
                "notes": "Lower cost option with an additional hub handoff.",
            },
            {
                "carrier": "ColdChain Express",
                "mode": "reefer truckload",
                "available": "reefer" in cargo_type or "temperature" in cargo_type,
                "estimated_cost_usd": base_cost + 620,
                "reliability_score": 91,
                "service_level": "temperature-controlled",
                "notes": "Use when temperature control is mandatory.",
            },
        ]
        feasible = [option for option in options if option["available"]]
        result = json.dumps({"ranked_options": feasible, "capacity_status": "available" if feasible else "shortage"}, indent=2)
        log_stage("tool output", tool="evaluate_carrier_capacity", task_id=task_id, result=result)
        return result
 
def _shipment_type(request: dict[str, Any]) -> str:
    declared_type = str(request.get("shipment_type") or request.get("type") or "").lower()
    weight = request.get("weight_lbs") or request.get("weight")
    if "ftl" in declared_type or "full-truckload" in declared_type or "full truckload" in declared_type:
        return "FTL"
    if isinstance(weight, int | float) and weight >= 15000:
        return "FTL"
    if "parcel" in declared_type:
        return "parcel"
    return "LTL"
 
EVALUATE_CARRIER_CAPACITY_TOOL = AgentTool(
    name="EvaluateCarrierCapacity",
    description=evaluate_carrier_capacity.__doc__ or "",
    func=evaluate_carrier_capacity,
    args_model=CarrierCapacityToolArgs,
)
 
def create_carrier_capacity_agent() -> DurableAgent:
    """Build the carrier capacity durable agent with its capacity ranking tool."""
    return TracedCarrierCapacityAgent(
        name="carrier-capacity-agent",
        role="Carrier Capacity Specialist",
        goal="Rank feasible carrier and capacity options for a normalized shipment request.",
        instructions=[
            "Use exactly one tool call per assistant turn until the carrier capacity response is complete.",
            "Follow this state machine strictly:",
            "1. If EvaluateCarrierCapacity has not returned, call EvaluateCarrierCapacity with the shipment_request JSON from the user request.",
            "2. After EvaluateCarrierCapacity returns, do not call another tool. Return the carrier options JSON exactly as the final answer.",
            "Do not skip steps, combine steps, reorder steps, or emit conversational commentary between tool calls.",
            "Do not include task_id in tool arguments.",
        ],
        tools=[
            EVALUATE_CARRIER_CAPACITY_TOOL,
        ],
        llm=DaprChatClient(component_name=CHAT_COMPONENT),
        runtime=create_workflow_runtime(),
        execution=AgentExecutionConfig(
            max_iterations=4,
            tool_choice="auto",
            tool_execution_mode=ToolExecutionMode.SEQUENTIAL,
        ),
    )
 
def main() -> None:
    runner = AgentRunner()
    carrier_capacity = create_carrier_capacity_agent()
    app = create_agent_app(
        runner=runner,
        agent=carrier_capacity,
        title="Carrier Capacity Agent Service",
    )
    run_agent_service(
        runner=runner,
        agent=carrier_capacity,
        app=app,
        port=AGENT_PORT,
        uvicorn_log_config=UVICORN_LOG_CONFIG,
        log_stage=log_stage,
        chat_component=CHAT_COMPONENT,
    )
 
if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        pass

Route Planning - Agent

It uses the shipment request and carrier options to propose route legs, hub or cross-dock points, ETA, pickup/delivery milestones, and alternate handling paths when the primary route is risky.

# pyproject.toml
[project]
name = "agent-route-planning"
version = "0.1.0"
requires-python = ">=3.11,<3.14"
dependencies = [
    "dapr-agents==1.0.0",
    "dapr==1.17.3",
    "opentelemetry-api==1.41.1",
    "opentelemetry-sdk==1.41.1",
    "opentelemetry-exporter-otlp-proto-grpc==1.41.1",
    "python-dotenv==1.0.0",
    "uvicorn[standard]==0.31.1",
]
# app.py
from __future__ import annotations
 
import json
import sys
from pathlib import Path
from typing import Any
 
sys.path.append(str(Path(__file__).resolve().parents[1]))
 
from pydantic import BaseModel, Field
 
from dapr_agents import DurableAgent
from dapr_agents.agents.configs import AgentExecutionConfig, ToolExecutionMode
from dapr_agents.llm.dapr import DaprChatClient
from dapr_agents.tool import AgentTool
from dapr_agents.workflow.runners import AgentRunner
from shared.observability import (
    reset_tool_trace_context,
    set_tool_trace_context,
    start_span,
)
from shared.agent_config import configure_agent_environment
from shared.agent_host import (
    create_agent_app,
    run_agent_service,
)
from shared.json_payloads import (
    loads_object as _loads_object,
)
from shared.logging_utils import create_stage_logger
from shared.tool_recovery import (
    get_last_tool_content as _get_last_tool_content,
    replace_response_with_tool_output,
    tool_call_content_from_response as _tool_call_content_from_response,
)
from shared.workflow_runtime import create_workflow_runtime
 
runtime_config = configure_agent_environment(
    service_name="route-planning",
    logger_name="route_planning.app",
    default_port=50053,
)
logger = runtime_config.logger
UVICORN_LOG_CONFIG = runtime_config.uvicorn_log_config
AGENT_PORT = runtime_config.agent_port
CHAT_COMPONENT = runtime_config.chat_component
WORKFLOW_TRACE_CONTEXTS: dict[str, dict[str, str]] = {}
log_stage = create_stage_logger(logger, "RoutePlanning")
 
class RoutePlanningToolArgs(BaseModel):
    shipment_request: str = Field(..., description="Shipment request JSON returned by NormalizeShipmentRequest.")
    carrier_options: str = Field(..., description="Carrier options JSON returned by EvaluateCarrierCapacity.")
 
class TracedRoutePlanningAgent(DurableAgent):
    def record_initial_entry(self, ctx: Any, payload: dict[str, Any]) -> None:
        trace_context = payload.get("trace_context")
        if isinstance(trace_context, dict):
            WORKFLOW_TRACE_CONTEXTS[ctx.workflow_id] = {
                str(key): str(value) for key, value in trace_context.items()
            }
        token = set_tool_trace_context(WORKFLOW_TRACE_CONTEXTS.get(ctx.workflow_id))
        try:
            with start_span(
                "route_planning.workflow.started",
                workflow_id=ctx.workflow_id,
                source=payload.get("source"),
                triggering_workflow_instance_id=payload.get("triggering_workflow_instance_id"),
            ):
                super().record_initial_entry(ctx, payload)
        finally:
            reset_tool_trace_context(token)
 
    def call_llm(self, ctx: Any, payload: dict[str, Any]) -> dict[str, Any]:
        token = set_tool_trace_context(WORKFLOW_TRACE_CONTEXTS.get(ctx.workflow_id))
        try:
            with start_span(
                "route_planning.llm.call",
                workflow_id=ctx.workflow_id,
                source=payload.get("source"),
            ):
                response = super().call_llm(ctx, payload)
                tool_output = self._get_last_tool_content(payload.get("instance_id"), "PlanShipmentRoute")
                if tool_output is None:
                    tool_call_content = _tool_call_content_from_response(response, "PlanShipmentRoute")
                    if tool_call_content is not None:
                        tool_output = plan_shipment_route(tool_call_content, tool_call_content, ctx=ctx)
                return replace_response_with_tool_output(
                    response,
                    tool_output,
                    "RoutePlanning",
                    logger,
                )
        finally:
            reset_tool_trace_context(token)
 
    def run_tool(self, ctx: Any, payload: dict[str, Any]) -> dict[str, Any]:
        token = set_tool_trace_context(WORKFLOW_TRACE_CONTEXTS.get(ctx.workflow_id))
        try:
            tool_call = payload.get("tool_call", {})
            function = tool_call.get("function", {}) if isinstance(tool_call, dict) else {}
            with start_span(
                "route_planning.tool.run",
                workflow_id=ctx.workflow_id,
                tool_name=function.get("name"),
                tool_call_id=tool_call.get("id") if isinstance(tool_call, dict) else None,
            ):
                return super().run_tool(ctx, payload)
        finally:
            reset_tool_trace_context(token)
 
    def finalize_workflow(self, ctx: Any, payload: dict[str, Any]) -> None:
        token = set_tool_trace_context(WORKFLOW_TRACE_CONTEXTS.get(ctx.workflow_id))
        try:
            with start_span(
                "route_planning.workflow.finalized",
                workflow_id=ctx.workflow_id,
                triggering_workflow_instance_id=payload.get("triggering_workflow_instance_id"),
            ):
                super().finalize_workflow(ctx, payload)
        finally:
            reset_tool_trace_context(token)
            WORKFLOW_TRACE_CONTEXTS.pop(ctx.workflow_id, None)
 
    def summarize(self, ctx: Any, payload: dict[str, Any]) -> dict[str, Any]:
        try:
            return super().summarize(ctx, payload)
        except Exception as exc:
            logger.warning("Skipping non-critical memory summary: %s", exc)
            return {}
 
    def _get_last_tool_content(self, instance_id: Any, tool_name: str) -> str | None:
        if not isinstance(instance_id, str) or not instance_id.strip():
            return None
        try:
            entry = self._infra.get_state(instance_id)
        except Exception as exc:
            logger.warning("Could not load RoutePlanning state to recover tool output: %s", exc)
            return None
        return _get_last_tool_content(entry, tool_name)
 
def _resolve_task_id(task_id: str | None = None, ctx: Any | None = None) -> str:
    workflow_id = getattr(ctx, "workflow_id", None)
    if isinstance(workflow_id, str) and workflow_id.strip():
        return workflow_id.strip()
    if isinstance(task_id, str) and task_id.strip():
        return task_id.strip()
    return "unknown"
 
def plan_shipment_route(
    shipment_request: str,
    carrier_options: str,
    task_id: str = "unknown",
    ctx: Any | None = None,
    _source_agent: str | None = None,
) -> str:
    """Create route legs, milestones, and ETA for a shipment."""
    task_id = _resolve_task_id(task_id, ctx)
    with start_span("route_planning.tool.plan_shipment_route", task_id=task_id):
        request = _loads_object(shipment_request, "shipment_request")
        capacity = _loads_object(carrier_options, "carrier_options")
        primary_carrier = (capacity.get("ranked_options") or [{}])[0]
        origin = request.get("origin", "origin facility")
        destination = request.get("destination", "destination facility")
        expedited = str(request.get("service_level", "")).lower() == "expedited"
        transit_days = 2 if expedited else 4
        route = {
            "recommended_carrier": primary_carrier.get("carrier", "No feasible carrier"),
            "transit_days": transit_days,
            "eta_basis": "working-day estimate from deterministic lane rules",
            "legs": [
                {"sequence": 1, "type": "pickup", "location": origin, "milestone": "Pickup confirmed"},
                {"sequence": 2, "type": "linehaul", "location": "regional hub", "milestone": "Departed origin hub"},
                {"sequence": 3, "type": "last_mile", "location": destination, "milestone": "Out for delivery"},
            ],
            "alternatives": [
                {
                    "name": "Lower-cost deferred LTL",
                    "transit_days": transit_days + 2,
                    "tradeoff": "Lower freight cost with one extra hub handoff.",
                }
            ],
        }
        result = json.dumps(route, indent=2)
        log_stage("tool output", tool="plan_shipment_route", task_id=task_id, route=route)
        return result
 
PLAN_SHIPMENT_ROUTE_TOOL = AgentTool(
    name="PlanShipmentRoute",
    description=plan_shipment_route.__doc__ or "",
    func=plan_shipment_route,
    args_model=RoutePlanningToolArgs,
)
 
def create_route_planning_agent() -> DurableAgent:
    """Build the route planning durable agent with its route planning tool."""
    return TracedRoutePlanningAgent(
        name="route-planning-agent",
        role="Route Planning Specialist",
        goal="Create route legs, milestones, alternatives, and ETA for a shipment.",
        instructions=[
            "Use exactly one tool call per assistant turn until the route plan is complete.",
            "Follow this state machine strictly:",
            "1. If PlanShipmentRoute has not returned, call PlanShipmentRoute with the shipment_request JSON and carrier_options JSON from the user request.",
            "2. After PlanShipmentRoute returns, do not call another tool. Return the route plan JSON exactly as the final answer.",
            "Do not skip steps, combine steps, reorder steps, or emit conversational commentary between tool calls.",
            "Do not include task_id in tool arguments.",
        ],
        tools=[
            PLAN_SHIPMENT_ROUTE_TOOL,
        ],
        llm=DaprChatClient(component_name=CHAT_COMPONENT),
        runtime=create_workflow_runtime(),
        execution=AgentExecutionConfig(
            max_iterations=4,
            tool_choice="auto",
            tool_execution_mode=ToolExecutionMode.SEQUENTIAL,
        ),
    )
 
def main() -> None:
    runner = AgentRunner()
    route_planning = create_route_planning_agent()
    app = create_agent_app(
        runner=runner,
        agent=route_planning,
        title="Route Planning Agent Service",
    )
    run_agent_service(
        runner=runner,
        agent=route_planning,
        app=app,
        port=AGENT_PORT,
        uvicorn_log_config=UVICORN_LOG_CONFIG,
        log_stage=log_stage,
        chat_component=CHAT_COMPONENT,
    )
 
if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        pass

Risk Compliance - Agent

It uses the shipment request and route plan to identify documents, temperature-control risks, cross-border or hazmat concerns when present, mitigations, exception triggers, and go/no-go conditions.

# pyproject.toml
[project]
name = "agent-risk-compliance"
version = "0.1.0"
requires-python = ">=3.11,<3.14"
dependencies = [
    "dapr-agents==1.0.0",
    "dapr==1.17.3",
    "opentelemetry-api==1.41.1",
    "opentelemetry-sdk==1.41.1",
    "opentelemetry-exporter-otlp-proto-grpc==1.41.1",
    "python-dotenv==1.0.0",
    "uvicorn[standard]==0.31.1",
]
# app.py
from __future__ import annotations
 
import json
import sys
from pathlib import Path
from typing import Any
 
sys.path.append(str(Path(__file__).resolve().parents[1]))
 
from pydantic import BaseModel, Field
 
from dapr_agents import DurableAgent
from dapr_agents.agents.configs import AgentExecutionConfig, ToolExecutionMode
from dapr_agents.llm.dapr import DaprChatClient
from dapr_agents.tool import AgentTool
from dapr_agents.workflow.runners import AgentRunner
from shared.observability import (
    reset_tool_trace_context,
    set_tool_trace_context,
    start_span,
)
from shared.agent_config import configure_agent_environment
from shared.agent_host import (
    create_agent_app,
    run_agent_service,
)
from shared.json_payloads import (
    loads_object as _loads_object,
)
from shared.logging_utils import create_stage_logger
from shared.tool_recovery import (
    get_last_tool_content as _get_last_tool_content,
    replace_response_with_tool_output,
    tool_call_content_from_response as _tool_call_content_from_response,
)
from shared.workflow_runtime import create_workflow_runtime
 
runtime_config = configure_agent_environment(
    service_name="risk-compliance",
    logger_name="risk_compliance.app",
    default_port=50054,
)
logger = runtime_config.logger
UVICORN_LOG_CONFIG = runtime_config.uvicorn_log_config
AGENT_PORT = runtime_config.agent_port
CHAT_COMPONENT = runtime_config.chat_component
WORKFLOW_TRACE_CONTEXTS: dict[str, dict[str, str]] = {}
log_stage = create_stage_logger(logger, "RiskCompliance")
 
class RiskComplianceToolArgs(BaseModel):
    shipment_request: str = Field(..., description="Shipment request JSON returned by NormalizeShipmentRequest.")
    route_plan: str = Field(..., description="Route plan JSON returned by PlanShipmentRoute.")
 
class TracedRiskComplianceAgent(DurableAgent):
    def record_initial_entry(self, ctx: Any, payload: dict[str, Any]) -> None:
        trace_context = payload.get("trace_context")
        if isinstance(trace_context, dict):
            WORKFLOW_TRACE_CONTEXTS[ctx.workflow_id] = {
                str(key): str(value) for key, value in trace_context.items()
            }
        token = set_tool_trace_context(WORKFLOW_TRACE_CONTEXTS.get(ctx.workflow_id))
        try:
            with start_span(
                "risk_compliance.workflow.started",
                workflow_id=ctx.workflow_id,
                source=payload.get("source"),
                triggering_workflow_instance_id=payload.get("triggering_workflow_instance_id"),
            ):
                super().record_initial_entry(ctx, payload)
        finally:
            reset_tool_trace_context(token)
 
    def call_llm(self, ctx: Any, payload: dict[str, Any]) -> dict[str, Any]:
        token = set_tool_trace_context(WORKFLOW_TRACE_CONTEXTS.get(ctx.workflow_id))
        try:
            with start_span(
                "risk_compliance.llm.call",
                workflow_id=ctx.workflow_id,
                source=payload.get("source"),
            ):
                response = super().call_llm(ctx, payload)
                tool_output = self._get_last_tool_content(payload.get("instance_id"), "AssessRiskAndCompliance")
                if tool_output is None:
                    tool_call_content = _tool_call_content_from_response(response, "AssessRiskAndCompliance")
                    if tool_call_content is not None:
                        tool_output = assess_risk_and_compliance(tool_call_content, tool_call_content, ctx=ctx)
                return replace_response_with_tool_output(
                    response,
                    tool_output,
                    "RiskCompliance",
                    logger,
                )
        finally:
            reset_tool_trace_context(token)
 
    def run_tool(self, ctx: Any, payload: dict[str, Any]) -> dict[str, Any]:
        token = set_tool_trace_context(WORKFLOW_TRACE_CONTEXTS.get(ctx.workflow_id))
        try:
            tool_call = payload.get("tool_call", {})
            function = tool_call.get("function", {}) if isinstance(tool_call, dict) else {}
            with start_span(
                "risk_compliance.tool.run",
                workflow_id=ctx.workflow_id,
                tool_name=function.get("name"),
                tool_call_id=tool_call.get("id") if isinstance(tool_call, dict) else None,
            ):
                return super().run_tool(ctx, payload)
        finally:
            reset_tool_trace_context(token)
 
    def finalize_workflow(self, ctx: Any, payload: dict[str, Any]) -> None:
        token = set_tool_trace_context(WORKFLOW_TRACE_CONTEXTS.get(ctx.workflow_id))
        try:
            with start_span(
                "risk_compliance.workflow.finalized",
                workflow_id=ctx.workflow_id,
                triggering_workflow_instance_id=payload.get("triggering_workflow_instance_id"),
            ):
                super().finalize_workflow(ctx, payload)
        finally:
            reset_tool_trace_context(token)
            WORKFLOW_TRACE_CONTEXTS.pop(ctx.workflow_id, None)
 
    def summarize(self, ctx: Any, payload: dict[str, Any]) -> dict[str, Any]:
        try:
            return super().summarize(ctx, payload)
        except Exception as exc:
            logger.warning("Skipping non-critical memory summary: %s", exc)
            return {}
 
    def _get_last_tool_content(self, instance_id: Any, tool_name: str) -> str | None:
        if not isinstance(instance_id, str) or not instance_id.strip():
            return None
        try:
            entry = self._infra.get_state(instance_id)
        except Exception as exc:
            logger.warning("Could not load RiskCompliance state to recover tool output: %s", exc)
            return None
        return _get_last_tool_content(entry, tool_name)
 
def _resolve_task_id(task_id: str | None = None, ctx: Any | None = None) -> str:
    workflow_id = getattr(ctx, "workflow_id", None)
    if isinstance(workflow_id, str) and workflow_id.strip():
        return workflow_id.strip()
    if isinstance(task_id, str) and task_id.strip():
        return task_id.strip()
    return "unknown"
 
def assess_risk_and_compliance(
    shipment_request: str,
    route_plan: str,
    task_id: str = "unknown",
    ctx: Any | None = None,
    _source_agent: str | None = None,
) -> str:
    """Assess compliance documents, operational risks, and mitigation actions."""
    task_id = _resolve_task_id(task_id, ctx)
    with start_span("risk_compliance.tool.assess_risk_and_compliance", task_id=task_id):
        request = _loads_object(shipment_request, "shipment_request")
        route = _loads_object(route_plan, "route_plan")
        cargo_type = str(request.get("cargo_type", "general freight")).lower()
        shipment_type = str(request.get("shipment_type", "LTL")).lower()
        documents = ["bill of lading", "packing list"]
        restrictions: list[str] = []
        risk_drivers = ["hub handoff risk", "pickup appointment adherence"]
        if request.get("cross_border"):
            documents.extend(["commercial invoice", "customs declaration"])
            risk_drivers.append("customs clearance variability")
        if "hazmat" in cargo_type:
            documents.append("hazmat declaration")
            restrictions.append("carrier must accept regulated hazardous material")
            risk_drivers.append("regulated cargo handling")
        if "reefer" in cargo_type or "temperature" in cargo_type:
            documents.append("temperature log")
            restrictions.append("continuous temperature control required")
            risk_drivers.append("temperature excursion risk")
        risk_score = min(95, 30 + len(risk_drivers) * 12 + (10 if shipment_type == "ftl" else 0))
        assessment = {
            "risk_score": risk_score,
            "risk_level": "high" if risk_score >= 70 else "medium" if risk_score >= 45 else "low",
            "documents": documents,
            "restrictions": restrictions,
            "risk_drivers": risk_drivers,
            "mitigations": [
                "Confirm pickup appointment and dock readiness.",
                "Set proactive milestone checks for every route leg.",
                "Pre-clear documents before dispatch." if request.get("cross_border") else "Verify consignee delivery window before dispatch.",
            ],
            "route_reference": route.get("recommended_carrier"),
        }
        result = json.dumps(assessment, indent=2)
        log_stage("tool output", tool="assess_risk_and_compliance", task_id=task_id, assessment=assessment)
        return result
 
ASSESS_RISK_AND_COMPLIANCE_TOOL = AgentTool(
    name="AssessRiskAndCompliance",
    description=assess_risk_and_compliance.__doc__ or "",
    func=assess_risk_and_compliance,
    args_model=RiskComplianceToolArgs,
)
 
def create_risk_compliance_agent() -> DurableAgent:
    """Build the risk and compliance durable agent with its assessment tool."""
    return TracedRiskComplianceAgent(
        name="risk-compliance-agent",
        role="Risk And Compliance Specialist",
        goal="Assess compliance documents, restrictions, operational risk, and mitigations.",
        instructions=[
            "Use exactly one tool call per assistant turn until the risk and compliance assessment is complete.",
            "Follow this state machine strictly:",
            "1. If AssessRiskAndCompliance has not returned, call AssessRiskAndCompliance with the shipment_request JSON and route_plan JSON from the user request.",
            "2. After AssessRiskAndCompliance returns, do not call another tool. Return the risk and compliance JSON exactly as the final answer.",
            "Do not skip steps, combine steps, reorder steps, or emit conversational commentary between tool calls.",
            "Do not include task_id in tool arguments.",
        ],
        tools=[
            ASSESS_RISK_AND_COMPLIANCE_TOOL,
        ],
        llm=DaprChatClient(component_name=CHAT_COMPONENT),
        runtime=create_workflow_runtime(),
        execution=AgentExecutionConfig(
            max_iterations=4,
            tool_choice="auto",
            tool_execution_mode=ToolExecutionMode.SEQUENTIAL,
        ),
    )
 
def main() -> None:
    runner = AgentRunner()
    risk_compliance = create_risk_compliance_agent()
    app = create_agent_app(
        runner=runner,
        agent=risk_compliance,
        title="Risk Compliance Agent Service",
    )
    run_agent_service(
        runner=runner,
        agent=risk_compliance,
        app=app,
        port=AGENT_PORT,
        uvicorn_log_config=UVICORN_LOG_CONFIG,
        log_stage=log_stage,
        chat_component=CHAT_COMPONENT,
    )
 
if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        pass

Customer Update Writer - Agent

It drafts an operations summary, customer-ready update, dispatch checklist, and final Shipment Execution Plan from all prior agent outputs.

# pyproject.toml
[project]
name = "agent-customer-update-writer"
version = "0.1.0"
requires-python = ">=3.11,<3.14"
dependencies = [
    "dapr-agents==1.0.0",
    "dapr==1.17.3",
    "opentelemetry-api==1.41.1",
    "opentelemetry-sdk==1.41.1",
    "opentelemetry-exporter-otlp-proto-grpc==1.41.1",
    "python-dotenv==1.0.0",
    "uvicorn[standard]==0.31.1",
]
# app.py
from __future__ import annotations
 
import json
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from textwrap import dedent
from typing import Any
 
sys.path.append(str(Path(__file__).resolve().parents[1]))
 
from pydantic import BaseModel, Field
 
from dapr_agents import DurableAgent
from dapr_agents.agents.configs import AgentExecutionConfig, ToolExecutionMode
from dapr_agents.llm.dapr import DaprChatClient
from dapr_agents.tool import AgentTool
from dapr_agents.workflow.runners import AgentRunner
from shared.observability import (
    reset_tool_trace_context,
    set_tool_trace_context,
    start_span,
)
from shared.agent_config import configure_agent_environment
from shared.agent_host import (
    create_agent_app,
    run_agent_service,
)
from shared.logging_utils import create_stage_logger
from shared.tool_recovery import (
    get_last_tool_content as _get_last_tool_content_from_entry,
    message_to_dict as _message_to_dict,
    strip_agent_markup as _strip_agent_markup,
    tool_call_content_from_response as _tool_call_content_from_response,
)
from shared.workflow_runtime import create_workflow_runtime
 
runtime_config = configure_agent_environment(
    service_name="customer-update-writer",
    logger_name="customer_update_writer.app",
    default_port=50055,
)
logger = runtime_config.logger
UVICORN_LOG_CONFIG = runtime_config.uvicorn_log_config
AGENT_PORT = runtime_config.agent_port
CHAT_COMPONENT = runtime_config.chat_component
WORKFLOW_TRACE_CONTEXTS: dict[str, dict[str, str]] = {}
log_stage = create_stage_logger(logger, "CustomerUpdateWriter")
 
class OperationsSummaryToolArgs(BaseModel):
    scenario: str = Field(..., description="Original shipment scenario.")
    planning_brief: str = Field(..., description="Logistics planning brief to summarize.")
 
class CustomerUpdateToolArgs(BaseModel):
    scenario: str = Field(..., description="Original shipment scenario.")
    operations_summary: str = Field(..., description="Operations summary returned by DraftOperationsSummary.")
 
class FinalExecutionPlanToolArgs(BaseModel):
    scenario: str = Field(..., description="Original shipment scenario.")
    operations_summary: str = Field(..., description="Operations summary returned by DraftOperationsSummary.")
    customer_update: str = Field(..., description="Customer update returned by DraftCustomerUpdate.")
 
class TracedCustomerUpdateWriterAgent(DurableAgent):
    def record_initial_entry(self, ctx: Any, payload: dict[str, Any]) -> None:
        trace_context = payload.get("trace_context")
        if isinstance(trace_context, dict):
            WORKFLOW_TRACE_CONTEXTS[ctx.workflow_id] = {
                str(key): str(value) for key, value in trace_context.items()
            }
        token = set_tool_trace_context(WORKFLOW_TRACE_CONTEXTS.get(ctx.workflow_id))
        try:
            with start_span(
                "customer_update_writer.workflow.started",
                workflow_id=ctx.workflow_id,
                source=payload.get("source"),
                triggering_workflow_instance_id=payload.get("triggering_workflow_instance_id"),
            ):
                super().record_initial_entry(ctx, payload)
        finally:
            reset_tool_trace_context(token)
 
    def call_llm(self, ctx: Any, payload: dict[str, Any]) -> dict[str, Any]:
        token = set_tool_trace_context(WORKFLOW_TRACE_CONTEXTS.get(ctx.workflow_id))
        try:
            with start_span(
                "customer_update_writer.llm.call",
                workflow_id=ctx.workflow_id,
                source=payload.get("source"),
            ):
                response = super().call_llm(ctx, payload)
                return self._replace_empty_final_response(payload, response)
        finally:
            reset_tool_trace_context(token)
 
    def run_tool(self, ctx: Any, payload: dict[str, Any]) -> dict[str, Any]:
        token = set_tool_trace_context(WORKFLOW_TRACE_CONTEXTS.get(ctx.workflow_id))
        try:
            tool_call = payload.get("tool_call", {})
            function = tool_call.get("function", {}) if isinstance(tool_call, dict) else {}
            with start_span(
                "customer_update_writer.tool.run",
                workflow_id=ctx.workflow_id,
                tool_name=function.get("name"),
                tool_call_id=tool_call.get("id") if isinstance(tool_call, dict) else None,
            ):
                return super().run_tool(ctx, payload)
        finally:
            reset_tool_trace_context(token)
 
    def finalize_workflow(self, ctx: Any, payload: dict[str, Any]) -> None:
        token = set_tool_trace_context(WORKFLOW_TRACE_CONTEXTS.get(ctx.workflow_id))
        try:
            with start_span(
                "customer_update_writer.workflow.finalized",
                workflow_id=ctx.workflow_id,
                triggering_workflow_instance_id=payload.get("triggering_workflow_instance_id"),
            ):
                super().finalize_workflow(ctx, payload)
        finally:
            reset_tool_trace_context(token)
            WORKFLOW_TRACE_CONTEXTS.pop(ctx.workflow_id, None)
 
    def summarize(self, ctx: Any, payload: dict[str, Any]) -> dict[str, Any]:
        try:
            return super().summarize(ctx, payload)
        except Exception as exc:
            logger.warning("Skipping non-critical memory summary: %s", exc)
            return {}
 
    def _replace_empty_final_response(
        self,
        payload: dict[str, Any],
        response: dict[str, Any],
    ) -> dict[str, Any]:
        final_report = self._get_last_tool_content(
            payload.get("instance_id"),
            "FinalizeShipmentExecutionPlan",
        )
        if _is_invalid_final_report(final_report):
            final_report = None
        content = response.get("content")
        content_is_final_tool_call = _tool_call_parameters(content, "FinalizeShipmentExecutionPlan") is not None
        if final_report is None and content_is_final_tool_call:
            final_report = _finalize_from_tool_call_content(content)
            if _is_invalid_final_report(final_report):
                final_report = None
        if final_report is None:
            final_report = _finalize_from_response_tool_calls(response)
            if _is_invalid_final_report(final_report):
                final_report = None
        if final_report is None:
            final_report = self._finalize_from_recovered_state(payload, response)
 
        if response.get("tool_calls"):
            if final_report is None:
                return response
            logger.warning("Replacing final CustomerUpdateWriter tool-call response with FinalizeShipmentExecutionPlan tool output.")
            return {
                **response,
                "role": response.get("role", "assistant"),
                "content": final_report,
                "tool_calls": None,
            }
 
        if content_is_final_tool_call and final_report is not None:
            logger.warning("Replacing final CustomerUpdateWriter tool-call content with FinalizeShipmentExecutionPlan tool output.")
            return {
                **response,
                "role": response.get("role", "assistant"),
                "content": final_report,
                "tool_calls": None,
            }
 
        if final_report is not None and content != final_report:
            logger.warning("Replacing non-canonical CustomerUpdateWriter final response with FinalizeShipmentExecutionPlan tool output.")
            return {
                **response,
                "role": response.get("role", "assistant"),
                "content": final_report,
                "tool_calls": None,
            }
 
        if isinstance(content, str) and content.strip().lower() not in {"", "none", "null"}:
            return response
 
        if final_report is None:
            return response
 
        logger.warning("Replacing empty final CustomerUpdateWriter LLM response with FinalizeShipmentExecutionPlan tool output.")
        return {
            **response,
            "role": response.get("role", "assistant"),
            "content": final_report,
        }
 
    def _get_last_tool_content(self, instance_id: Any, tool_name: str) -> str | None:
        if not isinstance(instance_id, str) or not instance_id.strip():
            return None
 
        try:
            entry = self._infra.get_state(instance_id)
        except Exception as exc:
            logger.warning("Could not load CustomerUpdateWriter state to recover final tool output: %s", exc)
            return None
 
        for message in reversed(getattr(entry, "messages", []) or []):
            message_dict = _message_to_dict(message)
            if (
                message_dict.get("role") == "tool"
                and message_dict.get("name") == tool_name
            ):
                tool_content = message_dict.get("content")
                if isinstance(tool_content, str) and tool_content.strip():
                    return tool_content
 
        return None
 
    def _finalize_from_recovered_state(
        self,
        payload: dict[str, Any],
        response: dict[str, Any],
    ) -> str | None:
        instance_id = payload.get("instance_id")
        if not isinstance(instance_id, str) or not instance_id.strip():
            return None
 
        try:
            entry = self._infra.get_state(instance_id)
        except Exception as exc:
            logger.warning("Could not load CustomerUpdateWriter state to recover final report: %s", exc)
            return None
 
        task = payload.get("task")
        scenario = _extract_scenario_from_task(task)
        if not scenario:
            scenario = _extract_scenario_from_tool_call_content(response.get("content"))
        if not scenario:
            return None
 
        operations_summary = _get_last_tool_content_from_entry(entry, "DraftOperationsSummary")
        if not _is_usable_section(operations_summary):
            planning_brief = _extract_planning_brief_from_task(task)
            fallback_brief = planning_brief or (task if isinstance(task, str) else "")
            operations_summary = draft_operations_summary(scenario, fallback_brief)
 
        customer_update = _get_last_tool_content_from_entry(entry, "DraftCustomerUpdate")
        if not _is_usable_section(customer_update):
            customer_update = draft_customer_update(scenario, operations_summary)
 
        final_report = finalize_shipment_execution_plan(scenario, operations_summary, customer_update)
        return None if _is_invalid_final_report(final_report) else final_report
 
def _is_usable_section(content: Any) -> bool:
    if not isinstance(content, str) or not content.strip():
        return False
    stripped = content.strip()
    if stripped in {"{", "}", "None", "null"}:
        return False
    return not _has_dangling_json_fragment(stripped)
 
def _is_invalid_final_report(content: Any) -> bool:
    return isinstance(content, str) and _has_dangling_json_fragment(content)
 
def _has_dangling_json_fragment(content: str) -> bool:
    in_fenced_block = False
    open_brace_line = False
 
    for raw_line in content.splitlines():
        line = raw_line.strip()
        if line.startswith("```"):
            in_fenced_block = not in_fenced_block
            continue
 
        if in_fenced_block:
            continue
 
        if open_brace_line and line.startswith("##"):
            return True
 
        if line == "{":
            open_brace_line = True
            continue
 
        if open_brace_line and line == "}":
            open_brace_line = False
 
    return open_brace_line
 
def _extract_scenario_from_task(task: Any) -> str | None:
    if not isinstance(task, str):
        return None
    match = re.search(r"\bscenario:\s*\n(.+?)(?:\n\nplanning_brief:|\Z)", task, flags=re.DOTALL | re.IGNORECASE)
    if match:
        scenario = match.group(1).strip()
        return scenario or None
    return task.strip() or None
 
def _extract_planning_brief_from_task(task: Any) -> str | None:
    if not isinstance(task, str):
        return None
    match = re.search(r"\bplanning_brief:\s*\n(.+)\Z", task, flags=re.DOTALL | re.IGNORECASE)
    if match:
        planning_brief = match.group(1).strip()
        return planning_brief or None
    return None
 
def _extract_scenario_from_tool_call_content(content: Any) -> str | None:
    if not isinstance(content, str):
        return None
    match = re.search(r'"scenario"\s*:\s*"((?:\\.|[^"\\])*)"', _strip_agent_markup(content), flags=re.DOTALL)
    if not match:
        return None
    try:
        return json.loads(f'"{match.group(1)}"')
    except json.JSONDecodeError:
        return match.group(1)
 
def _tool_call_parameters(content: Any, tool_name: str) -> dict[str, Any] | None:
    if not isinstance(content, str) or tool_name not in content:
        return None
    if not any(marker in content for marker in ('"parameters"', "'parameters'", '"arguments"', "'arguments'")):
        return None
 
    try:
        payload = json.loads(_strip_agent_markup(content))
    except json.JSONDecodeError:
        return None
 
    if not isinstance(payload, dict) or payload.get("name") != tool_name:
        return None
 
    parameters = payload.get("parameters") or payload.get("arguments") or payload.get("args")
    function = payload.get("function")
    if isinstance(function, dict):
        parameters = function.get("arguments") or function.get("parameters") or parameters
        if function.get("name") not in {None, tool_name}:
            return None
    if isinstance(parameters, str) and parameters.strip():
        try:
            parsed_parameters = json.loads(_strip_agent_markup(parameters))
        except json.JSONDecodeError:
            return None
        parameters = parsed_parameters
    return parameters if isinstance(parameters, dict) else None
 
def _finalize_from_tool_call_content(content: Any) -> str | None:
    parameters = _tool_call_parameters(content, "FinalizeShipmentExecutionPlan")
    if parameters is None:
        return None
 
    scenario = parameters.get("scenario")
    operations_summary = parameters.get("operations_summary")
    customer_update = parameters.get("customer_update")
    if not all(isinstance(value, str) and value.strip() for value in (scenario, operations_summary, customer_update)):
        return None
 
    return finalize_shipment_execution_plan(
        scenario.strip(),
        operations_summary.strip(),
        customer_update.strip(),
    )
 
def _finalize_from_response_tool_calls(response: dict[str, Any]) -> str | None:
    tool_call_content = _tool_call_content_from_response(response, "FinalizeShipmentExecutionPlan")
    if tool_call_content is None:
        return None
    return _finalize_from_tool_call_content(tool_call_content)
 
 
def _resolve_task_id(task_id: str | None = None, ctx: Any | None = None) -> str:
    workflow_id = getattr(ctx, "workflow_id", None)
    if isinstance(workflow_id, str) and workflow_id.strip():
        return workflow_id.strip()
    if isinstance(task_id, str) and task_id.strip():
        return task_id.strip()
    return "unknown"
 
def draft_operations_summary(
    scenario: str,
    planning_brief: str,
    task_id: str = "unknown",
    ctx: Any | None = None,
    _source_agent: str | None = None,
) -> str:
    """Draft an internal operations summary from the planning brief."""
    task_id = _resolve_task_id(task_id, ctx)
    with start_span("customer_update_writer.tool.draft_operations_summary", task_id=task_id):
        log_stage("tool input", tool="draft_operations_summary", task_id=task_id, scenario=scenario)
        summary = dedent(
            f"""\
            ## Operations Summary
 
            Shipment scenario:
            {scenario}
 
            Planning basis:
            {planning_brief[:900]}{"..." if len(planning_brief) > 900 else ""}
 
            Dispatch should proceed only after the required documents, carrier acceptance,
            pickup appointment, and risk mitigations are confirmed.
            """
        )
        log_stage("tool output", tool="draft_operations_summary", task_id=task_id, summary=summary)
        return summary
 
def draft_customer_update(
    scenario: str,
    operations_summary: str,
    task_id: str = "unknown",
    ctx: Any | None = None,
    _source_agent: str | None = None,
) -> str:
    """Draft a concise customer-facing shipment update."""
    task_id = _resolve_task_id(task_id, ctx)
    with start_span("customer_update_writer.tool.draft_customer_update", task_id=task_id):
        log_stage("tool input", tool="draft_customer_update", task_id=task_id, scenario=scenario)
        update = dedent(
            f"""\
            ## Customer Update
 
            We have reviewed the shipment request and identified a feasible execution path.
            The operations team is confirming carrier capacity, route milestones, and the
            required documentation before dispatch.
 
            Current status: planning complete; pending final pickup and document confirmation.
 
            Reference scenario:
            {scenario}
            """
        )
        log_stage("tool output", tool="draft_customer_update", task_id=task_id, update=update)
        return update
 
def finalize_shipment_execution_plan(
    scenario: str,
    operations_summary: str,
    customer_update: str,
    task_id: str = "unknown",
    ctx: Any | None = None,
    _source_agent: str | None = None,
) -> str:
    """Assemble the final shipment execution plan."""
    task_id = _resolve_task_id(task_id, ctx)
    with start_span("customer_update_writer.tool.finalize_shipment_execution_plan", task_id=task_id):
        log_stage("tool input", tool="finalize_shipment_execution_plan", task_id=task_id, scenario=scenario)
        final = dedent(
            f"""\
            # Shipment Execution Plan
 
            *Generated {datetime.now(timezone.utc).strftime("%B %d, %Y")}*
 
            ## Shipment Scenario
 
            {scenario}
 
            {operations_summary.strip()}
 
            ## Execution Checklist
 
            - Validate normalized shipment request.
            - Confirm carrier capacity and service level.
            - Confirm route milestones and delivery window.
            - Clear documentation and compliance blockers.
            - Monitor risk triggers and exception thresholds.
            - Send proactive customer updates at pickup, hub departure, out-for-delivery, and delivery.
 
            {customer_update.strip()}
            """
        ).strip()
        log_stage("tool output", tool="finalize_shipment_execution_plan", task_id=task_id, final=final)
        return final
 
DRAFT_OPERATIONS_SUMMARY_TOOL = AgentTool(
    name="DraftOperationsSummary",
    description=draft_operations_summary.__doc__ or "",
    func=draft_operations_summary,
    args_model=OperationsSummaryToolArgs,
)
DRAFT_CUSTOMER_UPDATE_TOOL = AgentTool(
    name="DraftCustomerUpdate",
    description=draft_customer_update.__doc__ or "",
    func=draft_customer_update,
    args_model=CustomerUpdateToolArgs,
)
FINALIZE_SHIPMENT_EXECUTION_PLAN_TOOL = AgentTool(
    name="FinalizeShipmentExecutionPlan",
    description=finalize_shipment_execution_plan.__doc__ or "",
    func=finalize_shipment_execution_plan,
    args_model=FinalExecutionPlanToolArgs,
)
 
def create_customer_update_writer_agent() -> DurableAgent:
    """Build the shipment communication writer agent with its tool chain and execution config."""
    return TracedCustomerUpdateWriterAgent(
        name="customer-update-writer-agent",
        role="Logistics Customer Communication Writer",
        goal="Turn a logistics planning brief into an execution plan and customer update.",
        instructions=[
            "Use exactly one tool call per assistant turn until the shipment execution plan is complete.",
            "Follow this state machine strictly:",
            "1. If DraftOperationsSummary has not returned, call DraftOperationsSummary with the scenario and planning_brief block from the user request.",
            "2. If DraftOperationsSummary has returned and DraftCustomerUpdate has not returned, call DraftCustomerUpdate with the scenario and exact operations summary.",
            "3. If DraftCustomerUpdate has returned and FinalizeShipmentExecutionPlan has not returned, call FinalizeShipmentExecutionPlan with the scenario, operations summary, and customer update.",
            "4. After FinalizeShipmentExecutionPlan returns, do not call another tool. Return the final execution plan exactly as the final answer.",
            "Do not skip steps, combine steps, reorder steps, or emit conversational commentary between tool calls.",
            "Do not include task_id in tool arguments.",
        ],
        tools=[
            DRAFT_OPERATIONS_SUMMARY_TOOL,
            DRAFT_CUSTOMER_UPDATE_TOOL,
            FINALIZE_SHIPMENT_EXECUTION_PLAN_TOOL,
        ],
        llm=DaprChatClient(component_name=CHAT_COMPONENT),
        runtime=create_workflow_runtime(),
        execution=AgentExecutionConfig(
            max_iterations=6,
            tool_choice="auto",
            tool_execution_mode=ToolExecutionMode.SEQUENTIAL,
        ),
    )
 
def main() -> None:
    runner = AgentRunner()
    customer_update_writer = create_customer_update_writer_agent()
 
    def before_run(body: dict[str, Any], requested_instance_id: str | None) -> None:
        log_stage(
            "run request received",
            task_id=requested_instance_id or "auto",
            scenario=body.get("scenario") if isinstance(body, dict) else None,
            body_keys=sorted(body.keys()) if isinstance(body, dict) else [],
        )
 
    def after_run(instance_id: str | None, requested_instance_id: str | None) -> None:
        log_stage("workflow scheduled", instance_id=instance_id or "", task_id=requested_instance_id or "auto")
 
    app = create_agent_app(
        runner=runner,
        agent=customer_update_writer,
        title="Customer Update Writer Agent Service",
        before_run=before_run,
        after_run=after_run,
    )
    run_agent_service(
        runner=runner,
        agent=customer_update_writer,
        app=app,
        port=AGENT_PORT,
        uvicorn_log_config=UVICORN_LOG_CONFIG,
        log_stage=log_stage,
        chat_component=CHAT_COMPONENT,
    )
 
if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        pass

shared

# agent_config.py
from __future__ import annotations
 
import copy
from dataclasses import dataclass
import logging
import os
import sys
import warnings
 
from dapr.conf import settings as dapr_settings
from dotenv import load_dotenv
import uvicorn
 
from shared.observability import configure_observability
 
@dataclass(frozen=True)
class AgentRuntimeConfig:
    agent_port: int
    chat_component: str
    logger: logging.Logger
    uvicorn_log_config: dict
 
def normalize_dapr_endpoint_environment() -> None:
    for name in ("DAPR_GRPC_ENDPOINT", "DAPR_HTTP_ENDPOINT"):
        value = os.getenv(name)
        if value:
            normalized_value = value.rstrip("/")
            os.environ[name] = normalized_value
            setattr(dapr_settings, name, normalized_value)
 
def configure_agent_environment(
    *,
    service_name: str,
    logger_name: str,
    default_port: int,
) -> AgentRuntimeConfig:
    load_dotenv()
    normalize_dapr_endpoint_environment()
    warnings.filterwarnings(
        "ignore",
        message="http and https schemes are deprecated for grpc.*",
        category=UserWarning,
        module="dapr.conf.helpers",
    )
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s %(levelname)s [%(name)s] %(message)s",
        stream=sys.stdout,
        force=True,
    )
    logger = logging.getLogger(logger_name)
    configure_observability(service_name)
 
    uvicorn_log_config = copy.deepcopy(uvicorn.config.LOGGING_CONFIG)
    uvicorn_log_config["handlers"]["default"]["stream"] = "ext://sys.stdout"
    uvicorn_log_config["handlers"]["access"]["stream"] = "ext://sys.stdout"
 
    return AgentRuntimeConfig(
        agent_port=int(os.getenv("AGENT_PORT", str(default_port))),
        chat_component=os.getenv("DAPR_CHAT_COMPONENT_NAME", "llama"),
        logger=logger,
        uvicorn_log_config=uvicorn_log_config,
    )
# agent_host.py
from __future__ import annotations
 
import asyncio
from collections.abc import Callable
from typing import Any
 
from fastapi import Body, FastAPI, HTTPException, Response
import uvicorn
 
from shared.observability import instrument_fastapi
 
def get_task_from_body(body: dict[str, Any] | None) -> str:
    task = body.get("task") if isinstance(body, dict) else None
    if not isinstance(task, str) or not task.strip():
        raise HTTPException(status_code=422, detail="A non-empty 'task' field is required.")
    return task.strip()
 
def get_requested_instance_id(body: dict[str, Any] | None) -> str | None:
    task_id = body.get("task_id") if isinstance(body, dict) else None
    if not isinstance(task_id, str) or not task_id.strip():
        return None
    return task_id.strip()
 
def workflow_status_url(instance_id: str) -> str:
    return f"/agent/instances/{instance_id}"
 
def create_agent_app(
    *,
    runner: Any,
    agent: Any,
    title: str,
    before_run: Callable[[dict[str, Any], str | None], None] | None = None,
    after_run: Callable[[str | None, str | None], None] | None = None,
) -> FastAPI:
    app = FastAPI(title=title, version="1.0.0")
    instrument_fastapi(app)
    runner.serve(agent, app=app, expose_entry=False)
 
    @app.get("/health")
    async def health() -> dict[str, str]:
        return {"status": "ok"}
 
    @app.post("/agent/run")
    async def start_workflow(body: dict[str, Any] = Body(default_factory=dict)) -> dict[str, str]:
        task = get_task_from_body(body)
        requested_instance_id = get_requested_instance_id(body)
        if before_run is not None:
            before_run(body, requested_instance_id)
 
        instance_id = await runner.run(
            agent,
            payload={"task": task},
            instance_id=requested_instance_id,
            wait=False,
            log=True,
        )
        if after_run is not None:
            after_run(instance_id, requested_instance_id)
 
        return {
            "instance_id": instance_id or "",
            "status_url": workflow_status_url(instance_id or ""),
        }
 
    @app.get("/agent/instances/__readiness_probe__", status_code=204)
    async def readiness_probe() -> Response:
        return Response(status_code=204)
 
    @app.get("/agent/instances/{instance_id}")
    async def get_status(instance_id: str) -> dict[str, Any]:
        workflow_state = await asyncio.to_thread(
            runner._wf_client.get_workflow_state,
            instance_id,
            fetch_payloads=True,
        )
        if workflow_state is None:
            raise HTTPException(status_code=404, detail="Workflow instance not found.")
 
        payload = workflow_state.to_json()
        payload["runtime_status"] = getattr(
            workflow_state.runtime_status,
            "name",
            str(workflow_state.runtime_status),
        )
        for field in ("created_at", "last_updated_at"):
            ts = payload.get(field)
            if ts:
                payload[field] = ts.isoformat()
        return payload
 
    return app
 
def run_agent_service(
    *,
    runner: Any,
    agent: Any,
    app: FastAPI,
    port: int,
    uvicorn_log_config: dict,
    log_stage: Callable[..., None],
    chat_component: str,
) -> None:
    try:
        log_stage("starting", port=port, chat_component=chat_component)
        uvicorn.run(
            app,
            host="0.0.0.0",
            port=port,
            log_level="info",
            log_config=uvicorn_log_config,
        )
    finally:
        log_stage("shutting down", agent=agent.name)
        runner.shutdown(agent)
# json_payloads.py
from __future__ import annotations
 
import json
import re
from typing import Any
 
def loads_object(payload: Any, field_name: str) -> dict[str, Any]:
    if isinstance(payload, dict):
        return unwrap_tool_argument(payload, field_name)
 
    for candidate in json_candidates(payload):
        try:
            value = json.loads(candidate)
        except json.JSONDecodeError:
            continue
        if isinstance(value, dict):
            return unwrap_tool_argument(value, field_name)
        raise ValueError(f"{field_name} must be a JSON object.")
 
    return {"raw_payload": payload}
 
def json_candidates(payload: Any) -> list[str]:
    if not isinstance(payload, str):
        payload = json.dumps(payload, default=str)
    normalized = repair_json_text(payload)
    candidates = [normalized]
    extracted = extract_first_json_object(normalized)
    if extracted is not None:
        candidates.append(repair_json_text(extracted))
    return candidates
 
def unwrap_tool_argument(value: dict[str, Any], field_name: str) -> dict[str, Any]:
    direct_value = value.get(field_name)
    if isinstance(direct_value, dict):
        return direct_value
    if isinstance(direct_value, str) and direct_value.strip():
        return loads_object(direct_value, field_name)
 
    parameters = value.get("parameters")
    if isinstance(parameters, dict):
        parameter_value = parameters.get(field_name)
        if isinstance(parameter_value, dict):
            return parameter_value
        if isinstance(parameter_value, str) and parameter_value.strip():
            return loads_object(parameter_value, field_name)
 
    return value
 
def repair_json_text(payload: str) -> str:
    normalized = payload.translate(str.maketrans({"“": '"', "”": '"', "«": '"', "»": '"'}))
    return re.sub(r'""(?=\s*[,}])', '"', normalized)
 
def extract_first_json_object(payload: str) -> str | None:
    start = payload.find("{")
    if start < 0:
        return None
    depth = 0
    in_string = False
    escaped = False
    for index, character in enumerate(payload[start:], start=start):
        if in_string:
            if escaped:
                escaped = False
            elif character == "\\":
                escaped = True
            elif character == '"':
                in_string = False
            continue
        if character == '"':
            in_string = True
        elif character == "{":
            depth += 1
        elif character == "}":
            depth -= 1
            if depth == 0:
                return payload[start:index + 1]
    return None
# logging_utils.py
from __future__ import annotations
 
from collections.abc import Callable
import json
import logging
from typing import Any
 
def preview(value: Any, limit: int = 800) -> str:
    text = value if isinstance(value, str) else json.dumps(value, default=str)
    text = text.replace("\n", "\\n")
    return text if len(text) <= limit else f"{text[:limit]}...<truncated {len(text) - limit} chars>"
 
def create_stage_logger(logger: logging.Logger, prefix: str) -> Callable[..., None]:
    def log_stage(stage: str, **values: Any) -> None:
        details = " ".join(f"{key}={preview(value)}" for key, value in values.items())
        logger.debug("[%s] %s %s", prefix, stage, details)
 
    return log_stage
# observability.py
from __future__ import annotations
 
import logging
import os
from collections.abc import Mapping
from contextvars import ContextVar
from typing import Any
 
from fastapi import FastAPI, Request
from opentelemetry import propagate, trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import SpanKind
from opentelemetry.trace.status import Status, StatusCode
 
_configured = False
_tool_trace_context: ContextVar[dict[str, str]] = ContextVar("tool_trace_context", default={})
 
def configure_observability(service_name: str) -> None:
    global _configured
    if _configured:
        return
 
    provider = TracerProvider(resource=Resource.create({"service.name": service_name}))
    if os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT"):
        provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
 
    trace.set_tracer_provider(provider)
    logging.getLogger(__name__).info("OpenTelemetry configured for %s", service_name)
    _configured = True
 
def instrument_fastapi(app: FastAPI) -> None:
    @app.middleware("http")
    async def trace_requests(request: Request, call_next):
        parent = propagate.extract(dict(request.headers))
        route = request.scope.get("route")
        route_path = getattr(route, "path", request.url.path)
        span_name = f"{request.method} {route_path}"
        tracer = trace.get_tracer(__name__)
        with tracer.start_as_current_span(
            span_name,
            context=parent,
            kind=SpanKind.SERVER,
            attributes={
                "http.request.method": request.method,
                "url.path": request.url.path,
                "server.address": request.url.hostname or "",
            },
        ) as span:
            try:
                response = await call_next(request)
            except Exception as exc:
                span.record_exception(exc)
                span.set_status(Status(StatusCode.ERROR, str(exc)))
                raise
            span.set_attribute("http.response.status_code", response.status_code)
            return response
 
def current_trace_context() -> dict[str, str]:
    carrier: dict[str, str] = {}
    propagate.inject(carrier)
    return carrier
 
def set_tool_trace_context(trace_context: Mapping[str, str] | None):
    return _tool_trace_context.set(dict(trace_context or {}))
 
def reset_tool_trace_context(token: object) -> None:
    _tool_trace_context.reset(token)
 
def start_span(
    name: str,
    trace_context: Mapping[str, str] | None = None,
    **attributes: Any,
):
    parent = propagate.extract(dict(trace_context if trace_context is not None else _tool_trace_context.get()))
    tracer = trace.get_tracer(__name__)
    return tracer.start_as_current_span(
        name,
        context=parent,
        kind=SpanKind.INTERNAL,
        attributes={key: value for key, value in attributes.items() if value is not None},
    )
 
def record_workflow_event(
    name: str,
    trace_context: Mapping[str, str] | None = None,
    status: Status | None = None,
    **attributes: Any,
) -> None:
    with start_span(name, trace_context, **attributes) as span:
        if status is not None:
            span.set_status(status)
# tool_recovery.py
from __future__ import annotations
 
import json
import re
from typing import Any
 
def message_to_dict(message: Any) -> dict[str, Any]:
    if hasattr(message, "model_dump"):
        return message.model_dump()
    if isinstance(message, dict):
        return dict(message)
    if hasattr(message, "__dict__"):
        return dict(message.__dict__)
    return {}
 
def get_last_tool_content(entry: Any, tool_name: str) -> str | None:
    for message in reversed(getattr(entry, "messages", []) or []):
        message_dict = message_to_dict(message)
        if message_dict.get("role") == "tool" and message_dict.get("name") == tool_name:
            tool_content = message_dict.get("content")
            if isinstance(tool_content, str) and tool_content.strip():
                return tool_content
    return None
 
def replace_response_with_tool_output(
    response: dict[str, Any],
    tool_output: str | None,
    agent_name: str,
    logger: Any,
) -> dict[str, Any]:
    if tool_output is None:
        return response
    if response.get("tool_calls") or response.get("content") != tool_output:
        logger.warning("Replacing final %s LLM response with deterministic tool output.", agent_name)
        return {
            **response,
            "role": response.get("role", "assistant"),
            "content": tool_output,
            "tool_calls": None,
        }
    return response
 
def strip_agent_markup(raw: str) -> str:
    stripped = raw.strip()
    python_tag = "<|python_tag|>"
    if stripped.startswith(python_tag):
        stripped = stripped[len(python_tag):].strip()
    fenced_match = re.fullmatch(r"```(?:json|python)?\s*(.*?)\s*```", stripped, flags=re.DOTALL)
    if fenced_match:
        stripped = fenced_match.group(1).strip()
    return stripped
 
def looks_like_tool_call_content(content: Any, tool_name: str) -> bool:
    if not isinstance(content, str):
        return False
    return tool_name in content and any(
        marker in content for marker in ('"parameters"', "'parameters'", '"arguments"', "'arguments'", "(")
    )
 
def tool_call_content_from_response(response: dict[str, Any], tool_name: str) -> str | None:
    content = response.get("content")
    if looks_like_tool_call_content(content, tool_name):
        return str(content)
 
    tool_calls = response.get("tool_calls")
    if not isinstance(tool_calls, list):
        return None
 
    for tool_call in tool_calls:
        parameters = tool_call_parameters_from_tool_call(tool_call, tool_name)
        if parameters is not None:
            return json.dumps({"name": tool_name, "parameters": parameters})
 
    return None
 
def tool_call_parameters_from_tool_call(tool_call: Any, tool_name: str) -> dict[str, Any] | None:
    if not isinstance(tool_call, dict):
        return None
 
    name = tool_call.get("name")
    arguments = tool_call.get("arguments") or tool_call.get("args") or tool_call.get("parameters")
    function = tool_call.get("function")
    if isinstance(function, dict):
        name = function.get("name") or name
        arguments = function.get("arguments") or function.get("parameters") or arguments
 
    if name != tool_name:
        return None
 
    if isinstance(arguments, dict):
        return arguments
    if isinstance(arguments, str) and arguments.strip():
        try:
            parsed = json.loads(strip_agent_markup(arguments))
        except json.JSONDecodeError:
            return None
        return parsed if isinstance(parsed, dict) else None
    return {}
# workflow_runtime.py
from __future__ import annotations
 
import logging
import sys
 
import dapr.ext.workflow as wf
from dapr.ext.workflow.logger import LoggerOptions
 
class DurableTaskNoiseFilter(logging.Filter):
    _ignored_messages = (
        "Ignoring unexpected taskCompleted event with ID",
        "Ignoring unexpected timerFired event with ID",
    )
 
    def filter(self, record: logging.LogRecord) -> bool:
        if record.name != "durabletask-worker":
            return True
 
        message = record.getMessage()
        return not any(ignored in message for ignored in self._ignored_messages)
 
def configure_workflow_loggers() -> None:
    for logger_name in ("WorkflowRuntime", "durabletask-worker"):
        logging.getLogger(logger_name).propagate = False
 
def create_workflow_runtime() -> wf.WorkflowRuntime:
    configure_workflow_loggers()
    handler = logging.StreamHandler(sys.stdout)
    handler.addFilter(DurableTaskNoiseFilter())
    return wf.WorkflowRuntime(
        logger_options=LoggerOptions(log_handler=handler)
    )

Each agent is implemented as a Dapr Agents DurableAgent. Within each agent, the LLM is accessed using DaprChatClient, where the default chat component is llama. Tool execution is configured with ToolExecutionMode.SEQUENTIAL, and the agent instructions explicitly require the LLM to invoke a deterministic tool and return the tool output as the final response. This architecture introduces two levels of sequencing: parent workflow sequencing, where the orchestrator executes agent services sequentially, and agent-internal sequencing, where each DurableAgent processes its tool calls sequentially.

Api Service (AspireWithDaprAgents.ApiService)

The Api Service is backend gateway between the Blazor UI and the logistics workflow. It exposes POST /agent-tasks and GET /readiness; validates scenarios, waits for orchestrator readiness, invokes /agent/run through Dapr service invocation, polls /agent/instances/{id}, renders usable workflow output, and returns the final execution plan.

// Program.cs
var builder = WebApplication.CreateBuilder(args);
 
builder.AddServiceDefaults();
builder.Services.AddProblemDetails();
builder.Services.AddOpenApi();
builder.Services.AddSingleton<ShipmentTaskCoordinator>();
builder.Services.AddHostedService(sp => sp.GetRequiredService<ShipmentTaskCoordinator>());
#pragma warning disable EXTEXP0001
builder.Services.AddHttpClient<DaprAgentClient>(client =>
    {
        client.Timeout = TimeSpan.FromSeconds(45);
    })
    .RemoveAllResilienceHandlers();
#pragma warning restore EXTEXP0001
 
var app = builder.Build();
 
app.UseExceptionHandler();
 
if (app.Environment.IsDevelopment())
{
    app.MapOpenApi();
}
 
app.MapGet("/", () => "API service is running.");
 
app.MapGet("/readiness", async (
    DaprAgentClient agentClient,
    CancellationToken cancellationToken) =>
{
    var readiness = await agentClient.CheckReadinessAsync(cancellationToken);
    return readiness.Ready
        ? Results.Ok(readiness)
        : Results.Json(readiness, statusCode: StatusCodes.Status503ServiceUnavailable);
})
.WithName("Readiness");
 
app.MapPost("/agent-tasks", async (
    SubmitAgentTaskRequest request,
    DaprAgentClient agentClient,
    ShipmentTaskCoordinator taskCoordinator,
    ILogger<Program> logger,
    CancellationToken cancellationToken) =>
{
    var scenario = request.Scenario?.Trim();
    if (string.IsNullOrWhiteSpace(scenario))
    {
        return Results.ValidationProblem(new Dictionary<string, string[]>
        {
            [nameof(request.Scenario)] = ["A shipment scenario is required."]
        });
    }
 
    var readiness = await agentClient.CheckReadinessAsync(cancellationToken);
    if (!readiness.Ready)
    {
        return Results.Json(readiness, statusCode: StatusCodes.Status503ServiceUnavailable);
    }
 
    var taskId = string.IsNullOrWhiteSpace(request.TaskId)
        ? Guid.NewGuid().ToString("n")
        : request.TaskId.Trim();
 
    logger.LogInformation("Submitting shipment scenario to agent workflow with taskId {TaskId}", taskId);
    taskCoordinator.Enqueue(taskId, scenario);
    return Results.Accepted(
        $"/agent-tasks/{taskId}",
        new SubmitAgentTaskResponse(
            taskId,
            scenario,
            $"/agent-tasks/{taskId}",
            DateTimeOffset.UtcNow));
})
.WithName("SubmitAgentTask");
 
app.MapGet("/agent-tasks/{taskId}", async (
    string taskId,
    DaprAgentClient agentClient,
    ShipmentTaskCoordinator taskCoordinator,
    CancellationToken cancellationToken) =>
{
    if (string.IsNullOrWhiteSpace(taskId))
    {
        return Results.ValidationProblem(new Dictionary<string, string[]>
        {
            [nameof(taskId)] = ["A task ID is required."]
        });
    }
 
    try
    {
        var normalizedTaskId = taskId.Trim();
        if (taskCoordinator.TryGet(normalizedTaskId, out var record) &&
            !string.Equals(record.RuntimeStatus, "RUNNING", StringComparison.OrdinalIgnoreCase))
        {
            return Results.Ok(new ShipmentTaskStatusResponse(
                normalizedTaskId,
                record.RuntimeStatus,
                null,
                record.UpdatedAt,
                record.FailureDetails));
        }
 
        var status = await agentClient.GetShipmentWorkflowStatusAsync(normalizedTaskId, cancellationToken);
        return Results.Ok(new ShipmentTaskStatusResponse(
            normalizedTaskId,
            status.RuntimeStatus,
            status.ExecutionPlan,
            DateTimeOffset.UtcNow,
            status.FailureDetails));
    }
    catch (DaprInvocationUnavailableException ex)
    {
        return Results.Problem(
            title: "Shipment orchestrator status is temporarily unavailable.",
            detail: ex.Message,
            statusCode: StatusCodes.Status503ServiceUnavailable);
    }
})
.WithName("GetAgentTaskStatus");
 
app.MapDefaultEndpoints();
 
app.Run();
// Contracts.cs
using System.Text.Json.Serialization;
 
sealed class DaprInvocationUnavailableException(string message) : InvalidOperationException(message);
 
record SubmitAgentTaskRequest(string Scenario, string? TaskId = null);
 
record AgentRunRequest(
    [property: JsonPropertyName("task_id")] string TaskId,
    string Task,
    [property: JsonPropertyName("trace_context")] Dictionary<string, string> TraceContext);
 
record AgentRunStartResponse(
    [property: JsonPropertyName("instance_id")] string InstanceId,
    [property: JsonPropertyName("status_url")] string StatusUrl);
 
record SubmitAgentTaskResponse(
    string TaskId,
    string Scenario,
    string StatusUrl,
    DateTimeOffset AcceptedAt);
 
record ShipmentTaskStatusResponse(
    string TaskId,
    string RuntimeStatus,
    string? ExecutionPlan,
    DateTimeOffset CheckedAt,
    string? FailureDetails);
 
record ReadinessResponse(
    bool Ready,
    DateTimeOffset CheckedAt,
    IReadOnlyList<DependencyReadiness> Dependencies);
 
record DependencyReadiness(
    string Name,
    bool Ready,
    string Detail);
 
record ShipmentWorkflowStatus(
    string RuntimeStatus,
    string? ExecutionPlan,
    string? FailureDetails);
// DaprAgentClient.cs
using System.Net;
using System.Net.Http.Json;
using System.Text.Json;
 
sealed class DaprAgentClient(HttpClient httpClient, IConfiguration configuration, ILogger<DaprAgentClient> logger)
{
    private const int InvokeAttemptCount = 8;
    private const int ReadinessAttemptCount = 10;
    private const string DefaultOrchestratorAppId = "orchestrator";
    private const string DaprApiTokenHeader = "dapr-api-token";
    private static readonly TimeSpan WorkflowStartTimeout = TimeSpan.FromSeconds(20);
    private static readonly TimeSpan WorkflowCompletionTimeout = TimeSpan.FromMinutes(22);
    private readonly JsonSerializerOptions _serializerOptions = new(JsonSerializerDefaults.Web);
 
    public async Task<ReadinessResponse> CheckReadinessAsync(CancellationToken cancellationToken)
    {
        return await CheckOrchestratorReadinessAsync(GetOrchestratorAppId(), cancellationToken);
    }
 
    public async Task<ReadinessResponse> WaitForReadinessAsync(CancellationToken cancellationToken)
    {
        var appId = GetOrchestratorAppId();
        ReadinessResponse readiness = new(false, DateTimeOffset.UtcNow, []);
 
        for (var attempt = 1; attempt <= ReadinessAttemptCount; attempt++)
        {
            readiness = await CheckOrchestratorReadinessAsync(appId, cancellationToken);
            if (readiness.Ready)
            {
                return readiness;
            }
 
            if (attempt < ReadinessAttemptCount)
            {
                logger.LogWarning(
                    "Readiness check for {AppId} failed on attempt {Attempt}/{TotalAttempts}. Retrying.",
                    appId,
                    attempt,
                    ReadinessAttemptCount);
                await Task.Delay(GetRetryDelay(attempt), cancellationToken);
            }
        }
 
        return readiness;
    }
 
    public async Task<string> RunShipmentWorkflowAsync(
        string taskId,
        string scenario,
        CancellationToken cancellationToken)
    {
        var startResponse = await StartShipmentWorkflowAsync(taskId, scenario, cancellationToken);
        var appId = GetOrchestratorAppId();
 
        logger.LogInformation("Agent workflow started instance {InstanceId}", startResponse.InstanceId);
 
        using var result = await PollForCompletionAsync(appId, startResponse.StatusUrl, cancellationToken);
        return ShipmentWorkflowOutputRenderer.ExtractOutput(result.RootElement);
    }
 
    public async Task<AgentRunStartResponse> StartShipmentWorkflowAsync(
        string taskId,
        string scenario,
        CancellationToken cancellationToken)
    {
        var appId = GetOrchestratorAppId();
        var payload = new AgentRunRequest(taskId, scenario, TraceContext.Current());
        using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
        timeoutCts.CancelAfter(WorkflowStartTimeout);
 
        try
        {
            return await InvokeAgentRunAsync(appId, payload, timeoutCts.Token);
        }
        catch (TaskCanceledException ex) when (!cancellationToken.IsCancellationRequested)
        {
            throw new DaprInvocationUnavailableException(
                $"Dapr invocation to '{appId}' did not start workflow '{taskId}' within {WorkflowStartTimeout.TotalSeconds:N0} seconds.")
            {
                Source = ex.Source,
            };
        }
    }
 
    public async Task<ShipmentWorkflowStatus> GetShipmentWorkflowStatusAsync(
        string taskId,
        CancellationToken cancellationToken)
    {
        var appId = GetOrchestratorAppId();
        var statusUri = BuildInvokeUri(appId, $"/agent/instances/{taskId}");
        ApplyDaprApiToken();
 
        using var response = await httpClient.GetAsync(statusUri, cancellationToken);
        if (response.StatusCode == HttpStatusCode.NotFound)
        {
            return new ShipmentWorkflowStatus("PENDING", null, null);
        }
 
        if (!response.IsSuccessStatusCode)
        {
            throw CreateInvocationUnavailableException(
                appId,
                statusUri,
                response.StatusCode,
                response.ReasonPhrase,
                await ReadResponseBodyAsync(response, cancellationToken));
        }
 
        using var payload = await JsonDocument.ParseAsync(
            await response.Content.ReadAsStreamAsync(cancellationToken),
            cancellationToken: cancellationToken);
        var runtimeStatus = payload.RootElement.TryGetProperty("runtime_status", out var statusValue)
            ? statusValue.GetString() ?? "UNKNOWN"
            : "UNKNOWN";
 
        var executionPlan = string.Equals(runtimeStatus, "COMPLETED", StringComparison.OrdinalIgnoreCase)
            ? ShipmentWorkflowOutputRenderer.ExtractOutput(payload.RootElement)
            : null;
        var failureDetails = string.Equals(runtimeStatus, "FAILED", StringComparison.OrdinalIgnoreCase) ||
            string.Equals(runtimeStatus, "TERMINATED", StringComparison.OrdinalIgnoreCase)
                ? ShipmentWorkflowOutputRenderer.ExtractFailureDetails(payload.RootElement)
                : null;
 
        return new ShipmentWorkflowStatus(runtimeStatus, executionPlan, failureDetails);
    }
 
    private string GetOrchestratorAppId() => configuration["AGENT_APP_ID"] ?? DefaultOrchestratorAppId;
 
    private async Task<AgentRunStartResponse> InvokeAgentRunAsync(
        string appId,
        AgentRunRequest payload,
        CancellationToken cancellationToken)
    {
        var invokeUri = BuildInvokeUri(appId, "/agent/run");
        Exception? lastTransientFailure = null;
        string? lastTransientBody = null;
 
        for (var attempt = 1; attempt <= InvokeAttemptCount; attempt++)
        {
            ApplyDaprApiToken();
 
            try
            {
                using var response = await httpClient.PostAsJsonAsync(invokeUri, payload, cancellationToken);
                if (response.IsSuccessStatusCode)
                {
                    var startResponse = await response.Content.ReadFromJsonAsync<AgentRunStartResponse>(_serializerOptions, cancellationToken);
                    if (startResponse is null || string.IsNullOrWhiteSpace(startResponse.InstanceId) || string.IsNullOrWhiteSpace(startResponse.StatusUrl))
                    {
                        throw new InvalidOperationException("Agent start response was incomplete.");
                    }
 
                    return startResponse;
                }
 
                var body = await ReadResponseBodyAsync(response, cancellationToken);
                if (IsWorkflowStartFailure(body))
                {
                    throw CreateInvocationUnavailableException(appId, invokeUri, response.StatusCode, response.ReasonPhrase, body);
                }
 
                if (IsTransientInvocationFailure(response.StatusCode, body) && attempt < InvokeAttemptCount)
                {
                    lastTransientBody = body;
                    lastTransientFailure = new InvalidOperationException(
                        $"Dapr invocation to '{appId}' returned HTTP {(int)response.StatusCode} ({response.ReasonPhrase ?? "unknown"}).");
 
                    logger.LogWarning(
                        "Dapr invoke to {AppId} returned HTTP {StatusCode} on attempt {Attempt}/{TotalAttempts}. Rechecking readiness before retrying.",
                        appId,
                        (int)response.StatusCode,
                        attempt,
                        InvokeAttemptCount);
 
                    await CheckOrchestratorReadinessAsync(appId, cancellationToken);
                    await Task.Delay(GetRetryDelay(attempt), cancellationToken);
                    continue;
                }
 
                throw CreateInvocationUnavailableException(appId, invokeUri, response.StatusCode, response.ReasonPhrase, body);
            }
            catch (HttpRequestException ex) when (attempt < InvokeAttemptCount)
            {
                lastTransientFailure = ex;
                logger.LogWarning(
                    ex,
                    "Dapr invoke to {AppId} failed with a transport error on attempt {Attempt}/{TotalAttempts}. Rechecking readiness before retrying.",
                    appId,
                    attempt,
                    InvokeAttemptCount);
 
                await CheckOrchestratorReadinessAsync(appId, cancellationToken);
                await Task.Delay(GetRetryDelay(attempt), cancellationToken);
            }
            catch (TaskCanceledException ex) when (!cancellationToken.IsCancellationRequested && attempt < InvokeAttemptCount)
            {
                lastTransientFailure = ex;
                logger.LogWarning(
                    ex,
                    "Dapr invoke to {AppId} timed out on attempt {Attempt}/{TotalAttempts}. Rechecking readiness before retrying.",
                    appId,
                    attempt,
                    InvokeAttemptCount);
 
                await CheckOrchestratorReadinessAsync(appId, cancellationToken);
                await Task.Delay(GetRetryDelay(attempt), cancellationToken);
            }
        }
 
        throw new DaprInvocationUnavailableException(
            CreateUnavailableMessage(appId, invokeUri, lastTransientFailure, lastTransientBody));
    }
 
    private async Task<ReadinessResponse> CheckOrchestratorReadinessAsync(
        string appId,
        CancellationToken cancellationToken)
    {
        var invokeUri = BuildInvokeUri(appId, "/agent/readiness");
 
        try
        {
            ApplyDaprApiToken();
            using var response = await httpClient.GetAsync(invokeUri, cancellationToken);
            if (response.IsSuccessStatusCode)
            {
                var readiness = await response.Content.ReadFromJsonAsync<ReadinessResponse>(_serializerOptions, cancellationToken);
                return readiness ?? new ReadinessResponse(false, DateTimeOffset.UtcNow,
                    [new DependencyReadiness(appId, false, "Readiness response was empty.")]);
            }
 
            var body = await response.Content.ReadAsStringAsync(cancellationToken);
            return new ReadinessResponse(false, DateTimeOffset.UtcNow,
                [new DependencyReadiness(appId, false, $"HTTP {(int)response.StatusCode}: {body}")]);
        }
        catch (Exception ex) when (ex is HttpRequestException or TaskCanceledException)
        {
            logger.LogWarning(ex, "Readiness check failed for Dapr app {AppId}.", appId);
            return new ReadinessResponse(false, DateTimeOffset.UtcNow,
                [new DependencyReadiness(appId, false, ex.Message)]);
        }
    }
 
    private async Task<JsonDocument> PollForCompletionAsync(
        string appId,
        string statusPath,
        CancellationToken cancellationToken)
    {
        using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
        timeoutCts.CancelAfter(WorkflowCompletionTimeout);
 
        var maxAttempts = (int)WorkflowCompletionTimeout.TotalSeconds;
        for (var attempt = 1; attempt <= maxAttempts; attempt++)
        {
            await Task.Delay(TimeSpan.FromSeconds(1), timeoutCts.Token);
            ApplyDaprApiToken();
 
            var statusUri = BuildInvokeUri(appId, statusPath);
            using var response = await httpClient.GetAsync(statusUri, timeoutCts.Token);
            if (!response.IsSuccessStatusCode)
            {
                if (IsTransientDaprStatus(response.StatusCode))
                {
                    logger.LogWarning(
                        "Dapr status poll for {AppId} returned HTTP {StatusCode} on attempt {Attempt}/{TotalAttempts}. Retrying.",
                        appId,
                        (int)response.StatusCode,
                        attempt,
                        maxAttempts);
                    continue;
                }
 
                throw CreateInvocationUnavailableException(
                    appId,
                    statusUri,
                    response.StatusCode,
                    response.ReasonPhrase,
                    await ReadResponseBodyAsync(response, timeoutCts.Token));
            }
 
            var payload = await JsonDocument.ParseAsync(
                await response.Content.ReadAsStreamAsync(timeoutCts.Token),
                cancellationToken: timeoutCts.Token);
            var runtimeStatus = payload.RootElement.TryGetProperty("runtime_status", out var statusValue)
                ? statusValue.GetString()
                : null;
 
            logger.LogDebug("Agent {AppId} poll {Attempt} runtime_status={RuntimeStatus}", appId, attempt, runtimeStatus);
 
            if (string.Equals(runtimeStatus, "COMPLETED", StringComparison.OrdinalIgnoreCase))
            {
                return payload;
            }
 
            if (string.Equals(runtimeStatus, "FAILED", StringComparison.OrdinalIgnoreCase) ||
                string.Equals(runtimeStatus, "TERMINATED", StringComparison.OrdinalIgnoreCase))
            {
                throw new InvalidOperationException(
                    $"Agent {appId} ended with status {runtimeStatus}. {ShipmentWorkflowOutputRenderer.ExtractFailureDetails(payload.RootElement)}");
            }
 
            payload.Dispose();
        }
 
        throw new TimeoutException($"Agent {appId} did not complete within the allotted timeout.");
    }
 
    private void ApplyDaprApiToken()
    {
        var daprApiToken = configuration["DAPR_API_TOKEN"];
        if (string.IsNullOrWhiteSpace(daprApiToken))
        {
            return;
        }
 
        httpClient.DefaultRequestHeaders.Remove(DaprApiTokenHeader);
        httpClient.DefaultRequestHeaders.TryAddWithoutValidation(DaprApiTokenHeader, daprApiToken);
    }
 
    private Uri BuildInvokeUri(string appId, string methodPath)
    {
        var normalizedPath = methodPath.StartsWith('/') ? methodPath : $"/{methodPath}";
        var daprHttpEndpoint = configuration["DAPR_HTTP_ENDPOINT"];
        if (!string.IsNullOrWhiteSpace(daprHttpEndpoint))
        {
            return new Uri($"{daprHttpEndpoint.TrimEnd('/')}/v1.0/invoke/{appId}/method{normalizedPath}");
        }
 
        var daprHttpPort = configuration["DAPR_HTTP_PORT"];
        if (string.IsNullOrWhiteSpace(daprHttpPort))
        {
            throw new InvalidOperationException("DAPR_HTTP_ENDPOINT or DAPR_HTTP_PORT is not configured.");
        }
 
        return new Uri($"http://localhost:{daprHttpPort}/v1.0/invoke/{appId}/method{normalizedPath}");
    }
 
    private static bool IsTransientDaprStatus(HttpStatusCode statusCode) =>
        statusCode is HttpStatusCode.BadGateway or
            HttpStatusCode.ServiceUnavailable or
            HttpStatusCode.GatewayTimeout or
            HttpStatusCode.InternalServerError;
 
    private static bool IsTransientInvocationFailure(HttpStatusCode statusCode, string? body) =>
        IsTransientDaprStatus(statusCode) ||
        (body is not null &&
            (body.Contains("EOF", StringComparison.OrdinalIgnoreCase) ||
             body.Contains("connection reset", StringComparison.OrdinalIgnoreCase) ||
             body.Contains("upstream", StringComparison.OrdinalIgnoreCase)));
 
    private static bool IsWorkflowStartFailure(string? body) =>
        body is not null &&
        (body.Contains("ERR_START_WORKFLOW", StringComparison.OrdinalIgnoreCase) ||
         body.Contains("StartWorkflow", StringComparison.OrdinalIgnoreCase) ||
         body.Contains("Deadline Exceeded", StringComparison.OrdinalIgnoreCase) ||
         (body.Contains("workflow", StringComparison.OrdinalIgnoreCase) &&
          body.Contains("context canceled", StringComparison.OrdinalIgnoreCase)));
 
    private static TimeSpan GetRetryDelay(int attempt) =>
        TimeSpan.FromSeconds(Math.Min(5, attempt));
 
    private static async Task<string> ReadResponseBodyAsync(
        HttpResponseMessage response,
        CancellationToken cancellationToken)
    {
        try
        {
            return await response.Content.ReadAsStringAsync(cancellationToken);
        }
        catch
        {
            return string.Empty;
        }
    }
 
    private static DaprInvocationUnavailableException CreateInvocationUnavailableException(
        string appId,
        Uri requestUri,
        HttpStatusCode statusCode,
        string? reasonPhrase,
        string? body)
    {
        var message = $"Dapr invocation to '{appId}' failed with HTTP {(int)statusCode} ({reasonPhrase ?? "unknown"}). Request URI: {requestUri}.";
        if (!string.IsNullOrWhiteSpace(body))
        {
            message += $" Response body: {body.Trim()}";
        }
 
        return new DaprInvocationUnavailableException(message);
    }
 
    private static string CreateUnavailableMessage(
        string appId,
        Uri requestUri,
        Exception? lastTransientFailure,
        string? lastTransientBody)
    {
        var message = $"Dapr invocation to '{appId}' did not become available after {InvokeAttemptCount} attempts. Request URI: {requestUri}.";
        if (lastTransientFailure is not null)
        {
            message += $" Last error: {lastTransientFailure.Message}";
        }
 
        if (!string.IsNullOrWhiteSpace(lastTransientBody))
        {
            message += $" Response body: {lastTransientBody.Trim()}";
        }
 
        return message;
    }
}
// ShipmentTaskCoordinator.cs
using System.Collections.Concurrent;
using System.Threading.Channels;
 
sealed class ShipmentTaskCoordinator(
    IServiceScopeFactory scopeFactory,
    ILogger<ShipmentTaskCoordinator> logger) : BackgroundService
{
    private readonly Channel<ShipmentStartRequest> _queue = Channel.CreateUnbounded<ShipmentStartRequest>(
        new UnboundedChannelOptions { SingleReader = true });
    private readonly ConcurrentDictionary<string, ShipmentTaskRecord> _tasks = [];
 
    public ShipmentTaskRecord Enqueue(string taskId, string scenario)
    {
        var record = new ShipmentTaskRecord(taskId, scenario, "STARTING", null, null, DateTimeOffset.UtcNow);
        _tasks[taskId] = record;
        _queue.Writer.TryWrite(new ShipmentStartRequest(taskId, scenario));
        return record;
    }
 
    public bool TryGet(string taskId, out ShipmentTaskRecord record) =>
        _tasks.TryGetValue(taskId, out record!);
 
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await foreach (var request in _queue.Reader.ReadAllAsync(stoppingToken))
        {
            await StartWorkflowAsync(request, stoppingToken);
        }
    }
 
    private async Task StartWorkflowAsync(ShipmentStartRequest request, CancellationToken stoppingToken)
    {
        try
        {
            using var scope = scopeFactory.CreateScope();
            var agentClient = scope.ServiceProvider.GetRequiredService<DaprAgentClient>();
            var start = await agentClient.StartShipmentWorkflowAsync(request.TaskId, request.Scenario, stoppingToken);
            _tasks[request.TaskId] = new ShipmentTaskRecord(
                request.TaskId,
                request.Scenario,
                "RUNNING",
                start.StatusUrl,
                null,
                DateTimeOffset.UtcNow);
            logger.LogInformation("Shipment workflow start accepted for taskId {TaskId}", request.TaskId);
        }
        catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
        {
        }
        catch (Exception ex)
        {
            _tasks[request.TaskId] = new ShipmentTaskRecord(
                request.TaskId,
                request.Scenario,
                "START_FAILED",
                null,
                ex.Message,
                DateTimeOffset.UtcNow);
            logger.LogWarning(ex, "Shipment workflow start failed for taskId {TaskId}", request.TaskId);
        }
    }
 
    private sealed record ShipmentStartRequest(string TaskId, string Scenario);
}
 
sealed record ShipmentTaskRecord(
    string TaskId,
    string Scenario,
    string RuntimeStatus,
    string? StatusUrl,
    string? FailureDetails,
    DateTimeOffset UpdatedAt);
// ShipmentWorkflowOutputRenderer.cs
using System.Text.Json;
using System.Text.RegularExpressions;
 
static class ShipmentWorkflowOutputRenderer
{
    private const string ExecutionChecklist = """
        ## Execution Checklist
 
        - Validate normalized shipment request.
        - Confirm carrier capacity and service level.
        - Confirm route milestones and delivery window.
        - Clear documentation and compliance blockers.
        - Monitor risk triggers and exception thresholds.
        - Send proactive customer updates at pickup, hub departure, out-for-delivery, and delivery.
        """;
 
    public static string ExtractOutput(JsonElement root)
    {
        if (TryExtractString(root, "serialized_output", out var output) ||
            TryExtractString(root, "output", out output))
        {
            return output;
        }
 
        throw new InvalidOperationException("Agent response did not include a usable output payload.");
    }
 
    public static string ExtractFailureDetails(JsonElement root)
    {
        if (!root.TryGetProperty("failure_details", out var failureDetails) ||
            failureDetails.ValueKind is JsonValueKind.Null or JsonValueKind.Undefined)
        {
            return "No failure details were returned by Dapr.";
        }
 
        if (failureDetails.ValueKind == JsonValueKind.Object)
        {
            var errorType = failureDetails.TryGetProperty("error_type", out var errorTypeValue)
                ? errorTypeValue.GetString()
                : null;
            var message = failureDetails.TryGetProperty("message", out var messageValue)
                ? messageValue.GetString()
                : null;
 
            if (!string.IsNullOrWhiteSpace(errorType) || !string.IsNullOrWhiteSpace(message))
            {
                return $"Failure type: {errorType ?? "unknown"}. Message: {message ?? "No message."}";
            }
        }
 
        return $"Failure details: {failureDetails}";
    }
 
    private static bool TryExtractString(JsonElement root, string propertyName, out string output)
    {
        output = string.Empty;
        if (!root.TryGetProperty(propertyName, out var value))
        {
            return false;
        }
 
        if (value.ValueKind == JsonValueKind.Object &&
            value.TryGetProperty("content", out var contentValue) &&
            contentValue.ValueKind == JsonValueKind.String)
        {
            return TryNormalizeOutputText(contentValue.GetString() ?? string.Empty, out output);
        }
 
        if (value.ValueKind == JsonValueKind.Object &&
            TryRenderToolPayload(value, out output))
        {
            return true;
        }
 
        return value.ValueKind == JsonValueKind.String &&
            TryNormalizeOutputText(value.GetString() ?? string.Empty, out output);
    }
 
    private static bool TryNormalizeOutputText(string raw, out string output)
    {
        output = string.Empty;
        if (!IsUsableOutput(raw))
        {
            return false;
        }
 
        var normalized = StripAgentMarkup(raw);
        try
        {
            using var parsed = JsonDocument.Parse(normalized);
            var parsedRoot = parsed.RootElement;
            if (parsedRoot.ValueKind == JsonValueKind.Object &&
                parsedRoot.TryGetProperty("content", out var parsedContent) &&
                parsedContent.ValueKind == JsonValueKind.String)
            {
                return TryNormalizeOutputText(parsedContent.GetString() ?? string.Empty, out output);
            }
 
            if (parsedRoot.ValueKind == JsonValueKind.Object &&
                TryRenderToolPayload(parsedRoot, out output))
            {
                return true;
            }
 
            if (parsedRoot.ValueKind == JsonValueKind.String)
            {
                var parsedString = parsedRoot.GetString() ?? string.Empty;
                if (LooksLikeToolPayload(parsedString))
                {
                    return TryNormalizeOutputText(parsedString, out output);
                }
 
                output = parsedString;
                return IsUsableOutput(output);
            }
        }
        catch (JsonException)
        {
            if (TryRenderMalformedToolPayload(normalized, out output))
            {
                return true;
            }
 
            output = CleanReturnedText(normalized, string.Empty);
            return IsUsableOutput(output);
        }
 
        return false;
    }
 
    private static string StripAgentMarkup(string raw)
    {
        var trimmed = raw.Trim();
        const string pythonTag = "<|python_tag|>";
        return trimmed.StartsWith(pythonTag, StringComparison.OrdinalIgnoreCase)
            ? trimmed[pythonTag.Length..].Trim()
            : trimmed;
    }
 
    private static bool TryRenderToolPayload(JsonElement value, out string output)
    {
        output = string.Empty;
        if (!value.TryGetProperty("name", out var nameValue) ||
            nameValue.ValueKind != JsonValueKind.String ||
            !value.TryGetProperty("parameters", out var parameters) ||
            parameters.ValueKind != JsonValueKind.Object)
        {
            return false;
        }
 
        output = nameValue.GetString() switch
        {
            "FinalizeShipmentExecutionPlan" => RenderFinalExecutionPlan(parameters),
            "DraftCustomerUpdate" => RenderCustomerUpdate(parameters),
            "DraftOperationsSummary" => RenderOperationsSummary(parameters),
            _ => string.Empty
        };
 
        return IsUsableOutput(output);
    }
 
    private static string RenderFinalExecutionPlan(JsonElement parameters)
    {
        var scenario = GetParameter(parameters, "scenario", "Shipment scenario was not returned.");
        var operationsSummary = GetParameter(parameters, "operations_summary", "Operations summary was not returned.");
        var customerUpdate = GetParameter(parameters, "customer_update", "Customer update was not returned.");
 
        return BuildExecutionPlan(ReadableScenario(scenario), operationsSummary, customerUpdate);
    }
 
    private static string RenderCustomerUpdate(JsonElement parameters)
    {
        var scenario = GetParameter(parameters, "scenario", "Shipment scenario was not returned.");
        var operationsSummary = GetParameter(parameters, "operations_summary", "Operations summary was not returned.");
 
        return BuildExecutionPlan(
            ReadableScenario(scenario),
            $"## Operations Summary\n\n{ExtractDispatchAction(ReadableOperationsSummary(operationsSummary))}");
    }
 
    private static string RenderOperationsSummary(JsonElement parameters)
    {
        var scenario = GetParameter(parameters, "scenario", "Shipment scenario was not returned.");
        var planningBrief = GetParameter(parameters, "planning_brief", "Planning brief was not returned.");
 
        return BuildExecutionPlan(
            scenario,
            $"## Operations Summary\n\n{ExtractDispatchAction(ReadablePlanningBrief(planningBrief))}");
    }
 
    private static string BuildExecutionPlan(string scenario, string operationsSection, string? customerUpdate = null)
    {
        var plan = $"""
            # Shipment Execution Plan
 
            ## Shipment Scenario
 
            {scenario}
 
            {operationsSection}
 
            {ExecutionChecklist}
            """;
 
        return string.IsNullOrWhiteSpace(customerUpdate)
            ? plan
            : $"{plan}\n\n{customerUpdate.Trim()}";
    }
 
    private static bool TryRenderMalformedToolPayload(string raw, out string output)
    {
        output = string.Empty;
        if (!raw.Contains("\"name\"", StringComparison.OrdinalIgnoreCase) ||
            !raw.Contains("\"parameters\"", StringComparison.OrdinalIgnoreCase))
        {
            return false;
        }
 
        output = ExtractJsonStringProperty(raw, "name") switch
        {
            "DraftCustomerUpdate" => RenderMalformedCustomerUpdate(raw),
            "DraftOperationsSummary" => RenderMalformedOperationsSummary(raw),
            _ => string.Empty
        };
 
        return IsUsableOutput(output);
    }
 
    private static bool LooksLikeToolPayload(string raw)
    {
        var normalized = StripAgentMarkup(raw);
        return normalized.Contains("\"name\"", StringComparison.OrdinalIgnoreCase) &&
            normalized.Contains("\"parameters\"", StringComparison.OrdinalIgnoreCase);
    }
 
    private static string RenderMalformedCustomerUpdate(string raw)
    {
        var scenario = ExtractJsonStringProperty(raw, "scenario");
        var operationsSummary = ExtractJsonStringProperty(raw, "operations_summary");
 
        return BuildExecutionPlan(
            ReadableScenario(scenario),
            $"## Operations Summary\n\n{ExtractDispatchAction(ReadableOperationsSummary(operationsSummary))}");
    }
 
    private static string RenderMalformedOperationsSummary(string raw)
    {
        var scenario = ExtractJsonStringProperty(raw, "scenario");
        var planningBrief = ExtractJsonStringProperty(raw, "planning_brief");
 
        return BuildExecutionPlan(
            ReadableScenario(scenario),
            $"## Operations Summary\n\n{ExtractDispatchAction(ReadablePlanningBrief(planningBrief))}");
    }
 
    private static string ExtractJsonStringProperty(string raw, string propertyName)
    {
        var match = Regex.Match(
            raw,
            $"\"{Regex.Escape(propertyName)}\"\\s*:\\s*\"(?<value>(?:\\\\.|[^\"\\\\])*)\"",
            RegexOptions.IgnoreCase);
        if (!match.Success)
        {
            return string.Empty;
        }
 
        var value = match.Groups["value"].Value;
        try
        {
            return JsonSerializer.Deserialize<string>($"\"{value}\"") ?? string.Empty;
        }
        catch (JsonException)
        {
            return value.Replace("\\\"", "\"", StringComparison.Ordinal).Replace("\\n", "\n", StringComparison.Ordinal);
        }
    }
 
    private static string GetParameter(JsonElement parameters, string name, string fallback)
    {
        if (!parameters.TryGetProperty(name, out var value))
        {
            return fallback;
        }
 
        return value.ValueKind switch
        {
            JsonValueKind.String => CleanReturnedText(value.GetString(), fallback),
            JsonValueKind.Object or JsonValueKind.Array => CleanReturnedText(value.GetRawText(), fallback),
            _ => CleanReturnedText(value.ToString(), fallback)
        };
    }
 
    private static string ReadablePlanningBrief(string planningBrief)
    {
        var trimmed = planningBrief.Trim();
        if (trimmed.StartsWith("{\"type\"", StringComparison.OrdinalIgnoreCase) ||
            trimmed.Contains("\"description\":\"Logistics planning brief", StringComparison.OrdinalIgnoreCase))
        {
            return "The workflow returned a partial operations-summary tool call instead of the completed final plan. Re-run the shipment if you need the full carrier, route, risk, and customer-update detail.";
        }
 
        return trimmed;
    }
 
    private static string ReadableScenario(string scenario)
    {
        var trimmed = scenario.Trim();
        if (string.IsNullOrWhiteSpace(trimmed))
        {
            return "Shipment scenario was not returned.";
        }
 
        return TryReadJsonStringProperty(trimmed, "description", out var description)
            ? description
            : trimmed;
    }
 
    private static string ReadableOperationsSummary(string operationsSummary)
    {
        var trimmed = operationsSummary.Trim();
        if (string.IsNullOrWhiteSpace(trimmed))
        {
            return "Operations summary was not returned.";
        }
 
        if (trimmed.StartsWith('{') && trimmed.EndsWith('}'))
        {
            return string.Join(
                " ",
                Regex.Matches(trimmed, "\"(?<sentence>[^\"]+)\"")
                    .Select(match => match.Groups["sentence"].Value)
                    .Where(sentence => !string.IsNullOrWhiteSpace(sentence)));
        }
 
        return trimmed;
    }
 
    private static bool TryReadJsonStringProperty(string raw, string propertyName, out string value)
    {
        value = string.Empty;
        try
        {
            using var document = JsonDocument.Parse(raw);
            if (document.RootElement.ValueKind == JsonValueKind.Object &&
                document.RootElement.TryGetProperty(propertyName, out var property) &&
                property.ValueKind == JsonValueKind.String)
            {
                value = property.GetString() ?? string.Empty;
                return IsUsableOutput(value);
            }
        }
        catch (JsonException)
        {
        }
 
        return false;
    }
 
    private static string ExtractDispatchAction(string text)
    {
        const string actionSentence = "Dispatch should proceed only after the required documents, carrier acceptance, pickup appointment, and risk mitigations are confirmed.";
        var normalized = text.ReplaceLineEndings(" ").Trim();
        return normalized.Contains(actionSentence, StringComparison.OrdinalIgnoreCase)
            ? actionSentence
            : normalized;
    }
 
    private static string CleanReturnedText(string? value, string fallback)
    {
        if (string.IsNullOrWhiteSpace(value))
        {
            return fallback;
        }
 
        var trimmed = value.Trim();
        return string.Equals(trimmed, "null", StringComparison.OrdinalIgnoreCase) ||
            string.Equals(trimmed, "None", StringComparison.OrdinalIgnoreCase)
            ? fallback
            : trimmed;
    }
 
    private static bool IsUsableOutput(string output) =>
        !string.IsNullOrWhiteSpace(output) &&
        !string.Equals(output.Trim(), "None", StringComparison.OrdinalIgnoreCase) &&
        !string.Equals(output.Trim(), "null", StringComparison.OrdinalIgnoreCase);
}
// TraceContext.cs
using System.Diagnostics;
 
static class TraceContext
{
    public static Dictionary<string, string> Current()
    {
        var traceContext = new Dictionary<string, string>();
        var activity = Activity.Current;
        if (activity is null)
        {
            return traceContext;
        }
 
        if (!string.IsNullOrWhiteSpace(activity.Id))
        {
            traceContext["traceparent"] = activity.Id;
        }
 
        if (!string.IsNullOrWhiteSpace(activity.TraceStateString))
        {
            traceContext["tracestate"] = activity.TraceStateString;
        }
 
        return traceContext;
    }
}

AppHost (AspireWithDaprAgents.AppHost)

Aspire orchestration is entry point for both local Dapr and Catalyst-connected runs. It starts the API, web frontend, orchestrator, and five Python agents; centralizes app IDs, app ports, Dapr HTTP ports, local component paths, Catalyst provisioning, Ollama endpoint checks, and local inspection containers e.g., Diagrid Dashboard & Redis Insight.

Refer to the image below for organizing the folder structure.

Add the Diagrid.Aspire.Hosting.Catalyst NuGet package to the AppHost project to enable seamless integration between your locally running Aspire applications and the live Diagrid Catalyst infrastructure.

dotnet add package Diagrid.Aspire.Hosting.Catalyst --version 0.0.5

Catalyst

// CatalystOptions.cs
namespace AspireWithDaprAgents.AppHost.Catalyst;
 
internal static class CatalystProjectState
{
    internal static bool IsActive(string? state) =>
        string.Equals(state, "Active", StringComparison.OrdinalIgnoreCase) ||
        string.Equals(state, "ready", StringComparison.OrdinalIgnoreCase);
 
    internal static bool IsDeleting(string? state) =>
        string.Equals(state, "Deleting", StringComparison.OrdinalIgnoreCase);
}
// CatalystProvisioning.cs
using System.Diagnostics;
 
namespace AspireWithDaprAgents.AppHost.Catalyst;
 
internal static class CatalystProvisioning
{
    private const int TransientCommandAttemptCount = 20;
 
    internal static async Task<bool> EnsureInfrastructureAsync(
        string projectName,
        IReadOnlyList<string> allDaprAppIds,
        IReadOnlyList<string> agentAppIds,
        string ollamaOpenAiEndpoint,
        string ollamaModel,
        string ollamaApiKey)
    {
        var allScopes = string.Join(",", allDaprAppIds);
        var agentScopes = string.Join(",", agentAppIds);
 
        if (!await RunDiagridAsync(["product", "use", "catalyst"]))
        {
            return false;
        }
 
        if (!await WaitForProjectBootstrapAsync(projectName, TimeSpan.FromSeconds(120)))
        {
            return false;
        }
 
        if (!await RunDiagridAsync(["project", "create", projectName, "--enable-managed-workflow", "--wait", "--use", "--ignore-if-exists"]))
        {
            return false;
        }
 
        string[][] commands =
        [
            .. allDaprAppIds.Select(appId => new[]
            {
                "appid", "create", appId, "--project", projectName, "--wait", "--ignore-if-exists",
            }),
            ["pubsub", "create", "shipment-workflow-events", "--project", projectName, "--scopes", allScopes, "--wait", "--ignore-if-exists"],
            ["kv", "create", "shipment-workflow-state", "--project", projectName, "--scopes", allScopes, "--wait", "--ignore-if-exists"],
            [
                "component", "create", "statestore",
                "--type", "state.diagrid",
                "--metadata", "state=shipment-workflow-state",
                "--metadata", "keyPrefix=none",
                "--scopes", allScopes,
                "--project", projectName,
                "--ignore-if-exists",
            ],
            [
                "component", "create", "pubsubstore",
                "--type", "pubsub.diagrid",
                "--metadata", "pubsub=shipment-workflow-events",
                "--scopes", allScopes,
                "--project", projectName,
                "--ignore-if-exists",
            ],
            [
                "subscription", "create", "shipment-progress-subscription",
                "--project", projectName,
                "--component", "pubsubstore",
                "--topic", "shipment-progress",
                "--scopes", "webfrontend",
                "--route", "/dapr/events/shipment-progress",
                "--wait",
            ],
            [
                "component", "create", "llama",
                "--type", "conversation.openai",
                "--metadata", $"key={ollamaApiKey}",
                "--metadata", $"endpoint={ollamaOpenAiEndpoint}",
                "--metadata", $"model={ollamaModel}",
                "--scopes", agentScopes,
                "--project", projectName,
                "--wait",
                "--ignore-if-exists",
            ],
            [
                "component", "update", "llama",
                "--metadata", $"key={ollamaApiKey}",
                "--metadata", $"endpoint={ollamaOpenAiEndpoint}",
                "--metadata", $"model={ollamaModel}",
                "--scopes", agentScopes,
                "--project", projectName,
                "--wait",
            ],
        ];
 
        foreach (var command in commands)
        {
            var isSubscriptionCommand = command is ["subscription", "create", ..];
            if (!await RunDiagridAsync(command, allowIfExistsError: isSubscriptionCommand))
            {
                return false;
            }
        }
 
        return true;
    }
 
    internal static string? ExtractProjectStateFromText(string output, string projectName)
    {
        foreach (var line in output.Split('\n', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries))
        {
            var columns = line.Split(' ', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries);
            if (columns.Length < 2 || !columns.Any(column => column.StartsWith(projectName, StringComparison.OrdinalIgnoreCase)))
            {
                continue;
            }
 
            foreach (var column in columns)
            {
                if (CatalystProjectState.IsActive(column))
                {
                    return "Active";
                }
 
                if (CatalystProjectState.IsDeleting(column) ||
                    string.Equals(column, "Provisioning", StringComparison.OrdinalIgnoreCase))
                {
                    return column;
                }
            }
        }
 
        return null;
    }
 
    private static async Task<bool> RunDiagridAsync(IReadOnlyList<string> arguments, bool allowIfExistsError = false)
    {
        for (var attempt = 1; attempt <= TransientCommandAttemptCount; attempt++)
        {
            var result = await RunDiagridCommandAsync(arguments);
            if (result.StartFailure is not null)
            {
                Console.Error.WriteLine($"Failed to run Diagrid CLI: {result.StartFailure.Message}");
                return false;
            }
 
            if (result.ExitCode == 0)
            {
                return true;
            }
 
            var output = result.Output;
            var error = result.Error;
            if (allowIfExistsError && (error.Contains("already exists", StringComparison.OrdinalIgnoreCase) ||
                                         error.Contains("resource already exists", StringComparison.OrdinalIgnoreCase)))
            {
                Console.WriteLine($"Resource already exists (skipping): {string.Join(' ', arguments.Take(3))}");
                return true;
            }
 
            if (attempt < TransientCommandAttemptCount &&
                IsTransientCatalystFailure(arguments, output, error))
            {
                var delay = GetRetryDelay(attempt);
                Console.WriteLine(
                    $"Transient Diagrid CLI failure on attempt {attempt}/{TransientCommandAttemptCount}: diagrid {string.Join(' ', arguments)}. Retrying in {delay.TotalSeconds:0}s.");
                await Task.Delay(delay);
                continue;
            }
 
            WriteCommandFailure(arguments, output, error);
            return false;
        }
 
        return false;
    }
 
    private static ProcessStartInfo CreateDiagridStartInfo(IReadOnlyList<string> arguments)
    {
        var startInfo = new ProcessStartInfo
        {
            FileName = "diagrid",
            RedirectStandardError = true,
            RedirectStandardOutput = true,
        };
 
        foreach (var arg in arguments)
        {
            startInfo.ArgumentList.Add(arg);
        }
 
        return startInfo;
    }
 
    private static async Task<bool> WaitForProjectBootstrapAsync(string projectName, TimeSpan timeout)
    {
        var expiresAt = DateTimeOffset.UtcNow + timeout;
        while (DateTimeOffset.UtcNow < expiresAt)
        {
            try
            {
                var state = await GetProjectStateAsync(projectName);
                if (CatalystProjectState.IsActive(state) || !CatalystProjectState.IsDeleting(state))
                {
                    return true;
                }
            }
            catch
            {
            }
 
            await Task.Delay(TimeSpan.FromSeconds(3));
        }
 
        Console.Error.WriteLine($"Timed out waiting for Catalyst project '{projectName}' to finish deleting.");
        return false;
    }
 
    private static async Task<bool> WaitForProjectActiveAsync(string projectName, TimeSpan timeout)
    {
        var expiresAt = DateTimeOffset.UtcNow + timeout;
        while (DateTimeOffset.UtcNow < expiresAt)
        {
            try
            {
                var state = await GetProjectStateAsync(projectName);
                if (CatalystProjectState.IsActive(state))
                {
                    return true;
                }
            }
            catch
            {
            }
 
            await Task.Delay(TimeSpan.FromSeconds(3));
        }
 
        Console.Error.WriteLine($"Timed out waiting for Catalyst project '{projectName}' to become active.");
        return false;
    }
 
    private static async Task<DiagridCommandResult> RunDiagridCommandAsync(IReadOnlyList<string> arguments)
    {
        var startInfo = CreateDiagridStartInfo(arguments);
 
        try
        {
            using var process = Process.Start(startInfo);
            if (process is null)
            {
                return new(-1, "", "", new InvalidOperationException("Failed to start the Diagrid CLI."));
            }
 
            var outputTask = process.StandardOutput.ReadToEndAsync();
            var errorTask = process.StandardError.ReadToEndAsync();
            await process.WaitForExitAsync();
 
            return new(
                process.ExitCode,
                await outputTask,
                await errorTask,
                null);
        }
        catch (Exception ex)
        {
            return new(-1, "", "", ex);
        }
    }
 
    private static async Task<string?> GetProjectStateAsync(string projectName)
    {
        var startInfo = CreateDiagridStartInfo(["project", "get", projectName, "--output", "json"]);
 
        using var process = Process.Start(startInfo);
        if (process is null)
        {
            return null;
        }
 
        var outputTask = process.StandardOutput.ReadToEndAsync();
        try
        {
            await process.WaitForExitAsync().WaitAsync(TimeSpan.FromSeconds(15));
        }
        catch (TimeoutException)
        {
            try
            {
                process.Kill(entireProcessTree: true);
            }
            catch
            {
            }
 
            Console.WriteLine($"Timed out checking Catalyst project '{projectName}' state. Retrying...");
            return null;
        }
 
        var output = await outputTask;
 
        if (process.ExitCode != 0)
        {
            return null;
        }
 
        try
        {
            return ExtractProjectStateFromJson(output) ?? ExtractProjectStateFromText(output, projectName);
        }
        catch
        {
        }
 
        return ExtractProjectStateFromText(output, projectName);
    }
 
    internal static string? ExtractProjectStateFromJson(string output)
    {
        using var document = System.Text.Json.JsonDocument.Parse(output);
        if (document.RootElement.TryGetProperty("state", out var stateProperty))
        {
            return stateProperty.GetString();
        }
 
        if (!document.RootElement.TryGetProperty("status", out var statusElement))
        {
            return null;
        }
 
        if (statusElement.ValueKind == System.Text.Json.JsonValueKind.Object &&
            statusElement.TryGetProperty("status", out var nestedStatusProperty))
        {
            return nestedStatusProperty.GetString();
        }
 
        return statusElement.ValueKind == System.Text.Json.JsonValueKind.String
            ? statusElement.GetString()
            : null;
    }
 
    internal static bool IsTransientCatalystFailure(IReadOnlyList<string> arguments, string output, string error)
    {
        var combined = $"{output}\n{error}";
        if (string.IsNullOrWhiteSpace(combined))
        {
            return false;
        }
 
        if (combined.Contains("max number of components reached", StringComparison.OrdinalIgnoreCase))
        {
            return false;
        }
 
        var transientMarkers = new[]
        {
            "provisioning",
            "not ready",
            "temporarily unavailable",
            "try again",
            "please retry",
            "timeout",
            "timed out",
            "connection reset",
            "eof",
            "unavailable",
            "502",
            "503",
            "504",
        };
 
        if (transientMarkers.Any(marker => combined.Contains(marker, StringComparison.OrdinalIgnoreCase)))
        {
            return true;
        }
 
        var isProjectScopedCommand = arguments.Any(argument => string.Equals(argument, "--project", StringComparison.OrdinalIgnoreCase));
        return isProjectScopedCommand &&
            combined.Contains("not found", StringComparison.OrdinalIgnoreCase) &&
            combined.Contains("project", StringComparison.OrdinalIgnoreCase);
    }
 
    private static TimeSpan GetRetryDelay(int attempt) =>
        TimeSpan.FromSeconds(Math.Min(15, attempt * 3));
 
    private static void WriteCommandFailure(IReadOnlyList<string> arguments, string output, string error)
    {
        Console.Error.WriteLine($"Diagrid CLI command failed: diagrid {string.Join(' ', arguments)}");
        if (!string.IsNullOrWhiteSpace(output))
        {
            Console.Error.WriteLine(output.Trim());
        }
 
        if (!string.IsNullOrWhiteSpace(error))
        {
            Console.Error.WriteLine(error.Trim());
            if (error.Contains("max number of components reached", StringComparison.OrdinalIgnoreCase))
            {
                Console.Error.WriteLine("Catalyst Online enforces a 10-component quota per project.");
                Console.Error.WriteLine("If this project already has 10 components, delete unused Catalyst resources or set CATALYST_PROJECT_NAME to a fresh project name.");
            }
        }
    }
 
    private sealed record DiagridCommandResult(int ExitCode, string Output, string Error, Exception? StartFailure);
}
// OllamaStartup.cs
using AspireWithDaprAgents.AppHost.Shared;
using Microsoft.Extensions.Configuration;
 
namespace AspireWithDaprAgents.AppHost.Catalyst;
 
internal sealed record OllamaConnection(string BaseUrl, string Model, string? ApiKey)
{
    internal string OpenAiEndpoint => OllamaEndpoint.BuildOpenAiEndpoint(BaseUrl);
}
 
internal static class OllamaStartup
{
    private const string DefaultLocalBaseUrl = "http://localhost:11434";
    private const string DefaultModel = "llama3.2";
    private const string DefaultCatalystApiKey = "ollama";
 
    internal static async Task<OllamaConnection?> ConfigureAsync(IConfiguration configuration, bool useCatalyst)
    {
        var localBaseUrl = configuration["OLLAMA_BASE_URL"] ?? DefaultLocalBaseUrl;
        var catalystBaseUrl = configuration["CATALYST_OLLAMA_BASE_URL"];
        var baseUrl = useCatalyst && !string.IsNullOrWhiteSpace(catalystBaseUrl)
            ? catalystBaseUrl
            : localBaseUrl;
        var model = useCatalyst
            ? configuration["CATALYST_OLLAMA_MODEL"] ?? configuration["OLLAMA_MODEL"] ?? DefaultModel
            : configuration["OLLAMA_MODEL"] ?? DefaultModel;
        var apiKey = useCatalyst
            ? ResolveCatalystApiKey(configuration["CATALYST_OLLAMA_API_KEY"])
            : null;
 
        if (string.IsNullOrWhiteSpace(baseUrl))
        {
            WriteMissingBaseUrl(useCatalyst);
            return null;
        }
 
        if (!useCatalyst)
        {
            return await OllamaEndpoint.EnsureRunningAsync(baseUrl, model, apiKey)
                ? new OllamaConnection(baseUrl, model, apiKey)
                : null;
        }
 
        if (string.IsNullOrWhiteSpace(catalystBaseUrl))
        {
            WriteMissingBaseUrl(useCatalyst);
            return null;
        }
 
        if (OllamaEndpoint.IsLoopbackUrl(catalystBaseUrl))
        {
            Console.Error.WriteLine("Catalyst mode requires CATALYST_OLLAMA_BASE_URL to be a non-loopback HTTPS endpoint.");
            Console.Error.WriteLine("Manage your tunnel or hosted OpenAI-compatible Ollama endpoint separately, then set CATALYST_OLLAMA_BASE_URL to that public URL.");
            return null;
        }
 
        if (!await OllamaEndpoint.EnsureRunningAsync(catalystBaseUrl, model, apiKey))
        {
            return null;
        }
 
        return new OllamaConnection(catalystBaseUrl, model, apiKey);
    }
 
    internal static string ResolveCatalystApiKey(string? configuredApiKey) =>
        string.IsNullOrWhiteSpace(configuredApiKey)
            ? DefaultCatalystApiKey
            : configuredApiKey;
 
    private static void WriteMissingBaseUrl(bool useCatalyst)
    {
        Console.Error.WriteLine(useCatalyst
            ? "Catalyst mode requires a reachable Ollama base URL."
            : "Local mode requires an Ollama base URL.");
        Console.Error.WriteLine(useCatalyst
            ? "Set CATALYST_OLLAMA_BASE_URL to a separately managed reachable HTTPS endpoint."
            : "Set OLLAMA_BASE_URL or use the default http://localhost:11434.");
    }
}

components

# conversation-ollama.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: llama
spec:
  type: conversation.ollama
  version: v1
  metadata:
    - name: baseUrl
      value: http://localhost:11434 # When you install and run Ollama locally, it starts an HTTP server at http://localhost:11434
    - name: model
      value: llama3.2 # It is model name. 
    - name: cacheTTL # It is used for Prompt Caching
      value: 10m
# observability.yaml
apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
  name: observability
spec:
  tracing:
    samplingRate: "0"
# pubsubstore.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsubstore
spec:
  type: pubsub.redis
  version: v1
  metadata:
    - name: redisHost
      value: "localhost:6379"
    - name: redisPassword
      value: ""
# resiliency.yaml
apiVersion: dapr.io/v1alpha1
kind: Resiliency
metadata:
  name: llm-resiliency
version: v1alpha1
scopes:
  - order-intake
  - carrier-capacity
  - route-planning
  - risk-compliance
  - customer-update-writer
spec:
  policies:
    timeouts:
      llmResponse: 90s
    retries:
      llmTransient:
        policy: exponential
        maxInterval: 5s
        maxRetries: 2
  targets:
    components:
      llama:
        outbound:
          timeout: llmResponse
          retry: llmTransient
# statestore.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: statestore
spec:
  type: state.redis
  version: v1
  metadata:
    - name: redisHost
      value: "localhost:6379"
    - name: redisPassword
      value: ""
    - name: keyPrefix
      value: "none"
    - name: actorStateStore
      value: "true"

dashboard-components

# statestore.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: statestore
spec:
  type: state.redis
  version: v1
  metadata:
    - name: redisHost
      value: "host.docker.internal:6379"
    - name: redisPassword
      value: ""
    - name: keyPrefix
      value: "none"
    - name: actorStateStore
      value: "true"

Local

// LocalDiagnosticsExtensions.cs
using Aspire.Hosting;
 
namespace AspireWithDaprAgents.AppHost.Local;
 
public static class LocalDiagnosticsExtensions
{
    public static void AddLocalDiagnostics(
        this IDistributedApplicationBuilder builder,
        string dashboardStateStorePath)
    {
        builder.AddContainer("diagrid-dashboard", "ghcr.io/diagridio/diagrid-dashboard", "latest")
            .WithHttpEndpoint(port: 8080, targetPort: 8080, name: "http")
            .WithBindMount(dashboardStateStorePath, "/app/components/custom_state.yaml", isReadOnly: true)
            .WithEnvironment("COMPONENT_FILE", "/app/components/custom_state.yaml")
            .WithExternalHttpEndpoints();
 
        builder.AddContainer("redisinsight", "redis/redisinsight", "latest")
            .WithHttpEndpoint(port: 5540, targetPort: 5540, name: "http")
            .WithEnvironment("RI_APP_HOST", "0.0.0.0")
            .WithEnvironment("RI_APP_PORT", "5540")
            .WithExternalHttpEndpoints();
    }
}

Shared

// AppTopology.cs
namespace AspireWithDaprAgents.AppHost.Shared;
 
internal sealed record LogisticsAgent(
    string AppId,
    string AgentName,
    string DirectoryName,
    int AppPort,
    int DaprHttpPort)
{
    internal string EnvironmentPrefix => AppId.Replace('-', '_').ToUpperInvariant();
}
 
internal static class AppTopology
{
    internal const string ApiServiceAppId = "apiservice";
    internal const string WebFrontendAppId = "webfrontend";
    internal const int ApiServiceDaprHttpPort = 3512;
    internal const int WebFrontendAppPort = 5275;
    internal const int WebFrontendDaprHttpPort = 3516;
 
    internal static readonly LogisticsAgent Orchestrator = new(
        AppId: "orchestrator",
        AgentName: "orchestrator",
        DirectoryName: "Orchestrator",
        AppPort: 50050,
        DaprHttpPort: 3509);
 
    internal static readonly LogisticsAgent[] LogisticsAgents =
    [
        Orchestrator,
        new("order-intake", "order-intake-agent", "OrderIntakeAgent", 50051, 3510),
        new("carrier-capacity", "carrier-capacity-agent", "CarrierCapacityAgent", 50052, 3511),
        new("route-planning", "route-planning-agent", "RoutePlanningAgent", 50053, 3513),
        new("risk-compliance", "risk-compliance-agent", "RiskComplianceAgent", 50054, 3514),
        new("customer-update-writer", "customer-update-writer-agent", "CustomerUpdateWriterAgent", 50055, 3515),
    ];
 
    internal static IEnumerable<string> AgentAppIds => LogisticsAgents.Select(agent => agent.AppId);
 
    internal static IEnumerable<string> DaprAppIds =>
    [
        .. AgentAppIds,
        ApiServiceAppId,
        WebFrontendAppId,
    ];
 
}
// DaprConfigurationExtensions.cs
using Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.Python;
using CommunityToolkit.Aspire.Hosting.Dapr;
using Diagrid.Aspire.Hosting.Catalyst;
using System.Collections.Immutable;
 
namespace AspireWithDaprAgents.AppHost.Shared;
 
internal static class DaprConfigurationExtensions
{
    public const string ChatComponentName = "llama";
    public const string DaprApiTimeoutSeconds = "300";
 
    public static IResourceBuilder<PythonAppResource> ApplySharedAgentConfig(
        this IResourceBuilder<PythonAppResource> agent)
    {
        return agent
            .WithOtlpExporter()
            .WithHttpHealthCheck("/health")
            .WithEnvironment("DAPR_API_TIMEOUT_SECONDS", DaprApiTimeoutSeconds);
    }
 
    public static IResourceBuilder<PythonAppResource> WithAgentDaprApp(
        this IResourceBuilder<PythonAppResource> agent,
        LogisticsAgent appConfig,
        IResourceBuilder<CatalystProject>? catalystProject,
        bool useCatalyst,
        string daprConfigPath,
        string componentPath)
    {
        if (useCatalyst && catalystProject is not null)
        {
            return agent.WithCatalyst(catalystProject);
        }
 
        return agent
            .WithEnvironment("DAPR_HTTP_PORT", appConfig.DaprHttpPort.ToString())
            .WithDaprSidecar(new DaprSidecarOptions
            {
                AppId = appConfig.AppId,
                AppPort = appConfig.AppPort,
                DaprHttpPort = appConfig.DaprHttpPort,
                EnableApiLogging = true,
                Config = daprConfigPath,
                ResourcesPaths = ImmutableHashSet.Create(componentPath),
            });
    }
 
    public static IResourceBuilder<ProjectResource> ApplySharedServiceConfig(
        this IResourceBuilder<ProjectResource> service)
    {
        return service
            .WithOtlpExporter()
            .WithHttpHealthCheck("/health")
            .WithEnvironment("DAPR_API_TIMEOUT_SECONDS", DaprApiTimeoutSeconds);
    }
 
    public static IResourceBuilder<ProjectResource> WithDaprApp(
        this IResourceBuilder<ProjectResource> service,
        int daprHttpPort,
        IResourceBuilder<CatalystProject>? catalystProject,
        bool useCatalyst,
        string daprConfigPath,
        string componentPath)
    {
        if (useCatalyst && catalystProject is not null)
        {
            return service.WithCatalyst(catalystProject);
        }
 
        return service
            .WithEnvironment("DAPR_HTTP_PORT", daprHttpPort.ToString())
            .WithDaprSidecar(new DaprSidecarOptions
            {
                DaprHttpPort = daprHttpPort,
                EnableApiLogging = true,
                Config = daprConfigPath,
                ResourcesPaths = ImmutableHashSet.Create(componentPath),
            });
    }
 
    public static IResourceBuilder<ProjectResource> WithWebFrontendDaprApp(
        this IResourceBuilder<ProjectResource> webFrontend,
        IResourceBuilder<CatalystProject>? catalystProject,
        bool useCatalyst,
        string daprConfigPath,
        string componentPath)
    {
        if (useCatalyst && catalystProject is not null)
        {
            return webFrontend.WithCatalyst(catalystProject);
        }
 
        return webFrontend
            .WithEnvironment("DAPR_HTTP_PORT", AppTopology.WebFrontendDaprHttpPort.ToString())
            .WithDaprSidecar(new DaprSidecarOptions
            {
                AppId = AppTopology.WebFrontendAppId,
                AppPort = AppTopology.WebFrontendAppPort,
                DaprHttpPort = AppTopology.WebFrontendDaprHttpPort,
                EnableApiLogging = true,
                Config = daprConfigPath,
                ResourcesPaths = ImmutableHashSet.Create(componentPath),
            });
    }
}
// LogisticsAgentResourceExtensions.cs
using Aspire.Hosting.ApplicationModel;
using Aspire.Hosting.Python;
using Diagrid.Aspire.Hosting.Catalyst;
 
namespace AspireWithDaprAgents.AppHost.Shared;
 
internal static class LogisticsAgentResourceExtensions
{
    internal static IResourceBuilder<PythonAppResource> AddLogisticsAgent(
        this IDistributedApplicationBuilder builder,
        LogisticsAgent agent,
        string agentsPath,
        IResourceBuilder<CatalystProject>? catalystProject,
        bool useCatalyst,
        string daprConfigPath,
        string componentsPath)
    {
        var resource = builder.AddPythonApp(agent.AppId, Path.Combine(agentsPath, agent.DirectoryName), "app.py")
            .WithUv(args: ["sync", "--quiet"])
            .WithHttpEndpoint(port: agent.AppPort, targetPort: agent.AppPort, name: "http", isProxied: false)
            .ApplySharedAgentConfig()
            .WithEnvironment("AGENT_PORT", agent.AppPort.ToString());
 
        resource = agent == AppTopology.Orchestrator
            ? resource.WithOrchestratorEnvironment()
            : resource.WithEnvironment("DAPR_CHAT_COMPONENT_NAME", DaprConfigurationExtensions.ChatComponentName);
 
        return resource.WithAgentDaprApp(agent, catalystProject, useCatalyst, daprConfigPath, componentsPath);
    }
 
    private static IResourceBuilder<PythonAppResource> WithOrchestratorEnvironment(
        this IResourceBuilder<PythonAppResource> orchestrator)
    {
        orchestrator = orchestrator
            .WithEnvironment("CHILD_WORKFLOW_TIMEOUT_SECONDS", "120")
            .WithEnvironment("CHILD_WORKFLOW_MAX_ATTEMPTS", "2")
            .WithEnvironment("CHILD_WORKFLOW_RETRY_FIRST_INTERVAL_SECONDS", "5")
            .WithEnvironment("CHILD_WORKFLOW_RETRY_BACKOFF_COEFFICIENT", "2.0");
 
        foreach (var agent in AppTopology.LogisticsAgents.Where(agent => agent != AppTopology.Orchestrator))
        {
            orchestrator = orchestrator
                .WithEnvironment($"{agent.EnvironmentPrefix}_APP_ID", agent.AppId)
                .WithEnvironment($"{agent.EnvironmentPrefix}_AGENT_NAME", agent.AgentName);
        }
 
        return orchestrator;
    }
}
// OllamaEndpoint.cs
using System.Text.Json;
 
namespace AspireWithDaprAgents.AppHost.Shared;
 
internal static class OllamaEndpoint
{
    internal static async Task<bool> EnsureRunningAsync(string baseUrl, string model, string? bearerToken)
    {
        if (!Uri.TryCreate(baseUrl, UriKind.Absolute, out var baseUri))
        {
            Console.Error.WriteLine($"LLM configuration is invalid. OLLAMA_BASE_URL or CATALYST_OLLAMA_BASE_URL must be an absolute URL, but was '{baseUrl}'.");
            Console.Error.WriteLine("The application will stop because the required LLM is not available.");
            return false;
        }
 
        using var httpClient = new HttpClient
        {
            BaseAddress = baseUri,
            Timeout = TimeSpan.FromSeconds(3),
        };
 
        try
        {
            using var request = new HttpRequestMessage(HttpMethod.Get, "/api/tags");
            if (!string.IsNullOrWhiteSpace(bearerToken))
            {
                request.Headers.Authorization = new("Bearer", bearerToken);
            }
 
            using var response = await httpClient.SendAsync(request);
            if (!response.IsSuccessStatusCode)
            {
                WriteUnhealthyEndpointMessage(baseUrl, (int)response.StatusCode);
                return false;
            }
 
            var content = await response.Content.ReadAsStringAsync();
            if (!HasModel(content, model))
            {
                Console.Error.WriteLine($"LLM is running at {baseUrl}, but required model '{model}' is not available.");
                Console.Error.WriteLine($"Run 'ollama pull {model}' and start the application again.");
                Console.Error.WriteLine("The application will stop because the required LLM is not available.");
                return false;
            }
 
            return true;
        }
        catch (Exception ex) when (ex is HttpRequestException or TaskCanceledException)
        {
            Console.Error.WriteLine($"LLM is not running at {baseUrl}.");
            Console.Error.WriteLine($"Start Ollama and ensure model '{model}' is available, for example: 'ollama pull {model}'.");
            Console.Error.WriteLine("The application will stop because the required LLM is not available.");
            return false;
        }
        catch (JsonException)
        {
            Console.Error.WriteLine($"LLM endpoint at {baseUrl} did not return a valid Ollama model list.");
            Console.Error.WriteLine("The application will stop because the required LLM is not available.");
            return false;
        }
    }
 
    internal static string BuildOpenAiEndpoint(string baseUrl)
    {
        var trimmedBaseUrl = baseUrl.TrimEnd('/');
        return trimmedBaseUrl.EndsWith("/v1", StringComparison.OrdinalIgnoreCase)
            ? trimmedBaseUrl
            : $"{trimmedBaseUrl}/v1";
    }
 
    internal static bool IsLoopbackUrl(string baseUrl)
    {
        return Uri.TryCreate(baseUrl, UriKind.Absolute, out var uri) && uri.IsLoopback;
    }
 
    private static void WriteUnhealthyEndpointMessage(string baseUrl, int statusCode)
    {
        if (statusCode == 404)
        {
            Console.Error.WriteLine($"LLM endpoint not found at {baseUrl}.");
            Console.Error.WriteLine(baseUrl.Contains("ngrok", StringComparison.OrdinalIgnoreCase)
                ? "The ngrok URL may be stale, expired, or not ready yet."
                : "Ensure the LLM server is running at this URL.");
        }
        else
        {
            Console.Error.WriteLine($"LLM is not running correctly at {baseUrl}. Ollama returned HTTP {statusCode}.");
        }
 
        Console.Error.WriteLine("The application will stop because the required LLM is not available.");
    }
 
    private static bool HasModel(string tagsJson, string requiredModel)
    {
        using var document = JsonDocument.Parse(tagsJson);
        if (!document.RootElement.TryGetProperty("models", out var models) || models.ValueKind != JsonValueKind.Array)
        {
            return false;
        }
 
        foreach (var model in models.EnumerateArray())
        {
            if (!model.TryGetProperty("name", out var nameProperty))
            {
                continue;
            }
 
            var name = nameProperty.GetString();
            if (string.Equals(name, requiredModel, StringComparison.OrdinalIgnoreCase) ||
                string.Equals(name, $"{requiredModel}:latest", StringComparison.OrdinalIgnoreCase))
            {
                return true;
            }
        }
 
        return false;
    }
}
// StartupDependencyCheck.cs
using System.Diagnostics;
using System.Net.Sockets;
 
namespace AspireWithDaprAgents.AppHost.Shared;
 
internal sealed record StartupDependencyOptions(
    string AppHostDirectory,
    string AgentsPath,
    string ComponentsPath,
    string DashboardStateStorePath,
    bool UseCatalyst,
    string? CatalystOllamaBaseUrl);
 
internal sealed record StartupDependencyCheckResult(string Name, bool Ready, string Detail)
{
    internal static StartupDependencyCheckResult Pass(string name, string detail) => new(name, true, detail);
 
    internal static StartupDependencyCheckResult Fail(string name, string detail) => new(name, false, detail);
}
 
internal sealed record StartupDependencyCheck(string Name, Func<CancellationToken, Task<StartupDependencyCheckResult>> ExecuteAsync);
 
internal static class StartupDependencyChecks
{
    internal static async Task<bool> EnsureAsync(StartupDependencyOptions options, CancellationToken cancellationToken = default)
    {
        Console.WriteLine("Checking startup dependencies...");
 
        var checks = CreateChecks(options);
        var results = new List<StartupDependencyCheckResult>(checks.Count);
        foreach (var check in checks)
        {
            var result = await check.ExecuteAsync(cancellationToken);
            results.Add(result);
 
            var prefix = result.Ready ? "OK" : "MISSING";
            var writer = result.Ready ? Console.Out : Console.Error;
            writer.WriteLine($"[{prefix}] {result.Name}: {result.Detail}");
        }
 
        if (results.All(result => result.Ready))
        {
            return true;
        }
 
        Console.Error.WriteLine("Startup stopped because one or more required dependencies are unavailable.");
        return false;
    }
 
    internal static IReadOnlyList<string> GetCheckNames(StartupDependencyOptions options) =>
        CreateChecks(options).Select(check => check.Name).ToArray();
 
    private static IReadOnlyList<StartupDependencyCheck> CreateChecks(StartupDependencyOptions options)
    {
        var checks = new List<StartupDependencyCheck>
        {
            CommandCheck("uv CLI", "uv", ["--version"], "Install uv so Aspire can create Python agent environments."),
            DirectoryCheck("Python agents folder", options.AgentsPath),
        };
 
        checks.AddRange(AppTopology.LogisticsAgents.Select(agent =>
            FileCheck($"{agent.AppId} pyproject", Path.Combine(options.AgentsPath, agent.DirectoryName, "pyproject.toml"))));
 
        if (options.UseCatalyst)
        {
            checks.Add(CommandCheck("Diagrid CLI", "diagrid", ["version"], "Install and authenticate the Diagrid CLI with 'diagrid login'."));
            checks.Add(CatalystOllamaEndpointCheck(options.CatalystOllamaBaseUrl));
 
            return checks;
        }
 
        checks.Add(CommandCheck("Dapr CLI", "dapr", ["--version"], "Install and initialize Dapr CLI/runtime before local mode."));
        checks.Add(ContainerRuntimeCheck());
        checks.Add(TcpPortCheck("Redis", "localhost", 6379, "Start Redis on localhost:6379 for local state, pub/sub, actors, and workflows."));
        checks.Add(DirectoryCheck("Dapr components folder", options.ComponentsPath));
        checks.Add(FileCheck("Dapr observability config", Path.Combine(options.ComponentsPath, "observability.yaml")));
        checks.Add(FileCheck("Dapr state store component", Path.Combine(options.ComponentsPath, "statestore.yaml")));
        checks.Add(FileCheck("Dapr pub/sub component", Path.Combine(options.ComponentsPath, "pubsubstore.yaml")));
        checks.Add(FileCheck("Dapr chat component", Path.Combine(options.ComponentsPath, "conversation-ollama.yaml")));
        checks.Add(FileCheck("Diagrid Dashboard state component", options.DashboardStateStorePath));
 
        return checks;
    }
 
    private static StartupDependencyCheck CatalystOllamaEndpointCheck(string? catalystOllamaBaseUrl) =>
        new("Catalyst Ollama endpoint", _ =>
        {
            if (string.IsNullOrWhiteSpace(catalystOllamaBaseUrl))
            {
                return Task.FromResult(StartupDependencyCheckResult.Fail(
                    "Catalyst Ollama endpoint",
                    "Set CATALYST_OLLAMA_BASE_URL to a separately managed reachable HTTPS OpenAI-compatible Ollama endpoint."));
            }
 
            if (!Uri.TryCreate(catalystOllamaBaseUrl, UriKind.Absolute, out var uri))
            {
                return Task.FromResult(StartupDependencyCheckResult.Fail(
                    "Catalyst Ollama endpoint",
                    $"CATALYST_OLLAMA_BASE_URL must be an absolute URL, but was '{catalystOllamaBaseUrl}'."));
            }
 
            if (!string.Equals(uri.Scheme, Uri.UriSchemeHttps, StringComparison.OrdinalIgnoreCase))
            {
                return Task.FromResult(StartupDependencyCheckResult.Fail(
                    "Catalyst Ollama endpoint",
                    "CATALYST_OLLAMA_BASE_URL must use HTTPS because Catalyst calls it from Diagrid cloud."));
            }
 
            if (OllamaEndpoint.IsLoopbackUrl(catalystOllamaBaseUrl))
            {
                return Task.FromResult(StartupDependencyCheckResult.Fail(
                    "Catalyst Ollama endpoint",
                    "CATALYST_OLLAMA_BASE_URL must not point at localhost or another loopback address."));
            }
 
            return Task.FromResult(StartupDependencyCheckResult.Pass(
                "Catalyst Ollama endpoint",
                catalystOllamaBaseUrl));
        });
 
    private static StartupDependencyCheck DirectoryCheck(string name, string path) =>
        new(name, _ => Task.FromResult(
            Directory.Exists(path)
                ? StartupDependencyCheckResult.Pass(name, path)
                : StartupDependencyCheckResult.Fail(name, $"Directory not found: {path}")));
 
    private static StartupDependencyCheck FileCheck(string name, string path) =>
        new(name, _ => Task.FromResult(
            File.Exists(path)
                ? StartupDependencyCheckResult.Pass(name, path)
                : StartupDependencyCheckResult.Fail(name, $"File not found: {path}")));
 
    private static StartupDependencyCheck TcpPortCheck(string name, string host, int port, string failureDetail) =>
        new(name, async cancellationToken =>
        {
            try
            {
                using var client = new TcpClient();
                await client.ConnectAsync(host, port, cancellationToken).AsTask().WaitAsync(TimeSpan.FromSeconds(2), cancellationToken);
                return StartupDependencyCheckResult.Pass(name, $"{host}:{port} is reachable");
            }
            catch (Exception ex) when (ex is SocketException or TimeoutException or OperationCanceledException)
            {
                return StartupDependencyCheckResult.Fail(name, failureDetail);
            }
        });
 
    private static StartupDependencyCheck ContainerRuntimeCheck() =>
        new("Container runtime", async cancellationToken =>
        {
            var docker = await RunCommandAsync("docker", ["--version"], cancellationToken);
            if (docker.ExitCode == 0)
            {
                return StartupDependencyCheckResult.Pass("Container runtime", docker.Output);
            }
 
            var podman = await RunCommandAsync("podman", ["--version"], cancellationToken);
            if (podman.ExitCode == 0)
            {
                return StartupDependencyCheckResult.Pass("Container runtime", podman.Output);
            }
 
            return StartupDependencyCheckResult.Fail(
                "Container runtime",
                "Install Docker or Podman so Aspire can start local diagnostics containers.");
        });
 
    private static StartupDependencyCheck CommandCheck(string name, string fileName, string[] arguments, string failureDetail) =>
        new(name, async cancellationToken =>
        {
            var result = await RunCommandAsync(fileName, arguments, cancellationToken);
            return result.ExitCode == 0
                ? StartupDependencyCheckResult.Pass(name, result.Output)
                : StartupDependencyCheckResult.Fail(name, failureDetail);
        });
 
    private static async Task<(int ExitCode, string Output)> RunCommandAsync(
        string fileName,
        IReadOnlyList<string> arguments,
        CancellationToken cancellationToken)
    {
        var startInfo = new ProcessStartInfo
        {
            FileName = fileName,
            RedirectStandardError = true,
            RedirectStandardOutput = true,
        };
 
        foreach (var argument in arguments)
        {
            startInfo.ArgumentList.Add(argument);
        }
 
        try
        {
            using var process = Process.Start(startInfo);
            if (process is null)
            {
                return (-1, "");
            }
 
            var outputTask = process.StandardOutput.ReadToEndAsync(cancellationToken);
            var errorTask = process.StandardError.ReadToEndAsync(cancellationToken);
            await process.WaitForExitAsync(cancellationToken).WaitAsync(TimeSpan.FromSeconds(5), cancellationToken);
 
            var output = (await outputTask).Trim();
            var error = (await errorTask).Trim();
            return (process.ExitCode, FirstNonEmptyLine(output, error));
        }
        catch (Exception ex) when (ex is InvalidOperationException or System.ComponentModel.Win32Exception or TimeoutException or OperationCanceledException)
        {
            return (-1, "");
        }
    }
 
    private static string FirstNonEmptyLine(string output, string error)
    {
        var line = output
            .Split('\n', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries)
            .Concat(error.Split('\n', StringSplitOptions.RemoveEmptyEntries | StringSplitOptions.TrimEntries))
            .FirstOrDefault();
 
        return string.IsNullOrWhiteSpace(line) ? "available" : line;
    }
}
// AppHost.cs
using Diagrid.Aspire.Hosting.Catalyst;
using AspireWithDaprAgents.AppHost.Shared;
using AspireWithDaprAgents.AppHost.Local;
using AspireWithDaprAgents.AppHost.Catalyst;
 
const string LocalRunMode = "local";
const string CatalystRunMode = "catalyst";
 
var builder = DistributedApplication.CreateBuilder(args);
var agentAppIds = AppTopology.AgentAppIds.ToArray();
var allDaprAppIds = AppTopology.DaprAppIds.ToArray();
 
var runMode = builder.Configuration["DAPR_RUN_MODE"] ?? builder.Configuration["RUN_MODE"] ?? LocalRunMode;
var useCatalyst = string.Equals(runMode, CatalystRunMode, StringComparison.OrdinalIgnoreCase) ||
    args.Contains("--catalyst", StringComparer.OrdinalIgnoreCase);
var catalystProjectName = builder.Configuration["CATALYST_PROJECT_NAME"] ?? "aspire-dapr-agents-catalyst";
 
var componentsPath = Path.Combine(builder.AppHostDirectory, "components");
var daprConfigPath = Path.Combine(componentsPath, "observability.yaml");
var dashboardStateStorePath = Path.Combine(builder.AppHostDirectory, "dashboard-components", "statestore.yaml");
var agentsPath = Path.GetFullPath(Path.Combine(builder.AppHostDirectory, "..", "AspireWithDaprAgents.Agents"));
 
if (!await StartupDependencyChecks.EnsureAsync(new(
    AppHostDirectory: builder.AppHostDirectory,
    AgentsPath: agentsPath,
    ComponentsPath: componentsPath,
    DashboardStateStorePath: dashboardStateStorePath,
    UseCatalyst: useCatalyst,
    CatalystOllamaBaseUrl: builder.Configuration["CATALYST_OLLAMA_BASE_URL"])))
{
    Environment.ExitCode = 1;
    return;
}
 
var ollama = await OllamaStartup.ConfigureAsync(builder.Configuration, useCatalyst);
if (ollama is null)
{
    Environment.ExitCode = 1;
    return;
}
 
if (useCatalyst && !await CatalystProvisioning.EnsureInfrastructureAsync(catalystProjectName, allDaprAppIds, agentAppIds, ollama.OpenAiEndpoint, ollama.Model, ollama.ApiKey ?? "ollama"))
{
    Environment.ExitCode = 1;
    return;
}
 
var catalystProject = useCatalyst
    ? builder.AddCatalystProject(
            catalystProjectName,
            descriptor: new()
            {
                EnableManagedWorkflow = true,
            })
    : null;
 
if (!useCatalyst)
{
    builder.AddLocalDiagnostics(dashboardStateStorePath);
}
 
var agentResources = AppTopology.LogisticsAgents.ToDictionary(
    agent => agent.AppId,
    agent => builder.AddLogisticsAgent(agent, agentsPath, catalystProject, useCatalyst, daprConfigPath, componentsPath));
 
var apiService = builder.AddProject<Projects.AspireWithDaprAgents_ApiService>("apiservice")
    .WithExternalHttpEndpoints()
    .WithReference(agentResources[AppTopology.Orchestrator.AppId])
    .WithEnvironment("AGENT_APP_ID", AppTopology.Orchestrator.AppId);
 
apiService = apiService.ApplySharedServiceConfig();
apiService = apiService.WithDaprApp(AppTopology.ApiServiceDaprHttpPort, catalystProject, useCatalyst, daprConfigPath, componentsPath);
 
foreach (var agentResource in agentResources.Values)
{
    apiService = apiService.WaitFor(agentResource);
}
 
var webFrontend = builder.AddProject<Projects.AspireWithDaprAgents_Web>(AppTopology.WebFrontendAppId)
    .WithExternalHttpEndpoints()
    .WithReference(apiService);
 
webFrontend = webFrontend.ApplySharedServiceConfig();
webFrontend = webFrontend.WithWebFrontendDaprApp(catalystProject, useCatalyst, daprConfigPath, componentsPath);
 
webFrontend
    .WaitFor(apiService);
 
builder.Build().Run();

Preparation of ngrok tunnel

AppHost does not manage ngrok tunneling, so ngrok must be configured before launching the application in Catalyst mode. This is required to make the locally running Ollama instance reachable by Catalyst.

You can create an account using ngrok and follow the Setup & Istallation steps. Once the setup is complete, run the following command to establish the tunnel and assign the forwarding URL to CATALYST_OLLAMA_BASE_URL.

ngrok http 11434 --host-header=rewrite
export CATALYST_OLLAMA_BASE_URL=<forwarding URL>

NOTE: This tunnel can be made secure, but has been kept out of scope for simplicity. It can also be managed by AppHost with some additional effort, instead of being run manually.

Preparation of Diagrid Catalyst

Refer to the Diagrid Catalyst & .NET Aspire blog post to understand the steps required for Catalyst integration.

Let's run the App

As mentioned earlier, AppHost supports two application runtime modes e.g., Local and Catalyst, with Local mode being the default. In Local mode, it starts local Dapr sidecars for the API service, web frontend, orchestrator, and logistics agents with component YAML from AspireWithDaprAgents.AppHost/components. It uses the external Redis instance at localhost:6379, verifies local Ollama before startup, and starts the local Diagrid Dashboard and Redis Insight containers.

aspire run

From the Aspire Dashboard, you can launch the Diagrid Dashboard, Redis Insight and Web App.

NOTE: The local Diagrid Dashboard has limited functionality.

DAPR_EXECUTION_MODE=catalyst aspire run

In Catalyst mode the AppHost creates a Catalyst project with managed workflow enabled, provisions managed KV and pub/sub services, and registers Catalyst-backed Dapr components named statestore, pubsubstore, and llama. App IDs, app ports, and local Dapr HTTP ports are centralized in AppHost metadata so Catalyst app connections and local sidecar settings stay aligned. The application code keeps using the same Dapr app IDs and chat component name in both modes.

The web frontend is also Dapr-enabled in both modes. It subscribes to pubsubstore topic shipment-progress through /dapr/subscribe and receives realtime progress deliveries at /dapr/events/shipment-progress.

You can observe and compare Aspire resources across local and Catalyst environments to understand the differences. The following screenshots provide a glimpse of Diagrid Catalyst's key features, including Components, Managed Services, App Graph, and Workflows. There are many more features that are not covered here.

After submitting a scenario or multiple scenarios, you can see valuable details in the Apps Graph and Workflows.

Where is the web app code ?

You might be wondering where the web application code is located, as it hasn’t been discussed or shared yet. It is a simple Blazor application intended to demonstrate the complete flow. If you’re interested in accessing the web application code, feel free to reach out to me.

Conclusion

Through this comprehensive walkthrough, I've demonstrated how Dapr Agents, Aspire, and Diagrid Catalyst work in concert to streamline the development of production-grade agentic application. However, this blog post has only scratched the surface of Dapr Agents' remarkable capabilities.

Dapr Agents is fundamentally a developer framework for building durable and resilient AI agent systems powered by Large Language Models. Dapr Agents supports a rich set of agentic patterns including Augmented LLM, Durable Agent, Prompt Chaining, Evaluator Optimizer, Autonomous Agent, Parallelization, Routing and Orchestrator Worker. The framework provides cryptographic identity for every agent, ensuring secure authentication and authorization by default.

For deeper exploration, see the Dapr Agents documentation to unlock capabilities like data-driven agent systems, memory persistence, structured outputs, event-driven collaboration, and enterprise-grade observability that transform agentic applications from prototypes into scalable, resilient production workloads.

Happy Learning & coding... 📚