Dapr - Workflow & .NET Aspire

How can Dapr Workflow and the Dapr Conversation API help you implement augmented LLM and stateful LLM agentic patterns?

This blog post demonstrates how to use the Dapr Workflow building block and the Conversation API (or building block) from Dapr, together with .NET Aspire, to build an application. Dapr Workflows are stateful and support long-running, fault-tolerant applications, making them ideal for orchestrating microservices. The Dapr Workflow building block also works seamlessly with other Dapr building blocks. Dapr Workflow simplifies complex, stateful coordination requirements in microservice architectures. Application patterns such as task chaining, fan-out/fan-in, asynchronous HTTP APIs, monitoring, external system interaction, and compensation can all benefit from Dapr Workflow.

The demo applications built in this blog post is based on the principles outlined in Building Effective Agents. According to Anthropic, the term agent can be defined in several ways. For example, some describe agents as fully autonomous systems that operate independently for extended periods, using a variety of tools to accomplish complex tasks. Others use the term to refer to more prescriptive implementations that follow predefined workflows. All these variations are categorized as agentic systems, with an important architectural distinction drawn between workflows and agents:

  • Workflows are systems in which LLMs and tools are orchestrated through predefined code paths.

  • Agents are systems in which LLMs dynamically direct their own processes and tool usage, retaining control over how tasks are accomplished.

To cover all common patterns, this series starts with Augmented LLM and Stateful LLM, and the remaining patterns will be covered in future posts.

Prerequisites

  • A dasic understanding of how Dapr and Aspire (formerly .NET Aspire) work. You may also refer to my earlier posts for more information on Dapr and Aspire.
  • Ollama CLI and Llama3.2 & Phi models
  • Docker Desktop
  • Diagrid Dashboard - It helps visualizing workflow state and runs as a container, and it uses the data stored by Dapr Workflow by connecting to your local Dapr state store.

NOTE: To install Ollama CLI and LLama3.2 & Phi, you can refer my blog post Dapr - Coversation API & .NET Aspire.

Solution structure

The solution uses the following structure, which includes an API project for each pattern, Aspire-related projects, and a web application.

Changes in AppHost

It contains the following YAML files within the components folder. These are used as Dapr components to abstract infrastructure, such as various LLMs, state stores, and other Dapr configurations.

# conversation-echo.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: echo
spec:
  type: conversation.echo
  version: v1
# conversation-llama.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: llama
spec:
  type: conversation.ollama
  version: v1
  metadata:
    - name: baseUrl
      value: http://localhost:11434 # When you install and run Ollama locally, it starts an HTTP server at http://localhost:11434
    - name: model
      value: llama3.2 # It is model name. 
    - name: cacheTTL # It is used for Prompt Caching
      value: 10m
# conversation-phi.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: phi
spec:
  type: conversation.ollama
  version: v1
  metadata:
    - name: baseUrl
      value: http://localhost:11434 # When you install and run Ollama locally, it starts an HTTP server at http://localhost:11434
    - name: model
      value: phi # It is model name.
    - name: cacheTTL # It is used for Prompt Caching
      value: 10m
# daprConfig.yaml
apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
  name: daprConfig
  namespace: default
spec:
  tracing:
    samplingRate: "1"
    zipkin:
      endpointAddress: "http://localhost:9411/api/v2/spans"
# statestore.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: statestore
spec:
  type: state.redis
  version: v1
  metadata:
    - name: redisHost
      value: localhost:6379
    - name: redisPassword
      value: ""
    - name: actorStateStore
      value: "true"

After completing the component files, let’s shift our focus to the actual code.

dotnet add package CommunityToolkit.Aspire.Hosting.Dapr
# AppHost.cs
using CommunityToolkit.Aspire.Hosting.Dapr;
 
var builder = DistributedApplication.CreateBuilder(args);
 
// Diagrid Dashboard for Dapr Workflow Visualization
var diagridDashboard = builder.AddContainer("diagrid-dashboard", "ghcr.io/diagridio/diagrid-dashboard", "latest")
    .WithHttpEndpoint(8080, 8080, name: "dashboard");
 
// Add RedisInsight GUI for Redis data visualization
var redisInsight = builder.AddContainer("redisinsight", "redislabs/redisinsight", "latest")
    .WithHttpEndpoint(8001, 8001)
    .WithEnvironment("RI_APP_PORT", "8001")
    .WithEnvironment("RI_HOST", "0.0.0.0")
    .WithBindMount(Path.Combine(builder.AppHostDirectory, "redisinsight-data"), "/data");
 
// Pattern - 1. AugmentedLlm
var augmentedLlmService = builder.AddProject<Projects.AspireWithDapr_AugmentedLlm>("AugmentedLlm")
    .WithHttpHealthCheck("/health")
    .WithDaprSidecar(new DaprSidecarOptions
    {
        ResourcesPaths = ["components"],
        Config = "components/daprConfig.yaml",
        EnableApiLogging = true
    })
    .WithEnvironment("DAPR_HOST_IP", "127.0.0.1");
 
// Pattern - 1.1 StatefulLlm
var statefulLlmService = builder.AddProject<Projects.AspireWithDapr_StatefulLlm>("StatefulLlm")
    .WithExternalHttpEndpoints()
    .WithHttpHealthCheck("/health")
    .WithDaprSidecar(new DaprSidecarOptions
    {
        ResourcesPaths = ["components"],
        Config = "components/daprConfig.yaml",
        EnableApiLogging = true
    })
    .WithEnvironment("DAPR_HOST_IP", "127.0.0.1");
 
builder.AddProject<Projects.AspireWithDapr_Web>("webfrontend")
    .WithExternalHttpEndpoints()
    .WithHttpHealthCheck("/health")
    .WithReference(augmentedLlmService)
    .WaitFor(augmentedLlmService)
    .WithReference(statefulLlmService)
    .WaitFor(statefulLlmService)
    .WithDaprSidecar(new DaprSidecarOptions
    {
        ResourcesPaths = ["components"],
        Config = "components/daprConfig.yaml"
    })
    .WithEnvironment("DAPR_HOST_IP", "127.0.0.1");
 
builder.Build().Run();

The RedisInsight GUI allows you to explore and understand Dapr Workflow–related data stored in Redis.

Now, let’s start implementing the first pattern, called Augmented LLM, using Dapr Workflow and the Dapr Conversation API.

Augmented LLM (RAG - Retrieval Augmented Generation)

Agentic systems are centered around an LLM enhanced with augmentations such as retrieval, tools, and memory.

To keep the implementation simple and easy to understand, I opted for the following approach:

  • Custom types (for example, LlmRequest and LlmResponse) are placed in the Models folder.
  • Workflow and activities are defined in a single file within the Workflow folder.
  • Workflow and activities are registered in Program.cs.

dotnet add package Dapr.AI
dotnet add package Dapr.Workflow
// LlmModels.cs
using System.ComponentModel.DataAnnotations;
 
namespace AspireWithDapr.AugmentedLlm.Models;
 
public record LlmRequest(
    [Required] [MinLength(1)] string Prompt,
    Dictionary<string, object>? Context = null,
    string? Model = null,
    double? Temperature = null);
 
public record LlmResponse(
    string Response,
    Dictionary<string, object>? Metadata = null);
// SearchModels.cs
using System.ComponentModel.DataAnnotations;
 
namespace AspireWithDapr.AugmentedLlm.Models;
 
public record SearchResult(string Title, string Content, string Source);
 
public record SearchRequest(
    [Required] [MinLength(1)] string Query);
 
public record PromptRequest(
    [Required] [MinLength(1)] string Prompt,
    [Required] List<SearchResult> SearchResults);
 
public record AddCitationsRequest(
    [Required] LlmResponse Response,
    [Required] List<SearchResult> SearchResults);
// Program.cs
#pragma warning disable DAPR_CONVERSATION
 
using System.ComponentModel.DataAnnotations;
using Dapr.AI.Conversation.Extensions;
using Dapr.Workflow;
using AspireWithDapr.AugmentedLlm.Models;
using AspireWithDapr.AugmentedLlm.Workflow;
 
var builder = WebApplication.CreateBuilder(args);
 
builder.AddServiceDefaults();
 
builder.Services.AddProblemDetails();
 
builder.Services.AddOpenApi();
 
builder.Services.AddDaprConversationClient();
 
builder.Services.AddDaprWorkflow(options =>
{
    options.RegisterWorkflow<AugmentedLlmWorkflow>();
    
    options.RegisterActivity<SearchDataActivity>();
    options.RegisterActivity<AugmentPromptActivity>();
    options.RegisterActivity<CallLlmActivity>();
    options.RegisterActivity<AddCitationsActivity>();
    options.RegisterActivity<LoggingActivity>();
});
 
var app = builder.Build();
 
// Initialize workflow logger with the app's logger factory
var loggerFactory = app.Services.GetRequiredService<ILoggerFactory>();
AugmentedLlmWorkflow.SetLogger(loggerFactory);
 
app.UseExceptionHandler();
 
if (app.Environment.IsDevelopment())
{
    app.MapOpenApi();
}
 
app.MapGet("/", () => Results.Ok(new { 
    service = "Augmented LLM API", 
    version = "1.0.0",
    endpoints = new[] { "/llm/query", "/health" }
}))
.WithName("GetServiceInfo")
.WithTags("Info");
 
app.MapPost("/llm/query", async (
    [Required] LlmRequest request,
    DaprWorkflowClient workflowClient,
    ILogger<Program> logger,
    CancellationToken cancellationToken) =>
{
    if (string.IsNullOrWhiteSpace(request.Prompt))
    {
        return Results.BadRequest(new { error = "Prompt is required and cannot be empty." });
    }
 
    try
    {
        logger.LogInformation("Starting LLM workflow for prompt: {Prompt} with length {PromptLength} characters", request.Prompt, request.Prompt.Length);
        
        var instanceId = Guid.NewGuid().ToString();
        
        await workflowClient.ScheduleNewWorkflowAsync(
            nameof(AugmentedLlmWorkflow),
            instanceId,
            request);
 
        logger.LogInformation("Workflow started with instance ID: {InstanceId}", instanceId);
 
        return Results.Accepted($"/llm/query/{instanceId}", new { 
            instanceId,
            status = "started"
        });
    }
    catch (Exception ex)
    {
        logger.LogError(ex, "Error starting workflow");
        return Results.Problem(
            detail: "An error occurred while starting the workflow.",
            statusCode: StatusCodes.Status500InternalServerError);
    }
})
.WithName("StartLlmQuery")
.WithTags("LLM")
.Produces<LlmResponse>(StatusCodes.Status202Accepted)
.Produces(StatusCodes.Status400BadRequest)
.Produces(StatusCodes.Status500InternalServerError);
 
app.MapGet("/llm/query/{instanceId}", async (
    string instanceId,
    DaprWorkflowClient workflowClient,
    ILogger<Program> logger,
    CancellationToken cancellationToken) =>
{
    try
    {
        var workflowState = await workflowClient.GetWorkflowStateAsync(instanceId);
 
        if (workflowState is null)
        {
            return Results.NotFound(new { error = $"Workflow instance {instanceId} not found." });
        }
 
        if (workflowState.RuntimeStatus != WorkflowRuntimeStatus.Completed)
        {
            return Results.Accepted($"/llm/query/{instanceId}", new
            {
                instanceId,
                runtimeStatus = workflowState.RuntimeStatus.ToString()
            });
        }
 
        var output = workflowState.ReadOutputAs<LlmResponse>();
        return Results.Ok(new
        {
            instanceId,
            runtimeStatus = workflowState.RuntimeStatus.ToString(),
            result = output
        });
    }
    catch (Exception ex)
    {
        logger.LogError(ex, "Error retrieving workflow state for instance {InstanceId}", instanceId);
        return Results.Problem(
            detail: "An error occurred while retrieving the workflow state.",
            statusCode: StatusCodes.Status500InternalServerError);
    }
})
.WithName("GetLlmQueryResult")
.WithTags("LLM")
.Produces<LlmResponse>()
.Produces(StatusCodes.Status404NotFound)
.Produces(StatusCodes.Status500InternalServerError);
 
app.MapGet("/llm/query/{instanceId}/status", async (
    string instanceId,
    DaprWorkflowClient workflowClient,
    ILogger<Program> logger,
    CancellationToken cancellationToken) =>
{
    try
    {
        var workflowState = await workflowClient.GetWorkflowStateAsync(instanceId);
 
        if (workflowState is null)
        {
            return Results.NotFound(new { error = $"Workflow instance {instanceId} not found." });
        }
 
        var status = workflowState.RuntimeStatus;
        return Results.Ok(new
        {
            instanceId,
            runtimeStatus = status.ToString(),
            isCompleted = status == WorkflowRuntimeStatus.Completed,
            isRunning = status == WorkflowRuntimeStatus.Running,
            isFailed = status == WorkflowRuntimeStatus.Failed,
            isTerminated = status == WorkflowRuntimeStatus.Terminated,
            isPending = status == WorkflowRuntimeStatus.Pending
        });
    }
    catch (Exception ex)
    {
        logger.LogError(ex, "Error retrieving workflow status for instance {InstanceId}", instanceId);
        return Results.Problem(
            detail: "An error occurred while retrieving the workflow status.",
            statusCode: StatusCodes.Status500InternalServerError);
    }
})
.WithName("GetWorkflowStatus")
.WithTags("LLM")
.Produces(StatusCodes.Status200OK)
.Produces(StatusCodes.Status404NotFound)
.Produces(StatusCodes.Status500InternalServerError);
 
app.MapDefaultEndpoints();
 
app.Run();
// AugmentedLlmWorkflow.cs
#pragma warning disable DAPR_CONVERSATION
using Dapr.Workflow;
using Dapr.AI.Conversation;
using Dapr.AI.Conversation.ConversationRoles;
using AspireWithDapr.AugmentedLlm.Models;
using System.Text.Json;
 
namespace AspireWithDapr.AugmentedLlm.Workflow;
 
public class AugmentedLlmWorkflow : Workflow<LlmRequest, LlmResponse>
{
    private static ILogger? _logger;
 
    public static void SetLogger(ILoggerFactory loggerFactory)
    {
        _logger = loggerFactory.CreateLogger<AugmentedLlmWorkflow>();
    }
 
    public override async Task<LlmResponse> RunAsync(
        WorkflowContext context, 
        LlmRequest request)
    {
        ArgumentNullException.ThrowIfNull(request);
        ArgumentException.ThrowIfNullOrWhiteSpace(request.Prompt);
 
        // Create logger if not set, using a default factory as fallback
        var logger = _logger ?? LoggerFactory.Create(builder => builder.AddConsole()).CreateLogger<AugmentedLlmWorkflow>();
        var workflowInstanceId = context.InstanceId;
 
        // Log workflow start using activity to ensure it's captured
        var promptPreview = request.Prompt.Length > 100 ? request.Prompt[..100] + "..." : request.Prompt;
        await context.CallActivityAsync<bool>(
            nameof(LoggingActivity),
            new LogMessage($"[Workflow Start] InstanceId: {workflowInstanceId} | Prompt: \"{promptPreview}\" | Prompt Length: {request.Prompt.Length} | Model: {request.Model ?? "llama"} | Temperature: {request.Temperature ?? 0.7}"));
 
        // Log full initial prompt for end-to-end tracking
        await context.CallActivityAsync<bool>(
            nameof(LoggingActivity),
            new LogMessage($"[Initial Prompt] InstanceId: {workflowInstanceId}\n---\n{request.Prompt}\n---"));
 
        context.SetCustomStatus(new { step = "searching_data" });
    
        // Step 1: Search for relevant data
        await context.CallActivityAsync<bool>(
            nameof(LoggingActivity),
            new LogMessage($"[Step 1: SearchData] InstanceId: {workflowInstanceId} | Input Query: {request.Prompt}"));
 
        List<SearchResult>? searchResults = null;
        try
        {
            var searchRequest = new SearchRequest(request.Prompt);
            searchResults = await context.CallActivityAsync<List<SearchResult>>(
                nameof(SearchDataActivity), 
                searchRequest);
 
            await context.CallActivityAsync<bool>(
                nameof(LoggingActivity),
                new LogMessage($"[Step 1: SearchData Complete] InstanceId: {workflowInstanceId} | Results Count: {searchResults?.Count ?? 0}"));
 
            // Log prompt after search (same as initial, but logged here for tracking)
            await context.CallActivityAsync<bool>(
                nameof(LoggingActivity),
                new LogMessage($"[Prompt After Search] InstanceId: {workflowInstanceId}\n---\n{request.Prompt}\n---"));
        }
        catch (Exception ex)
        {
            logger.LogError(ex,
                "[Step 1: SearchData Failed] InstanceId: {InstanceId} | Error: {Error}",
                workflowInstanceId,
                ex.Message);
            throw;
        }
 
        if (searchResults != null && searchResults.Count > 0)
        {
            logger.LogDebug(
                "[Step 1: SearchData Output] InstanceId: {InstanceId} | Results: {Results}",
                workflowInstanceId,
                JsonSerializer.Serialize(searchResults.Select(r => new { r.Title, r.Source, ContentLength = r.Content.Length })));
        }
 
        if (searchResults is null || searchResults.Count == 0)
        {
            logger.LogWarning(
                "[Workflow Early Exit] InstanceId: {InstanceId} | Reason: No search results found",
                workflowInstanceId);
 
            context.SetCustomStatus(new { step = "completed", warning = "no_search_results" });
            return new LlmResponse(
                "No relevant sources were found for your query.",
                new Dictionary<string, object>
                {
                    ["sources_count"] = 0,
                    ["has_citations"] = false
                });
        }
 
        context.SetCustomStatus(new { step = "augmenting_prompt", results_count = searchResults.Count });
 
        // Step 2: Augment the prompt with search results
        var originalPromptPreview = request.Prompt.Length > 100 ? request.Prompt[..100] + "..." : request.Prompt;
        await context.CallActivityAsync<bool>(
            nameof(LoggingActivity),
            new LogMessage($"[Step 2: AugmentPrompt] InstanceId: {workflowInstanceId} | Original Prompt: \"{originalPromptPreview}\" | Original Prompt Length: {request.Prompt.Length} | Search Results Count: {searchResults.Count}"));
 
        string? augmentedPrompt = null;
        try
        {
            var promptRequest = new PromptRequest(request.Prompt, searchResults);
            augmentedPrompt = await context.CallActivityAsync<string>(
                nameof(AugmentPromptActivity), 
                promptRequest);
 
            var increase = request.Prompt.Length > 0 ? Math.Round((double)(augmentedPrompt?.Length ?? 0 - request.Prompt.Length) / request.Prompt.Length * 100, 2) : 0;
            var augmentedPromptPreview = augmentedPrompt?.Length > 150 ? augmentedPrompt[..150] + "..." : augmentedPrompt;
            await context.CallActivityAsync<bool>(
                nameof(LoggingActivity),
                new LogMessage($"[Step 2: AugmentPrompt Complete] InstanceId: {workflowInstanceId} | Augmented Prompt: \"{augmentedPromptPreview}\" | Augmented Prompt Length: {augmentedPrompt?.Length ?? 0} | Increase: {increase}%"));
 
            // Log full augmented prompt after augmentation
            await context.CallActivityAsync<bool>(
                nameof(LoggingActivity),
                new LogMessage($"[Prompt After Augmentation] InstanceId: {workflowInstanceId}\n---\n{augmentedPrompt}\n---"));
        }
        catch (Exception ex)
        {
            logger.LogError(ex,
                "[Step 2: AugmentPrompt Failed] InstanceId: {InstanceId} | Error: {Error}",
                workflowInstanceId,
                ex.Message);
            throw;
        }
 
        logger.LogDebug(
            "[Step 2: AugmentPrompt Output] InstanceId: {InstanceId} | Augmented Prompt Preview: {Preview}",
            workflowInstanceId,
            augmentedPrompt?.Length > 200 ? augmentedPrompt[..200] + "..." : augmentedPrompt);
            
        context.SetCustomStatus(new { step = "calling_llm" });
 
        // Step 3: Call the LLM with the augmented prompt
        var llmPromptPreview = augmentedPrompt?.Length > 150 ? augmentedPrompt[..150] + "..." : augmentedPrompt;
        await context.CallActivityAsync<bool>(
            nameof(LoggingActivity),
            new LogMessage($"[Step 3: CallLlm] InstanceId: {workflowInstanceId} | Model: {request.Model ?? "llama"} | Temperature: {request.Temperature ?? 0.7} | Prompt: \"{llmPromptPreview}\" | Prompt Length: {augmentedPrompt?.Length ?? 0}"));
 
        LlmResponse? response = null;
        try
        {
            var llmRequest = new LlmRequest(
                augmentedPrompt!,
                new Dictionary<string, object>
                {
                    ["search_results"] = searchResults,
                    ["original_query"] = request.Prompt,
                    ["instruction"] = "Answer based on provided sources"
                },
                Model: request.Model,
                Temperature: request.Temperature);
 
            response = await context.CallActivityAsync<LlmResponse>(
                nameof(CallLlmActivity), 
                llmRequest);
 
            await context.CallActivityAsync<bool>(
                nameof(LoggingActivity),
                new LogMessage($"[Step 3: CallLlm Complete] InstanceId: {workflowInstanceId} | Response Length: {response?.Response?.Length ?? 0} | Has Metadata: {response?.Metadata != null}"));
 
            // Log full LLM response after LLM call
            if (response?.Response != null)
            {
                await context.CallActivityAsync<bool>(
                    nameof(LoggingActivity),
                    new LogMessage($"[Response After LLM Call] InstanceId: {workflowInstanceId}\n---\n{response.Response}\n---"));
            }
        }
        catch (Exception ex)
        {
            logger.LogError(ex,
                "[Step 3: CallLlm Failed] InstanceId: {InstanceId} | Error: {Error} | StackTrace: {StackTrace}",
                workflowInstanceId,
                ex.Message,
                ex.StackTrace);
            throw;
        }
 
        if (response?.Metadata != null)
        {
            logger.LogDebug(
                "[Step 3: CallLlm Metadata] InstanceId: {InstanceId} | Metadata: {Metadata}",
                workflowInstanceId,
                JsonSerializer.Serialize(response.Metadata));
        }
 
        logger.LogDebug(
            "[Step 3: CallLlm Output Preview] InstanceId: {InstanceId} | Response Preview: {Preview}",
            workflowInstanceId,
            response?.Response?.Length > 200 ? response.Response[..200] + "..." : response?.Response);
 
        context.SetCustomStatus(new { step = "adding_citations" });
                
        // Step 4: Add citations to the LLM response
        await context.CallActivityAsync<bool>(
            nameof(LoggingActivity),
            new LogMessage($"[Step 4: AddCitations] InstanceId: {workflowInstanceId} | Response Length: {response?.Response?.Length ?? 0} | Search Results Count: {searchResults.Count}"));
 
        LlmResponse? finalResponse = null;
        try
        {
            var citationsRequest = new AddCitationsRequest(response!, searchResults);
            finalResponse = await context.CallActivityAsync<LlmResponse>(
                nameof(AddCitationsActivity), 
                citationsRequest);
 
            await context.CallActivityAsync<bool>(
                nameof(LoggingActivity),
                new LogMessage($"[Step 4: AddCitations Complete] InstanceId: {workflowInstanceId} | Final Response Length: {finalResponse?.Response?.Length ?? 0} | Citations Added: {searchResults.Count}"));
 
            // Log full final response after adding citations
            if (finalResponse?.Response != null)
            {
                await context.CallActivityAsync<bool>(
                    nameof(LoggingActivity),
                    new LogMessage($"[Response After Adding Citations] InstanceId: {workflowInstanceId}\n---\n{finalResponse.Response}\n---"));
            }
        }
        catch (Exception ex)
        {
            logger.LogError(ex,
                "[Step 4: AddCitations Failed] InstanceId: {InstanceId} | Error: {Error}",
                workflowInstanceId,
                ex.Message);
            throw;
        }
        
        context.SetCustomStatus(new { step = "completed" });
 
        await context.CallActivityAsync<bool>(
            nameof(LoggingActivity),
            new LogMessage($"[Workflow Complete] InstanceId: {workflowInstanceId} | Total Steps: 4 | Final Response Length: {finalResponse?.Response?.Length ?? 0} | Total Sources: {searchResults.Count}"));
 
        if (finalResponse?.Metadata != null)
        {
            logger.LogDebug(
                "[Workflow Final Metadata] InstanceId: {InstanceId} | Metadata: {Metadata}",
                workflowInstanceId,
                JsonSerializer.Serialize(finalResponse.Metadata));
        }
 
        if (finalResponse is null)
        {
            logger.LogError(
                "[Workflow Error] InstanceId: {InstanceId} | Final response is null",
                workflowInstanceId);
            throw new InvalidOperationException("Workflow completed but final response is null.");
        }
 
        return finalResponse;
    }
}
 
// Logging activity to ensure workflow logs are captured
public class LoggingActivity(ILogger<LoggingActivity> logger) : WorkflowActivity<LogMessage, bool>
{
    public override Task<bool> RunAsync(WorkflowActivityContext context, LogMessage message)
    {
        try
        {
            switch (message.Level.ToLower())
            {
                case "error":
                    logger.LogError("[Workflow] {Message}", message.Message);
                    break;
                case "warning":
                    logger.LogWarning("[Workflow] {Message}", message.Message);
                    break;
                case "debug":
                    logger.LogDebug("[Workflow] {Message}", message.Message);
                    break;
                default:
                    logger.LogInformation("[Workflow] {Message}", message.Message);
                    break;
            }
            return Task.FromResult(true);
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "[LoggingActivity] Error logging message: {Message}", message.Message);
            return Task.FromResult(false);
        }
    }
}
 
public record LogMessage(string Message, string Level = "Information");
 
public class SearchDataActivity(ILogger<SearchDataActivity> logger) : WorkflowActivity<SearchRequest, List<SearchResult>>
{
    public override async Task<List<SearchResult>> RunAsync(
        WorkflowActivityContext context,
        SearchRequest input)
    {
        ArgumentNullException.ThrowIfNull(input);
        ArgumentException.ThrowIfNullOrWhiteSpace(input.Query);
 
        var activityName = nameof(SearchDataActivity);
        var startTime = DateTime.UtcNow;
 
        logger.LogInformation(
            "[{Activity} Start] Query: {Query} | Query Length: {QueryLength}",
            activityName,
            input.Query,
            input.Query.Length);
 
        logger.LogDebug(
            "[{Activity} Input] Full Request: {Request}",
            activityName,
            JsonSerializer.Serialize(input));
 
        // Simulate API call
        logger.LogDebug("[{Activity}] Simulating search API call...", activityName);
        await Task.Delay(100);
        
        var results = new List<SearchResult>
        {
            new($"Document about {input.Query}", 
                $"This document contains comprehensive information about {input.Query}. " +
                $"It provides detailed analysis, examples, and best practices.",
                "Knowledge Base"),
            new($"Technical Guide: {input.Query}",
                $"A technical guide covering {input.Query}. Includes implementation details, " +
                $"code examples, troubleshooting, and performance considerations.",
                "Technical Library"),
            new($"Research Paper: {input.Query}",
                $"Academic research on {input.Query}. Presents findings, methodologies, " +
                $"and future research directions.",
                "Research Database")
        };
 
        var duration = (DateTime.UtcNow - startTime).TotalMilliseconds;
        var totalContentLength = results.Sum(r => r.Content.Length);
 
        logger.LogInformation(
            "[{Activity} Complete] Duration: {Duration}ms | Results Count: {Count} | Total Content Length: {ContentLength}",
            activityName,
            duration,
            results.Count,
            totalContentLength);
 
        logger.LogDebug(
            "[{Activity} Output] Results: {Results}",
            activityName,
            JsonSerializer.Serialize(results.Select(r => new
            {
                r.Title,
                r.Source,
                ContentLength = r.Content.Length,
                ContentPreview = r.Content.Length > 100 ? r.Content[..100] + "..." : r.Content
            })));
 
        return results;
    }
}
public class AugmentPromptActivity(ILogger<AugmentPromptActivity> logger) : WorkflowActivity<PromptRequest, string>
{
    public override Task<string> RunAsync(
        WorkflowActivityContext context,
        PromptRequest input)
    {
        ArgumentNullException.ThrowIfNull(input);
        ArgumentException.ThrowIfNullOrWhiteSpace(input.Prompt);
        ArgumentNullException.ThrowIfNull(input.SearchResults);
 
        var activityName = nameof(AugmentPromptActivity);
        var startTime = DateTime.UtcNow;
        var promptPreview = input.Prompt.Length > 100 ? input.Prompt[..100] + "..." : input.Prompt;
 
        logger.LogInformation(
            "[{Activity} Start] Original Prompt: \"{Prompt}\" | Original Prompt Length: {PromptLength} | Search Results Count: {ResultsCount}",
            activityName,
            promptPreview,
            input.Prompt.Length,
            input.SearchResults.Count);
 
        logger.LogDebug(
            "[{Activity} Input] Original Prompt: {Prompt} | Results: {Results}",
            activityName,
            input.Prompt,
            JsonSerializer.Serialize(input.SearchResults.Select(r => new { r.Title, r.Source, ContentLength = r.Content.Length })));
 
        var sourcesText = string.Join("\n\n",
            input.SearchResults.Select((r, i) =>
                $"[Source {i + 1}]\nTitle: {r.Title}\nContent: {r.Content}\nOrigin: {r.Source}"));
 
        var augmentedPrompt = $"""
            Based on the following verified sources:
            
            {sourcesText}
            
            Please answer this question: {input.Prompt}
            
            Requirements:
            1. Use information only from provided sources
            2. Cite sources using [Source X] format
            3. If information is not available in sources, state so clearly
            4. Provide a comprehensive, well-structured response
            
            Answer:
            """;
 
        var duration = (DateTime.UtcNow - startTime).TotalMilliseconds;
        var promptIncrease = input.Prompt.Length > 0 
            ? Math.Round((double)(augmentedPrompt.Length - input.Prompt.Length) / input.Prompt.Length * 100, 2) 
            : 0;
 
        var augmentedPromptPreview = augmentedPrompt.Length > 150 ? augmentedPrompt[..150] + "..." : augmentedPrompt;
        logger.LogInformation(
            "[{Activity} Complete] Duration: {Duration}ms | Augmented Prompt: \"{AugmentedPrompt}\" | Augmented Prompt Length: {AugmentedLength} | Increase: {Increase}% | Sources Text Length: {SourcesLength}",
            activityName,
            duration,
            augmentedPromptPreview,
            augmentedPrompt.Length,
            promptIncrease,
            sourcesText.Length);
 
        logger.LogDebug(
            "[{Activity} Output] Augmented Prompt Preview: {Preview}",
            activityName,
            augmentedPrompt.Length > 300 ? augmentedPrompt[..300] + "..." : augmentedPrompt);
 
        return Task.FromResult(augmentedPrompt);
    }
}
public class CallLlmActivity(
    DaprConversationClient conversationClient,
    ILogger<CallLlmActivity> logger) : WorkflowActivity<LlmRequest, LlmResponse>
{
    public override async Task<LlmResponse> RunAsync(
        WorkflowActivityContext context,
        LlmRequest request)
    {
        ArgumentNullException.ThrowIfNull(conversationClient);
        ArgumentNullException.ThrowIfNull(request);
        ArgumentException.ThrowIfNullOrWhiteSpace(request.Prompt);
 
        var activityName = nameof(CallLlmActivity);
        var startTime = DateTime.UtcNow;
        var componentName = request.Model ?? "llama";
        var promptPreview = request.Prompt.Length > 150 ? request.Prompt[..150] + "..." : request.Prompt;
 
        logger.LogInformation(
            "[{Activity} Start] Model: {Model} | Temperature: {Temperature} | Prompt: \"{Prompt}\" | Prompt Length: {PromptLength}",
            activityName,
            componentName,
            request.Temperature ?? 0.7,
            promptPreview,
            request.Prompt.Length);
 
        logger.LogDebug(
            "[{Activity} Input] Full Request: {Request}",
            activityName,
            JsonSerializer.Serialize(new
            {
                request.Model,
                request.Temperature,
                PromptLength = request.Prompt.Length,
                PromptPreview = request.Prompt.Length > 200 ? request.Prompt[..200] + "..." : request.Prompt,
                ContextKeys = request.Context?.Keys.ToArray() ?? Array.Empty<string>()
            }));
 
        var conversationInput = new ConversationInput(
        [
            new UserMessage
            {
                Content = [new MessageContent(request.Prompt)]
            }
        ]);
        
        var conversationOptions = new ConversationOptions(componentName)
        {
            Temperature = request.Temperature ?? 0.7
        };
 
        logger.LogDebug(
            "[{Activity}] Calling LLM conversation API with component: {Component}",
            activityName,
            componentName);
 
        var response = await conversationClient.ConverseAsync(
            [conversationInput],
            conversationOptions,
            CancellationToken.None);
        
        if (response?.Outputs is null || response.Outputs.Count == 0)
        {
            logger.LogError(
                "[{Activity} Error] LLM returned no outputs. Response: {Response}",
                activityName,
                response != null ? JsonSerializer.Serialize(response) : "null");
            throw new InvalidOperationException("LLM returned no outputs.");
        }
 
        logger.LogDebug(
            "[{Activity}] Received {OutputCount} output(s) from LLM",
            activityName,
            response.Outputs.Count);
 
        var responseText = string.Empty;
        var choiceCount = 0;
        foreach (var output in response.Outputs)
        {
            if (output?.Choices is null) continue;
            
            foreach (var choice in output.Choices)
            {
                var content = choice.Message?.Content ?? string.Empty;
                responseText += content;
                choiceCount++;
            }
        }
 
        if (string.IsNullOrWhiteSpace(responseText))
        {
            logger.LogError(
                "[{Activity} Error] LLM returned empty response. Outputs Count: {OutputCount}, Choices Processed: {ChoiceCount}",
                activityName,
                response.Outputs.Count,
                choiceCount);
            throw new InvalidOperationException("LLM returned empty response.");
        }
 
        var duration = (DateTime.UtcNow - startTime).TotalMilliseconds;
        var result = new LlmResponse(
            responseText,
            new Dictionary<string, object>
            {
                ["model"] = componentName,
                ["temperature"] = request.Temperature ?? 0.7,
                ["timestamp"] = DateTime.UtcNow,
                ["conversationId"] = response.ConversationId ?? string.Empty
            });
 
        logger.LogInformation(
            "[{Activity} Complete] Duration: {Duration}ms | Response Length: {ResponseLength} | Choices: {ChoiceCount} | ConversationId: {ConversationId}",
            activityName,
            duration,
            responseText.Length,
            choiceCount,
            response.ConversationId ?? "none");
 
        logger.LogDebug(
            "[{Activity} Output] Response Preview: {Preview} | Metadata: {Metadata}",
            activityName,
            responseText.Length > 300 ? responseText[..300] + "..." : responseText,
            JsonSerializer.Serialize(result.Metadata));
 
        return result;
    }
}
public class AddCitationsActivity(ILogger<AddCitationsActivity> logger) : WorkflowActivity<AddCitationsRequest, LlmResponse>
{
    public override Task<LlmResponse> RunAsync(
        WorkflowActivityContext context,
        AddCitationsRequest input)
    {
        ArgumentNullException.ThrowIfNull(input);
        ArgumentNullException.ThrowIfNull(input.Response);
        ArgumentNullException.ThrowIfNull(input.SearchResults);
 
        var activityName = nameof(AddCitationsActivity);
        var startTime = DateTime.UtcNow;
 
        logger.LogInformation(
            "[{Activity} Start] Response Length: {ResponseLength} | Search Results Count: {ResultsCount}",
            activityName,
            input.Response.Response?.Length ?? 0,
            input.SearchResults.Count);
 
        logger.LogDebug(
            "[{Activity} Input] Response Preview: {ResponsePreview} | Results: {Results}",
            activityName,
            input.Response.Response?.Length > 200 ? input.Response.Response[..200] + "..." : input.Response.Response,
            JsonSerializer.Serialize(input.SearchResults.Select(r => new { r.Title, r.Source })));
 
        var sourcesList = string.Join("\n",
            input.SearchResults.Select((r, i) =>
                $"{i + 1}. {r.Title} ({r.Source})"));
 
        var citedResponse = $"{input.Response.Response}\n\n" +
                           $"## References\n{sourcesList}";
 
        var metadata = new Dictionary<string, object>(input.Response.Metadata ?? new Dictionary<string, object>())
        {
            ["sources_count"] = input.SearchResults.Count,
            ["has_citations"] = true,
            ["citation_format"] = "numbered"
        };
 
        var duration = (DateTime.UtcNow - startTime).TotalMilliseconds;
        var lengthIncrease = input.Response.Response?.Length > 0
            ? Math.Round((double)(citedResponse.Length - input.Response.Response.Length) / input.Response.Response.Length * 100, 2)
            : 0;
 
        logger.LogInformation(
            "[{Activity} Complete] Duration: {Duration}ms | Final Response Length: {FinalLength} | Length Increase: {Increase}% | Citations Added: {CitationsCount}",
            activityName,
            duration,
            citedResponse.Length,
            lengthIncrease,
            input.SearchResults.Count);
 
        logger.LogDebug(
            "[{Activity} Output] Final Response Preview: {Preview} | Metadata: {Metadata}",
            activityName,
            citedResponse.Length > 300 ? citedResponse[..300] + "..." : citedResponse,
            JsonSerializer.Serialize(metadata));
 
        return Task.FromResult(new LlmResponse(citedResponse, metadata));
    }
}
 

If you review the code, you will see that it implements a Dapr Workflow following the Retrieval-Augmented Generation (RAG) pattern. This approach enhances LLM responses by first retrieving relevant data and then augmenting the prompt with that context before invoking the LLM. It has four main steps (Activities):

  • Search Data - Searches for relevant data/documents based on the user's query. Currently simulates a search API returning 3 mock results (Knowledge Base, Technical Library, Research Database). In production, this would connect to a real vector database or search service.

  • Augment Prompt - Combines the original user prompt with retrieved search results into a structured prompt template with instructions for the LLM.

  • Call LLM - Calls the actual LLM via Dapr AI Conversation API (DaprConversationClient). Sends the augmented prompt and receives the LLM-generated response.

  • Add Citations - Appends a formatted "References" section to the LLM response, listing all sources used. Updates metadata with citation information.

Aspire, Diagrid, Redis Insight UIs (or dashbaords)

To run the application, execute the following command.

aspire run 

Let’s run the web application

Run the web application from the Aspire Dashboard. After it launches, ask the first question: Who is Elon Musk?

You can now analyze the response using the Diagrid Dashboard, which is a valuable tool for understanding the internal flow of the workflow and its activities, including the input and output of each activity.

Now, let’s start working on Stateful LLMs, where all conversations are stored on a session-wise basis.

Stateful LLM (Conversational Memory)

To support Stateful LLMs, the AugmentedLlm API was copied and renamed to StatefulLlm, after which the necessary code changes were made to persist all conversations.

// StatefulModels.cs
using System.ComponentModel.DataAnnotations;
 
namespace AspireWithDapr.StatefulLlm.Models;
 
public record StatefulLlmRequest(
    [Required] [MinLength(1)] string Prompt,
    string? SessionId = null,
    string? Model = null,
    double? Temperature = null,
    Dictionary<string, object>? Context = null);
 
public record StatefulLlmResponse(
    string Response,
    string SessionId,
    Dictionary<string, object>? Metadata = null);
 
public record ConversationState(
    string SessionId,
    List<ConversationTurn> History,
    DateTime CreatedAt,
    DateTime LastUpdatedAt,
    string? Model = null,
    double? Temperature = null)
{
    public int TurnCount => History.Count;
}
 
public record ConversationTurn(
    string Role, // "user" or "assistant"
    string Content,
    DateTime Timestamp);
 
public record SearchResult(string Title, string Content, string Source);
 
public record SearchRequest(
    [Required] [MinLength(1)] string Query);
 
public record PromptRequest(
    [Required] [MinLength(1)] string Prompt,
    [Required] List<SearchResult> SearchResults,
    List<ConversationTurn>? ConversationHistory = null);
 
public record AddCitationsRequest(
    [Required] StatefulLlmResponse Response,
    [Required] List<SearchResult> SearchResults);
 
// Program.cs
#pragma warning disable DAPR_CONVERSATION
 
using System.ComponentModel.DataAnnotations;
using Dapr.AI.Conversation.Extensions;
using Dapr.Workflow;
using AspireWithDapr.StatefulLlm.Models;
using AspireWithDapr.StatefulLlm.Workflow;
using Dapr.Client;
 
var builder = WebApplication.CreateBuilder(args);
 
builder.AddServiceDefaults();
 
builder.Services.AddProblemDetails();
 
builder.Services.AddOpenApi();
 
builder.Services.AddDaprConversationClient();
builder.Services.AddDaprClient();
 
builder.Services.AddDaprWorkflow(options =>
{
    options.RegisterWorkflow<StatefulLlmWorkflow>();
    
    options.RegisterActivity<LoadConversationStateActivity>();
    options.RegisterActivity<SaveConversationStateActivity>();
    options.RegisterActivity<SearchDataActivity>();
    options.RegisterActivity<AugmentPromptActivity>();
    options.RegisterActivity<CallLlmActivity>();
    options.RegisterActivity<AddCitationsActivity>();
});
 
var app = builder.Build();
 
// Initialize workflow logger with the app's logger factory
var loggerFactory = app.Services.GetRequiredService<ILoggerFactory>();
StatefulLlmWorkflow.SetLogger(loggerFactory);
 
app.UseExceptionHandler();
 
if (app.Environment.IsDevelopment())
{
    app.MapOpenApi();
}
 
app.MapGet("/", () => Results.Ok(new { 
    service = "Stateful LLM API", 
    version = "1.0.0",
    endpoints = new[] { "/llm/query", "/llm/session/{sessionId}", "/llm/session/{sessionId}/history", "/health" }
}))
.WithName("GetServiceInfo")
.WithTags("Info");
 
app.MapPost("/llm/query", async (
    [Required] StatefulLlmRequest request,
    DaprWorkflowClient workflowClient,
    ILogger<Program> logger,
    CancellationToken cancellationToken) =>
{
    if (string.IsNullOrWhiteSpace(request.Prompt))
    {
        return Results.BadRequest(new { error = "Prompt is required and cannot be empty." });
    }
 
    try
    {
        logger.LogInformation(
            "Starting stateful LLM workflow | SessionId: {SessionId} | Prompt Length: {PromptLength}",
            request.SessionId ?? "new", request.Prompt.Length);
        
        var instanceId = Guid.NewGuid().ToString();
        
        await workflowClient.ScheduleNewWorkflowAsync(
            nameof(StatefulLlmWorkflow),
            instanceId,
            request);
 
        logger.LogInformation("Workflow started | InstanceId: {InstanceId} | SessionId: {SessionId}", 
            instanceId, request.SessionId ?? "new");
 
        return Results.Accepted($"/llm/query/{instanceId}", new { 
            instanceId,
            sessionId = request.SessionId,
            status = "started"
        });
    }
    catch (Exception ex)
    {
        logger.LogError(ex, "Error starting workflow");
        return Results.Problem(
            detail: "An error occurred while starting the workflow.",
            statusCode: StatusCodes.Status500InternalServerError);
    }
})
.WithName("StartStatefulLlmQuery")
.WithTags("LLM")
.Produces<StatefulLlmResponse>(StatusCodes.Status202Accepted)
.Produces(StatusCodes.Status400BadRequest)
.Produces(StatusCodes.Status500InternalServerError);
 
app.MapGet("/llm/query/{instanceId}", async (
    string instanceId,
    DaprWorkflowClient workflowClient,
    ILogger<Program> logger,
    CancellationToken cancellationToken) =>
{
    try
    {
        var workflowState = await workflowClient.GetWorkflowStateAsync(instanceId);
 
        if (workflowState is null)
        {
            return Results.NotFound(new { error = $"Workflow instance {instanceId} not found." });
        }
 
        if (workflowState.RuntimeStatus != WorkflowRuntimeStatus.Completed)
        {
            return Results.Accepted($"/llm/query/{instanceId}", new
            {
                instanceId,
                runtimeStatus = workflowState.RuntimeStatus.ToString()
            });
        }
 
        var output = workflowState.ReadOutputAs<StatefulLlmResponse>();
        return Results.Ok(new
        {
            instanceId,
            runtimeStatus = workflowState.RuntimeStatus.ToString(),
            result = output
        });
    }
    catch (Exception ex)
    {
        logger.LogError(ex, "Error retrieving workflow state for instance {InstanceId}", instanceId);
        return Results.Problem(
            detail: "An error occurred while retrieving the workflow state.",
            statusCode: StatusCodes.Status500InternalServerError);
    }
})
.WithName("GetStatefulLlmQueryResult")
.WithTags("LLM")
.Produces<StatefulLlmResponse>()
.Produces(StatusCodes.Status404NotFound)
.Produces(StatusCodes.Status500InternalServerError);
 
app.MapGet("/llm/session/{sessionId}", async (
    string sessionId,
    DaprClient daprClient,
    ILogger<Program> logger,
    CancellationToken cancellationToken) =>
{
    try
    {
        const string StateStoreName = "statestore";
        const string ConversationStatePrefix = "conversation:";
        
        var stateKey = $"{ConversationStatePrefix}{sessionId}";
        var state = await daprClient.GetStateAsync<ConversationState>(
            StateStoreName,
            stateKey,
            cancellationToken: cancellationToken);
 
        if (state == null)
        {
            return Results.NotFound(new { error = $"Session {sessionId} not found." });
        }
 
        return Results.Ok(new
        {
            sessionId = state.SessionId,
            turnCount = state.TurnCount,
            createdAt = state.CreatedAt,
            lastUpdatedAt = state.LastUpdatedAt,
            model = state.Model,
            temperature = state.Temperature,
            history = state.History.Select(t => new
            {
                t.Role,
                t.Content,
                t.Timestamp
            })
        });
    }
    catch (Exception ex)
    {
        logger.LogError(ex, "Error retrieving session {SessionId}", sessionId);
        return Results.Problem(
            detail: "An error occurred while retrieving the session.",
            statusCode: StatusCodes.Status500InternalServerError);
    }
})
.WithName("GetSession")
.WithTags("Session")
.Produces(StatusCodes.Status200OK)
.Produces(StatusCodes.Status404NotFound)
.Produces(StatusCodes.Status500InternalServerError);
 
app.MapDelete("/llm/session/{sessionId}", async (
    string sessionId,
    DaprClient daprClient,
    ILogger<Program> logger,
    CancellationToken cancellationToken) =>
{
    try
    {
        const string StateStoreName = "statestore";
        const string ConversationStatePrefix = "conversation:";
        
        var stateKey = $"{ConversationStatePrefix}{sessionId}";
        await daprClient.DeleteStateAsync(
            StateStoreName,
            stateKey,
            cancellationToken: cancellationToken);
 
        logger.LogInformation("Session deleted | SessionId: {SessionId}", sessionId);
        return Results.Ok(new { message = $"Session {sessionId} deleted successfully." });
    }
    catch (Exception ex)
    {
        logger.LogError(ex, "Error deleting session {SessionId}", sessionId);
        return Results.Problem(
            detail: "An error occurred while deleting the session.",
            statusCode: StatusCodes.Status500InternalServerError);
    }
})
.WithName("DeleteSession")
.WithTags("Session")
.Produces(StatusCodes.Status200OK)
.Produces(StatusCodes.Status500InternalServerError);
 
app.MapGet("/llm/session/{sessionId}/history", async (
    string sessionId,
    DaprClient daprClient,
    ILogger<Program> logger,
    CancellationToken cancellationToken) =>
{
    try
    {
        const string StateStoreName = "statestore";
        const string ConversationStatePrefix = "conversation:";
        
        var stateKey = $"{ConversationStatePrefix}{sessionId}";
        var state = await daprClient.GetStateAsync<ConversationState>(
            StateStoreName,
            stateKey,
            cancellationToken: cancellationToken);
 
        if (state == null)
        {
            return Results.NotFound(new { error = $"Session {sessionId} not found." });
        }
 
        return Results.Ok(new
        {
            sessionId = state.SessionId,
            turnCount = state.TurnCount,
            history = state.History.Select(t => new
            {
                t.Role,
                t.Content,
                t.Timestamp
            })
        });
    }
    catch (Exception ex)
    {
        logger.LogError(ex, "Error retrieving session history {SessionId}", sessionId);
        return Results.Problem(
            detail: "An error occurred while retrieving the session history.",
            statusCode: StatusCodes.Status500InternalServerError);
    }
})
.WithName("GetSessionHistory")
.WithTags("Session")
.Produces(StatusCodes.Status200OK)
.Produces(StatusCodes.Status404NotFound)
.Produces(StatusCodes.Status500InternalServerError);
 
app.MapDefaultEndpoints();
 
app.Run();
// StatefulLlmWorkflow.cs
#pragma warning disable DAPR_CONVERSATION
using Dapr.Workflow;
using Dapr.AI.Conversation;
using Dapr.AI.Conversation.ConversationRoles;
using AspireWithDapr.StatefulLlm.Models;
using System.Text.Json;
using Dapr.Client;
 
namespace AspireWithDapr.StatefulLlm.Workflow;
 
public class StatefulLlmWorkflow : Workflow<StatefulLlmRequest, StatefulLlmResponse>
{
    private static ILogger? _logger;
    private const string StateStoreName = "statestore";
    private const string ConversationStatePrefix = "conversation:";
 
    public static void SetLogger(ILoggerFactory loggerFactory)
    {
        _logger = loggerFactory.CreateLogger<StatefulLlmWorkflow>();
    }
 
    public override async Task<StatefulLlmResponse> RunAsync(
        WorkflowContext context,
        StatefulLlmRequest request)
    {
        ArgumentNullException.ThrowIfNull(request);
        ArgumentException.ThrowIfNullOrWhiteSpace(request.Prompt);
 
        var logger = _logger ?? LoggerFactory.Create(builder => builder.AddConsole()).CreateLogger<StatefulLlmWorkflow>();
        var workflowInstanceId = context.InstanceId;
 
        // Generate or use provided session ID
        // Use context.NewGuid() instead of Guid.NewGuid() to ensure deterministic ID across workflow replays
        var sessionId = request.SessionId ?? context.NewGuid().ToString();
        
        logger.LogInformation(
            "[Workflow Start] InstanceId: {InstanceId} | SessionId: {SessionId} | Prompt Length: {PromptLength} | SessionIdProvided: {SessionIdProvided}",
            workflowInstanceId, sessionId, request.Prompt.Length, request.SessionId != null);
 
        context.SetCustomStatus(new { step = "loading_state", sessionId });
 
        // Step 1: Load conversation state if session ID exists
        ConversationState? conversationState = null;
        try
        {
            conversationState = await context.CallActivityAsync<ConversationState?>(
                nameof(LoadConversationStateActivity),
                sessionId);
            
            if (conversationState != null)
            {
                logger.LogInformation(
                    "[State Loaded] SessionId: {SessionId} | Turn Count: {TurnCount} | Last Updated: {LastUpdated}",
                    sessionId, conversationState.TurnCount, conversationState.LastUpdatedAt);
            }
            else
            {
                logger.LogInformation("[New Session] SessionId: {SessionId}", sessionId);
            }
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "[State Load Failed] SessionId: {SessionId}", sessionId);
            // Continue with new conversation if state load fails
        }
 
        // Initialize or update conversation state
        if (conversationState == null)
        {
            conversationState = new ConversationState(
                sessionId,
                [],
                DateTime.UtcNow,
                DateTime.UtcNow,
                request.Model,
                request.Temperature);
        }
        else
        {
            // Update model/temperature if provided
            conversationState = conversationState with
            {
                Model = request.Model ?? conversationState.Model,
                Temperature = request.Temperature ?? conversationState.Temperature,
                LastUpdatedAt = DateTime.UtcNow
            };
        }
 
        // Add user message to history
        var userTurn = new ConversationTurn("user", request.Prompt, DateTime.UtcNow);
        var updatedHistory = new List<ConversationTurn>(conversationState.History) { userTurn };
        conversationState = conversationState with { History = updatedHistory };
 
        context.SetCustomStatus(new { step = "searching_data", sessionId, turnCount = conversationState.TurnCount });
 
        // Step 2: Search for relevant data
        List<SearchResult>? searchResults = null;
        try
        {
            var searchRequest = new SearchRequest(request.Prompt);
            searchResults = await context.CallActivityAsync<List<SearchResult>>(
                nameof(SearchDataActivity),
                searchRequest);
 
            logger.LogInformation(
                "[Search Complete] SessionId: {SessionId} | Results Count: {Count}",
                sessionId, searchResults?.Count ?? 0);
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "[Search Failed] SessionId: {SessionId}", sessionId);
            // Continue without search results
        }
 
        if (searchResults is null || searchResults.Count == 0)
        {
            logger.LogWarning("[No Search Results] SessionId: {SessionId}", sessionId);
            searchResults = [];
        }
 
        context.SetCustomStatus(new { step = "augmenting_prompt", sessionId });
 
        // Step 3: Augment prompt with search results and conversation history
        string? augmentedPrompt = null;
        try
        {
            var promptRequest = new PromptRequest(
                request.Prompt,
                searchResults,
                conversationState.History.SkipLast(1).ToList()); // Exclude current user message
 
            augmentedPrompt = await context.CallActivityAsync<string>(
                nameof(AugmentPromptActivity),
                promptRequest);
 
            logger.LogInformation(
                "[Prompt Augmented] SessionId: {SessionId} | Length: {Length}",
                sessionId, augmentedPrompt?.Length ?? 0);
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "[Augment Prompt Failed] SessionId: {SessionId}", sessionId);
            throw;
        }
 
        context.SetCustomStatus(new { step = "calling_llm", sessionId });
 
        // Step 4: Call LLM with conversation history
        StatefulLlmResponse? response = null;
        try
        {
            var llmRequest = new StatefulLlmRequest(
                augmentedPrompt!,
                sessionId,
                conversationState.Model,
                conversationState.Temperature,
                new Dictionary<string, object>
                {
                    ["search_results"] = searchResults,
                    ["conversation_history"] = conversationState.History.SkipLast(1).ToList(),
                    ["original_query"] = request.Prompt
                });
 
            response = await context.CallActivityAsync<StatefulLlmResponse>(
                nameof(CallLlmActivity),
                llmRequest);
 
            logger.LogInformation(
                "[LLM Response] SessionId: {SessionId} | Response Length: {Length}",
                sessionId, response?.Response?.Length ?? 0);
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "[LLM Call Failed] SessionId: {SessionId}", sessionId);
            throw;
        }
 
        if (response == null)
        {
            throw new InvalidOperationException("LLM returned null response");
        }
 
        // Add assistant response to history
        var assistantTurn = new ConversationTurn("assistant", response.Response, DateTime.UtcNow);
        updatedHistory = new List<ConversationTurn>(conversationState.History) { assistantTurn };
        conversationState = conversationState with
        {
            History = updatedHistory,
            LastUpdatedAt = DateTime.UtcNow
        };
 
        context.SetCustomStatus(new { step = "adding_citations", sessionId });
 
        // Step 5: Add citations
        StatefulLlmResponse? finalResponse = null;
        try
        {
            if (searchResults.Count > 0)
            {
                var citationsRequest = new AddCitationsRequest(response, searchResults);
                finalResponse = await context.CallActivityAsync<StatefulLlmResponse>(
                    nameof(AddCitationsActivity),
                    citationsRequest);
            }
            else
            {
                finalResponse = response;
            }
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "[Add Citations Failed] SessionId: {SessionId}", sessionId);
            finalResponse = response; // Return response without citations if citation fails
        }
 
        context.SetCustomStatus(new { step = "saving_state", sessionId });
 
        // Step 6: Save conversation state
        try
        {
            await context.CallActivityAsync<bool>(
                nameof(SaveConversationStateActivity),
                conversationState);
 
            logger.LogInformation(
                "[State Saved] SessionId: {SessionId} | Turn Count: {TurnCount}",
                sessionId, conversationState.TurnCount);
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "[State Save Failed] SessionId: {SessionId}", sessionId);
            // Don't fail the workflow if state save fails
        }
 
        context.SetCustomStatus(new { step = "completed", sessionId });
 
        // Update metadata with session info
        var metadata = new Dictionary<string, object>(finalResponse.Metadata ?? [])
        {
            ["session_id"] = sessionId,
            ["turn_count"] = conversationState.TurnCount,
            ["conversation_length"] = conversationState.History.Sum(t => t.Content.Length)
        };
 
        return finalResponse with { Metadata = metadata };
    }
}
 
// Activity to load conversation state from Dapr state store
public class LoadConversationStateActivity(DaprClient daprClient, ILogger<LoadConversationStateActivity> logger)
    : WorkflowActivity<string, ConversationState?>
{
    private const string StateStoreName = "statestore";
    private const string ConversationStatePrefix = "conversation:";
 
    public override async Task<ConversationState?> RunAsync(
        WorkflowActivityContext context,
        string sessionId)
    {
        ArgumentException.ThrowIfNullOrWhiteSpace(sessionId);
 
        try
        {
            var stateKey = $"{ConversationStatePrefix}{sessionId}";
            var state = await daprClient.GetStateAsync<ConversationState>(
                StateStoreName,
                stateKey);
 
            if (state != null)
            {
                logger.LogInformation(
                    "[LoadState Success] SessionId: {SessionId} | Turn Count: {TurnCount} | History Count: {HistoryCount}",
                    sessionId, state.TurnCount, state.History?.Count ?? 0);
                return state;
            }
 
            logger.LogInformation("[LoadState Not Found] StateKey: {StateKey}", stateKey);
            return null;
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "[LoadState Error] SessionId: {SessionId}", sessionId);
            return null; // Return null on error to allow new conversation
        }
    }
}
 
// Activity to save conversation state to Dapr state store
public class SaveConversationStateActivity(DaprClient daprClient, ILogger<SaveConversationStateActivity> logger)
    : WorkflowActivity<ConversationState, bool>
{
    private const string StateStoreName = "statestore";
    private const string ConversationStatePrefix = "conversation:";
 
    public override async Task<bool> RunAsync(
        WorkflowActivityContext context,
        ConversationState state)
    {
        ArgumentNullException.ThrowIfNull(state);
        ArgumentException.ThrowIfNullOrWhiteSpace(state.SessionId);
 
        try
        {
            var stateKey = $"{ConversationStatePrefix}{state.SessionId}";
            await daprClient.SaveStateAsync(
                StateStoreName,
                stateKey,
                state);
 
            logger.LogInformation(
                "[SaveState Success] SessionId: {SessionId} | Turn Count: {TurnCount} | History Count: {HistoryCount}",
                state.SessionId, state.TurnCount, state.History?.Count ?? 0);
            return true;
        }
        catch (Exception ex)
        {
            logger.LogError(ex, "[SaveState Error] SessionId: {SessionId}", state.SessionId);
            return false;
        }
    }
}
 
// Reuse activities from AugmentedLlm with adaptations
public class SearchDataActivity(ILogger<SearchDataActivity> logger) : WorkflowActivity<SearchRequest, List<SearchResult>>
{
    public override async Task<List<SearchResult>> RunAsync(
        WorkflowActivityContext context,
        SearchRequest input)
    {
        ArgumentNullException.ThrowIfNull(input);
        ArgumentException.ThrowIfNullOrWhiteSpace(input.Query);
 
        logger.LogInformation("[SearchData] Query: {Query}", input.Query);
        
        // Simulate search - in production, replace with actual search implementation
        await Task.Delay(100);
        
        return
        [
            new($"Document about {input.Query}",
                $"This document contains comprehensive information about {input.Query}.",
                "Knowledge Base"),
            new($"Technical Guide: {input.Query}",
                $"A technical guide covering {input.Query}.",
                "Technical Library")
        ];
    }
}
 
public class AugmentPromptActivity : WorkflowActivity<PromptRequest, string>
{
    public override Task<string> RunAsync(
        WorkflowActivityContext context,
        PromptRequest input)
    {
        ArgumentNullException.ThrowIfNull(input);
        ArgumentException.ThrowIfNullOrWhiteSpace(input.Prompt);
 
        var sourcesText = string.Join("\n\n",
            input.SearchResults.Select((r, i) =>
                $"[Source {i + 1}]\nTitle: {r.Title}\nContent: {r.Content}\nOrigin: {r.Source}"));
 
        // Include conversation history if available
        var historyContext = string.Empty;
        if (input.ConversationHistory != null && input.ConversationHistory.Count > 0)
        {
            historyContext = "\n\nPrevious conversation:\n" + string.Join("\n",
                input.ConversationHistory.Select((t, i) =>
                    $"{t.Role}: {t.Content}"));
        }
 
        var augmentedPrompt = $"""
            Based on the following verified sources:
            
            {sourcesText}
            {historyContext}
            
            Please answer this question: {input.Prompt}
            
            Requirements:
            1. Use information only from provided sources
            2. Consider the conversation history for context
            3. Cite sources using [Source X] format
            4. If information is not available in sources, state so clearly
            5. Provide a comprehensive, well-structured response
            
            Answer:
            """;
 
        return Task.FromResult(augmentedPrompt);
    }
}
 
public class CallLlmActivity(
    DaprConversationClient conversationClient,
    ILogger<CallLlmActivity> logger) : WorkflowActivity<StatefulLlmRequest, StatefulLlmResponse>
{
    public override async Task<StatefulLlmResponse> RunAsync(
        WorkflowActivityContext context,
        StatefulLlmRequest request)
    {
        ArgumentNullException.ThrowIfNull(conversationClient);
        ArgumentNullException.ThrowIfNull(request);
        ArgumentException.ThrowIfNullOrWhiteSpace(request.Prompt);
 
        var componentName = request.Model ?? "llama";
        var conversationHistory = request.Context?.ContainsKey("conversation_history") == true
            ? request.Context["conversation_history"] as List<ConversationTurn>
            : null;
 
        // Build conversation messages from history + current prompt
        var messages = new List<IConversationMessage>();
        
        if (conversationHistory != null)
        {
            foreach (var turn in conversationHistory)
            {
                if (turn.Role == "user")
                {
                    messages.Add(new UserMessage { Content = [new MessageContent(turn.Content)] });
                }
                else if (turn.Role == "assistant")
                {
                    messages.Add(new AssistantMessage { Content = [new MessageContent(turn.Content)] });
                }
            }
        }
 
        // Add current user message
        messages.Add(new UserMessage { Content = [new MessageContent(request.Prompt)] });
 
        var conversationInput = new ConversationInput(messages);
        var conversationOptions = new ConversationOptions(componentName)
        {
            Temperature = request.Temperature ?? 0.7
        };
 
        logger.LogInformation(
            "[CallLlm] SessionId: {SessionId} | Model: {Model} | History Turns: {HistoryCount}",
            request.SessionId ?? "new", componentName, conversationHistory?.Count ?? 0);
 
        var response = await conversationClient.ConverseAsync(
            [conversationInput],
            conversationOptions,
            CancellationToken.None);
 
        if (response?.Outputs is null || response.Outputs.Count == 0)
        {
            throw new InvalidOperationException("LLM returned no outputs.");
        }
 
        var responseText = string.Empty;
        foreach (var output in response.Outputs)
        {
            if (output?.Choices is null) continue;
            
            foreach (var choice in output.Choices)
            {
                responseText += choice.Message?.Content ?? string.Empty;
            }
        }
 
        if (string.IsNullOrWhiteSpace(responseText))
        {
            throw new InvalidOperationException("LLM returned empty response.");
        }
 
        // SessionId should always be provided by the workflow at this point
        if (string.IsNullOrWhiteSpace(request.SessionId))
        {
            logger.LogWarning("[CallLlm] SessionId was null - this should not happen!");
        }
 
        return new StatefulLlmResponse(
            responseText,
            request.SessionId ?? "unknown-session",
            new Dictionary<string, object>
            {
                ["model"] = componentName,
                ["temperature"] = request.Temperature ?? 0.7,
                ["timestamp"] = DateTime.UtcNow,
                ["conversationId"] = response.ConversationId ?? string.Empty
            });
    }
}
 
public class AddCitationsActivity : WorkflowActivity<AddCitationsRequest, StatefulLlmResponse>
{
    public override Task<StatefulLlmResponse> RunAsync(
        WorkflowActivityContext context,
        AddCitationsRequest input)
    {
        ArgumentNullException.ThrowIfNull(input);
        ArgumentNullException.ThrowIfNull(input.Response);
        ArgumentNullException.ThrowIfNull(input.SearchResults);
 
        var sourcesList = string.Join("\n",
            input.SearchResults.Select((r, i) =>
                $"{i + 1}. {r.Title} ({r.Source})"));
 
        var citedResponse = $"{input.Response.Response}\n\n## References\n{sourcesList}";
 
        var metadata = new Dictionary<string, object>(input.Response.Metadata ?? [])
        {
            ["sources_count"] = input.SearchResults.Count,
            ["has_citations"] = true
        };
 
        return Task.FromResult(new StatefulLlmResponse(
            citedResponse,
            input.Response.SessionId,
            metadata));
    }
}

If you review the code again, you will see that it implements a Dapr Workflow that extends the Retrieval-Augmented Generation (RAG) pattern with state management. Unlike the basic AugmentedLlmWorkflow, this workflow maintains a persistent conversation history across multiple interactions. It has six main steps (Activities):

  • Load Conversation - Loads existing conversation history from Dapr State Store.
  • Search Data - Searches for relevant documents based on the current user prompt. Same as in AugmentedLlmWorkflow - simulates a search returning mock results.
  • Augment Prompt - Builds an augmented prompt that includes search results AND previous conversation history for context-aware responses.
  • Call LLM - Calls the LLM via Dapr Conversation API with the full conversation history.
  • Add Citations - Appends a "References" section to the response.
  • Save Conversation - Persists the updated conversation state to Dapr State Store using DaprClient.SaveStateAsync(). Includes both user and assistant messages.

Let's run the web application once again

Run the web application from the Aspire Dashboard. After it launches, select the service type as StatefulLlm (Conversational). You may notice a Previous Sessions option available for selection. After selecting a session, you can view the complete conversation history for that session. You can now start a conversation and observe the application’s behavior.

Where is the web application code ?

You might be wondering where the web application code is located, as it hasn’t been discussed or shared yet. It is a simple Blazor application intended to demonstrate the complete flow. If you’re interested in accessing the web application code, feel free to reach out to me.

Conclusion

Leveraging Anthropic’s research insights alongside Dapr Workflow and the Dapr Conversation API enables developers to build powerful, production-ready LLM-based systems that balance capability, maintainability, and simplicity.

The simplest solution is often the most effective. By starting with foundational patterns and adding complexity only when it delivers clear benefits, you can build systems that are both powerful and maintainable.

Happy Learning & coding... 📚