这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

持久性存储

Durable Task 的后端持久性存储

1 - 持久性存储概述

Durable Task 的后端持久性存储概述

介绍

https://github.com/Azure/durabletask#supported-persistance-stores

支持的持久性存储

从 v2.x 版开始,持久任务框架支持一组可扩展的后端持久性存储。每个存储都可以使用不同的 NuGet 软件包启用。所有软件包的最新版本都已签名,可在 nuget.org 上下载。

Package 详细信息 发展状况
DurableTask.ServiceBus 协调消息和运行时状态存储在 Service Bus 队列中,而跟踪状态存储在 Azure Storage 中。该提供商的优势在于其成熟性和事务一致性。不过,微软已不再对其进行积极开发。 生产准备就绪,但未积极维护
DurableTask.AzureStorage 所有协调状态都存储在 Azure Storage queues, tables, 和 blobs 中。该提供商的优势在于服务依赖性最小、效率高、功能丰富。这是唯一可用于 Durable Functions 的后端。 生产准备就绪并积极维护
DurableTask.AzureServiceFabric 所有协调状态都存储在 Azure Service Fabric Reliable Collections 中。如果您在 Azure Service Fabric 中托管应用程序,并且不想在存储状态时依赖外部资源,那么这是一个理想的选择。 生产准备就绪并积极维护
DurableTask.Netherite 由微软研究院开发的超高性能后端,使用微软研究院的 FASTER 数据库技术将状态存储在 Azure Event Hubs 和 Azure Page Blobs 中。GitHub Repo 生产准备就绪并积极维护
DurableTask.SqlServer 所有协调状态都存储在 Microsoft SQL Server 或 Azure SQL 数据库中,并带有索引表和存储过程,以便直接交互。 👉 GitHub Repo 生产准备就绪并积极维护
DurableTask.Emulator 这是一个内存存储,仅供测试之用。不建议用于任何生产工作负载。 未进行积极维护

持久任务框架的核心编程模型包含在 DurableTask.Core 软件包中,该软件包也在积极开发中。

代码实现

Package 对应代码实现
DurableTask.ServiceBus src\DurableTask.ServiceBus
DurableTask.AzureStorage src\DurableTask.AzureStorage
DurableTask.AzureServiceFabric src\DurableTask.AzureServiceFabric
DurableTask.Netherite https://github.com/microsoft/durabletask-netherite
DurableTask.SqlServer src\DurableTask.SqlServer
https://github.com/microsoft/durabletask-mssql
DurableTask.Emulator src\DurableTask.Emulator

2 - 核心编程模型

Durable Task 后端持久性存储的核心编程模型

2.1 - Orchestration

核心编程模型的 Orchestration 定义

2.1.1 - TaskOrchestration

核心编程模型之 TaskOrchestration

src\DurableTask.Core\TaskOrchestration.cs

TaskOrchestration.cs 中定义了三个方法:

  • Execute()
  • RaiseEvent()
  • GetStatus()

Execute() 方法

public abstract Task<string> Execute(OrchestrationContext context, string input);

方法实现为:

public override async Task<string> Execute(OrchestrationContext context, string input)
        {
            var parameter = DataConverter.Deserialize<TInput>(input);
            TResult result;
            try
            {
                result = await RunTask(context, parameter);
            }
            catch (Exception e) when (!Utils.IsFatal(e) && !Utils.IsExecutionAborting(e))
            {
                string details = null;
                FailureDetails failureDetails = null;
                if (context.ErrorPropagationMode == ErrorPropagationMode.SerializeExceptions)
                {
                    details = Utils.SerializeCause(e, DataConverter);
                }
                else
                {
                    failureDetails = new FailureDetails(e);
                }

                throw new OrchestrationFailureException(e.Message, details)
                {
                    FailureDetails = failureDetails,
                };
            }

            return DataConverter.Serialize(result);
        }

RunTask() 方法是个抽象方法。

public abstract Task<TResult> RunTask(OrchestrationContext context, TInput input);

默认的 DataConverter 是 json:

    public abstract class TaskOrchestration<TResult, TInput, TEvent, TStatus> : TaskOrchestration
    {
        /// <summary>
        /// Creates a new TaskOrchestration with the default DataConverter
        /// </summary>
        protected TaskOrchestration()
        {
            DataConverter = JsonDataConverter.Default;
        }

        /// <summary>
        /// The DataConverter to use for input and output serialization/deserialization
        /// </summary>
        public DataConverter DataConverter { get; protected set; }

RaiseEvent() 方法

public abstract void RaiseEvent(OrchestrationContext context, string name, string input);

方法实现为:

public override void RaiseEvent(OrchestrationContext context, string name, string input)
{
  var parameter = DataConverter.Deserialize<TEvent>(input);
  OnEvent(context, name, parameter);
}

OnEvent() 是一个空实现。

        public virtual void OnEvent(OrchestrationContext context, string name, TEvent input)
        {
            // do nothing
        }

GetStatus() 方法

public abstract string GetStatus();

2.1.2 - OrchestrationInstance

核心编程模型之 OrchestrationInstance

src\DurableTask.Core\OrchestrationInstance.cs

OrchestrationInstance 中定义了几个属性:

  • InstanceId
  • ExecutionId()
    [DataContract]
    public class OrchestrationInstance : IExtensibleDataObject
    {
        /// <summary>
        /// The instance id, assigned as unique to the orchestration
        /// </summary>
        [DataMember]
        public string InstanceId { get; set; }

        /// <summary>
        /// The execution id, unique to the execution of this instance
        /// </summary>
        [DataMember]
        public string ExecutionId { get; set; }

2.1.3 - OrchestrationState

核心编程模型之 OrchestrationState

src\DurableTask.Core\OrchestrationState.cs

OrchestrationState 中定义了几个属性:

  • CompletedTime
  • CompressedSize
  • CreatedTime
  • Input
  • LastUpdatedTime
  • Name
  • OrchestrationInstance: 包含 InstanceId 和 ExecutionId
  • Output
  • ParentInstance
  • Size
  • Status
  • Tags
  • Version: string  格式,看能否复用。
  • Generation
  • ScheduledStartTime
  • FailureDetails

2.2 - Activity

核心编程模型的 Activity 定义

2.2.1 - TaskActivity

核心编程模型之 TaskActivity

src\DurableTask.Core\TaskActivity.cs

TaskActivity 中定义了三个方法:

  • Run()
  • RunAsync()

Run() 方法

public abstract string Run(TaskContext context, string input);

blocked for AsyncTaskActivity:

        /// <summary>
        /// Synchronous execute method, blocked for AsyncTaskActivity
        /// </summary>
        /// <returns>string.Empty</returns>
        public override string Run(TaskContext context, string input)
        {
            // will never run
            return string.Empty;
        }

RunAsync() 方法

        public virtual Task<string> RunAsync(TaskContext context, string input)
        {
            return Task.FromResult(Run(context, input));
        }

会被覆盖为:

public override async Task<string> RunAsync(TaskContext context, string input)
        {
            TInput parameter = default(TInput);

            var jArray = Utils.ConvertToJArray(input);

            int parameterCount = jArray.Count;
            if (parameterCount > 1)
            {
                throw new TaskFailureException(
                    "TaskActivity implementation cannot be invoked due to more than expected input parameters.  Signature mismatch.");
            }
            
            if (parameterCount == 1)
            {
                JToken jToken = jArray[0];
                if (jToken is JValue jValue)
                {
                    parameter = jValue.ToObject<TInput>();
                }
                else
                {
                    string serializedValue = jToken.ToString();
                    parameter = DataConverter.Deserialize<TInput>(serializedValue);
                }
            }

            TResult result;
            try
            {
                result = await ExecuteAsync(context, parameter);
            }
            catch (Exception e) when (!Utils.IsFatal(e) && !Utils.IsExecutionAborting(e))
            {
                string details = null;
                FailureDetails failureDetails = null;
                if (context != null && context.ErrorPropagationMode == ErrorPropagationMode.SerializeExceptions)
                {
                    details = Utils.SerializeCause(e, DataConverter);
                }
                else
                {
                    failureDetails = new FailureDetails(e);
                }

                throw new TaskFailureException(e.Message, e, details)
                    .WithFailureDetails(failureDetails);
            }

            string serializedResult = DataConverter.Serialize(result);
            return serializedResult;
        }
    }

ExecuteAsync() 是一个abstract 方法:

        protected abstract Task<TResult> ExecuteAsync(TaskContext context, TInput input);

GetStatus() 方法

public abstract string GetStatus();

2.2.2 - TaskContext

核心编程模型之 TaskContext

src\DurableTask.Core\TaskContext.cs

TaskActivity 中定义了以下属性

  • OrchestrationInstance: 包含 InstanceId 和 InstanceId
  • ErrorPropagationMode

2.3 - Entity

核心编程模型的 Entity 定义

2.3.1 - TaskEntity

核心编程模型之 TaskEntity

Abstract base class for entities

src\DurableTask.Core\Entities\TaskEntity.cs

TaskActivity 中定义了三个方法:

  • ExecuteOperationBatchAsync()

ExecuteOperationBatchAsync() 方法

public abstract Task<EntityBatchResult> ExecuteOperationBatchAsync(EntityBatchRequest operations);

EnztityBatchRequest 类

A request for execution of a batch of operations on an entity.

  • string InstanceId
  • string EntityState
  • List<OperationRequest> Operations

OperationRequest 类

包含属性:

  • string Operation
  • Guid Id
  • string Input

2.3.2 - EntityId

核心编程模型之 EntityId

A unique identifier for an entity, consisting of entity name and entity key.

src\DurableTask.Core\Entities\EntityId.cs

EntityId 中定义以下属性:

  • string Name
  • string Key

2.4 - History

核心编程模型的 History event 定义

2.4.1 - History概述

核心编程模型的 History event 概述

介绍

以下介绍来自 README.md

Durable Task Framework History Events

以下是构成协调状态的一些常见历史事件。您可以在 DTFx 的 Azure StorageMSSQL 存储后端的历史记录表中轻松查看这些事件。在使用 DTFx 代码、调试问题或创建直接读取历史记录的诊断工具(如 Durable Functions Monitor 项目)时,了解这些事件非常有用。

Event Type Description
OrchestratorStarted 协调器函数正在开始新的_执行/execution_。您将在历史记录中看到许多此类事件–每次协调器从 “等待 “状态恢复时都会出现一个。请注意,这并不意味着协调器首次启动–首次执行由 “ExecutionStarted “历史事件表示(见下文)。该事件的 timestamp 时间戳用于填充 CurrentDateTimeUtc 属性。
ExecutionStarted 协调已开始首次执行。该事件包含协调器名称、输入内容和协调器的_scheduled_时间(可能早于历史记录中前面的 OrchestratorStarted事件)。这总是协调历史中的第二个事件。
TaskScheduled 协调器调度了一项活动任务。该事件包括活动名称、输入和一个连续的 “EventId”,可用于将 “TaskScheduled " 事件与相应的 “TaskCompleted “或 “TaskFailed “事件关联起来。请注意,如果一个活动任务被重试,可能会生成多个 Task*** 事件。
TaskCompleted 调度的任务活动已成功完成。TaskScheduledId 字段将与相应 TaskScheduled 事件的 “EventId” 字段匹配。
TaskFailed 计划的任务活动以失败告终。TaskScheduledId 字段将与相应 “TaskScheduled” 事件的 “EventId” 字段匹配。
SubOrchestrationInstanceCreated 协调器已调度子协调器。该事件包含已调度协调器的名称、实例 ID、输入和有序事件 ID,可用于将 SubOrchestrationInstanceCreated 事件与后续的 SubOrchestrationInstanceCompleted 或 SubOrchestrationInstanceFailed 历史事件关联起来。时间戳指的是调度子协调器的时间,它将早于开始执行的时间。请注意,如果一个活动任务被重试,可能会产生多个 SubOrchestrationInstance*** 事件。
SubOrchestrationInstanceCompleted 调度的子协调器已成功完成。TaskScheduledId “字段将与相应 “SubOrchestrationInstanceCreated “事件的 “EventId “字段匹配。
SubOrchestrationInstanceFailed 计划的子协调器已完成,但出现故障。TaskScheduledId 字段将与相应 SubOrchestrationInstanceCreated 事件的 EventId 字段匹配。
TimerCreated 协调器安排了一个持久定时器。FireAt “属性包含定时器启动的日期。
TimerFired 先前安排的持久定时器已启动。TimerId 字段将与相应 TimeCreated 事件的 EventId 字段匹配。
EventRaised 协调(或持久实体中的实体)收到外部事件。该记录包含事件名称、有效载荷和事件_发送_的时间戳(应与历史事件实际被持久化的时间相同或更早)。
EventSent 协调(或entity)向另一个协调(或entity)发送了单向消息。
ExecutionCompleted 协调已完成。该事件包括协调的输出,不区分成功或失败。
ExecutionTerminated 协调被 API 调用强制终止。该事件的时间戳表示计划终止的时间,而不一定是实际终止的时间。
OrchestratorCompleted 协调器函数已等待并提交了任何副作用。您将在历史记录中看到许多此类事件–协调器每次等待时都会出现一个。请注意,这并不意味着协调器已经完成(完成由 ExecutionCompletedExecutionTerminated 表示)。
GenericEvent 通用历史事件,有一个 Data 字段,但没有特定含义。这种历史事件并不常用。在某些情况下,该事件用于触发空闲协调的全新重放,例如在协调重绕之后。
HistoryStateEvent 包含协调历史快照的历史事件。大多数现代后端类型都不使用这种事件类型。

2.4.2 - HistoryEvent事件

核心编程模型的 HistoryEvent 事件

包含属性:

  • int EventId
  • EventType EventType
  • bool IsPlayed
  • DateTime Timestamp
  • ExtensionDataObject ExtensionData

这个类也是其他 event 的父类。

2.4.3 - ExecutionStartedEvent事件

核心编程模型的 ExecutionStartedEvent 事件

包含属性:

  • string EventId
  • string Input
  • EventType EventType
  • ParentInstance ParentInstance
  • string Name
  • string Version:可以复用
  • IDictionary<string, string> Tags
  • string Correlation
  • DistributedTraceContext ParentTraceContext
  • DateTime ScheduledStartTime
  • int Generation

2.4.4 - OrchestratorStartedEvent事件

核心编程模型的 OrchestratorStartedEvent 事件

包含属性:

  • string EventId
  • EventType EventType

3 - Azure Storage

Azure Storage 后端持久性存储实现