This blog is a continuation of my previous blog, Dapr - Workflow & .NET Aspire, which introduced the concepts of Augmented LLM and Stateful LLM. In this blog, the focus shifts to advanced agentic patterns—including Prompt chaining, Routing, and Parallelization—demonstrating how these can be implemented using Dapr Workflow and the Dapr Conversation API to build scalable, stateful, and modular LLM pipelines in distributed .NET applications.
Prerequisites & Solution structure
- Refer to the earlier blog post, Dapr - Workflow & .NET Aspire
Changes in AppHost
The PromptChaining project (a new Minimal API) has been added, and the web frontend has been updated to reference it.
// AppHost.cs
using CommunityToolkit.Aspire.Hosting.Dapr;
var builder = DistributedApplication.CreateBuilder(args);
// Diagrid Dashboard for Dapr Workflow Visualization
var diagridDashboard = builder.AddContainer("diagrid-dashboard", "ghcr.io/diagridio/diagrid-dashboard", "latest")
.WithHttpEndpoint(8080, 8080, name: "dashboard");
// Add RedisInsight GUI for Redis data visualization
var redisInsight = builder.AddContainer("redisinsight", "redislabs/redisinsight", "latest")
.WithHttpEndpoint(8001, 8001)
.WithEnvironment("RI_APP_PORT", "8001")
.WithEnvironment("RI_HOST", "0.0.0.0")
.WithBindMount(Path.Combine(builder.AppHostDirectory, "redisinsight-data"), "/data");
// Pattern - 1. AugmentedLlm
var augmentedLlmService = builder.AddProject<Projects.AspireWithDapr_AugmentedLlm>("AugmentedLlm")
.WithHttpHealthCheck("/health")
.WithDaprSidecar(new DaprSidecarOptions
{
ResourcesPaths = ["components"],
Config = "components/daprConfig.yaml",
EnableApiLogging = false
})
.WithEnvironment("DAPR_HOST_IP", "127.0.0.1")
.WithEnvironment("ZIPKIN_ENDPOINT", "http://localhost:9411/api/v2/spans");
// Pattern - 1.1 StatefulLlm
var statefulLlmService = builder.AddProject<Projects.AspireWithDapr_StatefulLlm>("StatefulLlm")
.WithExternalHttpEndpoints()
.WithHttpHealthCheck("/health")
.WithDaprSidecar(new DaprSidecarOptions
{
ResourcesPaths = ["components"],
Config = "components/daprConfig.yaml",
EnableApiLogging = false
})
.WithEnvironment("DAPR_HOST_IP", "127.0.0.1")
.WithEnvironment("ZIPKIN_ENDPOINT", "http://localhost:9411/api/v2/spans");
// Pattern - 2. PromptChaining
var promptChainingService = builder.AddProject<Projects.AspireWithDapr_PromptChaining>("PromptChaining")
.WithExternalHttpEndpoints()
.WithHttpHealthCheck("/health")
.WithDaprSidecar(new DaprSidecarOptions
{
ResourcesPaths = ["components"],
Config = "components/daprConfig.yaml",
EnableApiLogging = false
})
.WithEnvironment("DAPR_HOST_IP", "127.0.0.1")
.WithEnvironment("ZIPKIN_ENDPOINT", "http://localhost:9411/api/v2/spans");
// Web Frontend
builder.AddProject<Projects.AspireWithDapr_Web>("webfrontend")
.WithExternalHttpEndpoints()
.WithHttpHealthCheck("/health")
.WithReference(augmentedLlmService)
.WaitFor(augmentedLlmService)
.WithReference(statefulLlmService)
.WaitFor(statefulLlmService)
.WithReference(promptChainingService)
.WaitFor(promptChainingService)
.WithDaprSidecar(new DaprSidecarOptions
{
ResourcesPaths = ["components"],
Config = "components/daprConfig.yaml"
})
.WithEnvironment("DAPR_HOST_IP", "127.0.0.1")
.WithEnvironment("ZIPKIN_ENDPOINT", "http://localhost:9411/api/v2/spans");
builder.Build().Run();Prompt Chaining
Prompt chaining divides a task into sequential steps, with each LLM invocation building on the output of the previous one. Programmatic checks e.g., “gate” can be inserted at intermediate steps to ensure the workflow remains on track.


dotnet add package Dapr.AI
dotnet add package Dapr.Workflow// LlmModels.cs
using System.ComponentModel.DataAnnotations;
namespace AspireWithDapr.PromptChaining.Models;
public record LlmRequest(
[Required] [MinLength(1)] string Prompt,
Dictionary<string, object>? Context = null,
string? Model = null,
double? Temperature = null);
public record LlmResponse(
string Response,
Dictionary<string, object>? Metadata = null);// SearchModels.cs
using System.ComponentModel.DataAnnotations;
namespace AspireWithDapr.PromptChaining.Models;
public record SearchResult(string Title, string Content, string Source);
public record SearchRequest(
[Required] [MinLength(1)] string Query);// Program.cs
#pragma warning disable DAPR_CONVERSATION
using System.ComponentModel.DataAnnotations;
using System.Diagnostics;
using Dapr.AI.Conversation.Extensions;
using Dapr.Workflow;
using AspireWithDapr.PromptChaining.Models;
using AspireWithDapr.PromptChaining.Workflow;
using Microsoft.Extensions.Hosting;
using OpenTelemetry.Trace;
var builder = WebApplication.CreateBuilder(args);
builder.AddServiceDefaults();
builder.Services.AddProblemDetails();
builder.Services.AddOpenApi();
builder.Services.AddDaprConversationClient();
builder.Services.AddDaprWorkflow(options =>
{
options.RegisterWorkflow<PromptChainingWorkflow>();
options.RegisterActivity<AnalyzeIntentActivity>();
options.RegisterActivity<GenerateQueriesActivity>();
options.RegisterActivity<SearchDataActivity>();
options.RegisterActivity<SynthesizeActivity>();
options.RegisterActivity<CallLlmActivity>();
options.RegisterActivity<LoggingActivity>();
});
var app = builder.Build();
// Initialize workflow logger with the app's logger factory
var loggerFactory = app.Services.GetRequiredService<ILoggerFactory>();
PromptChainingWorkflow.SetLogger(loggerFactory);
app.UseExceptionHandler();
if (app.Environment.IsDevelopment())
{
app.MapOpenApi();
}
app.MapGet("/", () => Results.Ok(new {
service = "Prompt Chaining LLM API",
version = "1.0.0",
endpoints = new[] { "/llm/query", "/llm/query/{instanceId}", "/llm/query/{instanceId}/status", "/health" }
}))
.WithName("GetServiceInfo")
.WithTags("Info");
app.MapPost("/llm/query", async (
[Required] LlmRequest request,
DaprWorkflowClient workflowClient,
ILogger<Program> logger,
CancellationToken cancellationToken) =>
{
using var activity = Extensions.WorkflowActivitySource.StartActivity("PromptChaining.StartQuery", ActivityKind.Server);
activity?.SetTag("llm.prompt_length", request.Prompt?.Length ?? 0);
activity?.SetTag("llm.model", request.Model ?? "llama");
activity?.SetTag("llm.temperature", request.Temperature ?? 0.7);
if (string.IsNullOrWhiteSpace(request.Prompt))
{
activity?.SetStatus(ActivityStatusCode.Error, "Prompt is required");
return Results.BadRequest(new { error = "Prompt is required and cannot be empty." });
}
try
{
logger.LogInformation("Starting Prompt Chaining workflow for prompt: {Prompt} with length {PromptLength} characters", request.Prompt, request.Prompt.Length);
var instanceId = Guid.NewGuid().ToString();
activity?.SetTag("workflow.instance_id", instanceId);
await workflowClient.ScheduleNewWorkflowAsync(
nameof(PromptChainingWorkflow),
instanceId,
request);
logger.LogInformation("Workflow started with instance ID: {InstanceId}", instanceId);
activity?.SetStatus(ActivityStatusCode.Ok);
return Results.Accepted($"/llm/query/{instanceId}", new {
instanceId,
status = "started"
});
}
catch (Exception ex)
{
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
activity?.AddException(ex);
logger.LogError(ex, "Error starting workflow");
return Results.Problem(
detail: "An error occurred while starting the workflow.",
statusCode: StatusCodes.Status500InternalServerError);
}
})
.WithName("StartPromptChainingQuery")
.WithTags("LLM")
.Produces<LlmResponse>(StatusCodes.Status202Accepted)
.Produces(StatusCodes.Status400BadRequest)
.Produces(StatusCodes.Status500InternalServerError);
app.MapGet("/llm/query/{instanceId}", async (
string instanceId,
DaprWorkflowClient workflowClient,
ILogger<Program> logger,
CancellationToken cancellationToken) =>
{
using var activity = Extensions.WorkflowActivitySource.StartActivity("PromptChaining.GetResult", ActivityKind.Server);
activity?.SetTag("workflow.instance_id", instanceId);
try
{
var workflowState = await workflowClient.GetWorkflowStateAsync(instanceId);
if (workflowState is null)
{
activity?.SetTag("workflow.found", false);
activity?.SetStatus(ActivityStatusCode.Error, "Not found");
return Results.NotFound(new { error = $"Workflow instance {instanceId} not found." });
}
activity?.SetTag("workflow.found", true);
activity?.SetTag("workflow.runtime_status", workflowState.RuntimeStatus.ToString());
if (workflowState.RuntimeStatus != WorkflowRuntimeStatus.Completed)
{
activity?.SetStatus(ActivityStatusCode.Ok);
return Results.Accepted($"/llm/query/{instanceId}", new
{
instanceId,
runtimeStatus = workflowState.RuntimeStatus.ToString()
});
}
var output = workflowState.ReadOutputAs<LlmResponse>();
activity?.SetTag("workflow.has_output", output != null);
activity?.SetStatus(ActivityStatusCode.Ok);
return Results.Ok(new
{
instanceId,
runtimeStatus = workflowState.RuntimeStatus.ToString(),
result = output
});
}
catch (Exception ex)
{
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
activity?.AddException(ex);
logger.LogError(ex, "Error retrieving workflow state for instance {InstanceId}", instanceId);
return Results.Problem(
detail: "An error occurred while retrieving the workflow state.",
statusCode: StatusCodes.Status500InternalServerError);
}
})
.WithName("GetPromptChainingQueryResult")
.WithTags("LLM")
.Produces<LlmResponse>()
.Produces(StatusCodes.Status404NotFound)
.Produces(StatusCodes.Status500InternalServerError);
app.MapGet("/llm/query/{instanceId}/status", async (
string instanceId,
DaprWorkflowClient workflowClient,
ILogger<Program> logger,
CancellationToken cancellationToken) =>
{
using var activity = Extensions.WorkflowActivitySource.StartActivity("PromptChaining.GetStatus", ActivityKind.Server);
activity?.SetTag("workflow.instance_id", instanceId);
try
{
var workflowState = await workflowClient.GetWorkflowStateAsync(instanceId);
if (workflowState is null)
{
activity?.SetTag("workflow.found", false);
activity?.SetStatus(ActivityStatusCode.Error, "Not found");
return Results.NotFound(new { error = $"Workflow instance {instanceId} not found." });
}
var status = workflowState.RuntimeStatus;
activity?.SetTag("workflow.found", true);
activity?.SetTag("workflow.runtime_status", status.ToString());
activity?.SetStatus(ActivityStatusCode.Ok);
return Results.Ok(new
{
instanceId,
runtimeStatus = status.ToString(),
isCompleted = status == WorkflowRuntimeStatus.Completed,
isRunning = status == WorkflowRuntimeStatus.Running,
isFailed = status == WorkflowRuntimeStatus.Failed,
isTerminated = status == WorkflowRuntimeStatus.Terminated,
isPending = status == WorkflowRuntimeStatus.Pending
});
}
catch (Exception ex)
{
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
activity?.AddException(ex);
logger.LogError(ex, "Error retrieving workflow status for instance {InstanceId}", instanceId);
return Results.Problem(
detail: "An error occurred while retrieving the workflow status.",
statusCode: StatusCodes.Status500InternalServerError);
}
})
.WithName("GetPromptChainingWorkflowStatus")
.WithTags("LLM")
.Produces(StatusCodes.Status200OK)
.Produces(StatusCodes.Status404NotFound)
.Produces(StatusCodes.Status500InternalServerError);
app.MapDefaultEndpoints();
app.Run();// PromptChainingWorkflow.cs
#pragma warning disable DAPR_CONVERSATION
using Dapr.Workflow;
using Dapr.AI.Conversation;
using Dapr.AI.Conversation.ConversationRoles;
using AspireWithDapr.PromptChaining.Models;
using System.Text.Json;
using System.Diagnostics;
namespace AspireWithDapr.PromptChaining.Workflow;
public class PromptChainingWorkflow : Workflow<LlmRequest, LlmResponse>
{
private static ILogger? _logger;
public static void SetLogger(ILoggerFactory loggerFactory)
{
_logger = loggerFactory.CreateLogger<PromptChainingWorkflow>();
}
public override async Task<LlmResponse> RunAsync(
WorkflowContext context,
LlmRequest request)
{
ArgumentNullException.ThrowIfNull(request);
ArgumentException.ThrowIfNullOrWhiteSpace(request.Prompt);
// Create logger if not set, using a default factory as fallback
var logger = _logger ?? LoggerFactory.Create(builder => builder.AddConsole()).CreateLogger<PromptChainingWorkflow>();
var workflowInstanceId = context.InstanceId;
// Log workflow start using activity to ensure it's captured
var promptPreview = request.Prompt.Length > 100 ? request.Prompt[..100] + "..." : request.Prompt;
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Workflow Start] InstanceId: {workflowInstanceId} | Prompt: \"{promptPreview}\" | Prompt Length: {request.Prompt.Length} | Model: {request.Model ?? "llama"} | Temperature: {request.Temperature ?? 0.7}"));
// Log full initial prompt for end-to-end tracking
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Initial Prompt] InstanceId: {workflowInstanceId}\n---\n{request.Prompt}\n---"));
context.SetCustomStatus(new { step = "analyzing_intent" });
// Chain 1: Analyze intent
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Step 1: AnalyzeIntent] InstanceId: {workflowInstanceId} | Input Prompt: {promptPreview}"));
string? intent = null;
try
{
intent = await context.CallActivityAsync<string>(
nameof(AnalyzeIntentActivity),
new AnalyzeIntentRequest(request.Prompt, request.Model, request.Temperature));
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Step 1: AnalyzeIntent Complete] InstanceId: {workflowInstanceId} | Intent: {intent}"));
}
catch (Exception ex)
{
logger.LogError(ex,
"[Step 1: AnalyzeIntent Failed] InstanceId: {InstanceId} | Error: {Error}",
workflowInstanceId,
ex.Message);
throw;
}
context.SetCustomStatus(new { step = "generating_queries", intent });
// Chain 2: Generate search queries
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Step 2: GenerateQueries] InstanceId: {workflowInstanceId} | Prompt: \"{promptPreview}\" | Intent: {intent}"));
List<string>? queries = null;
try
{
queries = await context.CallActivityAsync<List<string>>(
nameof(GenerateQueriesActivity),
new GenerateQueriesRequest(request.Prompt, intent!, request.Model, request.Temperature));
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Step 2: GenerateQueries Complete] InstanceId: {workflowInstanceId} | Query Count: {queries?.Count ?? 0}"));
}
catch (Exception ex)
{
logger.LogError(ex,
"[Step 2: GenerateQueries Failed] InstanceId: {InstanceId} | Error: {Error}",
workflowInstanceId,
ex.Message);
throw;
}
if (queries is null || queries.Count == 0)
{
logger.LogWarning(
"[Workflow Early Exit] InstanceId: {InstanceId} | Reason: No queries generated",
workflowInstanceId);
context.SetCustomStatus(new { step = "completed", warning = "no_queries_generated" });
return new LlmResponse(
"Unable to generate search queries for your query.",
new Dictionary<string, object>
{
["queries_generated"] = 0,
["intent"] = intent ?? "unknown"
});
}
context.SetCustomStatus(new { step = "searching", query_count = queries.Count });
// Chain 3: Execute searches in parallel
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Step 3: SearchData] InstanceId: {workflowInstanceId} | Query Count: {queries.Count}"));
List<SearchResult>? flattenedResults = null;
try
{
var searchTasks = queries.Select(query =>
context.CallActivityAsync<List<SearchResult>>(
nameof(SearchDataActivity),
new SearchRequest(query)));
var allResults = await Task.WhenAll(searchTasks);
flattenedResults = allResults.SelectMany(r => r).Distinct().ToList();
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Step 3: SearchData Complete] InstanceId: {workflowInstanceId} | Total Results: {flattenedResults.Count}"));
}
catch (Exception ex)
{
logger.LogError(ex,
"[Step 3: SearchData Failed] InstanceId: {InstanceId} | Error: {Error}",
workflowInstanceId,
ex.Message);
throw;
}
if (flattenedResults is null || flattenedResults.Count == 0)
{
logger.LogWarning(
"[Workflow Early Exit] InstanceId: {InstanceId} | Reason: No search results found",
workflowInstanceId);
context.SetCustomStatus(new { step = "completed", warning = "no_search_results" });
return new LlmResponse(
"No relevant sources were found for your query.",
new Dictionary<string, object>
{
["queries_generated"] = queries.Count,
["results_synthesized"] = 0,
["intent"] = intent ?? "unknown"
});
}
if (flattenedResults.Count > 0)
{
logger.LogDebug(
"[Step 3: SearchData Output] InstanceId: {InstanceId} | Results: {Results}",
workflowInstanceId,
JsonSerializer.Serialize(flattenedResults.Select(r => new { r.Title, r.Source, ContentLength = r.Content.Length })));
}
context.SetCustomStatus(new { step = "synthesizing" });
// Chain 4: Synthesize information
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Step 4: Synthesize] InstanceId: {workflowInstanceId} | Results Count: {flattenedResults.Count} | Original Prompt: \"{promptPreview}\""));
string? synthesis = null;
try
{
synthesis = await context.CallActivityAsync<string>(
nameof(SynthesizeActivity),
new SynthesizeRequest(flattenedResults, request.Prompt, request.Model, request.Temperature));
var synthesisPreview = synthesis?.Length > 150 ? synthesis[..150] + "..." : synthesis;
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Step 4: Synthesize Complete] InstanceId: {workflowInstanceId} | Synthesis: \"{synthesisPreview}\" | Synthesis Length: {synthesis?.Length ?? 0}"));
// Log full synthesis after completion
if (synthesis != null)
{
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Synthesis After Step 4] InstanceId: {workflowInstanceId}\n---\n{synthesis}\n---"));
}
}
catch (Exception ex)
{
logger.LogError(ex,
"[Step 4: Synthesize Failed] InstanceId: {InstanceId} | Error: {Error}",
workflowInstanceId,
ex.Message);
throw;
}
logger.LogDebug(
"[Step 4: Synthesize Output] InstanceId: {InstanceId} | Synthesis Preview: {Preview}",
workflowInstanceId,
synthesis?.Length > 200 ? synthesis[..200] + "..." : synthesis);
context.SetCustomStatus(new { step = "generating_final_answer" });
// Chain 5: Generate final answer
var finalPrompt = $"""
Based on the synthesized information:
{synthesis}
Please answer: {request.Prompt}
""";
var finalPromptPreview = finalPrompt.Length > 150 ? finalPrompt[..150] + "..." : finalPrompt;
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Step 5: CallLlm] InstanceId: {workflowInstanceId} | Model: {request.Model ?? "llama"} | Temperature: {request.Temperature ?? 0.7} | Prompt: \"{finalPromptPreview}\" | Prompt Length: {finalPrompt.Length}"));
LlmResponse? response = null;
try
{
response = await context.CallActivityAsync<LlmResponse>(
nameof(CallLlmActivity),
new LlmRequest(finalPrompt, request.Context, request.Model, request.Temperature));
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Step 5: CallLlm Complete] InstanceId: {workflowInstanceId} | Response Length: {response?.Response?.Length ?? 0} | Has Metadata: {response?.Metadata != null}"));
// Log full LLM response after LLM call
if (response?.Response != null)
{
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Response After LLM Call] InstanceId: {workflowInstanceId}\n---\n{response.Response}\n---"));
}
}
catch (Exception ex)
{
logger.LogError(ex,
"[Step 5: CallLlm Failed] InstanceId: {InstanceId} | Error: {Error} | StackTrace: {StackTrace}",
workflowInstanceId,
ex.Message,
ex.StackTrace);
throw;
}
if (response?.Metadata != null)
{
logger.LogDebug(
"[Step 5: CallLlm Metadata] InstanceId: {InstanceId} | Metadata: {Metadata}",
workflowInstanceId,
JsonSerializer.Serialize(response.Metadata));
}
logger.LogDebug(
"[Step 5: CallLlm Output Preview] InstanceId: {InstanceId} | Response Preview: {Preview}",
workflowInstanceId,
response?.Response?.Length > 200 ? response.Response[..200] + "..." : response?.Response);
context.SetCustomStatus(new { step = "completed" });
// Enhance metadata with workflow information
var enhancedMetadata = new Dictionary<string, object>(response?.Metadata ?? new Dictionary<string, object>())
{
["intent"] = intent ?? "unknown",
["queries_generated"] = queries.Count,
["results_synthesized"] = flattenedResults.Count,
["workflow_type"] = "prompt_chaining"
};
var finalResponse = response! with { Metadata = enhancedMetadata };
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Workflow Complete] InstanceId: {workflowInstanceId} | Total Steps: 5 | Final Response Length: {finalResponse.Response?.Length ?? 0} | Intent: {intent} | Queries: {queries.Count} | Results: {flattenedResults.Count}"));
if (finalResponse.Metadata != null)
{
logger.LogDebug(
"[Workflow Final Metadata] InstanceId: {InstanceId} | Metadata: {Metadata}",
workflowInstanceId,
JsonSerializer.Serialize(finalResponse.Metadata));
}
if (finalResponse is null)
{
logger.LogError(
"[Workflow Error] InstanceId: {InstanceId} | Final response is null",
workflowInstanceId);
throw new InvalidOperationException("Workflow completed but final response is null.");
}
return finalResponse;
}
}
// Logging activity to ensure workflow logs are captured
public class LoggingActivity(ILogger<LoggingActivity> logger) : WorkflowActivity<LogMessage, bool>
{
public override Task<bool> RunAsync(WorkflowActivityContext context, LogMessage message)
{
try
{
switch (message.Level.ToLower())
{
case "error":
logger.LogError("[Workflow] {Message}", message.Message);
break;
case "warning":
logger.LogWarning("[Workflow] {Message}", message.Message);
break;
case "debug":
logger.LogDebug("[Workflow] {Message}", message.Message);
break;
default:
logger.LogInformation("[Workflow] {Message}", message.Message);
break;
}
return Task.FromResult(true);
}
catch (Exception ex)
{
logger.LogError(ex, "[LoggingActivity] Error logging message: {Message}", message.Message);
return Task.FromResult(false);
}
}
}
public record LogMessage(string Message, string Level = "Information");
// Activity to analyze the intent of a user query
public class AnalyzeIntentActivity(
DaprConversationClient conversationClient,
ILogger<AnalyzeIntentActivity> logger) : WorkflowActivity<AnalyzeIntentRequest, string>
{
public override async Task<string> RunAsync(
WorkflowActivityContext context,
AnalyzeIntentRequest request)
{
ArgumentNullException.ThrowIfNull(conversationClient);
ArgumentNullException.ThrowIfNull(request);
ArgumentException.ThrowIfNullOrWhiteSpace(request.Prompt);
var activityName = nameof(AnalyzeIntentActivity);
using var span = Extensions.LlmActivitySource.StartActivity("LLM.AnalyzeIntent", ActivityKind.Internal);
span?.SetTag("llm.activity", activityName);
span?.SetTag("llm.model", request.Model ?? "llama");
span?.SetTag("llm.prompt_length", request.Prompt.Length);
var startTime = DateTime.UtcNow;
var componentName = request.Model ?? "llama";
var promptPreview = request.Prompt.Length > 100 ? request.Prompt[..100] + "..." : request.Prompt;
logger.LogInformation(
"[{Activity} Start] Prompt: \"{Prompt}\" | Prompt Length: {PromptLength} | Model: {Model} | Temperature: {Temperature}",
activityName,
promptPreview,
request.Prompt.Length,
componentName,
request.Temperature ?? 0.3);
logger.LogDebug(
"[{Activity} Input] Full Request: {Request}",
activityName,
JsonSerializer.Serialize(new
{
PromptLength = request.Prompt.Length,
PromptPreview = request.Prompt.Length > 200 ? request.Prompt[..200] + "..." : request.Prompt,
request.Model,
request.Temperature
}));
var analysisPrompt = $"""
Analyze the intent of this query:
"{request.Prompt}"
Respond with one word: factual, explanatory, comparative, creative, or analytical.
""";
var conversationInput = new ConversationInput(
[
new UserMessage
{
Content = [new MessageContent(analysisPrompt)]
}
]);
var conversationOptions = new ConversationOptions(componentName)
{
Temperature = request.Temperature ?? 0.3
};
logger.LogDebug(
"[{Activity}] Calling LLM conversation API with component: {Component}",
activityName,
componentName);
var response = await conversationClient.ConverseAsync(
[conversationInput],
conversationOptions,
CancellationToken.None);
if (response?.Outputs is null || response.Outputs.Count == 0)
{
logger.LogError(
"[{Activity} Error] LLM returned no outputs. Response: {Response}",
activityName,
response != null ? JsonSerializer.Serialize(response) : "null");
throw new InvalidOperationException("LLM returned no outputs for intent analysis.");
}
logger.LogDebug(
"[{Activity}] Received {OutputCount} output(s) from LLM",
activityName,
response.Outputs.Count);
var responseText = string.Empty;
var choiceCount = 0;
foreach (var output in response.Outputs)
{
if (output?.Choices is null) continue;
foreach (var choice in output.Choices)
{
var content = choice.Message?.Content ?? string.Empty;
responseText += content;
choiceCount++;
}
}
if (string.IsNullOrWhiteSpace(responseText))
{
logger.LogError(
"[{Activity} Error] LLM returned empty response. Outputs Count: {OutputCount}, Choices Processed: {ChoiceCount}",
activityName,
response.Outputs.Count,
choiceCount);
throw new InvalidOperationException("LLM returned empty response for intent analysis.");
}
var intent = responseText.Trim().ToLower();
// Validate intent is one of the expected values
var validIntents = new[] { "factual", "explanatory", "comparative", "creative", "analytical" };
if (!validIntents.Contains(intent))
{
// Default to analytical if intent doesn't match
intent = "analytical";
logger.LogWarning("[{Activity}] Invalid intent '{Intent}', defaulting to analytical", activityName, responseText);
}
var duration = (DateTime.UtcNow - startTime).TotalMilliseconds;
logger.LogInformation(
"[{Activity} Complete] Duration: {Duration}ms | Intent: {Intent} | Choices: {ChoiceCount} | ConversationId: {ConversationId}",
activityName,
duration,
intent,
choiceCount,
response.ConversationId ?? "none");
logger.LogDebug(
"[{Activity} Output] Intent: {Intent}",
activityName,
intent);
span?.SetTag("llm.intent", intent);
span?.SetTag("llm.duration_ms", duration);
span?.SetStatus(ActivityStatusCode.Ok);
return intent;
}
}
// Activity to generate search queries based on intent
public class GenerateQueriesActivity(
DaprConversationClient conversationClient,
ILogger<GenerateQueriesActivity> logger) : WorkflowActivity<GenerateQueriesRequest, List<string>>
{
public override async Task<List<string>> RunAsync(
WorkflowActivityContext context,
GenerateQueriesRequest request)
{
ArgumentNullException.ThrowIfNull(conversationClient);
ArgumentNullException.ThrowIfNull(request);
ArgumentException.ThrowIfNullOrWhiteSpace(request.Prompt);
ArgumentException.ThrowIfNullOrWhiteSpace(request.Intent);
var activityName = nameof(GenerateQueriesActivity);
using var span = Extensions.LlmActivitySource.StartActivity("LLM.GenerateQueries", ActivityKind.Internal);
span?.SetTag("llm.activity", activityName);
span?.SetTag("llm.model", request.Model ?? "llama");
span?.SetTag("llm.intent", request.Intent);
span?.SetTag("llm.prompt_length", request.Prompt.Length);
var startTime = DateTime.UtcNow;
var componentName = request.Model ?? "llama";
var promptPreview = request.Prompt.Length > 100 ? request.Prompt[..100] + "..." : request.Prompt;
logger.LogInformation(
"[{Activity} Start] Prompt: \"{Prompt}\" | Prompt Length: {PromptLength} | Intent: {Intent} | Model: {Model} | Temperature: {Temperature}",
activityName,
promptPreview,
request.Prompt.Length,
request.Intent,
componentName,
request.Temperature ?? 0.7);
logger.LogDebug(
"[{Activity} Input] Full Request: {Request}",
activityName,
JsonSerializer.Serialize(new
{
PromptLength = request.Prompt.Length,
PromptPreview = request.Prompt.Length > 200 ? request.Prompt[..200] + "..." : request.Prompt,
request.Intent,
request.Model,
request.Temperature
}));
var queryPrompt = $"""
For the query: "{request.Prompt}"
With intent: {request.Intent}
Generate 3 search queries to gather comprehensive information.
Return each query on a new line, numbered 1, 2, 3.
""";
var conversationInput = new ConversationInput(
[
new UserMessage
{
Content = [new MessageContent(queryPrompt)]
}
]);
var conversationOptions = new ConversationOptions(componentName)
{
Temperature = request.Temperature ?? 0.7
};
logger.LogDebug(
"[{Activity}] Calling LLM conversation API with component: {Component}",
activityName,
componentName);
var response = await conversationClient.ConverseAsync(
[conversationInput],
conversationOptions,
CancellationToken.None);
if (response?.Outputs is null || response.Outputs.Count == 0)
{
logger.LogError(
"[{Activity} Error] LLM returned no outputs. Response: {Response}",
activityName,
response != null ? JsonSerializer.Serialize(response) : "null");
throw new InvalidOperationException("LLM returned no outputs for query generation.");
}
logger.LogDebug(
"[{Activity}] Received {OutputCount} output(s) from LLM",
activityName,
response.Outputs.Count);
var responseText = string.Empty;
var choiceCount = 0;
foreach (var output in response.Outputs)
{
if (output?.Choices is null) continue;
foreach (var choice in output.Choices)
{
var content = choice.Message?.Content ?? string.Empty;
responseText += content;
choiceCount++;
}
}
if (string.IsNullOrWhiteSpace(responseText))
{
logger.LogError(
"[{Activity} Error] LLM returned empty response. Outputs Count: {OutputCount}, Choices Processed: {ChoiceCount}",
activityName,
response.Outputs.Count,
choiceCount);
throw new InvalidOperationException("LLM returned empty response for query generation.");
}
// Parse queries from response
var queries = responseText
.Split('\n', StringSplitOptions.RemoveEmptyEntries)
.Select(q => q.Trim())
.Where(q => !string.IsNullOrWhiteSpace(q))
// Remove numbering if present (e.g., "1. query" -> "query")
.Select(q => System.Text.RegularExpressions.Regex.Replace(q, @"^\d+[\.\)]\s*", ""))
.Take(3)
.ToList();
// If we didn't get 3 queries, add fallback queries
while (queries.Count < 3)
{
queries.Add($"{request.Prompt} {queries.Count + 1}");
}
var duration = (DateTime.UtcNow - startTime).TotalMilliseconds;
logger.LogInformation(
"[{Activity} Complete] Duration: {Duration}ms | Queries Generated: {Count} | Choices: {ChoiceCount} | ConversationId: {ConversationId}",
activityName,
duration,
queries.Count,
choiceCount,
response.ConversationId ?? "none");
logger.LogDebug(
"[{Activity} Output] Queries: {Queries}",
activityName,
JsonSerializer.Serialize(queries));
span?.SetTag("llm.queries_count", queries.Count);
span?.SetTag("llm.duration_ms", duration);
span?.SetStatus(ActivityStatusCode.Ok);
return queries;
}
}
// Activity to search for data (reusing pattern from AugmentedLlm)
public class SearchDataActivity(ILogger<SearchDataActivity> logger) : WorkflowActivity<SearchRequest, List<SearchResult>>
{
public override async Task<List<SearchResult>> RunAsync(
WorkflowActivityContext context,
SearchRequest input)
{
ArgumentNullException.ThrowIfNull(input);
ArgumentException.ThrowIfNullOrWhiteSpace(input.Query);
var activityName = nameof(SearchDataActivity);
using var span = Extensions.WorkflowActivitySource.StartActivity("Workflow.SearchData", ActivityKind.Internal);
span?.SetTag("workflow.activity", activityName);
span?.SetTag("search.query_length", input.Query.Length);
var startTime = DateTime.UtcNow;
logger.LogInformation(
"[{Activity} Start] Query: {Query} | Query Length: {QueryLength}",
activityName,
input.Query,
input.Query.Length);
logger.LogDebug(
"[{Activity} Input] Full Request: {Request}",
activityName,
JsonSerializer.Serialize(input));
// Simulate API call
logger.LogDebug("[{Activity}] Simulating search API call...", activityName);
await Task.Delay(100);
var results = new List<SearchResult>
{
new($"Document about {input.Query}",
$"This document contains comprehensive information about {input.Query}. " +
$"It provides detailed analysis, examples, and best practices.",
"Knowledge Base"),
new($"Technical Guide: {input.Query}",
$"A technical guide covering {input.Query}. Includes implementation details, " +
$"code examples, troubleshooting, and performance considerations.",
"Technical Library"),
new($"Research Paper: {input.Query}",
$"Academic research on {input.Query}. Presents findings, methodologies, " +
$"and future research directions.",
"Research Database")
};
var duration = (DateTime.UtcNow - startTime).TotalMilliseconds;
var totalContentLength = results.Sum(r => r.Content.Length);
logger.LogInformation(
"[{Activity} Complete] Duration: {Duration}ms | Results Count: {Count} | Total Content Length: {ContentLength}",
activityName,
duration,
results.Count,
totalContentLength);
logger.LogDebug(
"[{Activity} Output] Results: {Results}",
activityName,
JsonSerializer.Serialize(results.Select(r => new
{
r.Title,
r.Source,
ContentLength = r.Content.Length,
ContentPreview = r.Content.Length > 100 ? r.Content[..100] + "..." : r.Content
})));
span?.SetTag("search.results_count", results.Count);
span?.SetTag("search.total_content_length", totalContentLength);
span?.SetTag("search.duration_ms", duration);
span?.SetStatus(ActivityStatusCode.Ok);
return results;
}
}
// Activity to synthesize information from search results
public class SynthesizeActivity(
DaprConversationClient conversationClient,
ILogger<SynthesizeActivity> logger) : WorkflowActivity<SynthesizeRequest, string>
{
public override async Task<string> RunAsync(
WorkflowActivityContext context,
SynthesizeRequest request)
{
ArgumentNullException.ThrowIfNull(conversationClient);
ArgumentNullException.ThrowIfNull(request);
ArgumentNullException.ThrowIfNull(request.Results);
var activityName = nameof(SynthesizeActivity);
using var span = Extensions.LlmActivitySource.StartActivity("LLM.Synthesize", ActivityKind.Internal);
span?.SetTag("llm.activity", activityName);
span?.SetTag("llm.model", request.Model ?? "llama");
span?.SetTag("llm.results_count", request.Results.Count);
span?.SetTag("llm.prompt_length", request.OriginalPrompt.Length);
var startTime = DateTime.UtcNow;
var componentName = request.Model ?? "llama";
var promptPreview = request.OriginalPrompt.Length > 100 ? request.OriginalPrompt[..100] + "..." : request.OriginalPrompt;
logger.LogInformation(
"[{Activity} Start] Original Prompt: \"{Prompt}\" | Original Prompt Length: {PromptLength} | Results Count: {ResultsCount} | Model: {Model} | Temperature: {Temperature}",
activityName,
promptPreview,
request.OriginalPrompt.Length,
request.Results.Count,
componentName,
request.Temperature ?? 0.5);
logger.LogDebug(
"[{Activity} Input] Original Prompt: {Prompt} | Results: {Results}",
activityName,
request.OriginalPrompt,
JsonSerializer.Serialize(request.Results.Select(r => new { r.Title, r.Source, ContentLength = r.Content.Length })));
var sourcesText = string.Join("\n\n",
request.Results.Select((r, i) =>
$"[Source {i + 1}]\nTitle: {r.Title}\nContent: {r.Content}\nOrigin: {r.Source}"));
var synthesisPrompt = $"""
Original question: {request.OriginalPrompt}
Sources found:
{sourcesText}
Synthesize the key information from these sources into a coherent summary.
""";
var conversationInput = new ConversationInput(
[
new UserMessage
{
Content = [new MessageContent(synthesisPrompt)]
}
]);
var conversationOptions = new ConversationOptions(componentName)
{
Temperature = request.Temperature ?? 0.5
};
logger.LogDebug(
"[{Activity}] Calling LLM conversation API with component: {Component}",
activityName,
componentName);
var response = await conversationClient.ConverseAsync(
[conversationInput],
conversationOptions,
CancellationToken.None);
if (response?.Outputs is null || response.Outputs.Count == 0)
{
logger.LogError(
"[{Activity} Error] LLM returned no outputs. Response: {Response}",
activityName,
response != null ? JsonSerializer.Serialize(response) : "null");
throw new InvalidOperationException("LLM returned no outputs for synthesis.");
}
logger.LogDebug(
"[{Activity}] Received {OutputCount} output(s) from LLM",
activityName,
response.Outputs.Count);
var responseText = string.Empty;
var choiceCount = 0;
foreach (var output in response.Outputs)
{
if (output?.Choices is null) continue;
foreach (var choice in output.Choices)
{
var content = choice.Message?.Content ?? string.Empty;
responseText += content;
choiceCount++;
}
}
if (string.IsNullOrWhiteSpace(responseText))
{
logger.LogError(
"[{Activity} Error] LLM returned empty response. Outputs Count: {OutputCount}, Choices Processed: {ChoiceCount}",
activityName,
response.Outputs.Count,
choiceCount);
throw new InvalidOperationException("LLM returned empty synthesis.");
}
var duration = (DateTime.UtcNow - startTime).TotalMilliseconds;
var synthesisPreview = responseText.Length > 150 ? responseText[..150] + "..." : responseText;
logger.LogInformation(
"[{Activity} Complete] Duration: {Duration}ms | Synthesis: \"{Synthesis}\" | Synthesis Length: {SynthesisLength} | Choices: {ChoiceCount} | ConversationId: {ConversationId}",
activityName,
duration,
synthesisPreview,
responseText.Length,
choiceCount,
response.ConversationId ?? "none");
logger.LogDebug(
"[{Activity} Output] Synthesis Preview: {Preview}",
activityName,
responseText.Length > 300 ? responseText[..300] + "..." : responseText);
span?.SetTag("llm.synthesis_length", responseText.Length);
span?.SetTag("llm.duration_ms", duration);
span?.SetStatus(ActivityStatusCode.Ok);
return responseText;
}
}
// Activity to call LLM (reusing pattern from AugmentedLlm)
public class CallLlmActivity(
DaprConversationClient conversationClient,
ILogger<CallLlmActivity> logger) : WorkflowActivity<LlmRequest, LlmResponse>
{
public override async Task<LlmResponse> RunAsync(
WorkflowActivityContext context,
LlmRequest request)
{
ArgumentNullException.ThrowIfNull(conversationClient);
ArgumentNullException.ThrowIfNull(request);
ArgumentException.ThrowIfNullOrWhiteSpace(request.Prompt);
var activityName = nameof(CallLlmActivity);
using var span = Extensions.LlmActivitySource.StartActivity("LLM.CallLlm", ActivityKind.Internal);
span?.SetTag("llm.activity", activityName);
span?.SetTag("llm.model", request.Model ?? "llama");
span?.SetTag("llm.temperature", request.Temperature ?? 0.7);
span?.SetTag("llm.prompt_length", request.Prompt.Length);
var startTime = DateTime.UtcNow;
var componentName = request.Model ?? "llama";
var promptPreview = request.Prompt.Length > 150 ? request.Prompt[..150] + "..." : request.Prompt;
logger.LogInformation(
"[{Activity} Start] Model: {Model} | Temperature: {Temperature} | Prompt: \"{Prompt}\" | Prompt Length: {PromptLength}",
activityName,
componentName,
request.Temperature ?? 0.7,
promptPreview,
request.Prompt.Length);
logger.LogDebug(
"[{Activity} Input] Full Request: {Request}",
activityName,
JsonSerializer.Serialize(new
{
request.Model,
request.Temperature,
PromptLength = request.Prompt.Length,
PromptPreview = request.Prompt.Length > 200 ? request.Prompt[..200] + "..." : request.Prompt,
ContextKeys = request.Context?.Keys.ToArray() ?? Array.Empty<string>()
}));
var conversationInput = new ConversationInput(
[
new UserMessage
{
Content = [new MessageContent(request.Prompt)]
}
]);
var conversationOptions = new ConversationOptions(componentName)
{
Temperature = request.Temperature ?? 0.7
};
logger.LogDebug(
"[{Activity}] Calling LLM conversation API with component: {Component}",
activityName,
componentName);
var response = await conversationClient.ConverseAsync(
[conversationInput],
conversationOptions,
CancellationToken.None);
if (response?.Outputs is null || response.Outputs.Count == 0)
{
logger.LogError(
"[{Activity} Error] LLM returned no outputs. Response: {Response}",
activityName,
response != null ? JsonSerializer.Serialize(response) : "null");
throw new InvalidOperationException("LLM returned no outputs.");
}
logger.LogDebug(
"[{Activity}] Received {OutputCount} output(s) from LLM",
activityName,
response.Outputs.Count);
var responseText = string.Empty;
var choiceCount = 0;
foreach (var output in response.Outputs)
{
if (output?.Choices is null) continue;
foreach (var choice in output.Choices)
{
var content = choice.Message?.Content ?? string.Empty;
responseText += content;
choiceCount++;
}
}
if (string.IsNullOrWhiteSpace(responseText))
{
logger.LogError(
"[{Activity} Error] LLM returned empty response. Outputs Count: {OutputCount}, Choices Processed: {ChoiceCount}",
activityName,
response.Outputs.Count,
choiceCount);
throw new InvalidOperationException("LLM returned empty response.");
}
var duration = (DateTime.UtcNow - startTime).TotalMilliseconds;
var result = new LlmResponse(
responseText,
new Dictionary<string, object>
{
["model"] = componentName,
["temperature"] = request.Temperature ?? 0.7,
["timestamp"] = DateTime.UtcNow,
["conversationId"] = response.ConversationId ?? string.Empty
});
logger.LogInformation(
"[{Activity} Complete] Duration: {Duration}ms | Response Length: {ResponseLength} | Choices: {ChoiceCount} | ConversationId: {ConversationId}",
activityName,
duration,
responseText.Length,
choiceCount,
response.ConversationId ?? "none");
logger.LogDebug(
"[{Activity} Output] Response Preview: {Preview} | Metadata: {Metadata}",
activityName,
responseText.Length > 300 ? responseText[..300] + "..." : responseText,
JsonSerializer.Serialize(result.Metadata));
span?.SetTag("llm.response_length", responseText.Length);
span?.SetTag("llm.conversation_id", response.ConversationId ?? "none");
span?.SetTag("llm.duration_ms", duration);
span?.SetStatus(ActivityStatusCode.Ok);
return result;
}
}
// Request/Response models for activities
public record AnalyzeIntentRequest(
string Prompt,
string? Model = null,
double? Temperature = null);
public record GenerateQueriesRequest(
string Prompt,
string Intent,
string? Model = null,
double? Temperature = null);
public record SynthesizeRequest(
List<SearchResult> Results,
string OriginalPrompt,
string? Model = null,
double? Temperature = null);If you walk through the code, you will see that it implements a Dapr Workflow based on the Prompt Chaining pattern. This pattern represents a multi-step LLM orchestration in which a user query (or prompt) is processed through a sequence of activities, with each step’s output feeding into the next.
The workflow consists of five main steps (activities):
- Analyze Intent – Classifies the user’s query into one of five intents (or categories): factual, explanatory, comparative, creative, or analytical. This classification guides how subsequent queries are generated.
- Generate Queries – Uses the LLM to create three diverse search queries based on the original prompt and the detected intent.
- Search Data – Retrieves information for a single search query and runs in parallel for all generated queries. It currently returns mock data simulating results from a Knowledge Base, Technical Library, and Research Database, and is designed to be replaced with real search or retrieval APIs.
- Synthesize – Consolidates all search results into a coherent summary by formatting sources and instructing the LLM to synthesize the key information.
- Call LLM – Generates the final response by combining the synthesized information with the original user query.
Aspire, Diagrid, Redis Insight UIs (or dashbaords)
To run the application, execute the following command.
aspire run

Let’s run the web application
Run the web application from the Aspire Dashboard. Once it launches, select Prompt Chaining (Multi-Step) as the service type. You can then start a conversation and observe the application’s behavior. Using the Diagrid Dashboard, you can inspect the underlying workflow state and review historical execution data.

Now, let’s start structuring a new Minimal API for the Routing pattern.
Routing
In a routing workflow, inputs are classified and directed to specialized follow-up tasks. This design promotes separation of concerns and allows for more specialized prompt construction.

dotnet add package Dapr.AI
dotnet add package Dapr.Workflow// Program.cs
#pragma warning disable DAPR_CONVERSATION
using System.ComponentModel.DataAnnotations;
using Dapr.AI.Conversation.Extensions;
using Dapr.Workflow;
using AspireWithDapr.Routing.Models;
using AspireWithDapr.Routing.Workflow;
var builder = WebApplication.CreateBuilder(args);
builder.AddServiceDefaults();
builder.Services.AddProblemDetails();
builder.Services.AddOpenApi();
builder.Services.AddDaprConversationClient();
builder.Services.AddDaprWorkflow(options =>
{
options.RegisterWorkflow<RoutingWorkflow>();
options.RegisterActivity<RouteClassifierActivity>();
options.RegisterActivity<TechnicalHandlerActivity>();
options.RegisterActivity<CreativeHandlerActivity>();
options.RegisterActivity<AnalyticalHandlerActivity>();
options.RegisterActivity<GeneralHandlerActivity>();
options.RegisterActivity<LoggingActivity>();
});
var app = builder.Build();
// Initialize workflow logger with the app's logger factory
var loggerFactory = app.Services.GetRequiredService<ILoggerFactory>();
RoutingWorkflow.SetLogger(loggerFactory);
app.UseExceptionHandler();
if (app.Environment.IsDevelopment())
{
app.MapOpenApi();
}
app.MapGet("/", () => Results.Ok(new {
service = "Routing LLM API",
version = "1.0.0",
endpoints = new[] { "/llm/query", "/llm/query/{instanceId}", "/llm/query/{instanceId}/status", "/health" },
routes = new[] { "technical", "creative", "analytical", "general" }
}))
.WithName("GetServiceInfo")
.WithTags("Info");
app.MapPost("/llm/query", async (
[Required] LlmRequest request,
DaprWorkflowClient workflowClient,
ILogger<Program> logger,
CancellationToken cancellationToken) =>
{
if (string.IsNullOrWhiteSpace(request.Prompt))
{
return Results.BadRequest(new { error = "Prompt is required and cannot be empty." });
}
try
{
logger.LogInformation("Starting Routing workflow for prompt: {Prompt} with length {PromptLength} characters", request.Prompt, request.Prompt.Length);
var instanceId = Guid.NewGuid().ToString();
await workflowClient.ScheduleNewWorkflowAsync(
nameof(RoutingWorkflow),
instanceId,
request);
logger.LogInformation("Workflow started with instance ID: {InstanceId}", instanceId);
return Results.Accepted($"/llm/query/{instanceId}", new {
instanceId,
status = "started"
});
}
catch (Exception ex)
{
logger.LogError(ex, "Error starting workflow");
return Results.Problem(
detail: "An error occurred while starting the workflow.",
statusCode: StatusCodes.Status500InternalServerError);
}
})
.WithName("StartRoutingQuery")
.WithTags("LLM")
.Produces<LlmResponse>(StatusCodes.Status202Accepted)
.Produces(StatusCodes.Status400BadRequest)
.Produces(StatusCodes.Status500InternalServerError);
app.MapGet("/llm/query/{instanceId}", async (
string instanceId,
DaprWorkflowClient workflowClient,
ILogger<Program> logger,
CancellationToken cancellationToken) =>
{
try
{
var workflowState = await workflowClient.GetWorkflowStateAsync(instanceId);
if (workflowState is null)
{
return Results.NotFound(new { error = $"Workflow instance {instanceId} not found." });
}
if (workflowState.RuntimeStatus != WorkflowRuntimeStatus.Completed)
{
return Results.Accepted($"/llm/query/{instanceId}", new
{
instanceId,
runtimeStatus = workflowState.RuntimeStatus.ToString()
});
}
var output = workflowState.ReadOutputAs<LlmResponse>();
return Results.Ok(new
{
instanceId,
runtimeStatus = workflowState.RuntimeStatus.ToString(),
result = output
});
}
catch (Exception ex)
{
logger.LogError(ex, "Error retrieving workflow state for instance {InstanceId}", instanceId);
return Results.Problem(
detail: "An error occurred while retrieving the workflow state.",
statusCode: StatusCodes.Status500InternalServerError);
}
})
.WithName("GetRoutingQueryResult")
.WithTags("LLM")
.Produces<LlmResponse>()
.Produces(StatusCodes.Status404NotFound)
.Produces(StatusCodes.Status500InternalServerError);
app.MapGet("/llm/query/{instanceId}/status", async (
string instanceId,
DaprWorkflowClient workflowClient,
ILogger<Program> logger,
CancellationToken cancellationToken) =>
{
try
{
var workflowState = await workflowClient.GetWorkflowStateAsync(instanceId);
if (workflowState is null)
{
return Results.NotFound(new { error = $"Workflow instance {instanceId} not found." });
}
var status = workflowState.RuntimeStatus;
return Results.Ok(new
{
instanceId,
runtimeStatus = status.ToString(),
isCompleted = status == WorkflowRuntimeStatus.Completed,
isRunning = status == WorkflowRuntimeStatus.Running,
isFailed = status == WorkflowRuntimeStatus.Failed,
isTerminated = status == WorkflowRuntimeStatus.Terminated,
isPending = status == WorkflowRuntimeStatus.Pending
});
}
catch (Exception ex)
{
logger.LogError(ex, "Error retrieving workflow status for instance {InstanceId}", instanceId);
return Results.Problem(
detail: "An error occurred while retrieving the workflow status.",
statusCode: StatusCodes.Status500InternalServerError);
}
})
.WithName("GetRoutingWorkflowStatus")
.WithTags("LLM")
.Produces(StatusCodes.Status200OK)
.Produces(StatusCodes.Status404NotFound)
.Produces(StatusCodes.Status500InternalServerError);
app.MapDefaultEndpoints();
app.Run();// LlmModels.cs
using System.ComponentModel.DataAnnotations;
namespace AspireWithDapr.Routing.Models;
public record LlmRequest(
[Required] [MinLength(1)] string Prompt,
Dictionary<string, object>? Context = null,
string? Model = null,
double? Temperature = null);
public record LlmResponse(
string Response,
Dictionary<string, object>? Metadata = null);// RoutingWorkflow.cs
#pragma warning disable DAPR_CONVERSATION
using Dapr.Workflow;
using Dapr.AI.Conversation;
using Dapr.AI.Conversation.ConversationRoles;
using AspireWithDapr.Routing.Models;
using System.Text.Json;
namespace AspireWithDapr.Routing.Workflow;
public class RoutingWorkflow : Workflow<LlmRequest, LlmResponse>
{
private static ILogger? _logger;
public static void SetLogger(ILoggerFactory loggerFactory)
{
_logger = loggerFactory.CreateLogger<RoutingWorkflow>();
}
public override async Task<LlmResponse> RunAsync(
WorkflowContext context,
LlmRequest request)
{
ArgumentNullException.ThrowIfNull(request);
ArgumentException.ThrowIfNullOrWhiteSpace(request.Prompt);
// Create logger if not set, using a default factory as fallback
var logger = _logger ?? LoggerFactory.Create(builder => builder.AddConsole()).CreateLogger<RoutingWorkflow>();
var workflowInstanceId = context.InstanceId;
// Log workflow start using activity to ensure it's captured
var promptPreview = request.Prompt.Length > 100 ? request.Prompt[..100] + "..." : request.Prompt;
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Workflow Start] InstanceId: {workflowInstanceId} | Prompt: \"{promptPreview}\" | Prompt Length: {request.Prompt.Length} | Model: {request.Model ?? "llama"} | Temperature: {request.Temperature ?? 0.7}"));
// Log full initial prompt for end-to-end tracking
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Initial Prompt] InstanceId: {workflowInstanceId}\n---\n{request.Prompt}\n---"));
context.SetCustomStatus(new { step = "classifying" });
// Step 1: Classify the query
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Step 1: RouteClassifier] InstanceId: {workflowInstanceId} | Input Prompt: {promptPreview}"));
string? route = null;
try
{
route = await context.CallActivityAsync<string>(
nameof(RouteClassifierActivity),
new RouteClassifierRequest(request.Prompt, request.Model, request.Temperature));
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Step 1: RouteClassifier Complete] InstanceId: {workflowInstanceId} | Route: {route}"));
}
catch (Exception ex)
{
logger.LogError(ex,
"[Step 1: RouteClassifier Failed] InstanceId: {InstanceId} | Error: {Error}",
workflowInstanceId,
ex.Message);
throw;
}
context.SetCustomStatus(new { step = "routing", route });
// Step 2: Route to appropriate handler
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Step 2: Routing] InstanceId: {workflowInstanceId} | Route: {route} | Routing to handler..."));
LlmResponse? response = null;
try
{
response = route switch
{
"technical" => await context.CallActivityAsync<LlmResponse>(
nameof(TechnicalHandlerActivity),
new HandlerRequest(request, route)),
"creative" => await context.CallActivityAsync<LlmResponse>(
nameof(CreativeHandlerActivity),
new HandlerRequest(request, route)),
"analytical" => await context.CallActivityAsync<LlmResponse>(
nameof(AnalyticalHandlerActivity),
new HandlerRequest(request, route)),
_ => await context.CallActivityAsync<LlmResponse>(
nameof(GeneralHandlerActivity),
new HandlerRequest(request, route ?? "general"))
};
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Step 2: Routing Complete] InstanceId: {workflowInstanceId} | Route: {route} | Response Length: {response?.Response?.Length ?? 0}"));
// Log full response after routing
if (response?.Response != null)
{
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Response After Routing] InstanceId: {workflowInstanceId}\n---\n{response.Response}\n---"));
}
}
catch (Exception ex)
{
logger.LogError(ex,
"[Step 2: Routing Failed] InstanceId: {InstanceId} | Route: {Route} | Error: {Error}",
workflowInstanceId,
route,
ex.Message);
throw;
}
if (response?.Metadata != null)
{
logger.LogDebug(
"[Step 2: Routing Metadata] InstanceId: {InstanceId} | Metadata: {Metadata}",
workflowInstanceId,
JsonSerializer.Serialize(response.Metadata));
}
context.SetCustomStatus(new { step = "completed", route });
// Enhance metadata with routing information
var enhancedMetadata = new Dictionary<string, object>(response?.Metadata ?? new Dictionary<string, object>())
{
["route"] = route ?? "general",
["workflow_type"] = "routing"
};
var finalResponse = response! with { Metadata = enhancedMetadata };
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Workflow Complete] InstanceId: {workflowInstanceId} | Total Steps: 2 | Route: {route} | Final Response Length: {finalResponse.Response?.Length ?? 0}"));
if (finalResponse.Metadata != null)
{
logger.LogDebug(
"[Workflow Final Metadata] InstanceId: {InstanceId} | Metadata: {Metadata}",
workflowInstanceId,
JsonSerializer.Serialize(finalResponse.Metadata));
}
if (finalResponse is null)
{
logger.LogError(
"[Workflow Error] InstanceId: {InstanceId} | Final response is null",
workflowInstanceId);
throw new InvalidOperationException("Workflow completed but final response is null.");
}
return finalResponse;
}
}
// Logging activity to ensure workflow logs are captured
public class LoggingActivity(ILogger<LoggingActivity> logger) : WorkflowActivity<LogMessage, bool>
{
public override Task<bool> RunAsync(WorkflowActivityContext context, LogMessage message)
{
try
{
switch (message.Level.ToLower())
{
case "error":
logger.LogError("[Workflow] {Message}", message.Message);
break;
case "warning":
logger.LogWarning("[Workflow] {Message}", message.Message);
break;
case "debug":
logger.LogDebug("[Workflow] {Message}", message.Message);
break;
default:
logger.LogInformation("[Workflow] {Message}", message.Message);
break;
}
return Task.FromResult(true);
}
catch (Exception ex)
{
logger.LogError(ex, "[LoggingActivity] Error logging message: {Message}", message.Message);
return Task.FromResult(false);
}
}
}
public record LogMessage(string Message, string Level = "Information");
// Activity to classify and route queries
public class RouteClassifierActivity(
DaprConversationClient conversationClient,
ILogger<RouteClassifierActivity> logger) : WorkflowActivity<RouteClassifierRequest, string>
{
public override async Task<string> RunAsync(
WorkflowActivityContext context,
RouteClassifierRequest request)
{
ArgumentNullException.ThrowIfNull(conversationClient);
ArgumentNullException.ThrowIfNull(request);
ArgumentException.ThrowIfNullOrWhiteSpace(request.Prompt);
var activityName = nameof(RouteClassifierActivity);
var startTime = DateTime.UtcNow;
var componentName = request.Model ?? "llama";
var promptPreview = request.Prompt.Length > 100 ? request.Prompt[..100] + "..." : request.Prompt;
logger.LogInformation(
"[{Activity} Start] Prompt: \"{Prompt}\" | Prompt Length: {PromptLength} | Model: {Model} | Temperature: {Temperature}",
activityName,
promptPreview,
request.Prompt.Length,
componentName,
request.Temperature ?? 0.3);
logger.LogDebug(
"[{Activity} Input] Full Request: {Request}",
activityName,
JsonSerializer.Serialize(new
{
PromptLength = request.Prompt.Length,
PromptPreview = request.Prompt.Length > 200 ? request.Prompt[..200] + "..." : request.Prompt,
request.Model,
request.Temperature
}));
var classificationPrompt = $"""
Classify this query as technical, creative, or analytical:
"{request.Prompt}"
Respond with exactly one word: technical, creative, or analytical.
""";
var conversationInput = new ConversationInput(
[
new UserMessage
{
Content = [new MessageContent(classificationPrompt)]
}
]);
var conversationOptions = new ConversationOptions(componentName)
{
Temperature = request.Temperature ?? 0.3
};
logger.LogDebug(
"[{Activity}] Calling LLM conversation API with component: {Component}",
activityName,
componentName);
var response = await conversationClient.ConverseAsync(
[conversationInput],
conversationOptions,
CancellationToken.None);
if (response?.Outputs is null || response.Outputs.Count == 0)
{
logger.LogError(
"[{Activity} Error] LLM returned no outputs. Response: {Response}",
activityName,
response != null ? JsonSerializer.Serialize(response) : "null");
throw new InvalidOperationException("LLM returned no outputs for route classification.");
}
logger.LogDebug(
"[{Activity}] Received {OutputCount} output(s) from LLM",
activityName,
response.Outputs.Count);
var responseText = string.Empty;
var choiceCount = 0;
foreach (var output in response.Outputs)
{
if (output?.Choices is null) continue;
foreach (var choice in output.Choices)
{
var content = choice.Message?.Content ?? string.Empty;
responseText += content;
choiceCount++;
}
}
if (string.IsNullOrWhiteSpace(responseText))
{
logger.LogError(
"[{Activity} Error] LLM returned empty response. Outputs Count: {OutputCount}, Choices Processed: {ChoiceCount}",
activityName,
response.Outputs.Count,
choiceCount);
throw new InvalidOperationException("LLM returned empty response for route classification.");
}
var route = responseText.Trim().ToLower();
// Validate route is one of the expected values
var validRoutes = new[] { "technical", "creative", "analytical" };
if (!validRoutes.Contains(route))
{
// Default to general if route doesn't match
route = "general";
logger.LogWarning("[{Activity}] Invalid route '{Route}', defaulting to general", activityName, responseText);
}
var duration = (DateTime.UtcNow - startTime).TotalMilliseconds;
logger.LogInformation(
"[{Activity} Complete] Duration: {Duration}ms | Route: {Route} | Choices: {ChoiceCount} | ConversationId: {ConversationId}",
activityName,
duration,
route,
choiceCount,
response.ConversationId ?? "none");
logger.LogDebug(
"[{Activity} Output] Route: {Route}",
activityName,
route);
return route;
}
}
// Handler Activity for Technical queries
public class TechnicalHandlerActivity(
DaprConversationClient conversationClient,
ILogger<TechnicalHandlerActivity> logger) : WorkflowActivity<HandlerRequest, LlmResponse>
{
public override async Task<LlmResponse> RunAsync(
WorkflowActivityContext context,
HandlerRequest request)
{
ArgumentNullException.ThrowIfNull(conversationClient);
ArgumentNullException.ThrowIfNull(request);
ArgumentNullException.ThrowIfNull(request.Request);
var activityName = nameof(TechnicalHandlerActivity);
var startTime = DateTime.UtcNow;
var componentName = request.Request.Model ?? "llama";
var promptPreview = request.Request.Prompt.Length > 150 ? request.Request.Prompt[..150] + "..." : request.Request.Prompt;
logger.LogInformation(
"[{Activity} Start] Route: {Route} | Model: {Model} | Temperature: {Temperature} | Prompt: \"{Prompt}\" | Prompt Length: {PromptLength}",
activityName,
request.Route,
componentName,
0.3,
promptPreview,
request.Request.Prompt.Length);
var technicalPrompt = $"""
[TECHNICAL MODE]
You are a technical expert. Provide detailed, accurate technical information.
Question: {request.Request.Prompt}
Requirements:
1. Be precise and technical
2. Include code examples if relevant
3. Mention best practices
4. Use technical terminology appropriately
Answer:
""";
var llmRequest = new LlmRequest(
technicalPrompt,
request.Request.Context,
componentName,
0.3); // Lower temperature for technical accuracy
return await LlmHelper.CallLlmAsync(conversationClient, logger, activityName, startTime, llmRequest, request.Route);
}
}
// Handler Activity for Creative queries
public class CreativeHandlerActivity(
DaprConversationClient conversationClient,
ILogger<CreativeHandlerActivity> logger) : WorkflowActivity<HandlerRequest, LlmResponse>
{
public override async Task<LlmResponse> RunAsync(
WorkflowActivityContext context,
HandlerRequest request)
{
ArgumentNullException.ThrowIfNull(conversationClient);
ArgumentNullException.ThrowIfNull(request);
ArgumentNullException.ThrowIfNull(request.Request);
var activityName = nameof(CreativeHandlerActivity);
var startTime = DateTime.UtcNow;
var componentName = request.Request.Model ?? "llama";
var promptPreview = request.Request.Prompt.Length > 150 ? request.Request.Prompt[..150] + "..." : request.Request.Prompt;
logger.LogInformation(
"[{Activity} Start] Route: {Route} | Model: {Model} | Temperature: {Temperature} | Prompt: \"{Prompt}\" | Prompt Length: {PromptLength}",
activityName,
request.Route,
componentName,
0.9,
promptPreview,
request.Request.Prompt.Length);
var creativePrompt = $"""
[CREATIVE MODE]
You are a creative writer. Provide imaginative, engaging responses.
Request: {request.Request.Prompt}
Requirements:
1. Be creative and expressive
2. Use descriptive language
3. Engage the reader emotionally
4. Think outside the box
Response:
""";
var llmRequest = new LlmRequest(
creativePrompt,
request.Request.Context,
componentName,
0.9); // Higher temperature for creativity
return await LlmHelper.CallLlmAsync(conversationClient, logger, activityName, startTime, llmRequest, request.Route);
}
}
// Handler Activity for Analytical queries
public class AnalyticalHandlerActivity(
DaprConversationClient conversationClient,
ILogger<AnalyticalHandlerActivity> logger) : WorkflowActivity<HandlerRequest, LlmResponse>
{
public override async Task<LlmResponse> RunAsync(
WorkflowActivityContext context,
HandlerRequest request)
{
ArgumentNullException.ThrowIfNull(conversationClient);
ArgumentNullException.ThrowIfNull(request);
ArgumentNullException.ThrowIfNull(request.Request);
var activityName = nameof(AnalyticalHandlerActivity);
var startTime = DateTime.UtcNow;
var componentName = request.Request.Model ?? "llama";
var promptPreview = request.Request.Prompt.Length > 150 ? request.Request.Prompt[..150] + "..." : request.Request.Prompt;
logger.LogInformation(
"[{Activity} Start] Route: {Route} | Model: {Model} | Temperature: {Temperature} | Prompt: \"{Prompt}\" | Prompt Length: {PromptLength}",
activityName,
request.Route,
componentName,
0.5,
promptPreview,
request.Request.Prompt.Length);
var analyticalPrompt = $"""
[ANALYTICAL MODE]
You are an analytical thinker. Provide structured, logical analysis.
Question: {request.Request.Prompt}
Requirements:
1. Analyze pros and cons
2. Consider multiple perspectives
3. Provide structured reasoning
4. Support with evidence/logic
Analysis:
""";
var llmRequest = new LlmRequest(
analyticalPrompt,
request.Request.Context,
componentName,
0.5); // Medium temperature for balanced analysis
return await LlmHelper.CallLlmAsync(conversationClient, logger, activityName, startTime, llmRequest, request.Route);
}
}
// Handler Activity for General/Default queries
public class GeneralHandlerActivity(
DaprConversationClient conversationClient,
ILogger<GeneralHandlerActivity> logger) : WorkflowActivity<HandlerRequest, LlmResponse>
{
public override async Task<LlmResponse> RunAsync(
WorkflowActivityContext context,
HandlerRequest request)
{
ArgumentNullException.ThrowIfNull(conversationClient);
ArgumentNullException.ThrowIfNull(request);
ArgumentNullException.ThrowIfNull(request.Request);
var activityName = nameof(GeneralHandlerActivity);
var startTime = DateTime.UtcNow;
var componentName = request.Request.Model ?? "llama";
var temperature = request.Request.Temperature ?? 0.7;
var promptPreview = request.Request.Prompt.Length > 150 ? request.Request.Prompt[..150] + "..." : request.Request.Prompt;
logger.LogInformation(
"[{Activity} Start] Route: {Route} | Model: {Model} | Temperature: {Temperature} | Prompt: \"{Prompt}\" | Prompt Length: {PromptLength}",
activityName,
request.Route,
componentName,
temperature,
promptPreview,
request.Request.Prompt.Length);
// Use original prompt and settings for general handler
var llmRequest = new LlmRequest(
request.Request.Prompt,
request.Request.Context,
componentName,
temperature);
return await LlmHelper.CallLlmAsync(conversationClient, logger, activityName, startTime, llmRequest, request.Route);
}
}
// Shared helper class for calling LLM
internal static class LlmHelper
{
public static async Task<LlmResponse> CallLlmAsync(
DaprConversationClient conversationClient,
ILogger logger,
string activityName,
DateTime startTime,
LlmRequest llmRequest,
string route)
{
var componentName = llmRequest.Model ?? "llama";
var promptPreview = llmRequest.Prompt.Length > 150 ? llmRequest.Prompt[..150] + "..." : llmRequest.Prompt;
logger.LogDebug(
"[{Activity} Input] Full Request: {Request}",
activityName,
JsonSerializer.Serialize(new
{
llmRequest.Model,
llmRequest.Temperature,
PromptLength = llmRequest.Prompt.Length,
PromptPreview = llmRequest.Prompt.Length > 200 ? llmRequest.Prompt[..200] + "..." : llmRequest.Prompt,
ContextKeys = llmRequest.Context?.Keys.ToArray() ?? Array.Empty<string>()
}));
var conversationInput = new ConversationInput(
[
new UserMessage
{
Content = [new MessageContent(llmRequest.Prompt)]
}
]);
var conversationOptions = new ConversationOptions(componentName)
{
Temperature = llmRequest.Temperature ?? 0.7
};
logger.LogDebug(
"[{Activity}] Calling LLM conversation API with component: {Component}",
activityName,
componentName);
var response = await conversationClient.ConverseAsync(
[conversationInput],
conversationOptions,
CancellationToken.None);
if (response?.Outputs is null || response.Outputs.Count == 0)
{
logger.LogError(
"[{Activity} Error] LLM returned no outputs. Response: {Response}",
activityName,
response != null ? JsonSerializer.Serialize(response) : "null");
throw new InvalidOperationException($"LLM returned no outputs for {activityName}.");
}
logger.LogDebug(
"[{Activity}] Received {OutputCount} output(s) from LLM",
activityName,
response.Outputs.Count);
var responseText = string.Empty;
var choiceCount = 0;
foreach (var output in response.Outputs)
{
if (output?.Choices is null) continue;
foreach (var choice in output.Choices)
{
var content = choice.Message?.Content ?? string.Empty;
responseText += content;
choiceCount++;
}
}
if (string.IsNullOrWhiteSpace(responseText))
{
logger.LogError(
"[{Activity} Error] LLM returned empty response. Outputs Count: {OutputCount}, Choices Processed: {ChoiceCount}",
activityName,
response.Outputs.Count,
choiceCount);
throw new InvalidOperationException($"LLM returned empty response for {activityName}.");
}
var duration = (DateTime.UtcNow - startTime).TotalMilliseconds;
var result = new LlmResponse(
responseText,
new Dictionary<string, object>
{
["model"] = componentName,
["temperature"] = llmRequest.Temperature ?? 0.7,
["timestamp"] = DateTime.UtcNow,
["conversationId"] = response.ConversationId ?? string.Empty,
["route"] = route
});
logger.LogInformation(
"[{Activity} Complete] Duration: {Duration}ms | Response Length: {ResponseLength} | Choices: {ChoiceCount} | ConversationId: {ConversationId} | Route: {Route}",
activityName,
duration,
responseText.Length,
choiceCount,
response.ConversationId ?? "none",
route);
logger.LogDebug(
"[{Activity} Output] Response Preview: {Preview} | Metadata: {Metadata}",
activityName,
responseText.Length > 300 ? responseText[..300] + "..." : responseText,
JsonSerializer.Serialize(result.Metadata));
return result;
}
}
// Request/Response models for activities
public record RouteClassifierRequest(
string Prompt,
string? Model = null,
double? Temperature = null);
public record HandlerRequest(
LlmRequest Request,
string Route);If you walk through the code, you will see that it implements a Dapr Workflow based on the Routing pattern. This pattern represents a classification-based approach that routes queries to specialized handlers based on the query type.
The workflow consists of five main steps (activities):
-
Route Classifier – Asks the LLM to classify the user's query into one of three categories: technical, creative, or analytical. Uses low temperature (0.3) for consistent classification. Falls back to "general" if classification is invalid.
-
Technical Handler – Handles technical queries with a specialized prompt emphasizing precision, code examples, and best practices. Uses low temperature (0.3) for accurate, deterministic responses.
-
Creative Handler – Handles creative queries with a prompt encouraging imagination, descriptive language, and emotional engagement. Uses high temperature (0.9) for maximum creativity and variety.
-
Analytical Handler - Handles analytical queries with a prompt focused on structured reasoning, pros/cons analysis, and multiple perspectives. Uses medium temperature (0.5) for balanced, logical responses.
-
General Handler - Fallback handler for unclassified queries. Uses the original prompt without modification and the user's specified temperature (defaults to 0.7).
NOTE: Before running the application, ensure that the Routing project is added to the AppHost and that the web application’s reference is updated accordingly. Refer to the code snippet below.
// AppHost.cs
// Pattern - 3. Routing
var routingService = builder.AddProject<Projects.AspireWithDapr_Routing>("Routing")
.WithExternalHttpEndpoints()
.WithHttpHealthCheck("/health")
.WithDaprSidecar(new DaprSidecarOptions
{
ResourcesPaths = ["components"],
Config = "components/daprConfig.yaml"
})
.WithEnvironment("DAPR_HOST_IP", "127.0.0.1")
.WithEnvironment("ZIPKIN_ENDPOINT", "http://localhost:9411/api/v2/spans");
// Web Frontend
builder.AddProject<Projects.AspireWithDapr_Web>("webfrontend")
.WithExternalHttpEndpoints()
.WithHttpHealthCheck("/health")
.WithReference(augmentedLlmService)
.WaitFor(augmentedLlmService)
.WithReference(statefulLlmService)
.WaitFor(statefulLlmService)
.WithReference(promptChainingService)
.WaitFor(promptChainingService)
.WithReference(routingService)
.WaitFor(routingService)
.WithDaprSidecar(new DaprSidecarOptions
{
ResourcesPaths = ["components"],
Config = "components/daprConfig.yaml"
})
.WithEnvironment("DAPR_HOST_IP", "127.0.0.1")
.WithEnvironment("ZIPKIN_ENDPOINT", "http://localhost:9411/api/v2/spans");Aspire, Diagrid, Redis Insight UIs (or dashbaords)
To run the application, execute the following command.
aspire run

Let’s run the web application
Run the web application from the Aspire Dashboard. Once it launches, select Routing (Specialized Handlers) as the service type. You can then start a conversation and observe the application’s behavior. Using the Diagrid Dashboard, you can inspect the underlying workflow state and review historical execution data.

Now, let’s start preparing a new Minimal API for the Parallelization pattern.
Parallelization
LLMs may work concurrently on a task, with results aggregated programmatically. Parallelization generally manifests in two patterns:
- Sectioning: Independent subtasks executed in parallel.
- Voting: Multiple executions of the same task to generate diverse outputs.

// LlmModels.cs
using System.ComponentModel.DataAnnotations;
namespace AspireWithDapr.Parallelization.Models;
public record LlmRequest(
[Required] [MinLength(1)] string Prompt,
Dictionary<string, object>? Context = null,
string? Model = null,
double? Temperature = null);
public record LlmResponse(
string Response,
Dictionary<string, object>? Metadata = null);
public record ParallelRequest(
[Required] [MinLength(1)] string Prompt,
int ParallelTasks = 4,
Dictionary<string, object>? Context = null,
string? Model = null,
double? Temperature = null);// Program.cs
#pragma warning disable DAPR_CONVERSATION
using System.ComponentModel.DataAnnotations;
using Dapr.AI.Conversation.Extensions;
using Dapr.Workflow;
using AspireWithDapr.Parallelization.Models;
using AspireWithDapr.Parallelization.Workflow;
var builder = WebApplication.CreateBuilder(args);
builder.AddServiceDefaults();
builder.Services.AddProblemDetails();
builder.Services.AddOpenApi();
builder.Services.AddDaprConversationClient();
builder.Services.AddDaprWorkflow(options =>
{
options.RegisterWorkflow<ParallelizationWorkflow>();
options.RegisterActivity<GeneratePerspectivesActivity>();
options.RegisterActivity<CallLlmActivity>();
options.RegisterActivity<SynthesizeParallelActivity>();
options.RegisterActivity<LoggingActivity>();
});
var app = builder.Build();
// Initialize workflow logger with the app's logger factory
var loggerFactory = app.Services.GetRequiredService<ILoggerFactory>();
ParallelizationWorkflow.SetLogger(loggerFactory);
app.UseExceptionHandler();
if (app.Environment.IsDevelopment())
{
app.MapOpenApi();
}
app.MapGet("/", () => Results.Ok(new {
service = "Parallelization LLM API",
version = "1.0.0",
endpoints = new[] { "/llm/query", "/llm/query/{instanceId}", "/llm/query/{instanceId}/status", "/health" },
description = "Executes multiple LLM calls in parallel from different perspectives, then synthesizes results"
}))
.WithName("GetServiceInfo")
.WithTags("Info");
app.MapPost("/llm/query", async (
[Required] ParallelRequest request,
DaprWorkflowClient workflowClient,
ILogger<Program> logger,
CancellationToken cancellationToken) =>
{
if (string.IsNullOrWhiteSpace(request.Prompt))
{
return Results.BadRequest(new { error = "Prompt is required and cannot be empty." });
}
if (request.ParallelTasks < 1 || request.ParallelTasks > 8)
{
return Results.BadRequest(new { error = "ParallelTasks must be between 1 and 8." });
}
try
{
logger.LogInformation("Starting Parallelization workflow for prompt: {Prompt} with length {PromptLength} characters | Parallel Tasks: {ParallelTasks}",
request.Prompt, request.Prompt.Length, request.ParallelTasks);
var instanceId = Guid.NewGuid().ToString();
await workflowClient.ScheduleNewWorkflowAsync(
nameof(ParallelizationWorkflow),
instanceId,
request);
logger.LogInformation("Workflow started with instance ID: {InstanceId}", instanceId);
return Results.Accepted($"/llm/query/{instanceId}", new {
instanceId,
status = "started"
});
}
catch (Exception ex)
{
logger.LogError(ex, "Error starting workflow");
return Results.Problem(
detail: "An error occurred while starting the workflow.",
statusCode: StatusCodes.Status500InternalServerError);
}
})
.WithName("StartParallelizationQuery")
.WithTags("LLM")
.Produces<LlmResponse>(StatusCodes.Status202Accepted)
.Produces(StatusCodes.Status400BadRequest)
.Produces(StatusCodes.Status500InternalServerError);
app.MapGet("/llm/query/{instanceId}", async (
string instanceId,
DaprWorkflowClient workflowClient,
ILogger<Program> logger,
CancellationToken cancellationToken) =>
{
try
{
var workflowState = await workflowClient.GetWorkflowStateAsync(instanceId);
if (workflowState is null)
{
return Results.NotFound(new { error = $"Workflow instance {instanceId} not found." });
}
if (workflowState.RuntimeStatus != WorkflowRuntimeStatus.Completed)
{
return Results.Accepted($"/llm/query/{instanceId}", new
{
instanceId,
runtimeStatus = workflowState.RuntimeStatus.ToString()
});
}
var output = workflowState.ReadOutputAs<LlmResponse>();
return Results.Ok(new
{
instanceId,
runtimeStatus = workflowState.RuntimeStatus.ToString(),
result = output
});
}
catch (Exception ex)
{
logger.LogError(ex, "Error retrieving workflow state for instance {InstanceId}", instanceId);
return Results.Problem(
detail: "An error occurred while retrieving the workflow state.",
statusCode: StatusCodes.Status500InternalServerError);
}
})
.WithName("GetParallelizationQueryResult")
.WithTags("LLM")
.Produces<LlmResponse>()
.Produces(StatusCodes.Status404NotFound)
.Produces(StatusCodes.Status500InternalServerError);
app.MapGet("/llm/query/{instanceId}/status", async (
string instanceId,
DaprWorkflowClient workflowClient,
ILogger<Program> logger,
CancellationToken cancellationToken) =>
{
try
{
var workflowState = await workflowClient.GetWorkflowStateAsync(instanceId);
if (workflowState is null)
{
return Results.NotFound(new { error = $"Workflow instance {instanceId} not found." });
}
var status = workflowState.RuntimeStatus;
return Results.Ok(new
{
instanceId,
runtimeStatus = status.ToString(),
isCompleted = status == WorkflowRuntimeStatus.Completed,
isRunning = status == WorkflowRuntimeStatus.Running,
isFailed = status == WorkflowRuntimeStatus.Failed,
isTerminated = status == WorkflowRuntimeStatus.Terminated,
isPending = status == WorkflowRuntimeStatus.Pending
});
}
catch (Exception ex)
{
logger.LogError(ex, "Error retrieving workflow status for instance {InstanceId}", instanceId);
return Results.Problem(
detail: "An error occurred while retrieving the workflow status.",
statusCode: StatusCodes.Status500InternalServerError);
}
})
.WithName("GetParallelizationWorkflowStatus")
.WithTags("LLM")
.Produces(StatusCodes.Status200OK)
.Produces(StatusCodes.Status404NotFound)
.Produces(StatusCodes.Status500InternalServerError);
app.MapDefaultEndpoints();
app.Run();#pragma warning disable DAPR_CONVERSATION
using Dapr.Workflow;
using Dapr.AI.Conversation;
using Dapr.AI.Conversation.ConversationRoles;
using AspireWithDapr.Parallelization.Models;
using System.Text.Json;
namespace AspireWithDapr.Parallelization.Workflow;
public class ParallelizationWorkflow : Workflow<ParallelRequest, LlmResponse>
{
private static ILogger? _logger;
public static void SetLogger(ILoggerFactory loggerFactory)
{
_logger = loggerFactory.CreateLogger<ParallelizationWorkflow>();
}
public override async Task<LlmResponse> RunAsync(
WorkflowContext context,
ParallelRequest request)
{
ArgumentNullException.ThrowIfNull(request);
ArgumentException.ThrowIfNullOrWhiteSpace(request.Prompt);
// Create logger if not set, using a default factory as fallback
var logger = _logger ?? LoggerFactory.Create(builder => builder.AddConsole()).CreateLogger<ParallelizationWorkflow>();
var workflowInstanceId = context.InstanceId;
// Log workflow start using activity to ensure it's captured
var promptPreview = request.Prompt.Length > 100 ? request.Prompt[..100] + "..." : request.Prompt;
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Workflow Start] InstanceId: {workflowInstanceId} | Prompt: \"{promptPreview}\" | Prompt Length: {request.Prompt.Length} | Parallel Tasks: {request.ParallelTasks} | Model: {request.Model ?? "llama"} | Temperature: {request.Temperature ?? 0.7}"));
// Log full initial prompt for end-to-end tracking
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Initial Prompt] InstanceId: {workflowInstanceId}\n---\n{request.Prompt}\n---"));
context.SetCustomStatus(new { step = "preparing_parallel_tasks" });
// Step 1: Generate different perspectives/prompts
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Step 1: PrepareParallelTasks] InstanceId: {workflowInstanceId} | Parallel Tasks: {request.ParallelTasks}"));
List<string>? perspectives = null;
try
{
perspectives = await context.CallActivityAsync<List<string>>(
nameof(GeneratePerspectivesActivity),
new GeneratePerspectivesRequest(request.Prompt, request.ParallelTasks));
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Step 1: PrepareParallelTasks Complete] InstanceId: {workflowInstanceId} | Perspectives Generated: {perspectives?.Count ?? 0}"));
}
catch (Exception ex)
{
logger.LogError(ex,
"[Step 1: PrepareParallelTasks Failed] InstanceId: {InstanceId} | Error: {Error}",
workflowInstanceId,
ex.Message);
throw;
}
if (perspectives is null || perspectives.Count == 0)
{
logger.LogWarning(
"[Workflow Early Exit] InstanceId: {InstanceId} | Reason: No perspectives generated",
workflowInstanceId);
context.SetCustomStatus(new { step = "completed", warning = "no_perspectives_generated" });
return new LlmResponse(
"Unable to generate perspectives for parallel processing.",
new Dictionary<string, object>
{
["parallel_tasks"] = 0,
["workflow_type"] = "parallelization"
});
}
context.SetCustomStatus(new { step = "executing_in_parallel", task_count = perspectives.Count });
// Step 2: Execute all perspectives in parallel
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Step 2: ExecuteParallel] InstanceId: {workflowInstanceId} | Task Count: {perspectives.Count}"));
List<LlmResponse>? parallelResults = null;
try
{
var parallelTasks = perspectives.Select((perspective, index) =>
context.CallActivityAsync<LlmResponse>(
nameof(CallLlmActivity),
new LlmRequest(
perspective,
new Dictionary<string, object>(request.Context ?? new Dictionary<string, object>())
{
["perspective"] = perspective,
["perspective_index"] = index
},
request.Model,
request.Temperature)));
var results = await Task.WhenAll(parallelTasks);
parallelResults = results.ToList();
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Step 2: ExecuteParallel Complete] InstanceId: {workflowInstanceId} | Results Count: {parallelResults.Count}"));
}
catch (Exception ex)
{
logger.LogError(ex,
"[Step 2: ExecuteParallel Failed] InstanceId: {InstanceId} | Error: {Error}",
workflowInstanceId,
ex.Message);
throw;
}
if (parallelResults is null || parallelResults.Count == 0)
{
logger.LogWarning(
"[Workflow Early Exit] InstanceId: {InstanceId} | Reason: No parallel results",
workflowInstanceId);
context.SetCustomStatus(new { step = "completed", warning = "no_parallel_results" });
return new LlmResponse(
"Parallel execution completed but no results were generated.",
new Dictionary<string, object>
{
["parallel_tasks"] = perspectives.Count,
["results_count"] = 0,
["workflow_type"] = "parallelization"
});
}
if (parallelResults.Count > 0)
{
logger.LogDebug(
"[Step 2: ExecuteParallel Output] InstanceId: {InstanceId} | Results: {Results}",
workflowInstanceId,
JsonSerializer.Serialize(parallelResults.Select((r, i) => new
{
Index = i,
ResponseLength = r.Response?.Length ?? 0,
HasMetadata = r.Metadata != null
})));
}
context.SetCustomStatus(new { step = "synthesizing_results" });
// Step 3: Synthesize parallel results
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Step 3: Synthesize] InstanceId: {workflowInstanceId} | Results Count: {parallelResults.Count} | Original Prompt: \"{promptPreview}\""));
LlmResponse? synthesis = null;
try
{
synthesis = await context.CallActivityAsync<LlmResponse>(
nameof(SynthesizeParallelActivity),
new SynthesizeParallelRequest(
request.Prompt,
perspectives,
parallelResults.Select(r => r.Response).ToList(),
request.Model,
request.Temperature));
var synthesisPreview = synthesis?.Response?.Length > 150 ? synthesis.Response[..150] + "..." : synthesis?.Response;
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Step 3: Synthesize Complete] InstanceId: {workflowInstanceId} | Synthesis: \"{synthesisPreview}\" | Synthesis Length: {synthesis?.Response?.Length ?? 0}"));
// Log full synthesis after completion
if (synthesis?.Response != null)
{
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Synthesis After Step 3] InstanceId: {workflowInstanceId}\n---\n{synthesis.Response}\n---"));
}
}
catch (Exception ex)
{
logger.LogError(ex,
"[Step 3: Synthesize Failed] InstanceId: {InstanceId} | Error: {Error}",
workflowInstanceId,
ex.Message);
throw;
}
logger.LogDebug(
"[Step 3: Synthesize Output] InstanceId: {InstanceId} | Synthesis Preview: {Preview}",
workflowInstanceId,
synthesis?.Response?.Length > 200 ? synthesis.Response[..200] + "..." : synthesis?.Response);
context.SetCustomStatus(new { step = "completed" });
// Enhance metadata with parallelization information
var enhancedMetadata = new Dictionary<string, object>(synthesis?.Metadata ?? new Dictionary<string, object>())
{
["parallel_tasks"] = perspectives.Count,
["synthesis_type"] = "parallel_results",
["workflow_type"] = "parallelization"
};
var finalResponse = synthesis! with { Metadata = enhancedMetadata };
await context.CallActivityAsync<bool>(
nameof(LoggingActivity),
new LogMessage($"[Workflow Complete] InstanceId: {workflowInstanceId} | Total Steps: 3 | Parallel Tasks: {perspectives.Count} | Final Response Length: {finalResponse.Response?.Length ?? 0}"));
if (finalResponse.Metadata != null)
{
logger.LogDebug(
"[Workflow Final Metadata] InstanceId: {InstanceId} | Metadata: {Metadata}",
workflowInstanceId,
JsonSerializer.Serialize(finalResponse.Metadata));
}
if (finalResponse is null)
{
logger.LogError(
"[Workflow Error] InstanceId: {InstanceId} | Final response is null",
workflowInstanceId);
throw new InvalidOperationException("Workflow completed but final response is null.");
}
return finalResponse;
}
}
// Logging activity to ensure workflow logs are captured
public class LoggingActivity(ILogger<LoggingActivity> logger) : WorkflowActivity<LogMessage, bool>
{
public override Task<bool> RunAsync(WorkflowActivityContext context, LogMessage message)
{
try
{
switch (message.Level.ToLower())
{
case "error":
logger.LogError("[Workflow] {Message}", message.Message);
break;
case "warning":
logger.LogWarning("[Workflow] {Message}", message.Message);
break;
case "debug":
logger.LogDebug("[Workflow] {Message}", message.Message);
break;
default:
logger.LogInformation("[Workflow] {Message}", message.Message);
break;
}
return Task.FromResult(true);
}
catch (Exception ex)
{
logger.LogError(ex, "[LoggingActivity] Error logging message: {Message}", message.Message);
return Task.FromResult(false);
}
}
}
public record LogMessage(string Message, string Level = "Information");
// Activity to generate different perspectives for parallel execution
public class GeneratePerspectivesActivity(ILogger<GeneratePerspectivesActivity> logger) : WorkflowActivity<GeneratePerspectivesRequest, List<string>>
{
public override Task<List<string>> RunAsync(
WorkflowActivityContext context,
GeneratePerspectivesRequest request)
{
ArgumentNullException.ThrowIfNull(request);
ArgumentException.ThrowIfNullOrWhiteSpace(request.Prompt);
var activityName = nameof(GeneratePerspectivesActivity);
var startTime = DateTime.UtcNow;
logger.LogInformation(
"[{Activity} Start] Prompt: \"{Prompt}\" | Prompt Length: {PromptLength} | Parallel Tasks: {ParallelTasks}",
activityName,
request.Prompt.Length > 100 ? request.Prompt[..100] + "..." : request.Prompt,
request.Prompt.Length,
request.ParallelTasks);
// Generate different perspectives/prompts
var allPerspectives = new List<string>
{
$"As an expert: {request.Prompt}",
$"As a beginner: {request.Prompt}",
$"From a practical perspective: {request.Prompt}",
$"From a theoretical perspective: {request.Prompt}",
$"From a business perspective: {request.Prompt}",
$"From a technical perspective: {request.Prompt}",
$"From a user experience perspective: {request.Prompt}",
$"From a security perspective: {request.Prompt}"
};
var perspectives = allPerspectives.Take(request.ParallelTasks).ToList();
// If we need more perspectives than available, add variations
while (perspectives.Count < request.ParallelTasks)
{
perspectives.Add($"{request.Prompt} (Viewpoint {perspectives.Count + 1})");
}
var duration = (DateTime.UtcNow - startTime).TotalMilliseconds;
logger.LogInformation(
"[{Activity} Complete] Duration: {Duration}ms | Perspectives Generated: {Count}",
activityName,
duration,
perspectives.Count);
logger.LogDebug(
"[{Activity} Output] Perspectives: {Perspectives}",
activityName,
JsonSerializer.Serialize(perspectives));
return Task.FromResult(perspectives);
}
}
// Activity to call LLM (reusing pattern from AugmentedLlm)
public class CallLlmActivity(
DaprConversationClient conversationClient,
ILogger<CallLlmActivity> logger) : WorkflowActivity<LlmRequest, LlmResponse>
{
public override async Task<LlmResponse> RunAsync(
WorkflowActivityContext context,
LlmRequest request)
{
ArgumentNullException.ThrowIfNull(conversationClient);
ArgumentNullException.ThrowIfNull(request);
ArgumentException.ThrowIfNullOrWhiteSpace(request.Prompt);
var activityName = nameof(CallLlmActivity);
var startTime = DateTime.UtcNow;
var componentName = request.Model ?? "llama";
var promptPreview = request.Prompt.Length > 150 ? request.Prompt[..150] + "..." : request.Prompt;
logger.LogInformation(
"[{Activity} Start] Model: {Model} | Temperature: {Temperature} | Prompt: \"{Prompt}\" | Prompt Length: {PromptLength}",
activityName,
componentName,
request.Temperature ?? 0.7,
promptPreview,
request.Prompt.Length);
logger.LogDebug(
"[{Activity} Input] Full Request: {Request}",
activityName,
JsonSerializer.Serialize(new
{
request.Model,
request.Temperature,
PromptLength = request.Prompt.Length,
PromptPreview = request.Prompt.Length > 200 ? request.Prompt[..200] + "..." : request.Prompt,
ContextKeys = request.Context?.Keys.ToArray() ?? Array.Empty<string>()
}));
var conversationInput = new ConversationInput(
[
new UserMessage
{
Content = [new MessageContent(request.Prompt)]
}
]);
var conversationOptions = new ConversationOptions(componentName)
{
Temperature = request.Temperature ?? 0.7
};
logger.LogDebug(
"[{Activity}] Calling LLM conversation API with component: {Component}",
activityName,
componentName);
var response = await conversationClient.ConverseAsync(
[conversationInput],
conversationOptions,
CancellationToken.None);
if (response?.Outputs is null || response.Outputs.Count == 0)
{
logger.LogError(
"[{Activity} Error] LLM returned no outputs. Response: {Response}",
activityName,
response != null ? JsonSerializer.Serialize(response) : "null");
throw new InvalidOperationException("LLM returned no outputs.");
}
logger.LogDebug(
"[{Activity}] Received {OutputCount} output(s) from LLM",
activityName,
response.Outputs.Count);
var responseText = string.Empty;
var choiceCount = 0;
foreach (var output in response.Outputs)
{
if (output?.Choices is null) continue;
foreach (var choice in output.Choices)
{
var content = choice.Message?.Content ?? string.Empty;
responseText += content;
choiceCount++;
}
}
if (string.IsNullOrWhiteSpace(responseText))
{
logger.LogError(
"[{Activity} Error] LLM returned empty response. Outputs Count: {OutputCount}, Choices Processed: {ChoiceCount}",
activityName,
response.Outputs.Count,
choiceCount);
throw new InvalidOperationException("LLM returned empty response.");
}
var duration = (DateTime.UtcNow - startTime).TotalMilliseconds;
var result = new LlmResponse(
responseText,
new Dictionary<string, object>
{
["model"] = componentName,
["temperature"] = request.Temperature ?? 0.7,
["timestamp"] = DateTime.UtcNow,
["conversationId"] = response.ConversationId ?? string.Empty
});
logger.LogInformation(
"[{Activity} Complete] Duration: {Duration}ms | Response Length: {ResponseLength} | Choices: {ChoiceCount} | ConversationId: {ConversationId}",
activityName,
duration,
responseText.Length,
choiceCount,
response.ConversationId ?? "none");
logger.LogDebug(
"[{Activity} Output] Response Preview: {Preview} | Metadata: {Metadata}",
activityName,
responseText.Length > 300 ? responseText[..300] + "..." : responseText,
JsonSerializer.Serialize(result.Metadata));
return result;
}
}
// Activity to synthesize parallel results
public class SynthesizeParallelActivity(
DaprConversationClient conversationClient,
ILogger<SynthesizeParallelActivity> logger) : WorkflowActivity<SynthesizeParallelRequest, LlmResponse>
{
public override async Task<LlmResponse> RunAsync(
WorkflowActivityContext context,
SynthesizeParallelRequest request)
{
ArgumentNullException.ThrowIfNull(conversationClient);
ArgumentNullException.ThrowIfNull(request);
ArgumentNullException.ThrowIfNull(request.Perspectives);
ArgumentNullException.ThrowIfNull(request.Results);
var activityName = nameof(SynthesizeParallelActivity);
var startTime = DateTime.UtcNow;
var componentName = request.Model ?? "llama";
var promptPreview = request.OriginalPrompt.Length > 100 ? request.OriginalPrompt[..100] + "..." : request.OriginalPrompt;
logger.LogInformation(
"[{Activity} Start] Original Prompt: \"{Prompt}\" | Original Prompt Length: {PromptLength} | Perspectives Count: {PerspectivesCount} | Results Count: {ResultsCount} | Model: {Model} | Temperature: {Temperature}",
activityName,
promptPreview,
request.OriginalPrompt.Length,
request.Perspectives.Count,
request.Results.Count,
componentName,
request.Temperature ?? 0.6);
logger.LogDebug(
"[{Activity} Input] Original Prompt: {Prompt} | Perspectives: {Perspectives} | Results Count: {ResultsCount}",
activityName,
request.OriginalPrompt,
JsonSerializer.Serialize(request.Perspectives),
request.Results.Count);
var perspectivesText = string.Join("\n\n",
request.Perspectives.Zip(request.Results, (p, r) =>
$"[Perspective {request.Perspectives.IndexOf(p) + 1}]\n{p}\n\n[Analysis]\n{r}"));
var synthesisPrompt = $"""
Original question: {request.OriginalPrompt}
Multiple parallel analyses were conducted from different perspectives:
{perspectivesText}
Synthesize these analyses into a comprehensive, balanced answer.
Requirements:
1. Highlight agreements and common themes
2. Note any disagreements or different viewpoints
3. Provide a unified, well-structured response
4. Integrate insights from all perspectives
5. Maintain balance and objectivity
Synthesized Answer:
""";
var conversationInput = new ConversationInput(
[
new UserMessage
{
Content = [new MessageContent(synthesisPrompt)]
}
]);
var conversationOptions = new ConversationOptions(componentName)
{
Temperature = request.Temperature ?? 0.6
};
logger.LogDebug(
"[{Activity}] Calling LLM conversation API with component: {Component}",
activityName,
componentName);
var response = await conversationClient.ConverseAsync(
[conversationInput],
conversationOptions,
CancellationToken.None);
if (response?.Outputs is null || response.Outputs.Count == 0)
{
logger.LogError(
"[{Activity} Error] LLM returned no outputs. Response: {Response}",
activityName,
response != null ? JsonSerializer.Serialize(response) : "null");
throw new InvalidOperationException("LLM returned no outputs for synthesis.");
}
logger.LogDebug(
"[{Activity}] Received {OutputCount} output(s) from LLM",
activityName,
response.Outputs.Count);
var responseText = string.Empty;
var choiceCount = 0;
foreach (var output in response.Outputs)
{
if (output?.Choices is null) continue;
foreach (var choice in output.Choices)
{
var content = choice.Message?.Content ?? string.Empty;
responseText += content;
choiceCount++;
}
}
if (string.IsNullOrWhiteSpace(responseText))
{
logger.LogError(
"[{Activity} Error] LLM returned empty response. Outputs Count: {OutputCount}, Choices Processed: {ChoiceCount}",
activityName,
response.Outputs.Count,
choiceCount);
throw new InvalidOperationException("LLM returned empty synthesis.");
}
var duration = (DateTime.UtcNow - startTime).TotalMilliseconds;
var metadata = new Dictionary<string, object>
{
["model"] = componentName,
["temperature"] = request.Temperature ?? 0.6,
["timestamp"] = DateTime.UtcNow,
["conversationId"] = response.ConversationId ?? string.Empty,
["synthesized_from"] = request.Perspectives.Count,
["perspectives_used"] = request.Perspectives
};
var result = new LlmResponse(responseText, metadata);
var synthesisPreview = responseText.Length > 150 ? responseText[..150] + "..." : responseText;
logger.LogInformation(
"[{Activity} Complete] Duration: {Duration}ms | Synthesis: \"{Synthesis}\" | Synthesis Length: {SynthesisLength} | Choices: {ChoiceCount} | ConversationId: {ConversationId} | Perspectives: {PerspectivesCount}",
activityName,
duration,
synthesisPreview,
responseText.Length,
choiceCount,
response.ConversationId ?? "none",
request.Perspectives.Count);
logger.LogDebug(
"[{Activity} Output] Synthesis Preview: {Preview} | Metadata: {Metadata}",
activityName,
responseText.Length > 300 ? responseText[..300] + "..." : responseText,
JsonSerializer.Serialize(metadata));
return result;
}
}
// Request/Response models for activities
public record GeneratePerspectivesRequest(
string Prompt,
int ParallelTasks);
public record SynthesizeParallelRequest(
string OriginalPrompt,
List<string> Perspectives,
List<string> Results,
string? Model = null,
double? Temperature = null);If you walk through the code, you will see that it implements a Dapr Workflow based on the Parallelization pattern. This pattern represents a fan-out/fan-in approach that processes a query from multiple perspectives simultaneously and then synthesizes the results into a comprehensive answer.
The workflow consists of five main steps (activities):
-
Generate Perspectives – Creates number of different perspective prompts from a predefined list (expert, beginner, practical, theoretical, business, technical, UX, security). Does not call LLM - uses template-based generation for speed and determinism.
-
Call Llm – Executes a single LLM call using Dapr Conversation API. Multiple instances run in parallel (one per perspective) via
Task.WhenAll. Returns individual perspective analysis results. -
Synthesize Parallel – Combines all parallel results into a unified answer using LLM. Prompts the model to highlight agreements, note disagreements, integrate insights, and maintain balance across all perspectives.
NOTE: Before running the application, ensure that the Parallelization project is added to the AppHost and that the web application’s reference is updated accordingly. Refer to the code snippet below.
// AppHost.cs
// Pattern - 4. Parallelization
var parallelizationService = builder.AddProject<Projects.AspireWithDapr_Parallelization>("Parallelization")
.WithExternalHttpEndpoints()
.WithHttpHealthCheck("/health")
.WithDaprSidecar(new DaprSidecarOptions
{
ResourcesPaths = ["components"],
Config = "components/daprConfig.yaml"
})
.WithEnvironment("DAPR_HOST_IP", "127.0.0.1")
.WithEnvironment("ZIPKIN_ENDPOINT", "http://localhost:9411/api/v2/spans");
// Web Frontend
builder.AddProject<Projects.AspireWithDapr_Web>("webfrontend")
.WithExternalHttpEndpoints()
.WithHttpHealthCheck("/health")
.WithReference(augmentedLlmService)
.WaitFor(augmentedLlmService)
.WithReference(statefulLlmService)
.WaitFor(statefulLlmService)
.WithReference(promptChainingService)
.WaitFor(promptChainingService)
.WithReference(routingService)
.WaitFor(routingService)
.WithReference(parallelizationService)
.WaitFor(parallelizationService)
.WithDaprSidecar(new DaprSidecarOptions
{
ResourcesPaths = ["components"],
Config = "components/daprConfig.yaml"
})
.WithEnvironment("DAPR_HOST_IP", "127.0.0.1")
.WithEnvironment("ZIPKIN_ENDPOINT", "http://localhost:9411/api/v2/spans");Aspire, Diagrid, Redis Insight UIs (or dashbaords)
To run the application, execute the following command.
aspire run

Let’s run the web application
Run the web application from the Aspire Dashboard. Once it launches, select Parallelization (Multi-perspective) as the service type. You can then start a conversation and observe the application’s behavior. Using the Diagrid Dashboard, you can inspect the underlying workflow state and review historical execution data.

Conclusion
Together, Anthropic’s research, Dapr Workflow, and the Dapr Conversation API provide a strong foundation for building modern, production-ready LLM systems that are both capable and maintainable.
Ultimately, simplicity is a strategic choice. Starting with well-established patterns and evolving the system only as requirements demand helps ensure long-term reliability and clarity.