This blog post demonstrates how to use the Dapr Actors building block, in conjunction with .NET Aspire to develop a distributed application.
The actor pattern defines actors as the fundamental unit of computation. In this model, application logic is encapsulated within self-contained components known as actors. Each actor processes incoming messages sequentially—one at a time without requiring explicit concurrency control or thread management, thereby simplifying complex distributed system design.
For additional perspective on modern distributed application development practices, you may refer to my previous blog post on Microsoft Orleans, which explores common architectural challenges and explains how the actor programming model effectively addresses them.
In this article, we illustrates how Dapr Actors can be leveraged to build a highly scalable and resilient IoT monitoring system. The demo application simulates thousands of smart sensors continuously streaming telemetry data, including temperature, humidity, and AQI metrics. Each sensor is modeled as a dedicated stateful actor, ensuring single-threaded execution per device and eliminating shared-state contention.
These Actors persist device-specific state, process telemetry events deterministically, evaluate configurable alert thresholds, and issue control commands back to the devices. The result is a horizontally scalable distributed system that maintains consistency, simplifies concurrency management, and delivers real-time operational visibility through an interactive dashboard. A preview of the dashboard is shown below.

Prerequisites
To begin creating the DEMO application, please refer to my earlier blog posts on Dapr & .NET Aspire and ensure you have completed all steps up to the Dapr-ization of DEMO App section. Once completed, your solution structure should look similar to the one shown below.

Dapr Actors in Action
Start by creating the following new projects, either through the CLI (Terminal) or via the Solution Explorer.
- AspireWithDapr.Interfaces (Class Library) - defines the shared actor contracts, DTOs, enums, and state models used across services.
- AspireWithDapr.GenerateDevice (Console Application) - seeds the fleet by creating any missing devices through the public API. It coordinates bulk initialization, retry behavior, and the generation-complete marker that the simulator uses before it starts sending telemetry.
- AspireWithDapr.Simulator (Console Application) - generates telemetry for the registered fleet after initialization is complete.
AspireWithDapr.Interfaces
The first step is to add the Dapr.Actors package to the AspireWithDapr.Interfaces project and then create the structure as shown below:
dotnet add package Dapr.Actors
Enums
namespace AspireWithDapr.Interfaces.Enums;
public enum AlertSeverity
{
Warning,
Critical
}namespace AspireWithDapr.Interfaces.Enums;
public enum AlertType
{
HighTemperature,
LowTemperature,
PoorAirQuality
}namespace AspireWithDapr.Interfaces.Enums;
public enum DeviceStatus
{
Online,
Offline,
Degraded,
Maintenance
}namespace AspireWithDapr.Interfaces.Enums;
public enum DeviceType
{
MultiSensor
}Models
using AspireWithDapr.Interfaces.Enums;
namespace AspireWithDapr.Interfaces.Models;
public class Alert
{
public string Id { get; set; } = Guid.NewGuid().ToString();
public AlertType Type { get; set; }
public AlertSeverity Severity { get; set; }
public string Message { get; set; } = string.Empty;
public bool Acknowledged { get; set; }
public DateTimeOffset CreatedAt { get; set; }
}namespace AspireWithDapr.Interfaces.Models;
public class AlertSettings
{
public bool AlertsEnabled { get; set; } = true;
public double TargetTemperature { get; set; } = 22.0;
public double TemperatureThreshold { get; set; } = 3.0;
}namespace AspireWithDapr.Interfaces.Models;
public class CommandResponse
{
public bool Success { get; set; }
public string Message { get; set; } = string.Empty;
public static CommandResponse Ok(string message) => new() { Success = true, Message = message };
public static CommandResponse Fail(string message) => new() { Success = false, Message = message };
}namespace AspireWithDapr.Interfaces.Models;
public class DeviceGenerationCompleteRequest
{
public int DeviceCount { get; set; }
}namespace AspireWithDapr.Interfaces.Models;
public class DeviceGenerationStatus
{
public bool Completed { get; set; }
public int DeviceCount { get; set; }
public DateTimeOffset? CompletedAt { get; set; }
}using AspireWithDapr.Interfaces.Enums;
namespace AspireWithDapr.Interfaces.Models;
public class DeviceInfo
{
public string DeviceId { get; set; } = string.Empty;
public string Location { get; set; } = string.Empty;
public DeviceType DeviceType { get; set; }
public DeviceStatus Status { get; set; }
public DeviceSettings Settings { get; set; } = new();
public DateTimeOffset CreatedAt { get; set; }
}using AspireWithDapr.Interfaces.Enums;
namespace AspireWithDapr.Interfaces.Models;
public class DeviceListItem
{
public string DeviceId { get; set; } = string.Empty;
public string Location { get; set; } = string.Empty;
public DeviceType DeviceType { get; set; }
}namespace AspireWithDapr.Interfaces.Models;
public class DeviceListSnapshot
{
public long SnapshotVersion { get; set; }
public List<DeviceListItem> Devices { get; set; } = [];
}namespace AspireWithDapr.Interfaces.Models;
public class DeviceSettings
{
public AlertSettings AlertSettings { get; set; } = new();
}namespace AspireWithDapr.Interfaces.Models;
public class DeviceSnapshot
{
public DeviceInfo Info { get; set; } = new();
public DeviceTelemetry? LatestTelemetry { get; set; }
public DateTimeOffset LastTelemetryTime { get; set; }
public DeviceStatistics Statistics { get; set; } = new();
}using AspireWithDapr.Interfaces.Enums;
namespace AspireWithDapr.Interfaces.Models;
public class DeviceState
{
public string DeviceId { get; set; } = string.Empty;
public string Location { get; set; } = string.Empty;
public DeviceType DeviceType { get; set; }
public DeviceStatus Status { get; set; }
public DeviceTelemetry? LastTelemetry { get; set; }
public DateTimeOffset LastTelemetryTime { get; set; }
public DeviceStatistics Statistics { get; set; } = new();
public List<Alert> ActiveAlerts { get; set; } = [];
public DeviceSettings Settings { get; set; } = new();
}namespace AspireWithDapr.Interfaces.Models;
public class DeviceStatistics
{
public long TotalTelemetryReceived { get; set; }
public long TotalAlertsGenerated { get; set; }
public DateTimeOffset? FirstSeenTime { get; set; }
public DateTimeOffset? LastSeenTime { get; set; }
}using AspireWithDapr.Interfaces.Enums;
namespace AspireWithDapr.Interfaces.Models;
public class DeviceSummary
{
public string DeviceId { get; set; } = string.Empty;
public string Location { get; set; } = string.Empty;
public DeviceType DeviceType { get; set; }
public DeviceStatus Status { get; set; }
public DeviceTelemetry? LastTelemetry { get; set; }
public DateTimeOffset LastTelemetryTime { get; set; }
public int ActiveAlertCount { get; set; }
}namespace AspireWithDapr.Interfaces.Models;
public class DeviceSummarySnapshot
{
public long SnapshotVersion { get; set; }
public List<DeviceSummary> Devices { get; set; } = [];
}namespace AspireWithDapr.Interfaces.Models;
public class DeviceSummaryUpdate
{
public long SnapshotVersion { get; set; }
public DeviceSummary Summary { get; set; } = new();
}namespace AspireWithDapr.Interfaces.Models;
public class DeviceTelemetry
{
public double Temperature { get; set; }
public double Humidity { get; set; }
public int AirQualityIndex { get; set; }
public DateTimeOffset RecordedAt { get; set; }
}using AspireWithDapr.Interfaces.Enums;
namespace AspireWithDapr.Interfaces.Models;
public class InitializeDeviceRequest
{
public string Location { get; set; } = string.Empty;
public DeviceType DeviceType { get; set; }
}Interfaces
using AspireWithDapr.Interfaces.Models;
using Dapr.Actors;
namespace AspireWithDapr.Interfaces;
public interface IAlertActor : IActor
{
Task AddAlertsAsync(List<Alert> alerts);
Task<List<Alert>> GetActiveAlertsAsync();
Task<CommandResponse> AcknowledgeAlertAsync(string alertId);
Task<CommandResponse> ClearAllAlertsAsync();
}using AspireWithDapr.Interfaces.Enums;
using AspireWithDapr.Interfaces.Models;
using Dapr.Actors;
namespace AspireWithDapr.Interfaces;
public interface IDeviceActor : IActor
{
Task InitializeAsync(InitializeDeviceRequest request);
Task<DeviceInfo> GetDeviceInfoAsync();
Task<CommandResponse> SetStatusAsync(DeviceStatus status);
Task<CommandResponse> UpdateSettingsAsync(DeviceSettings settings);
Task<CommandResponse> RebootAsync();
Task ProcessTelemetryAsync(DeviceTelemetry telemetry);
Task ResetStatisticsAsync();
Task<DeviceSnapshot> GetSnapshotAsync();
}using Dapr.Actors;
namespace AspireWithDapr.Interfaces;
public interface IDeviceRegistryActor : IActor
{
Task AddDeviceAsync(string deviceId);
Task<List<string>> GetAllDeviceIdsAsync();
}AspireWithDapr.GenerateDevice & AspireWithDapr.Simulator
First, add the Microsoft.Extensions.Hosting and Microsoft.Extensions.Http NuGet packages to the AspireWithDapr.GenerateDevice and AspireWithDapr.Simulator projects. Then, add project references to AspireWithDapr.Interfaces and AspireWithDapr.ServiceDefaults.
dotnet add package Microsoft.Extensions.Hosting
dotnet add package Microsoft.Extensions.HttpOnce all the steps are completed, the solution structure should appear as shown below.

Next, let’s focus on the ApiService, as it contains the core business logic. Once we complete the ApiService, we will return to AspireWithDapr.GenerateDevice and AspireWithDapr.Simulator.
AspireWithDapr.ApiService
The ApiService acts as the backend entry point for device lifecycle operations, telemetry ingestion, fleet queries, alerts, and generation status. It hosts the Dapr actors, SignalR hub, and read-model services that keep the fleet dashboard synchronized with durable device state.
Begin with adding the Dapr.Actors.AspNetCore, Dapr.AspNetCore, Microsoft.AspNetCore.OpenApi and Scalar.AspNetCore NuGet packages to the AspireWithDapr.ApiService project. Then, add project references to AspireWithDapr.Interfaces and AspireWithDapr.ServiceDefaults.
dotnet add package Dapr.Actors.AspNetCore
dotnet add package Dapr.AspNetCore
dotnet add package Microsoft.AspNetCore.OpenApi
dotnet add package Scalar.AspNetCoreNext, create the Actors, DTOs, EndPoints, Hubs, ReadModel, and Services folders in the project to organize related files, as shown below.

// Program.cs
using System.Text.Json;
using System.Text.Json.Serialization;
using AspireWithDapr.ApiService.Actors;
using AspireWithDapr.ApiService.Endpoints;
using AspireWithDapr.ApiService.Hubs;
using AspireWithDapr.ApiService.ReadModel;
using AspireWithDapr.ApiService.Services;
using Dapr.Actors;
using Scalar.AspNetCore;
var builder = WebApplication.CreateBuilder(args);
builder.AddServiceDefaults();
builder.Services.AddProblemDetails();
builder.Services.AddOpenApi();
builder.Services.AddSingleton<DeviceRegistryService>();
builder.Services.AddSingleton<DeviceInitializationService>();
builder.Services.AddSingleton<DeviceFleetReadModelService>();
builder.Services.AddSingleton<DeviceSummaryStore>();
builder.Services.AddSingleton<DeviceSummaryVersionStore>();
builder.Services.AddSingleton<DeviceSummaryNotifier>();
builder.Services.AddSingleton<DeviceSummaryWriteThroughService>();
builder.Services.AddSingleton<DeviceGenerationStore>();
builder.Services.AddSignalR();
builder.Services.ConfigureHttpJsonOptions(options =>
{
options.SerializerOptions.Converters.Add(new JsonStringEnumConverter());
});
builder.Services.AddActors(options =>
{
var reentrantConfig = new ActorReentrancyConfig { Enabled = true, MaxStackDepth = 32 };
options.Actors.RegisterActor<DeviceActor>(typeOptions: new() { ReentrancyConfig = reentrantConfig });
options.Actors.RegisterActor<AlertActor>();
options.Actors.RegisterActor<DeviceRegistryActor>();
options.ActorIdleTimeout = TimeSpan.FromMinutes(10);
options.ActorScanInterval = TimeSpan.FromSeconds(30);
options.DrainOngoingCallTimeout = TimeSpan.FromSeconds(60);
options.DrainRebalancedActors = true;
options.RemindersStoragePartitions = 10;
options.JsonSerializerOptions = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
Converters = { new JsonStringEnumConverter(JsonNamingPolicy.CamelCase) },
WriteIndented = false
};
});
builder.Services.AddDaprClient();
var app = builder.Build();
app.UseExceptionHandler();
if (app.Environment.IsDevelopment())
{
app.MapOpenApi();
app.MapScalarApiReference();
}
app.MapDeviceListEndpoints();
app.MapDeviceGenerationEndpoints();
app.MapDeviceEndpoints();
app.MapAlertEndpoints();
app.MapActorsHandlers();
app.MapHub<DeviceUpdatesHub>("/hubs/deviceUpdates");
app.MapDefaultEndpoints();
app.Run();Actors
// AlertActor.cs
using AspireWithDapr.Interfaces;
using AspireWithDapr.Interfaces.Enums;
using AspireWithDapr.Interfaces.Models;
using AspireWithDapr.ApiService.ReadModel;
using Dapr.Actors.Runtime;
namespace AspireWithDapr.ApiService.Actors;
[Actor(TypeName = "AlertActor")]
public class AlertActor(
ActorHost host,
DeviceSummaryWriteThroughService summaryWriteThrough) : Actor(host), IAlertActor
{
private const string AlertsKey = "activeAlerts";
private const int MaxAlerts = 100;
protected override Task OnActivateAsync()
{
Logger.LogInformation("[AlertActor:{Id}] Activating", Id);
return Task.CompletedTask;
}
public async Task AddAlertsAsync(List<Alert> alerts)
{
if (alerts.Count == 0) return;
var existing = await LoadAlertsAsync();
var added = 0;
foreach (var alert in alerts)
{
if (HasActiveAlert(existing, alert.Type)) continue;
AddWithCap(existing, alert);
added++;
Logger.LogWarning("[AlertActor:{Id}] New alert: {Type} — {Message}", Id, alert.Type, alert.Message);
}
if (added > 0)
{
await StateManager.SetStateAsync(AlertsKey, existing);
await StateManager.SaveStateAsync();
await PublishAlertUpdatedAsync(existing);
}
}
public async Task<List<Alert>> GetActiveAlertsAsync()
{
var alerts = await LoadAlertsAsync();
return alerts.Where(a => !a.Acknowledged).ToList();
}
public async Task<CommandResponse> AcknowledgeAlertAsync(string alertId)
{
var alerts = await LoadAlertsAsync();
var alert = alerts.FirstOrDefault(a => a.Id == alertId);
if (alert is null)
return CommandResponse.Fail($"Alert '{alertId}' not found");
if (alert.Acknowledged)
return CommandResponse.Ok($"Alert '{alertId}' was already acknowledged");
alert.Acknowledged = true;
await StateManager.SetStateAsync(AlertsKey, alerts);
await StateManager.SaveStateAsync();
await PublishAlertUpdatedAsync(alerts);
Logger.LogInformation("[AlertActor:{Id}] Alert {AlertId} acknowledged", Id, alertId);
return CommandResponse.Ok($"Alert '{alertId}' acknowledged");
}
public async Task<CommandResponse> ClearAllAlertsAsync()
{
var cleared = new List<Alert>();
await StateManager.SetStateAsync(AlertsKey, cleared);
await StateManager.SaveStateAsync();
await PublishAlertUpdatedAsync(cleared);
Logger.LogInformation("[AlertActor:{Id}] All alerts cleared", Id);
return CommandResponse.Ok("All alerts cleared");
}
private async Task<List<Alert>> LoadAlertsAsync()
{
var result = await StateManager.TryGetStateAsync<List<Alert>>(AlertsKey);
return result.HasValue ? result.Value : [];
}
private static bool HasActiveAlert(List<Alert> alerts, AlertType type) =>
alerts.Any(a => a.Type == type && !a.Acknowledged);
private static void AddWithCap(List<Alert> alerts, Alert alert)
{
if (alerts.Count >= MaxAlerts) alerts.RemoveAt(0);
alerts.Add(alert);
}
private async Task PublishAlertUpdatedAsync(List<Alert> alerts)
{
await summaryWriteThrough.SyncAlertCountAsync(
Id.GetId(),
alerts.Count(a => !a.Acknowledged));
}
}// DeviceActor.cs
using Dapr.Actors.Runtime;
using AspireWithDapr.Interfaces;
using AspireWithDapr.Interfaces.Models;
using AspireWithDapr.Interfaces.Enums;
using AspireWithDapr.ApiService.ReadModel;
using AspireWithDapr.ApiService.Services;
namespace AspireWithDapr.ApiService.Actors;
[Actor(TypeName = "DeviceActor")]
public class DeviceActor(
ActorHost host,
DeviceRegistryService registry,
DeviceSummaryWriteThroughService summaryWriteThrough) : Actor(host), IDeviceActor, IRemindable
{
private const string DeviceInfoKey = "deviceInfo";
private const string TelemetryKey = "latestTelemetry";
private const string LastTelemetryTimeKey = "lastTelemetryTime";
private const string StatsKey = "statistics";
private const string RebootReminderName = "rebootComplete";
private const string LegacyStaleReminderName = "staleDataCheck";
private const int AqiWarningThreshold = 150;
private const int AqiCriticalThreshold = 200;
protected override async Task OnActivateAsync()
{
Logger.LogInformation("[DeviceActor:{Id}] Activating", Id);
await TryUnregisterLegacyReminderAsync();
var result = await StateManager.TryGetStateAsync<DeviceInfo>(DeviceInfoKey);
if (result.HasValue)
{
Logger.LogInformation("[DeviceActor:{Id}] Activated — Status: {Status}", Id, result.Value.Status);
}
else
{
Logger.LogInformation("[DeviceActor:{Id}] Activated — not yet initialized", Id);
}
}
public async Task InitializeAsync(InitializeDeviceRequest request)
{
ArgumentException.ThrowIfNullOrWhiteSpace(request.Location);
await TryUnregisterLegacyReminderAsync();
var existing = await StateManager.TryGetStateAsync<DeviceInfo>(DeviceInfoKey);
if (existing.HasValue)
{
throw new InvalidOperationException($"Device {Id} is already initialized.");
}
var info = new DeviceInfo
{
DeviceId = Id.GetId(),
Location = request.Location,
DeviceType = request.DeviceType,
Status = DeviceStatus.Online,
Settings = new DeviceSettings(),
CreatedAt = DateTimeOffset.UtcNow
};
await StateManager.SetStateAsync(DeviceInfoKey, info);
await StateManager.SetStateAsync(StatsKey, new DeviceStatistics { FirstSeenTime = DateTimeOffset.UtcNow });
await registry.RegisterAsync(info.DeviceId);
await summaryWriteThrough.SyncAsync(info, null, DateTimeOffset.MinValue, activeAlertCount: 0);
Logger.LogInformation("[DeviceActor:{Id}] Initialized — Location: {Location}, Type: {Type}",
Id, request.Location, request.DeviceType);
}
public async Task<DeviceInfo> GetDeviceInfoAsync()
{
var result = await StateManager.TryGetStateAsync<DeviceInfo>(DeviceInfoKey);
if (!result.HasValue)
throw new InvalidOperationException($"Device {Id} not initialized. Call /devices/{{id}}/initialize first.");
return result.Value;
}
public async Task<CommandResponse> SetStatusAsync(DeviceStatus status)
{
if (!Enum.IsDefined(status))
return CommandResponse.Fail($"Invalid status: {status}");
var info = await GetDeviceInfoAsync();
if (info.Status == status)
return CommandResponse.Ok($"Status already {status}");
var previous = info.Status;
info.Status = status;
await StateManager.SetStateAsync(DeviceInfoKey, info);
var lastTelemetry = await LoadLatestTelemetryAsync();
var lastTelemetryTime = await LoadLastTelemetryTimeAsync();
var activeAlertCount = await GetActiveAlertCountAsync();
await summaryWriteThrough.SyncAsync(info, lastTelemetry, lastTelemetryTime, activeAlertCount);
Logger.LogInformation("[DeviceActor:{Id}] Status changed {Previous} → {New}", Id, previous, status);
return CommandResponse.Ok($"Status changed from {previous} to {status}");
}
public async Task<CommandResponse> UpdateSettingsAsync(DeviceSettings settings)
{
var info = await GetDeviceInfoAsync();
info.Settings = settings;
await StateManager.SetStateAsync(DeviceInfoKey, info);
Logger.LogInformation("[DeviceActor:{Id}] Settings updated", Id);
return CommandResponse.Ok("Settings updated");
}
public async Task<CommandResponse> RebootAsync()
{
await SetStatusAsync(DeviceStatus.Maintenance);
await RegisterReminderAsync(
RebootReminderName, null,
dueTime: TimeSpan.FromSeconds(5),
period: TimeSpan.Zero);
Logger.LogInformation("[DeviceActor:{Id}] Reboot initiated — will complete in 5s via reminder", Id);
return CommandResponse.Ok("Reboot initiated. Device will return Online in ~5 seconds.");
}
public async Task ProcessTelemetryAsync(DeviceTelemetry telemetry)
{
var now = DateTimeOffset.UtcNow;
var info = await GetDeviceInfoAsync();
await StateManager.SetStateAsync(TelemetryKey, telemetry);
await StateManager.SetStateAsync(LastTelemetryTimeKey, now);
if (info.Status != DeviceStatus.Online)
{
info.Status = DeviceStatus.Online;
await StateManager.SetStateAsync(DeviceInfoKey, info);
}
var stats = await LoadStatsAsync();
stats.TotalTelemetryReceived++;
stats.LastSeenTime = now;
stats.FirstSeenTime ??= now;
await StateManager.SetStateAsync(StatsKey, stats);
await StateManager.SaveStateAsync();
var alerts = EvaluateThresholds(telemetry, info.Settings);
if (alerts.Count > 0)
{
stats.TotalAlertsGenerated += alerts.Count;
await StateManager.SetStateAsync(StatsKey, stats);
await StateManager.SaveStateAsync();
await GetAlertActor().AddAlertsAsync(alerts);
Logger.LogInformation("[DeviceActor:{Id}] Raised {Count} alert(s) → AlertActor", Id, alerts.Count);
}
var activeAlertCount = await GetActiveAlertCountAsync();
await summaryWriteThrough.SyncAsync(info, telemetry, now, activeAlertCount);
Logger.LogDebug(
"[DeviceActor:{Id}] Telemetry processed — Temp: {Temp}°C, Humidity: {Humidity}%, AQI: {Aqi}, Alerts: {ActiveAlertCount}",
Id, telemetry.Temperature, telemetry.Humidity, telemetry.AirQualityIndex, activeAlertCount);
}
private async Task<DeviceTelemetry?> LoadLatestTelemetryAsync()
{
var result = await StateManager.TryGetStateAsync<DeviceTelemetry>(TelemetryKey);
return result.HasValue ? result.Value : null;
}
private async Task<DateTimeOffset> LoadLastTelemetryTimeAsync()
{
var result = await StateManager.TryGetStateAsync<DateTimeOffset>(LastTelemetryTimeKey);
return result.HasValue ? result.Value : DateTimeOffset.MinValue;
}
public async Task<DeviceSnapshot> GetSnapshotAsync()
{
var info = await GetDeviceInfoAsync();
return new DeviceSnapshot
{
Info = info,
LatestTelemetry = await LoadLatestTelemetryAsync(),
LastTelemetryTime = await LoadLastTelemetryTimeAsync(),
Statistics = await LoadStatsAsync()
};
}
public async Task ResetStatisticsAsync()
{
await StateManager.SetStateAsync(StatsKey, new DeviceStatistics());
Logger.LogInformation("[DeviceActor:{Id}] Statistics reset", Id);
}
public async Task ReceiveReminderAsync(string reminderName, byte[] state,
TimeSpan dueTime, TimeSpan period)
{
switch (reminderName)
{
case LegacyStaleReminderName:
await TryUnregisterLegacyReminderAsync();
Logger.LogInformation("[DeviceActor:{Id}] Removed legacy reminder {ReminderName}", Id, reminderName);
break;
case RebootReminderName:
await UnregisterReminderAsync(RebootReminderName);
await SetStatusAsync(DeviceStatus.Online);
Logger.LogInformation("[DeviceActor:{Id}] Reboot completed via reminder — now Online", Id);
break;
}
}
internal static List<Alert> EvaluateThresholds(DeviceTelemetry telemetry, DeviceSettings settings)
{
if (!settings.AlertSettings.AlertsEnabled) return [];
var alerts = new List<Alert>();
var now = DateTimeOffset.UtcNow;
var tempDiff = Math.Abs(telemetry.Temperature - settings.AlertSettings.TargetTemperature);
if (tempDiff > settings.AlertSettings.TemperatureThreshold)
{
var alertType = telemetry.Temperature > settings.AlertSettings.TargetTemperature
? AlertType.HighTemperature
: AlertType.LowTemperature;
alerts.Add(new Alert
{
Type = alertType,
Severity = tempDiff > settings.AlertSettings.TemperatureThreshold * 2
? AlertSeverity.Critical : AlertSeverity.Warning,
Message = $"Temperature {telemetry.Temperature:F1}°C deviates from " +
$"target {settings.AlertSettings.TargetTemperature:F1}°C " +
$"(threshold: ±{settings.AlertSettings.TemperatureThreshold:F1}°C)",
CreatedAt = now
});
}
if (telemetry.AirQualityIndex > AqiWarningThreshold)
{
alerts.Add(new Alert
{
Type = AlertType.PoorAirQuality,
Severity = telemetry.AirQualityIndex > AqiCriticalThreshold
? AlertSeverity.Critical : AlertSeverity.Warning,
Message = $"Poor air quality: AQI {telemetry.AirQualityIndex} " +
$"(warning >{AqiWarningThreshold}, critical >{AqiCriticalThreshold})",
CreatedAt = now
});
}
return alerts;
}
private async Task<DeviceStatistics> LoadStatsAsync()
{
var result = await StateManager.TryGetStateAsync<DeviceStatistics>(StatsKey);
return result.HasValue ? result.Value : new DeviceStatistics();
}
private IAlertActor GetAlertActor() =>
this.ProxyFactory.CreateActorProxy<IAlertActor>(Id, "AlertActor");
private async Task<int> GetActiveAlertCountAsync()
{
var alerts = await GetAlertActor().GetActiveAlertsAsync();
return alerts.Count;
}
private async Task TryUnregisterLegacyReminderAsync()
{
try
{
await UnregisterReminderAsync(LegacyStaleReminderName);
}
catch (Exception ex)
{
Logger.LogDebug(ex, "[DeviceActor:{Id}] Legacy reminder {ReminderName} is not registered.", Id, LegacyStaleReminderName);
}
}
}// DeviceRegistryActor.cs
using AspireWithDapr.Interfaces;
using Dapr.Actors.Runtime;
namespace AspireWithDapr.ApiService.Actors;
[Actor(TypeName = "DeviceRegistryActor")]
public class DeviceRegistryActor(ActorHost host) : Actor(host), IDeviceRegistryActor
{
private const string RegistryKey = "deviceRegistry";
public async Task AddDeviceAsync(string deviceId)
{
if (string.IsNullOrWhiteSpace(deviceId)) return;
var list = await LoadAsync();
if (!list.Contains(deviceId, StringComparer.Ordinal))
{
list.Add(deviceId);
await StateManager.SetStateAsync(RegistryKey, list);
}
}
public async Task<List<string>> GetAllDeviceIdsAsync()
{
var list = await LoadAsync();
list.Sort(StringComparer.Ordinal);
return list;
}
private async Task<List<string>> LoadAsync()
{
var result = await StateManager.TryGetStateAsync<List<string>>(RegistryKey);
return result.HasValue ? result.Value : [];
}
}DTOs
// SendTelemetryRequest.cs
namespace AspireWithDapr.ApiService.DTOs;
public record SendTelemetryRequest(double Temperature, double Humidity, int AirQualityIndex);// UpdateSettingsRequest.cs
namespace AspireWithDapr.ApiService.DTOs;
public record UpdateSettingsRequest(
bool? AlertsEnabled,
double? TargetTemperature,
double? TemperatureThreshold);EndPoints
// AlertEndpoints.cs
using AspireWithDapr.Interfaces;
using AspireWithDapr.Interfaces.Models;
using Dapr.Actors;
using Dapr.Actors.Client;
namespace AspireWithDapr.ApiService.Endpoints;
public static class AlertEndpoints
{
public static IEndpointRouteBuilder MapAlertEndpoints(this IEndpointRouteBuilder app)
{
var group = app.MapGroup("/devices/{deviceId}/alerts")
.WithTags("🚨 AlertActor — Alert Lifecycle");
group.MapPost("/{alertId}/acknowledge", async (
string deviceId,
string alertId,
IActorProxyFactory factory) =>
{
var actor = GetAlertActor(factory, deviceId);
var response = await actor.AcknowledgeAlertAsync(alertId);
return response.Success ? Results.Ok(response) : Results.NotFound(response);
})
.WithName("AcknowledgeAlert")
.WithSummary("Acknowledge a specific alert by ID");
group.MapDelete("/", async (
string deviceId,
IActorProxyFactory factory) =>
{
var actor = GetAlertActor(factory, deviceId);
var response = await actor.ClearAllAlertsAsync();
return Results.Ok(response);
})
.WithName("ClearAlerts")
.WithSummary("Clear all alerts for this device");
return app;
}
private static IAlertActor GetAlertActor(IActorProxyFactory f, string id) =>
f.CreateActorProxy<IAlertActor>(new ActorId(id), "AlertActor");
}// DeviceEndpoints.cs
using AspireWithDapr.ApiService.DTOs;
using AspireWithDapr.ApiService.Services;
using AspireWithDapr.Interfaces;
using AspireWithDapr.Interfaces.Enums;
using AspireWithDapr.Interfaces.Models;
using Dapr.Actors;
using Dapr.Actors.Client;
using Microsoft.AspNetCore.Mvc;
namespace AspireWithDapr.ApiService.Endpoints;
public static class DeviceEndpoints
{
public static IEndpointRouteBuilder MapDeviceEndpoints(this IEndpointRouteBuilder app)
{
var group = app.MapGroup("/devices/{deviceId}");
MapLifecycleEndpoints(group);
MapTelemetryEndpoints(group);
MapStatsEndpoints(group);
return app;
}
private static void MapLifecycleEndpoints(RouteGroupBuilder group)
{
group.MapPost("/initialize", async (
string deviceId,
[FromBody] InitializeDeviceRequest request,
IActorProxyFactory factory,
DeviceInitializationService initializationService,
CancellationToken ct) =>
{
var actor = GetDeviceActor(factory, deviceId);
try
{
await initializationService.ExecuteAsync(
deviceId,
() => actor.InitializeAsync(request),
ct);
}
catch (InvalidOperationException ex)
{
return Results.Conflict(new { error = ex.Message });
}
return Results.Ok(new
{
message = $"Device '{deviceId}' initialized",
location = request.Location,
type = request.DeviceType.ToString(),
hint = "Next: POST /devices/{deviceId}/telemetry to send sensor data"
});
})
.WithName("InitializeDevice")
.WithTags("🔌 DeviceActor — Lifecycle & Settings")
.WithSummary("Initialize a device — creates DeviceActor state");
group.MapPost("/reboot", async (string deviceId, IActorProxyFactory factory) =>
{
var actor = GetDeviceActor(factory, deviceId);
var response = await actor.RebootAsync();
return response.Success
? Results.Ok(new
{
response.Message,
actorCommunication = new
{
step1 = "DeviceActor.SetStatusAsync(Maintenance) — immediate",
step2 = "Dapr registers a one-shot reminder in the state store",
step3 = "~5s later: reminder fires → DeviceActor.SetStatusAsync(Online)",
why = "Avoids blocking the actor turn with Task.Delay"
}
})
: Results.BadRequest(response);
})
.WithName("RebootDevice")
.WithTags("🔌 DeviceActor — Lifecycle & Settings")
.WithSummary("Reboot device — demonstrates one-shot reminder pattern");
group.MapPut("/settings", async (
string deviceId,
[FromBody] UpdateSettingsRequest request,
IActorProxyFactory factory) =>
{
var actor = GetDeviceActor(factory, deviceId);
var info = await actor.GetDeviceInfoAsync();
var current = info.Settings;
var updated = new DeviceSettings
{
AlertSettings = new AlertSettings
{
AlertsEnabled = request.AlertsEnabled ?? current.AlertSettings.AlertsEnabled,
TargetTemperature = request.TargetTemperature ?? current.AlertSettings.TargetTemperature,
TemperatureThreshold = request.TemperatureThreshold ?? current.AlertSettings.TemperatureThreshold,
}
};
var response = await actor.UpdateSettingsAsync(updated);
return response.Success ? Results.Ok(new { response.Message, settings = updated }) : Results.BadRequest(response);
})
.WithName("UpdateSettings")
.WithTags("🔌 DeviceActor — Lifecycle & Settings")
.WithSummary("Update device settings — DeviceActor uses these for threshold evaluation");
group.MapPut("/status/{status}", async (
string deviceId, string status, IActorProxyFactory factory) =>
{
if (!Enum.TryParse<DeviceStatus>(status, ignoreCase: true, out var parsed))
return Results.BadRequest(new { error = $"Invalid status '{status}'", valid = Enum.GetNames<DeviceStatus>() });
var actor = GetDeviceActor(factory, deviceId);
var response = await actor.SetStatusAsync(parsed);
return response.Success ? Results.Ok(response) : Results.BadRequest(response);
})
.WithName("SetDeviceStatus")
.WithTags("🔌 DeviceActor — Lifecycle & Settings")
.WithSummary("Manually override device status");
}
private static void MapTelemetryEndpoints(RouteGroupBuilder group)
{
group.MapPost("/telemetry", async (
string deviceId,
[FromBody] SendTelemetryRequest request,
IActorProxyFactory factory) =>
{
var actor = GetDeviceActor(factory, deviceId);
var telemetry = new DeviceTelemetry
{
Temperature = request.Temperature,
Humidity = request.Humidity,
AirQualityIndex = request.AirQualityIndex,
RecordedAt = DateTimeOffset.UtcNow
};
await actor.ProcessTelemetryAsync(telemetry);
return Results.Ok(new
{
message = "Telemetry accepted",
telemetry,
actorCommunicationChain = new[]
{
"1. DeviceActor stores reading, updates status & stats",
"2. DeviceActor evaluates thresholds locally",
"3. DeviceActor ──► AlertActor.AddAlertsAsync() (only if thresholds breached)"
}
});
})
.WithName("SendTelemetry")
.WithTags("📡 DeviceActor — Telemetry")
.WithSummary("Send sensor data — processed entirely by DeviceActor");
}
private static void MapStatsEndpoints(RouteGroupBuilder group)
{
group.MapDelete("/stats", async (string deviceId, IActorProxyFactory factory) =>
{
var actor = GetDeviceActor(factory, deviceId);
await actor.ResetStatisticsAsync();
return Results.Ok(new { message = "Statistics reset" });
})
.WithName("ResetStatistics")
.WithTags("📊 DeviceActor — Statistics")
.WithSummary("Reset all counters to zero");
}
private static IDeviceActor GetDeviceActor(IActorProxyFactory f, string id) =>
f.CreateActorProxy<IDeviceActor>(new ActorId(id), "DeviceActor");
}// DeviceGenerationEndpoints.cs
using AspireWithDapr.ApiService.Services;
using AspireWithDapr.Interfaces.Models;
namespace AspireWithDapr.ApiService.Endpoints;
public static class DeviceGenerationEndpoints
{
public static IEndpointRouteBuilder MapDeviceGenerationEndpoints(this IEndpointRouteBuilder app)
{
app.MapPost("/devices/generation/complete", async (
DeviceGenerationCompleteRequest request,
DeviceGenerationStore store,
CancellationToken ct) =>
{
var status = new DeviceGenerationStatus
{
Completed = true,
DeviceCount = request.DeviceCount,
CompletedAt = DateTimeOffset.UtcNow
};
await store.SaveAsync(status, ct);
return Results.Ok(status);
})
.WithName("MarkDeviceGenerationComplete")
.WithTags("Fleet — Read Model")
.WithSummary("Mark device generation complete");
app.MapGet("/devices/generation/status", async (
DeviceGenerationStore store,
CancellationToken ct) =>
{
var status = await store.GetAsync(ct);
return Results.Ok(status ?? new DeviceGenerationStatus { Completed = false });
})
.WithName("GetDeviceGenerationStatus")
.WithTags("Fleet — Read Model")
.WithSummary("Get device generation completion status");
return app;
}
}// DeviceListEndpoints.cs
using AspireWithDapr.ApiService.ReadModel;
using AspireWithDapr.ApiService.Services;
using AspireWithDapr.Interfaces;
using AspireWithDapr.Interfaces.Enums;
using AspireWithDapr.Interfaces.Models;
using Dapr.Actors;
using Dapr.Actors.Client;
namespace AspireWithDapr.ApiService.Endpoints;
public static class DeviceListEndpoints
{
public static IEndpointRouteBuilder MapDeviceListEndpoints(this IEndpointRouteBuilder app)
{
app.MapGet("/devices/list", async (
DeviceFleetReadModelService fleetReadModel,
DeviceSummaryVersionStore versionStore,
CancellationToken ct) =>
{
var summaries = await fleetReadModel.GetFleetSummariesAsync(ct);
var version = await versionStore.GetCurrentAsync(ct);
var items = summaries.Select(summary => new DeviceListItem
{
DeviceId = summary.DeviceId,
Location = summary.Location,
DeviceType = summary.DeviceType
}).ToList();
return Results.Ok(new DeviceListSnapshot
{
SnapshotVersion = version,
Devices = items
});
})
.WithName("GetDeviceList")
.WithTags("Fleet — Read Model")
.WithSummary("Get lightweight device list (id, location, type)");
app.MapGet("/devices", async (
DeviceFleetReadModelService fleetReadModel,
DeviceSummaryVersionStore versionStore,
CancellationToken ct) =>
{
var summaries = await fleetReadModel.GetFleetSummariesAsync(ct);
var version = await versionStore.GetCurrentAsync(ct);
var devicesWithDerivedStatus = summaries
.Select(s => WithDerivedStatus(s))
.ToList();
return Results.Ok(new DeviceSummarySnapshot
{
SnapshotVersion = version,
Devices = devicesWithDerivedStatus
});
})
.WithName("GetAllDeviceSummaries")
.WithTags("Fleet — Read Model")
.WithSummary("Get summary state for every registered device");
app.MapGet("/devices/{deviceId}/state", async (
string deviceId,
IActorProxyFactory factory) =>
{
try
{
var state = await BuildDeviceStateAsync(factory, deviceId);
return Results.Ok(state);
}
catch (InvalidOperationException ex)
{
return Results.NotFound(new { error = ex.Message });
}
})
.WithName("GetDeviceState")
.WithTags("Fleet — Composite Views")
.WithSummary("Get composite state for a single device (info + telemetry + alerts + stats)");
return app;
}
private static DeviceSummary WithDerivedStatus(DeviceSummary summary)
{
var derived = DeriveStatus(summary.LastTelemetryTime, summary.Status);
if (derived == summary.Status) return summary;
return new DeviceSummary
{
DeviceId = summary.DeviceId,
Location = summary.Location,
DeviceType = summary.DeviceType,
Status = derived,
LastTelemetry = summary.LastTelemetry,
LastTelemetryTime = summary.LastTelemetryTime,
ActiveAlertCount = summary.ActiveAlertCount
};
}
private static DeviceStatus DeriveStatus(DateTimeOffset lastTelemetryTime, DeviceStatus storedStatus)
=> DeviceStatusResolver.Resolve(lastTelemetryTime, storedStatus);
private static async Task<DeviceState> BuildDeviceStateAsync(
IActorProxyFactory factory, string deviceId)
{
var device = factory.CreateActorProxy<IDeviceActor>(new ActorId(deviceId), "DeviceActor");
var alert = factory.CreateActorProxy<IAlertActor>(new ActorId(deviceId), "AlertActor");
var snapshotTask = device.GetSnapshotAsync();
var alertsTask = alert.GetActiveAlertsAsync();
await Task.WhenAll(snapshotTask, alertsTask);
var snapshot = await snapshotTask;
var status = DeriveStatus(snapshot.LastTelemetryTime, snapshot.Info.Status);
return new DeviceState
{
DeviceId = snapshot.Info.DeviceId,
Location = snapshot.Info.Location,
DeviceType = snapshot.Info.DeviceType,
Status = status,
LastTelemetry = snapshot.LatestTelemetry,
LastTelemetryTime = snapshot.LastTelemetryTime,
Statistics = snapshot.Statistics,
ActiveAlerts = await alertsTask,
Settings = snapshot.Info.Settings
};
}
}Hubs
// DeviceUpdatesHub.cs
using Microsoft.AspNetCore.SignalR;
namespace AspireWithDapr.ApiService.Hubs;
public class DeviceUpdatesHub : Hub
{
}ReadModel
// DeviceStatusResolver.cs
using AspireWithDapr.Interfaces.Enums;
namespace AspireWithDapr.ApiService.ReadModel;
internal static class DeviceStatusResolver
{
private static readonly TimeSpan StaleDataThreshold = TimeSpan.FromMinutes(5);
public static DeviceStatus Resolve(DateTimeOffset lastTelemetryTime, DeviceStatus storedStatus)
{
if (lastTelemetryTime == DateTimeOffset.MinValue ||
DateTimeOffset.UtcNow - lastTelemetryTime > StaleDataThreshold)
{
return DeviceStatus.Offline;
}
return storedStatus == DeviceStatus.Offline
? DeviceStatus.Online
: storedStatus;
}
}// DeviceSummaryFactory.cs
using AspireWithDapr.Interfaces.Models;
namespace AspireWithDapr.ApiService.ReadModel;
internal static class DeviceSummaryFactory
{
public static DeviceSummary FromDeviceState(DeviceState state)
{
ArgumentNullException.ThrowIfNull(state);
return new DeviceSummary
{
DeviceId = state.DeviceId,
Location = state.Location,
DeviceType = state.DeviceType,
Status = DeviceStatusResolver.Resolve(state.LastTelemetryTime, state.Status),
LastTelemetry = state.LastTelemetry,
LastTelemetryTime = state.LastTelemetryTime,
ActiveAlertCount = state.ActiveAlerts.Count(alert => !alert.Acknowledged)
};
}
public static DeviceSummary FromDeviceInfo(
DeviceInfo info,
DeviceTelemetry? telemetry,
DateTimeOffset lastTelemetryTime,
int activeAlertCount)
{
ArgumentNullException.ThrowIfNull(info);
return new DeviceSummary
{
DeviceId = info.DeviceId,
Location = info.Location,
DeviceType = info.DeviceType,
Status = DeviceStatusResolver.Resolve(lastTelemetryTime, info.Status),
LastTelemetry = telemetry,
LastTelemetryTime = lastTelemetryTime,
ActiveAlertCount = activeAlertCount
};
}
}// DeviceSummaryNotifier.cs
using AspireWithDapr.ApiService.Hubs;
using AspireWithDapr.Interfaces.Models;
using Microsoft.AspNetCore.SignalR;
namespace AspireWithDapr.ApiService.ReadModel;
public class DeviceSummaryNotifier(IHubContext<DeviceUpdatesHub> hubContext)
{
public Task NotifyUpdatedAsync(DeviceSummary summary, long snapshotVersion, CancellationToken ct = default)
{
var update = new DeviceSummaryUpdate
{
SnapshotVersion = snapshotVersion,
Summary = summary
};
return hubContext.Clients.All.SendAsync("DeviceSummaryUpdated", update, ct);
}
}// DeviceSummaryStore.cs
using AspireWithDapr.Interfaces.Enums;
using AspireWithDapr.Interfaces.Models;
using Dapr;
using Dapr.Client;
namespace AspireWithDapr.ApiService.ReadModel;
public class DeviceSummaryStore(DaprClient daprClient, ILogger<DeviceSummaryStore> logger)
{
public const string StoreName = "readstore";
private const string SummaryPrefix = "device-summary:";
private const int BulkParallelism = 100;
private const int MaxRetries = 8;
public async Task<IReadOnlyList<DeviceSummary>> GetAllAsync(
IReadOnlyList<string> deviceIds, CancellationToken ct = default)
{
if (deviceIds.Count == 0) return Array.Empty<DeviceSummary>();
var keys = deviceIds.Select(SummaryKey).ToArray();
var results = await daprClient.GetBulkStateAsync<DeviceSummary>(
StoreName,
keys,
parallelism: BulkParallelism,
cancellationToken: ct);
return results
.Where(item => item.Value is not null)
.Select(item => item.Value!)
.OrderBy(summary => summary.DeviceId, StringComparer.Ordinal)
.ToList();
}
public async Task<(DeviceSummary Summary, bool Created, bool Changed)> UpsertCanonicalAsync(
DeviceSummary summary,
CancellationToken ct = default)
{
ArgumentNullException.ThrowIfNull(summary);
ArgumentException.ThrowIfNullOrWhiteSpace(summary.DeviceId);
for (var attempt = 0; attempt < MaxRetries; attempt++)
{
var state = await daprClient.GetStateAndETagAsync<DeviceSummary>(
StoreName,
SummaryKey(summary.DeviceId),
consistencyMode: ConsistencyMode.Strong,
cancellationToken: ct);
var created = state.value is null;
if (!created && SummaryEquals(state.value!, summary))
{
return (state.value!, false, false);
}
try
{
if (created || string.IsNullOrEmpty(state.etag))
{
await daprClient.SaveStateAsync(
StoreName,
SummaryKey(summary.DeviceId),
summary,
ReadModelStateRetry.StrongWriteOptions,
metadata: null,
cancellationToken: ct);
return (summary, created, true);
}
var saved = await daprClient.TrySaveStateAsync(
StoreName,
SummaryKey(summary.DeviceId),
summary,
state.etag,
ReadModelStateRetry.StrongFirstWriteOptions,
metadata: null,
cancellationToken: ct);
if (saved)
{
return (summary, false, true);
}
logger.LogDebug(
"Canonical summary upsert conflict for device {DeviceId} on attempt {Attempt} of {MaxRetries}. Retrying.",
summary.DeviceId,
attempt + 1,
MaxRetries);
}
catch (DaprApiException ex) when (attempt < MaxRetries - 1)
{
logger.LogDebug(
ex,
"Canonical summary upsert conflict for device {DeviceId} on attempt {Attempt} of {MaxRetries}. Retrying.",
summary.DeviceId,
attempt + 1,
MaxRetries);
}
if (attempt < MaxRetries - 1)
{
await ReadModelStateRetry.DelayAsync(attempt, ct);
}
}
throw new InvalidOperationException($"Failed to upsert canonical summary for device '{summary.DeviceId}'.");
}
public async Task<(DeviceSummary? Summary, bool Changed)> UpdateAlertCountAsync(
string deviceId,
int activeAlertCount,
CancellationToken ct = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(deviceId);
for (var attempt = 0; attempt < MaxRetries; attempt++)
{
var state = await daprClient.GetStateAndETagAsync<DeviceSummary>(
StoreName,
SummaryKey(deviceId),
consistencyMode: ConsistencyMode.Strong,
cancellationToken: ct);
if (state.value is null)
{
return (null, false);
}
if (state.value.ActiveAlertCount == activeAlertCount)
{
return (state.value, false);
}
var summary = state.value;
summary.ActiveAlertCount = activeAlertCount;
summary.Status = DeviceStatusResolver.Resolve(summary.LastTelemetryTime, summary.Status);
try
{
if (string.IsNullOrEmpty(state.etag))
{
await daprClient.SaveStateAsync(
StoreName,
SummaryKey(deviceId),
summary,
ReadModelStateRetry.StrongWriteOptions,
metadata: null,
cancellationToken: ct);
return (summary, true);
}
var saved = await daprClient.TrySaveStateAsync(
StoreName,
SummaryKey(deviceId),
summary,
state.etag,
ReadModelStateRetry.StrongFirstWriteOptions,
metadata: null,
cancellationToken: ct);
if (saved)
{
return (summary, true);
}
logger.LogDebug(
"Alert count update conflict for device {DeviceId} on attempt {Attempt} of {MaxRetries}. Retrying.",
deviceId,
attempt + 1,
MaxRetries);
}
catch (DaprApiException ex) when (attempt < MaxRetries - 1)
{
logger.LogDebug(
ex,
"Alert count update conflict for device {DeviceId} on attempt {Attempt} of {MaxRetries}. Retrying.",
deviceId,
attempt + 1,
MaxRetries);
}
if (attempt < MaxRetries - 1)
{
await ReadModelStateRetry.DelayAsync(attempt, ct);
}
}
throw new InvalidOperationException($"Failed to update alert count for device '{deviceId}'.");
}
private static bool SummaryEquals(DeviceSummary left, DeviceSummary right)
{
return left.DeviceId == right.DeviceId &&
left.Location == right.Location &&
left.DeviceType == right.DeviceType &&
left.Status == right.Status &&
left.LastTelemetryTime == right.LastTelemetryTime &&
left.ActiveAlertCount == right.ActiveAlertCount &&
TelemetryEquals(left.LastTelemetry, right.LastTelemetry);
}
private static bool TelemetryEquals(DeviceTelemetry? left, DeviceTelemetry? right)
{
if (left is null || right is null)
{
return left is null && right is null;
}
return left.Temperature == right.Temperature &&
left.Humidity == right.Humidity &&
left.AirQualityIndex == right.AirQualityIndex &&
left.RecordedAt == right.RecordedAt;
}
private static string SummaryKey(string deviceId) => $"{SummaryPrefix}{deviceId}";
}// DeviceSummaryWriteThroughService.cs
using AspireWithDapr.ApiService.Services;
using AspireWithDapr.Interfaces.Models;
namespace AspireWithDapr.ApiService.ReadModel;
public class DeviceSummaryWriteThroughService(
DeviceSummaryStore summaryStore,
DeviceSummaryVersionStore versionStore,
DeviceSummaryNotifier notifier,
ILogger<DeviceSummaryWriteThroughService> logger)
{
public async Task SyncAsync(
DeviceInfo info,
DeviceTelemetry? telemetry,
DateTimeOffset lastTelemetryTime,
int activeAlertCount,
CancellationToken ct = default)
{
ArgumentNullException.ThrowIfNull(info);
try
{
var summary = DeviceSummaryFactory.FromDeviceInfo(
info,
telemetry,
lastTelemetryTime,
activeAlertCount);
var (savedSummary, _, changed) = await summaryStore.UpsertCanonicalAsync(summary, ct);
if (!changed)
{
return;
}
var version = await versionStore.NextAsync(ct);
await notifier.NotifyUpdatedAsync(savedSummary, version, ct);
}
catch (Exception ex)
{
logger.LogWarning(ex, "Failed to write through summary for {DeviceId}", info.DeviceId);
}
}
public async Task SyncAlertCountAsync(
string deviceId,
int activeAlertCount,
CancellationToken ct = default)
{
ArgumentException.ThrowIfNullOrWhiteSpace(deviceId);
try
{
var (summary, changed) = await summaryStore.UpdateAlertCountAsync(deviceId, activeAlertCount, ct);
if (summary is null || !changed)
{
return;
}
var version = await versionStore.NextAsync(ct);
await notifier.NotifyUpdatedAsync(summary, version, ct);
}
catch (Exception ex)
{
logger.LogWarning(ex, "Failed to write through alert count for {DeviceId}", deviceId);
}
}
}// ReadModelStateRetry.cs
using Dapr;
using Dapr.Client;
namespace AspireWithDapr.ApiService.ReadModel;
internal static class ReadModelStateRetry
{
private const int InitialDelayMs = 10;
private const int MaxDelayMs = 250;
public static readonly StateOptions StrongWriteOptions = new()
{
Consistency = ConsistencyMode.Strong
};
public static readonly StateOptions StrongFirstWriteOptions = new()
{
Concurrency = ConcurrencyMode.FirstWrite,
Consistency = ConsistencyMode.Strong
};
public static Task DelayAsync(int attempt, CancellationToken ct)
{
var exponentialDelayMs = Math.Min(InitialDelayMs * (1 << attempt), MaxDelayMs);
var jitterMs = Random.Shared.Next(5, 30);
return Task.Delay(TimeSpan.FromMilliseconds(exponentialDelayMs + jitterMs), ct);
}
}Services
// DeviceFleetReadModelService.cs
using System.Collections.Concurrent;
using AspireWithDapr.ApiService.ReadModel;
using AspireWithDapr.Interfaces;
using AspireWithDapr.Interfaces.Models;
using Dapr.Actors;
using Dapr.Actors.Client;
namespace AspireWithDapr.ApiService.Services;
public class DeviceFleetReadModelService(
DeviceGenerationStore generationStore,
DeviceRegistryService registry,
DeviceSummaryStore summaryStore,
IActorProxyFactory actorProxyFactory,
IConfiguration configuration,
ILogger<DeviceFleetReadModelService> logger)
{
private readonly int repairParallelism = Math.Max(1,
configuration.GetValue("DeviceReadModelRepair:MaxParallelism", 8));
public async Task<IReadOnlyList<string>> GetKnownDeviceIdsAsync(CancellationToken ct = default)
{
var generationStatus = await generationStore.GetAsync(ct);
return MergeKnownDeviceIds(
await registry.GetAllDeviceIdsAsync(ct),
generationStatus);
}
public async Task<IReadOnlyList<DeviceSummary>> GetFleetSummariesAsync(CancellationToken ct = default)
{
var deviceIds = await GetKnownDeviceIdsAsync(ct);
if (deviceIds.Count == 0)
{
return Array.Empty<DeviceSummary>();
}
var summaries = await summaryStore.GetAllAsync(deviceIds, ct);
if (summaries.Count == deviceIds.Count)
{
return summaries;
}
var knownSummaryIds = summaries
.Select(summary => summary.DeviceId)
.ToHashSet(StringComparer.Ordinal);
var missingIds = deviceIds
.Where(deviceId => !knownSummaryIds.Contains(deviceId))
.ToList();
if (missingIds.Count == 0)
{
return summaries;
}
logger.LogInformation(
"Repairing {Count} missing device summaries out of {TotalDeviceCount} known device ids.",
missingIds.Count,
deviceIds.Count);
var repairedSummaries = await RepairMissingSummariesAsync(missingIds, ct);
return MergeFleetSummaries(summaries, repairedSummaries);
}
private async Task<IReadOnlyList<DeviceSummary>> RepairMissingSummariesAsync(
IReadOnlyList<string> missingIds,
CancellationToken ct)
{
var repaired = new ConcurrentBag<DeviceSummary>();
var options = new ParallelOptions
{
MaxDegreeOfParallelism = repairParallelism,
CancellationToken = ct
};
await Parallel.ForEachAsync(missingIds, options, async (deviceId, token) =>
{
var actor = actorProxyFactory.CreateActorProxy<IDeviceActor>(new ActorId(deviceId), "DeviceActor");
try
{
var snapshot = await actor.GetSnapshotAsync();
var summary = DeviceSummaryFactory.FromDeviceInfo(
snapshot.Info,
snapshot.LatestTelemetry,
snapshot.LastTelemetryTime,
activeAlertCount: 0);
await registry.RegisterAsync(deviceId, token);
await summaryStore.UpsertCanonicalAsync(summary, token);
repaired.Add(summary);
}
catch (InvalidOperationException)
{
// Known generated ids can still be absent if initialization never completed.
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
logger.LogWarning(ex, "Failed to repair device summary for {DeviceId}", deviceId);
}
});
return repaired
.OrderBy(summary => summary.DeviceId, StringComparer.Ordinal)
.ToList();
}
internal static IReadOnlyList<string> MergeKnownDeviceIds(
IEnumerable<string> registryIds,
DeviceGenerationStatus? generationStatus)
{
var ids = registryIds
.ToHashSet(StringComparer.Ordinal);
if (generationStatus is { Completed: true, DeviceCount: > 0 })
{
foreach (var generatedId in Enumerable.Range(1, generationStatus.DeviceCount).Select(FormatGeneratedDeviceId))
{
ids.Add(generatedId);
}
}
return ids
.OrderBy(deviceId => deviceId, StringComparer.Ordinal)
.ToList();
}
internal static IReadOnlyList<DeviceSummary> MergeFleetSummaries(
IEnumerable<DeviceSummary> summaries,
IEnumerable<DeviceSummary> repairedSummaries)
{
return summaries
.Concat(repairedSummaries)
.GroupBy(summary => summary.DeviceId, StringComparer.Ordinal)
.Select(group => group.OrderByDescending(summary => summary.LastTelemetryTime).First())
.OrderBy(summary => summary.DeviceId, StringComparer.Ordinal)
.ToList();
}
internal static string FormatGeneratedDeviceId(int index) => $"DEVICE-{index:D4}";
}// DeviceGenerationStore.cs
using AspireWithDapr.ApiService.ReadModel;
using AspireWithDapr.Interfaces.Models;
using Dapr.Client;
namespace AspireWithDapr.ApiService.Services;
public class DeviceGenerationStore(DaprClient daprClient)
{
private const string StoreName = DeviceSummaryStore.StoreName;
private const string StatusKey = "device-generation-status";
public Task SaveAsync(DeviceGenerationStatus status, CancellationToken ct = default) =>
daprClient.SaveStateAsync(StoreName, StatusKey, status, cancellationToken: ct);
public Task<DeviceGenerationStatus?> GetAsync(CancellationToken ct = default) =>
daprClient.GetStateAsync<DeviceGenerationStatus?>(StoreName, StatusKey, cancellationToken: ct);
}// DeviceInitializationService.cs
using System.Net.Sockets;
using System.Runtime.ExceptionServices;
using Dapr;
namespace AspireWithDapr.ApiService.Services;
public class DeviceInitializationService(IConfiguration configuration, ILogger<DeviceInitializationService> logger)
{
private readonly SemaphoreSlim gate = new(Math.Max(1,
configuration.GetValue("DeviceInitialization:MaxConcurrentActorCalls", 4)));
private readonly int maxAttempts = Math.Max(1,
configuration.GetValue("DeviceInitialization:MaxAttempts", 4));
private readonly TimeSpan retryDelay = TimeSpan.FromMilliseconds(Math.Max(100,
configuration.GetValue("DeviceInitialization:RetryDelayMilliseconds", 250)));
public async Task ExecuteAsync(string deviceId, Func<Task> action, CancellationToken ct)
{
await gate.WaitAsync(ct);
try
{
Exception? lastException = null;
for (var attempt = 1; attempt <= maxAttempts; attempt++)
{
try
{
await action();
return;
}
catch (Exception ex) when (ex is not OperationCanceledException && IsTransient(ex) && attempt < maxAttempts)
{
lastException = ex;
var delay = TimeSpan.FromMilliseconds(Math.Min(
5_000,
retryDelay.TotalMilliseconds * Math.Pow(2, attempt - 1)));
logger.LogWarning(
ex,
"Transient actor initialization failure for {DeviceId}. Retrying in {DelayMs}ms (attempt {Attempt} of {MaxAttempts}).",
deviceId,
delay.TotalMilliseconds,
attempt + 1,
maxAttempts);
await Task.Delay(delay, ct);
}
}
if (lastException is not null)
{
ExceptionDispatchInfo.Capture(lastException).Throw();
}
}
finally
{
gate.Release();
}
}
private static bool IsTransient(Exception ex)
{
if (ex is DaprApiException daprEx)
{
if (daprEx.InnerException is HttpRequestException or SocketException)
{
return true;
}
var message = daprEx.Message;
return message.Contains("resource temporarily unavailable", StringComparison.OrdinalIgnoreCase)
|| message.Contains("dial tcp", StringComparison.OrdinalIgnoreCase)
|| message.Contains("connect:", StringComparison.OrdinalIgnoreCase)
|| message.Contains("connection refused", StringComparison.OrdinalIgnoreCase);
}
return ex is HttpRequestException or SocketException;
}
}// DeviceRegistryService.cs
using AspireWithDapr.Interfaces;
using AspireWithDapr.ServiceDefaults;
using Dapr.Actors;
using Dapr.Actors.Client;
namespace AspireWithDapr.ApiService.Services;
public class DeviceRegistryService(IActorProxyFactory actorProxyFactory, ILogger<DeviceRegistryService> logger)
{
private const string RegistryActorType = "DeviceRegistryActor";
private const int RegistryShardCount = 32;
private static readonly ActorId LegacyRegistryActorId = new("registry");
private static readonly ActorId[] RegistryShardActorIds = Enumerable.Range(0, RegistryShardCount)
.Select(index => new ActorId($"registry-{index:D2}"))
.ToArray();
public async Task RegisterAsync(string deviceId, CancellationToken ct = default)
{
try
{
var actor = CreateRegistryActor(GetShardActorId(deviceId));
await actor.AddDeviceAsync(deviceId);
}
catch (Exception ex)
{
logger.LogWarning(ex, "Failed to register device {DeviceId} in registry store", deviceId);
}
}
public async Task<IReadOnlyList<string>> GetAllDeviceIdsAsync(CancellationToken ct = default)
{
var registryTasks = RegistryShardActorIds
.Select(LoadFromRegistryActorAsync)
.Append(LoadFromRegistryActorAsync(LegacyRegistryActorId))
.ToArray();
var registryResults = await Task.WhenAll(registryTasks);
return registryResults
.SelectMany(ids => ids)
.Distinct(StringComparer.Ordinal)
.OrderBy(deviceId => deviceId, StringComparer.Ordinal)
.ToList();
}
private async Task<List<string>> LoadFromRegistryActorAsync(ActorId actorId)
{
try
{
return await CreateRegistryActor(actorId).GetAllDeviceIdsAsync();
}
catch (Exception ex)
{
logger.LogWarning(ex, "Failed to load device ids from registry actor {ActorId}", actorId.ToString());
return [];
}
}
private IDeviceRegistryActor CreateRegistryActor(ActorId actorId) =>
actorProxyFactory.CreateActorProxy<IDeviceRegistryActor>(actorId, RegistryActorType);
private static ActorId GetShardActorId(string deviceId)
{
var shardIndex = RuntimeHelpers.StableHash(deviceId) % RegistryShardCount;
return RegistryShardActorIds[shardIndex];
}
}The diagram below illustrates the internal architecture of the demo application and the relationships between its components.

The diagram below illustrates how telemetry data flows through the system and is ultimately reflected on the Dashboard.

AspireWithDapr.GenerateDevice
The GenerateDevice application uses the appsettings.json file to configure and control its behavior. The configuration content is shown below:
{
"DeviceGenerator": {
"DeviceCount": 5000,
"MaxConcurrentInitializations": 4,
"InitializationDelayMilliseconds": 0,
"InitializationMaxAttempts": 5,
"InitializationRetryDelayMilliseconds": 500,
"StartupDelaySeconds": 0,
"WaitForApiReady": true,
"ApiReadyTimeoutSeconds": 60,
"ApiReadyPollSeconds": 2
}
}// Program.cs
using System.Net;
using System.Net.Http.Json;
using System.Text.Json;
using System.Text.Json.Serialization;
using AspireWithDapr.ServiceDefaults;
using AspireWithDapr.Interfaces.Enums;
using AspireWithDapr.Interfaces.Models;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Polly.CircuitBreaker;
using Polly.Timeout;
var builder = Host.CreateApplicationBuilder(args);
builder.ConfigureOpenTelemetry();
builder.AddDefaultHealthChecks();
builder.Services.AddServiceDiscovery();
builder.Services.AddHttpClient("ApiService", client =>
{
client.BaseAddress = new Uri("https+http://apiservice");
})
.AddServiceDiscovery()
.AddStandardResilienceHandler(options =>
{
options.AttemptTimeout.Timeout = TimeSpan.FromSeconds(45);
options.TotalRequestTimeout.Timeout = TimeSpan.FromMinutes(4);
options.CircuitBreaker.SamplingDuration = TimeSpan.FromSeconds(90);
options.CircuitBreaker.MinimumThroughput = 100;
options.CircuitBreaker.FailureRatio = 1.0;
options.CircuitBreaker.BreakDuration = TimeSpan.FromSeconds(5);
options.Retry.MaxRetryAttempts = 1;
});
builder.Services.AddHostedService<DeviceGeneratorService>();
var host = builder.Build();
await host.RunAsync();
public class DeviceGeneratorService(
IHttpClientFactory httpClientFactory,
IConfiguration configuration,
IHostApplicationLifetime applicationLifetime,
ILogger<DeviceGeneratorService> logger) : BackgroundService
{
private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web)
{
Converters = { new JsonStringEnumConverter() }
};
private static readonly string[] Locations =
[
"Building A - Lobby",
"Building A - Floor 1",
"Building A - Floor 2",
"Building A - Floor 3",
"Building B - Lobby",
"Building B - Floor 1",
"Building B - Floor 2",
"Building B - Floor 3",
"Building C - Lobby",
"Building C - Floor 1",
"Building C - Floor 2",
"Warehouse - Section A",
"Warehouse - Section B",
"Warehouse - Section C",
"Lab - Room 101",
"Lab - Room 102",
"Lab - Room 103",
"Lab - Clean Room",
"Cafeteria - Main Hall",
"Cafeteria - Kitchen",
"Cafeteria - Storage",
"Parking - Level 1",
"Parking - Level 2",
"Parking - Level 3",
"Server Room - Primary",
"Server Room - Backup",
"Conference Room A",
"Conference Room B",
"Conference Room C"
];
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
logger.LogInformation("Device Generator starting...");
var deviceCount = configuration.GetValue("DeviceGenerator:DeviceCount", 1000);
var maxConcurrency = Math.Max(1, configuration.GetValue("DeviceGenerator:MaxConcurrentInitializations", 20));
var initializationDelay = TimeSpan.FromMilliseconds(Math.Max(0,
configuration.GetValue("DeviceGenerator:InitializationDelayMilliseconds", 0)));
var startupDelay = TimeSpan.FromSeconds(Math.Max(0,
configuration.GetValue("DeviceGenerator:StartupDelaySeconds", 0)));
var initializationMaxAttempts = Math.Max(1,
configuration.GetValue("DeviceGenerator:InitializationMaxAttempts", 5));
var initializationRetryDelay = TimeSpan.FromMilliseconds(Math.Max(100,
configuration.GetValue("DeviceGenerator:InitializationRetryDelayMilliseconds", 500)));
var waitForApiReady = configuration.GetValue("DeviceGenerator:WaitForApiReady", true);
var apiReadyTimeout = TimeSpan.FromSeconds(Math.Max(1,
configuration.GetValue("DeviceGenerator:ApiReadyTimeoutSeconds", 60)));
var apiReadyPollInterval = TimeSpan.FromSeconds(Math.Max(1,
configuration.GetValue("DeviceGenerator:ApiReadyPollSeconds", 2)));
if (startupDelay > TimeSpan.Zero)
{
await Task.Delay(startupDelay, stoppingToken);
}
var client = httpClientFactory.CreateClient("ApiService");
if (waitForApiReady)
{
var ready = await RuntimeHelpers.WaitForApiReadyAsync(
client,
apiReadyTimeout,
apiReadyPollInterval,
logger,
stoppingToken);
if (!ready)
{
logger.LogError("ApiService readiness check timed out. Shutting down.");
applicationLifetime.StopApplication();
return;
}
}
var existingDeviceIds = await LoadExistingDeviceIdsAsync(client, stoppingToken);
if (existingDeviceIds.Count >= deviceCount)
{
logger.LogInformation("All {Count} devices already exist. Marking generation complete.", deviceCount);
await MarkGenerationCompleteAsync(client, deviceCount, stoppingToken);
applicationLifetime.StopApplication();
return;
}
var initialized = 0;
var skipped = 0;
var failed = 0;
logger.LogInformation(
"Initializing up to {Count} devices with max concurrency {Concurrency}, {Attempts} max attempts, and base retry delay {RetryDelayMs}ms...",
deviceCount,
maxConcurrency,
initializationMaxAttempts,
initializationRetryDelay.TotalMilliseconds);
var options = new ParallelOptions
{
MaxDegreeOfParallelism = maxConcurrency,
CancellationToken = stoppingToken
};
await Parallel.ForEachAsync(Enumerable.Range(1, deviceCount), options, async (index, ct) =>
{
var deviceId = $"DEVICE-{index:D4}";
if (existingDeviceIds.Contains(deviceId))
{
Interlocked.Increment(ref skipped);
return;
}
var device = CreateDevice(deviceId);
var result = await InitializeDeviceAsync(
client,
device,
initializationMaxAttempts,
initializationRetryDelay,
ct);
switch (result)
{
case InitializeResult.Success:
Interlocked.Increment(ref initialized);
break;
case InitializeResult.AlreadyExists:
Interlocked.Increment(ref skipped);
break;
case InitializeResult.Failed:
Interlocked.Increment(ref failed);
break;
}
if (initializationDelay > TimeSpan.Zero)
{
await Task.Delay(initializationDelay, ct);
}
});
logger.LogInformation(
"Device generation complete. Initialized {Initialized}. Skipped {Skipped}. Failed {Failed}.",
initialized,
skipped,
failed);
await MarkGenerationCompleteAsync(client, deviceCount, stoppingToken);
applicationLifetime.StopApplication();
}
private static GeneratedDevice CreateDevice(string deviceId)
{
var locationIndex = Math.Abs(RuntimeHelpers.StableHash(deviceId)) % Locations.Length;
return new GeneratedDevice
{
DeviceId = deviceId,
Location = Locations[locationIndex],
DeviceType = DeviceType.MultiSensor
};
}
private async Task<HashSet<string>> LoadExistingDeviceIdsAsync(HttpClient client, CancellationToken ct)
{
try
{
var snapshot = await client.GetFromJsonAsync<DeviceListSnapshot>("/devices/list", JsonOptions, ct);
if (snapshot?.Devices is { Count: > 0 })
{
logger.LogInformation("Found {Count} existing devices", snapshot.Devices.Count);
return snapshot.Devices.Select(device => device.DeviceId).ToHashSet(StringComparer.Ordinal);
}
logger.LogInformation("No existing devices registered yet.");
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
logger.LogWarning(ex, "Failed to load existing device list; initializing full set.");
}
return new HashSet<string>(StringComparer.Ordinal);
}
private async Task MarkGenerationCompleteAsync(HttpClient client, int deviceCount, CancellationToken ct)
{
try
{
var request = new DeviceGenerationCompleteRequest { DeviceCount = deviceCount };
var response = await client.PostAsJsonAsync("/devices/generation/complete", request, JsonOptions, ct);
if (!response.IsSuccessStatusCode)
{
logger.LogWarning("Failed to mark generation complete: {Status}", response.StatusCode);
}
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
logger.LogWarning(ex, "Failed to mark device generation complete.");
}
}
private async Task<InitializeResult> InitializeDeviceAsync(
HttpClient client,
GeneratedDevice device,
int maxAttempts,
TimeSpan baseRetryDelay,
CancellationToken ct)
{
var request = new { device.Location, device.DeviceType };
for (var attempt = 1; attempt <= maxAttempts; attempt++)
{
try
{
using var response = await client.PostAsJsonAsync(
$"/devices/{device.DeviceId}/initialize",
request,
ct);
if (response.IsSuccessStatusCode)
{
logger.LogDebug("Initialized device {DeviceId} at {Location}", device.DeviceId, device.Location);
return InitializeResult.Success;
}
if ((int)response.StatusCode == 409)
{
logger.LogInformation("Device {DeviceId} already exists. Skipping creation.", device.DeviceId);
return InitializeResult.AlreadyExists;
}
if (!IsTransientStatusCode(response.StatusCode) || attempt == maxAttempts)
{
logger.LogWarning(
"Failed to initialize device {DeviceId}: {Status} after {Attempt} attempt(s)",
device.DeviceId,
response.StatusCode,
attempt);
return InitializeResult.Failed;
}
await DelayBeforeRetryAsync(device.DeviceId, response.StatusCode.ToString(), attempt, baseRetryDelay, ct);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
if (!IsTransientInitializationException(ex) || attempt == maxAttempts)
{
logger.LogError(ex, "Error initializing device {DeviceId} after {Attempt} attempt(s)", device.DeviceId, attempt);
return InitializeResult.Failed;
}
await DelayBeforeRetryAsync(device.DeviceId, ex.GetType().Name, attempt, baseRetryDelay, ct);
}
}
return InitializeResult.Failed;
}
private async Task DelayBeforeRetryAsync(
string deviceId,
string failure,
int attempt,
TimeSpan baseRetryDelay,
CancellationToken ct)
{
var delay = TimeSpan.FromMilliseconds(Math.Min(
10_000,
baseRetryDelay.TotalMilliseconds * Math.Pow(2, attempt - 1)));
logger.LogWarning(
"Transient initialize failure for {DeviceId}: {Failure}. Retrying in {DelayMs}ms (attempt {Attempt}).",
deviceId,
failure,
delay.TotalMilliseconds,
attempt + 1);
await Task.Delay(delay, ct);
}
private static bool IsTransientStatusCode(HttpStatusCode statusCode) =>
statusCode is HttpStatusCode.RequestTimeout
or HttpStatusCode.TooManyRequests
or HttpStatusCode.BadGateway
or HttpStatusCode.ServiceUnavailable
or HttpStatusCode.GatewayTimeout
or HttpStatusCode.InternalServerError;
private static bool IsTransientInitializationException(Exception ex) =>
ex is HttpRequestException
or BrokenCircuitException
or TimeoutRejectedException;
}
public enum InitializeResult
{
Success,
AlreadyExists,
Failed
}
public class GeneratedDevice
{
public required string DeviceId { get; init; }
public required string Location { get; init; }
public required DeviceType DeviceType { get; init; }
}AspireWithDapr.SimulatorDevice
The SimulatorDevice application uses the appsettings.json file to configure and control its behavior. The configuration content is shown below:
{
"Simulator": {
"MaxConcurrentTelemetry": 50,
"DeviceRefreshSeconds": 60,
"StartupDelaySeconds": 0,
"WaitForApiReady": true,
"ApiReadyTimeoutSeconds": 60,
"ApiReadyPollSeconds": 2,
"WaitForGenerationComplete": true,
"GenerationPollSeconds": 2,
"GenerationTimeoutSeconds": 0
}
}// Program.cs
using System.Net.Http.Json;
using System.Text.Json;
using System.Text.Json.Serialization;
using AspireWithDapr.ServiceDefaults;
using AspireWithDapr.Interfaces.Enums;
using AspireWithDapr.Interfaces.Models;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
var builder = Host.CreateApplicationBuilder(args);
builder.ConfigureOpenTelemetry();
builder.AddDefaultHealthChecks();
builder.Services.AddServiceDiscovery();
builder.Services.AddHttpClient("ApiService", client =>
{
client.BaseAddress = new Uri("https+http://apiservice");
})
.AddServiceDiscovery()
.AddStandardResilienceHandler(options =>
{
options.AttemptTimeout.Timeout = TimeSpan.FromSeconds(30);
options.TotalRequestTimeout.Timeout = TimeSpan.FromSeconds(120);
options.CircuitBreaker.SamplingDuration = TimeSpan.FromSeconds(60);
options.Retry.MaxRetryAttempts = 2;
});
builder.Services.AddHostedService<TelemetrySimulatorService>();
var host = builder.Build();
await host.RunAsync();
public class TelemetrySimulatorService(
IHttpClientFactory httpClientFactory,
IConfiguration configuration,
ILogger<TelemetrySimulatorService> logger) : BackgroundService
{
private static readonly JsonSerializerOptions JsonOptions = new(JsonSerializerDefaults.Web)
{
Converters = { new JsonStringEnumConverter() }
};
private readonly int _maxConcurrentTelemetry = Math.Max(
1,
configuration.GetValue("Simulator:MaxConcurrentTelemetry", 50));
private readonly TimeSpan _deviceRefreshInterval = TimeSpan.FromSeconds(Math.Max(
5,
configuration.GetValue("Simulator:DeviceRefreshSeconds", 60)));
private int? _generatedDeviceLimit;
private SimulatedDevice[] _deviceSnapshot = Array.Empty<SimulatedDevice>();
private DateTimeOffset _nextDeviceRefresh = DateTimeOffset.MinValue;
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
logger.LogInformation("Telemetry Simulator starting...");
var startupDelaySeconds = Math.Max(0, configuration.GetValue("Simulator:StartupDelaySeconds", 0));
if (startupDelaySeconds > 0)
{
await Task.Delay(TimeSpan.FromSeconds(startupDelaySeconds), stoppingToken);
}
var waitForApiReady = configuration.GetValue("Simulator:WaitForApiReady", true);
var apiReadyTimeout = TimeSpan.FromSeconds(Math.Max(
1,
configuration.GetValue("Simulator:ApiReadyTimeoutSeconds", 60)));
var apiReadyPollInterval = TimeSpan.FromSeconds(Math.Max(
1,
configuration.GetValue("Simulator:ApiReadyPollSeconds", 2)));
var waitForGenerationComplete = configuration.GetValue("Simulator:WaitForGenerationComplete", true);
var generationPollInterval = TimeSpan.FromSeconds(Math.Max(
1,
configuration.GetValue("Simulator:GenerationPollSeconds", 2)));
var generationTimeoutSeconds = configuration.GetValue("Simulator:GenerationTimeoutSeconds", 0);
var generationTimeout = generationTimeoutSeconds <= 0
? (TimeSpan?)null
: TimeSpan.FromSeconds(generationTimeoutSeconds);
var client = httpClientFactory.CreateClient("ApiService");
if (waitForApiReady)
{
var ready = await RuntimeHelpers.WaitForApiReadyAsync(
client,
apiReadyTimeout,
apiReadyPollInterval,
logger,
stoppingToken);
if (!ready)
{
logger.LogWarning("ApiService readiness check timed out. Continuing anyway.");
}
}
if (waitForGenerationComplete)
{
var completed = await WaitForGenerationCompleteAsync(client, generationPollInterval, generationTimeout, stoppingToken);
if (!completed)
{
logger.LogWarning("Device generation completion wait timed out. Continuing anyway.");
}
}
await EnsureDevicesLoadedAsync(client, stoppingToken);
logger.LogInformation("Starting telemetry simulation...");
Console.WriteLine("\n========================================");
Console.WriteLine(" IoT Telemetry Simulator Running");
Console.WriteLine($" Simulating {_deviceSnapshot.Length} devices");
Console.WriteLine(" Press Ctrl+C to stop");
Console.WriteLine("========================================\n");
while (!stoppingToken.IsCancellationRequested)
{
await RefreshDevicesIfNeededAsync(client, stoppingToken);
if (_deviceSnapshot.Length == 0)
{
logger.LogWarning("No devices registered. Waiting before retry...");
await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
continue;
}
var options = new ParallelOptions
{
MaxDegreeOfParallelism = _maxConcurrentTelemetry,
CancellationToken = stoppingToken
};
await Parallel.ForEachAsync(_deviceSnapshot, options, async (device, ct) =>
{
await SendTelemetryAsync(client, device, ct);
});
logger.LogInformation("Sent telemetry for {Count} devices", _deviceSnapshot.Length);
var delay = TimeSpan.FromSeconds(3 + Random.Shared.NextDouble() * 4);
await Task.Delay(delay, stoppingToken);
}
logger.LogInformation("Telemetry Simulator stopped.");
}
private async Task EnsureDevicesLoadedAsync(HttpClient client, CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
if (await TryRefreshDevicesAsync(client, ct))
{
_nextDeviceRefresh = DateTimeOffset.UtcNow + _deviceRefreshInterval;
return;
}
logger.LogInformation("No devices registered yet. Waiting before retry...");
await Task.Delay(TimeSpan.FromSeconds(5), ct);
}
}
private async Task RefreshDevicesIfNeededAsync(HttpClient client, CancellationToken ct)
{
if (DateTimeOffset.UtcNow < _nextDeviceRefresh)
{
return;
}
if (await TryRefreshDevicesAsync(client, ct))
{
_nextDeviceRefresh = DateTimeOffset.UtcNow + _deviceRefreshInterval;
}
else
{
_nextDeviceRefresh = DateTimeOffset.UtcNow + TimeSpan.FromSeconds(10);
}
}
private async Task<bool> TryRefreshDevicesAsync(HttpClient client, CancellationToken ct)
{
try
{
await TryRefreshGenerationStatusAsync(client, ct);
var snapshot = await client.GetFromJsonAsync<DeviceListSnapshot>("/devices/list", JsonOptions, ct);
if (snapshot?.Devices is { Count: > 0 })
{
var telemetryTargets = SelectTelemetryTargets(snapshot.Devices);
var nextSnapshot = telemetryTargets
.Select(CreateSimulatedDevice)
.ToArray();
if (!HasSameSnapshot(_deviceSnapshot, nextSnapshot))
{
_deviceSnapshot = nextSnapshot;
if (_generatedDeviceLimit is int limit)
{
logger.LogInformation(
"Loaded {Count} telemetry targets from {RegisteredCount} registered devices using the first {Limit} sorted device IDs.",
_deviceSnapshot.Length,
snapshot.Devices.Count,
limit);
}
else
{
logger.LogInformation("Loaded {Count} devices", _deviceSnapshot.Length);
}
}
return _deviceSnapshot.Length > 0;
}
if (snapshot?.Devices is { Count: 0 })
{
if (_deviceSnapshot.Length > 0)
{
_deviceSnapshot = Array.Empty<SimulatedDevice>();
logger.LogInformation("Loaded 0 devices");
}
}
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
logger.LogWarning(ex, "Failed to load device list");
}
return false;
}
private List<DeviceListItem> SelectTelemetryTargets(IEnumerable<DeviceListItem> devices)
{
var orderedDevices = devices
.OrderBy(device => device.DeviceId, StringComparer.Ordinal);
if (_generatedDeviceLimit is int limit)
{
return orderedDevices.Take(limit).ToList();
}
return orderedDevices.ToList();
}
private static bool HasSameSnapshot(SimulatedDevice[] current, SimulatedDevice[] next)
{
if (current.Length != next.Length)
{
return false;
}
for (var index = 0; index < current.Length; index++)
{
if (!string.Equals(current[index].DeviceId, next[index].DeviceId, StringComparison.Ordinal) ||
!string.Equals(current[index].Location, next[index].Location, StringComparison.Ordinal) ||
current[index].DeviceType != next[index].DeviceType)
{
return false;
}
}
return true;
}
private async Task TryRefreshGenerationStatusAsync(HttpClient client, CancellationToken ct)
{
try
{
var status = await client.GetFromJsonAsync<DeviceGenerationStatus>(
"/devices/generation/status",
JsonOptions,
ct);
UpdateGeneratedDeviceLimit(status);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
logger.LogDebug(ex, "Failed to load generation status.");
}
}
private void UpdateGeneratedDeviceLimit(DeviceGenerationStatus? status)
{
if (status?.Completed != true)
{
return;
}
var nextLimit = Math.Max(0, status.DeviceCount);
if (_generatedDeviceLimit == nextLimit)
{
return;
}
_generatedDeviceLimit = nextLimit;
logger.LogInformation(
"Telemetry simulator will target the first {Count} devices in sorted device ID order.",
_generatedDeviceLimit);
}
private SimulatedDevice CreateSimulatedDevice(DeviceListItem device)
{
var seed = RuntimeHelpers.StableHash(device.DeviceId);
var seeded = new Random(seed);
return new SimulatedDevice
{
DeviceId = device.DeviceId,
Location = device.Location,
DeviceType = device.DeviceType,
BaseTemperature = 18 + seeded.NextDouble() * 8,
BaseHumidity = 35 + seeded.NextDouble() * 25,
BaseAqi = 30 + seeded.Next(100)
};
}
private async Task<bool> WaitForGenerationCompleteAsync(
HttpClient client,
TimeSpan pollInterval,
TimeSpan? timeout,
CancellationToken ct)
{
var deadline = timeout is null ? DateTimeOffset.MaxValue : DateTimeOffset.UtcNow + timeout.Value;
while (!ct.IsCancellationRequested && DateTimeOffset.UtcNow < deadline)
{
try
{
var status = await client.GetFromJsonAsync<DeviceGenerationStatus>(
"/devices/generation/status",
JsonOptions,
ct);
if (status?.Completed == true)
{
UpdateGeneratedDeviceLimit(status);
logger.LogInformation(
"Device generation complete (count: {Count}, at: {CompletedAt}).",
status.DeviceCount,
status.CompletedAt);
return true;
}
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
logger.LogDebug(ex, "Generation status check failed.");
}
await Task.Delay(pollInterval, ct);
}
return false;
}
private async Task SendTelemetryAsync(HttpClient client, SimulatedDevice device, CancellationToken ct)
{
try
{
var telemetry = GenerateTelemetry(device);
var response = await client.PostAsJsonAsync($"/devices/{device.DeviceId}/telemetry", telemetry, ct);
if (!response.IsSuccessStatusCode)
{
logger.LogWarning("Failed to send telemetry for {DeviceId}: {Status}", device.DeviceId, response.StatusCode);
}
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
logger.LogError(ex, "Error sending telemetry for device {DeviceId}", device.DeviceId);
}
}
private object GenerateTelemetry(SimulatedDevice device)
{
var tempVariation = (Random.Shared.NextDouble() - 0.5) * 4;
var humidityVariation = (Random.Shared.NextDouble() - 0.5) * 10;
var aqiVariation = Random.Shared.Next(-20, 30);
return new
{
Temperature = Math.Round(device.BaseTemperature + tempVariation, 1),
Humidity = Math.Clamp(Math.Round(device.BaseHumidity + humidityVariation, 1), 0, 100),
AirQualityIndex = Math.Clamp(device.BaseAqi + aqiVariation, 0, 500)
};
}
}
public class SimulatedDevice
{
public required string DeviceId { get; init; }
public required string Location { get; init; }
public required DeviceType DeviceType { get; init; }
public double BaseTemperature { get; init; }
public double BaseHumidity { get; init; }
public int BaseAqi { get; init; }
}AspireWithDapr.AppHost
The AppHost owns the local Aspire composition for the full sample. It wires project references together, starts the ApiService Dapr sidecar, and launches supporting tooling such as RedisInsight.
// AppHost.cs
using Aspire.Hosting;
using CommunityToolkit.Aspire.Hosting.Dapr;
var builder = DistributedApplication.CreateBuilder(args);
var componentsPath = Path.Combine(builder.AppHostDirectory, "Components");
var daprConfigPath = Path.Combine(componentsPath, "dapr-config.yaml");
var redisInsight = builder.AddContainer("redisinsight", "redislabs/redisinsight:latest")
.WithEndpoint(port: 8001, 8001)
.WithEnvironment("RI_APP_PORT", "8001")
.WithEnvironment("RI_HOST", "0.0.0.0");
var apiService = builder.AddProject<Projects.AspireWithDapr_ApiService>("apiservice")
.WithDaprSidecar(new DaprSidecarOptions
{
AppId = "apiservice",
Config = daprConfigPath,
ResourcesPaths = [componentsPath]
})
.WithHttpHealthCheck("/health");
builder.AddProject<Projects.AspireWithDapr_Web>("webfrontend")
.WithExternalHttpEndpoints()
.WithReference(apiService)
.WaitFor(apiService);
var deviceGenerator = builder.AddProject<Projects.AspireWithDapr_GenerateDevice>("devicegenerator")
.WithReference(apiService)
.WaitFor(apiService);
builder.AddProject<Projects.AspireWithDapr_Simulator>("simulator")
.WithReference(apiService)
.WaitFor(apiService)
.WaitFor(deviceGenerator)
.WithEnvironment("Simulator__WaitForGenerationComplete", "true")
.WithEnvironment("Simulator__GenerationTimeoutSeconds", "0");
builder.Build().Run();Dapr Components
# dapr-config.yaml
apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
name: aspirewithdapr
spec:
tracing:
samplingRate: "0"# readstore.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: readstore
spec:
type: state.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379# statestore.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
spec:
type: state.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: actorStateStore
value: "true"AspireWithDapr.ServiceDefaults
The ServiceDefaults centralizes shared Aspire service wiring such as service discovery, health endpoints, resilience policies, and OpenTelemetry. It reduces repeated infrastructure setup across the app projects while keeping runtime conventions consistent.
// Extensions.cs
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Diagnostics.HealthChecks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.ServiceDiscovery;
using OpenTelemetry;
using OpenTelemetry.Metrics;
using OpenTelemetry.Trace;
namespace Microsoft.Extensions.Hosting;
public static class Extensions
{
private const string HealthEndpointPath = "/health";
private const string AlivenessEndpointPath = "/alive";
public static TBuilder AddServiceDefaults<TBuilder>(this TBuilder builder) where TBuilder : IHostApplicationBuilder
{
builder.ConfigureOpenTelemetry();
builder.AddDefaultHealthChecks();
builder.Services.AddServiceDiscovery();
builder.Services.ConfigureHttpClientDefaults(http =>
{
// Turn on resilience by default
http.AddStandardResilienceHandler();
// Turn on service discovery by default
http.AddServiceDiscovery();
});
// Uncomment the following to restrict the allowed schemes for service discovery.
// builder.Services.Configure<ServiceDiscoveryOptions>(options =>
// {
// options.AllowedSchemes = ["https"];
// });
return builder;
}
public static TBuilder ConfigureOpenTelemetry<TBuilder>(this TBuilder builder) where TBuilder : IHostApplicationBuilder
{
builder.Logging.AddOpenTelemetry(logging =>
{
logging.IncludeFormattedMessage = true;
logging.IncludeScopes = true;
});
builder.Services.AddOpenTelemetry()
.WithMetrics(metrics =>
{
metrics.AddAspNetCoreInstrumentation()
.AddHttpClientInstrumentation()
.AddRuntimeInstrumentation();
})
.WithTracing(tracing =>
{
tracing.AddSource(builder.Environment.ApplicationName)
.AddAspNetCoreInstrumentation(tracing =>
// Exclude health check requests from tracing
tracing.Filter = context =>
!context.Request.Path.StartsWithSegments(HealthEndpointPath)
&& !context.Request.Path.StartsWithSegments(AlivenessEndpointPath)
)
// Uncomment the following line to enable gRPC instrumentation (requires the OpenTelemetry.Instrumentation.GrpcNetClient package)
//.AddGrpcClientInstrumentation()
.AddHttpClientInstrumentation();
});
builder.AddOpenTelemetryExporters();
return builder;
}
private static TBuilder AddOpenTelemetryExporters<TBuilder>(this TBuilder builder) where TBuilder : IHostApplicationBuilder
{
var useOtlpExporter = !string.IsNullOrWhiteSpace(builder.Configuration["OTEL_EXPORTER_OTLP_ENDPOINT"]);
if (useOtlpExporter)
{
builder.Services.AddOpenTelemetry().UseOtlpExporter();
}
// Uncomment the following lines to enable the Azure Monitor exporter (requires the Azure.Monitor.OpenTelemetry.AspNetCore package)
//if (!string.IsNullOrEmpty(builder.Configuration["APPLICATIONINSIGHTS_CONNECTION_STRING"]))
//{
// builder.Services.AddOpenTelemetry()
// .UseAzureMonitor();
//}
return builder;
}
public static TBuilder AddDefaultHealthChecks<TBuilder>(this TBuilder builder) where TBuilder : IHostApplicationBuilder
{
builder.Services.AddHealthChecks()
// Add a default liveness check to ensure app is responsive
.AddCheck("self", () => HealthCheckResult.Healthy(), ["live"]);
return builder;
}
public static WebApplication MapDefaultEndpoints(this WebApplication app)
{
// Adding health checks endpoints to applications in non-development environments has security implications.
// See https://aka.ms/dotnet/aspire/healthchecks for details before enabling these endpoints in non-development environments.
if (app.Environment.IsDevelopment())
{
// All health checks must pass for app to be considered ready to accept traffic after starting
app.MapHealthChecks(HealthEndpointPath);
// Only health checks tagged with the "live" tag must pass for app to be considered alive
app.MapHealthChecks(AlivenessEndpointPath, new HealthCheckOptions
{
Predicate = r => r.Tags.Contains("live")
});
}
return app;
}
}// RuntimeHelpers.cs
using Microsoft.Extensions.Logging;
namespace AspireWithDapr.ServiceDefaults;
public static class RuntimeHelpers
{
public static int StableHash(string value)
{
unchecked
{
var hash = 23;
foreach (var c in value)
{
hash = (hash * 31) + c;
}
if (hash == int.MinValue)
{
return 0;
}
return hash < 0 ? -hash : hash;
}
}
public static async Task<bool> WaitForApiReadyAsync(
HttpClient client,
TimeSpan timeout,
TimeSpan pollInterval,
ILogger logger,
CancellationToken ct)
{
var deadline = DateTimeOffset.UtcNow + timeout;
while (!ct.IsCancellationRequested && DateTimeOffset.UtcNow < deadline)
{
try
{
using var response = await client.GetAsync("/health", ct);
if (response.IsSuccessStatusCode)
{
logger.LogInformation("ApiService is ready.");
return true;
}
logger.LogDebug("ApiService not ready yet: {Status}", response.StatusCode);
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
logger.LogDebug(ex, "ApiService readiness check failed.");
}
await Task.Delay(pollInterval, ct);
}
return false;
}
}Let’s run the application
To run the application, execute the following command.
aspire run

Run the web application from the Aspire Dashboard. Once it launches, select Dashboard to view real-time data for the total number of devices, online devices, individual device telemetry, and their associated alerts.
While the application is running, you can launch the Dapr Dashboard using the command shown below. The Dapr Dashboard provides insights into Dapr applications, components, configurations, and control plane services (Kubernetes only). It also enables you to view metadata, manifests, deployment files, actors, logs, and other related information.
dapr dashboard
NOTE: Since the primary focus of this blog is Dapr Actors, the web application code has not been included here. If you are interested in accessing it, please feel free to contact me via email.
Conclusion
This application demonstrates how Dapr Actors can transform a complex distributed system into something much easier to design, reason about, and implement. By assigning each device its own isolated unit of state and behavior, the architecture avoids much of the locking, coordination, and consistency logic that typically makes these systems difficult to build. Actor placement, activation, persistence, and reminders are managed by the platform, allowing the application code to remain focused on business functionality such as telemetry processing, alerts, and fleet management. When combined with Aspire, service discovery, and a separate read model for dashboard queries, the overall solution remains scalable, observable, and maintainable without requiring heavy distributed-systems plumbing in every project.