持久性存储
- 1: 持久性存储概述
- 2: 核心编程模型
- 2.1: Orchestration
- 2.1.1: TaskOrchestration
- 2.1.2: OrchestrationInstance
- 2.1.3: OrchestrationState
- 2.2: Activity
- 2.2.1: TaskActivity
- 2.2.2: TaskContext
- 2.3: Entity
- 2.3.1: TaskEntity
- 2.3.2: EntityId
- 2.4: History
- 2.4.1: History概述
- 2.4.2: HistoryEvent事件
- 2.4.3: ExecutionStartedEvent事件
- 2.4.4: OrchestratorStartedEvent事件
- 3: Azure Storage
1 - 持久性存储概述
介绍
https://github.com/Azure/durabletask#supported-persistance-stores
支持的持久性存储
从 v2.x 版开始,持久任务框架支持一组可扩展的后端持久性存储。每个存储都可以使用不同的 NuGet 软件包启用。所有软件包的最新版本都已签名,可在 nuget.org 上下载。
Package | 详细信息 | 发展状况 |
---|---|---|
DurableTask.ServiceBus | 协调消息和运行时状态存储在 Service Bus 队列中,而跟踪状态存储在 Azure Storage 中。该提供商的优势在于其成熟性和事务一致性。不过,微软已不再对其进行积极开发。 | 生产准备就绪,但未积极维护 |
DurableTask.AzureStorage | 所有协调状态都存储在 Azure Storage queues, tables, 和 blobs 中。该提供商的优势在于服务依赖性最小、效率高、功能丰富。这是唯一可用于 Durable Functions 的后端。 | 生产准备就绪并积极维护 |
DurableTask.AzureServiceFabric | 所有协调状态都存储在 Azure Service Fabric Reliable Collections 中。如果您在 Azure Service Fabric 中托管应用程序,并且不想在存储状态时依赖外部资源,那么这是一个理想的选择。 | 生产准备就绪并积极维护 |
DurableTask.Netherite | 由微软研究院开发的超高性能后端,使用微软研究院的 FASTER 数据库技术将状态存储在 Azure Event Hubs 和 Azure Page Blobs 中。GitHub Repo | 生产准备就绪并积极维护 |
DurableTask.SqlServer | 所有协调状态都存储在 Microsoft SQL Server 或 Azure SQL 数据库中,并带有索引表和存储过程,以便直接交互。 👉 GitHub Repo | 生产准备就绪并积极维护 |
DurableTask.Emulator | 这是一个内存存储,仅供测试之用。不建议用于任何生产工作负载。 | 未进行积极维护 |
持久任务框架的核心编程模型包含在 DurableTask.Core 软件包中,该软件包也在积极开发中。
代码实现
Package | 对应代码实现 |
---|---|
DurableTask.ServiceBus | src\DurableTask.ServiceBus |
DurableTask.AzureStorage | src\DurableTask.AzureStorage |
DurableTask.AzureServiceFabric | src\DurableTask.AzureServiceFabric |
DurableTask.Netherite | https://github.com/microsoft/durabletask-netherite |
DurableTask.SqlServer | src\DurableTask.SqlServer https://github.com/microsoft/durabletask-mssql |
DurableTask.Emulator | src\DurableTask.Emulator |
2 - 核心编程模型
2.1 - Orchestration
2.1.1 - TaskOrchestration
src\DurableTask.Core\TaskOrchestration.cs
TaskOrchestration.cs 中定义了三个方法:
- Execute()
- RaiseEvent()
- GetStatus()
Execute() 方法
public abstract Task<string> Execute(OrchestrationContext context, string input);
方法实现为:
public override async Task<string> Execute(OrchestrationContext context, string input)
{
var parameter = DataConverter.Deserialize<TInput>(input);
TResult result;
try
{
result = await RunTask(context, parameter);
}
catch (Exception e) when (!Utils.IsFatal(e) && !Utils.IsExecutionAborting(e))
{
string details = null;
FailureDetails failureDetails = null;
if (context.ErrorPropagationMode == ErrorPropagationMode.SerializeExceptions)
{
details = Utils.SerializeCause(e, DataConverter);
}
else
{
failureDetails = new FailureDetails(e);
}
throw new OrchestrationFailureException(e.Message, details)
{
FailureDetails = failureDetails,
};
}
return DataConverter.Serialize(result);
}
RunTask() 方法是个抽象方法。
public abstract Task<TResult> RunTask(OrchestrationContext context, TInput input);
默认的 DataConverter 是 json:
public abstract class TaskOrchestration<TResult, TInput, TEvent, TStatus> : TaskOrchestration
{
/// <summary>
/// Creates a new TaskOrchestration with the default DataConverter
/// </summary>
protected TaskOrchestration()
{
DataConverter = JsonDataConverter.Default;
}
/// <summary>
/// The DataConverter to use for input and output serialization/deserialization
/// </summary>
public DataConverter DataConverter { get; protected set; }
RaiseEvent() 方法
public abstract void RaiseEvent(OrchestrationContext context, string name, string input);
方法实现为:
public override void RaiseEvent(OrchestrationContext context, string name, string input)
{
var parameter = DataConverter.Deserialize<TEvent>(input);
OnEvent(context, name, parameter);
}
OnEvent() 是一个空实现。
public virtual void OnEvent(OrchestrationContext context, string name, TEvent input)
{
// do nothing
}
GetStatus() 方法
public abstract string GetStatus();
2.1.2 - OrchestrationInstance
src\DurableTask.Core\OrchestrationInstance.cs
OrchestrationInstance 中定义了几个属性:
- InstanceId
- ExecutionId()
[DataContract]
public class OrchestrationInstance : IExtensibleDataObject
{
/// <summary>
/// The instance id, assigned as unique to the orchestration
/// </summary>
[DataMember]
public string InstanceId { get; set; }
/// <summary>
/// The execution id, unique to the execution of this instance
/// </summary>
[DataMember]
public string ExecutionId { get; set; }
2.1.3 - OrchestrationState
src\DurableTask.Core\OrchestrationState.cs
OrchestrationState 中定义了几个属性:
- CompletedTime
- CompressedSize
- CreatedTime
- Input
- LastUpdatedTime
- Name
- OrchestrationInstance: 包含 InstanceId 和 ExecutionId
- Output
- ParentInstance
- Size
- Status
- Tags
- Version: string 格式,看能否复用。
- Generation
- ScheduledStartTime
- FailureDetails
2.2 - Activity
2.2.1 - TaskActivity
src\DurableTask.Core\TaskActivity.cs
TaskActivity 中定义了三个方法:
- Run()
- RunAsync()
Run() 方法
public abstract string Run(TaskContext context, string input);
blocked for AsyncTaskActivity:
/// <summary>
/// Synchronous execute method, blocked for AsyncTaskActivity
/// </summary>
/// <returns>string.Empty</returns>
public override string Run(TaskContext context, string input)
{
// will never run
return string.Empty;
}
RunAsync() 方法
public virtual Task<string> RunAsync(TaskContext context, string input)
{
return Task.FromResult(Run(context, input));
}
会被覆盖为:
public override async Task<string> RunAsync(TaskContext context, string input)
{
TInput parameter = default(TInput);
var jArray = Utils.ConvertToJArray(input);
int parameterCount = jArray.Count;
if (parameterCount > 1)
{
throw new TaskFailureException(
"TaskActivity implementation cannot be invoked due to more than expected input parameters. Signature mismatch.");
}
if (parameterCount == 1)
{
JToken jToken = jArray[0];
if (jToken is JValue jValue)
{
parameter = jValue.ToObject<TInput>();
}
else
{
string serializedValue = jToken.ToString();
parameter = DataConverter.Deserialize<TInput>(serializedValue);
}
}
TResult result;
try
{
result = await ExecuteAsync(context, parameter);
}
catch (Exception e) when (!Utils.IsFatal(e) && !Utils.IsExecutionAborting(e))
{
string details = null;
FailureDetails failureDetails = null;
if (context != null && context.ErrorPropagationMode == ErrorPropagationMode.SerializeExceptions)
{
details = Utils.SerializeCause(e, DataConverter);
}
else
{
failureDetails = new FailureDetails(e);
}
throw new TaskFailureException(e.Message, e, details)
.WithFailureDetails(failureDetails);
}
string serializedResult = DataConverter.Serialize(result);
return serializedResult;
}
}
ExecuteAsync() 是一个abstract 方法:
protected abstract Task<TResult> ExecuteAsync(TaskContext context, TInput input);
GetStatus() 方法
public abstract string GetStatus();
2.2.2 - TaskContext
src\DurableTask.Core\TaskContext.cs
TaskActivity 中定义了以下属性
- OrchestrationInstance: 包含 InstanceId 和 InstanceId
- ErrorPropagationMode
2.3 - Entity
2.3.1 - TaskEntity
Abstract base class for entities
src\DurableTask.Core\Entities\TaskEntity.cs
TaskActivity 中定义了三个方法:
- ExecuteOperationBatchAsync()
ExecuteOperationBatchAsync() 方法
public abstract Task<EntityBatchResult> ExecuteOperationBatchAsync(EntityBatchRequest operations);
EnztityBatchRequest 类
A request for execution of a batch of operations on an entity.
- string InstanceId
- string EntityState
List<OperationRequest> Operations
OperationRequest 类
包含属性:
- string Operation
- Guid Id
- string Input
2.3.2 - EntityId
A unique identifier for an entity, consisting of entity name and entity key.
src\DurableTask.Core\Entities\EntityId.cs
EntityId 中定义以下属性:
- string Name
- string Key
2.4 - History
2.4.1 - History概述
介绍
以下介绍来自 README.md
Durable Task Framework History Events
以下是构成协调状态的一些常见历史事件。您可以在 DTFx 的 Azure Storage 和 MSSQL 存储后端的历史记录表中轻松查看这些事件。在使用 DTFx 代码、调试问题或创建直接读取历史记录的诊断工具(如 Durable Functions Monitor 项目)时,了解这些事件非常有用。
Event Type | Description |
---|---|
OrchestratorStarted |
协调器函数正在开始新的_执行/execution_。您将在历史记录中看到许多此类事件–每次协调器从 “等待 “状态恢复时都会出现一个。请注意,这并不意味着协调器首次启动–首次执行由 “ExecutionStarted “历史事件表示(见下文)。该事件的 timestamp 时间戳用于填充 CurrentDateTimeUtc 属性。 |
ExecutionStarted |
协调已开始首次执行。该事件包含协调器名称、输入内容和协调器的_scheduled_时间(可能早于历史记录中前面的 OrchestratorStarted 事件)。这总是协调历史中的第二个事件。 |
TaskScheduled |
协调器调度了一项活动任务。该事件包括活动名称、输入和一个连续的 “EventId”,可用于将 “TaskScheduled " 事件与相应的 “TaskCompleted “或 “TaskFailed “事件关联起来。请注意,如果一个活动任务被重试,可能会生成多个 Task*** 事件。 |
TaskCompleted |
调度的任务活动已成功完成。TaskScheduledId 字段将与相应 TaskScheduled 事件的 “EventId” 字段匹配。 |
TaskFailed |
计划的任务活动以失败告终。TaskScheduledId 字段将与相应 “TaskScheduled” 事件的 “EventId” 字段匹配。 |
SubOrchestrationInstanceCreated |
协调器已调度子协调器。该事件包含已调度协调器的名称、实例 ID、输入和有序事件 ID,可用于将 SubOrchestrationInstanceCreated 事件与后续的 SubOrchestrationInstanceCompleted 或 SubOrchestrationInstanceFailed 历史事件关联起来。时间戳指的是调度子协调器的时间,它将早于开始执行的时间。请注意,如果一个活动任务被重试,可能会产生多个 SubOrchestrationInstance*** 事件。 |
SubOrchestrationInstanceCompleted |
调度的子协调器已成功完成。TaskScheduledId “字段将与相应 “SubOrchestrationInstanceCreated “事件的 “EventId “字段匹配。 |
SubOrchestrationInstanceFailed |
计划的子协调器已完成,但出现故障。TaskScheduledId 字段将与相应 SubOrchestrationInstanceCreated 事件的 EventId 字段匹配。 |
TimerCreated |
协调器安排了一个持久定时器。FireAt “属性包含定时器启动的日期。 |
TimerFired |
先前安排的持久定时器已启动。TimerId 字段将与相应 TimeCreated 事件的 EventId 字段匹配。 |
EventRaised |
协调(或持久实体中的实体)收到外部事件。该记录包含事件名称、有效载荷和事件_发送_的时间戳(应与历史事件实际被持久化的时间相同或更早)。 |
EventSent |
协调(或entity)向另一个协调(或entity)发送了单向消息。 |
ExecutionCompleted |
协调已完成。该事件包括协调的输出,不区分成功或失败。 |
ExecutionTerminated |
协调被 API 调用强制终止。该事件的时间戳表示计划终止的时间,而不一定是实际终止的时间。 |
OrchestratorCompleted |
协调器函数已等待并提交了任何副作用。您将在历史记录中看到许多此类事件–协调器每次等待时都会出现一个。请注意,这并不意味着协调器已经完成(完成由 ExecutionCompleted 或 ExecutionTerminated 表示)。 |
GenericEvent |
通用历史事件,有一个 Data 字段,但没有特定含义。这种历史事件并不常用。在某些情况下,该事件用于触发空闲协调的全新重放,例如在协调重绕之后。 |
HistoryStateEvent |
包含协调历史快照的历史事件。大多数现代后端类型都不使用这种事件类型。 |
2.4.2 - HistoryEvent事件
包含属性:
- int EventId
- EventType EventType
- bool IsPlayed
- DateTime Timestamp
- ExtensionDataObject ExtensionData
这个类也是其他 event 的父类。
2.4.3 - ExecutionStartedEvent事件
包含属性:
- string EventId
- string Input
- EventType EventType
- ParentInstance ParentInstance
- string Name
- string Version:可以复用
- IDictionary<string, string> Tags
- string Correlation
- DistributedTraceContext ParentTraceContext
- DateTime ScheduledStartTime
- int Generation
2.4.4 - OrchestratorStartedEvent事件
包含属性:
- string EventId
- EventType EventType