1 - wiki Home

Durable Task Framework的 WIKI 首页

https://github.com/Azure/durabletask/wiki/

概述

持久任务框架为开发人员提供了一种使用 .NET 任务框架和 .NET 4.5 中添加的 async/await 关键字在 C# 中编写代码协调的方法。

以下是持久任务框架的主要功能:

  • 在简单的 C# 代码中定义代码协调
  • 程序状态的自动持久化和检查点化
  • 协调和活动的版本化
  • 异步计时器、协调组合、用户辅助检查点

该框架本身非常轻量级,只需要一个 Azure 服务总线命名空间和可选的 Azure 存储账户。协调和工作节点的运行实例完全由用户托管。用户代码不会在服务总线 “内部” 执行。

问题陈述

许多场景都涉及以事务方式在多个地方更新状态或执行操作。例如,从数据库 A 中的某个账户借入一笔钱,并将其贷记到数据库 B 中的另一个账户中,这些操作都需要以原子方式完成。这种一致性可以通过使用分布式事务来实现,该事务将分别跨越数据库 A 和 B 的借记(debit)和贷记(credit)操作。

但是,为了实现严格的一致性,事务意味着锁,而锁不利于扩展,因为需要相同锁的后续操作会被阻塞,直到锁被释放。对于旨在实现高可用性和一致性的云服务来说,这将成为一个巨大的规模瓶颈。此外,即使我们认为可以承受分布式事务的冲击,我们也会发现几乎没有任何云服务真正支持分布式事务(甚至是简单的锁定)。

实现一致性的另一种模式是在持久工作流中执行借记和贷记的业务逻辑。在这种情况下,工作流的伪代码如下:

  1. 从数据库 A 中的某个账户借记
  2. 如果借记成功,则
  3. 贷记到 DB B 中的某个账户
  4. 如果上述操作失败,则继续重试,直到达到某个阈值
  5. 如果贷记仍然失败,则撤销数据库 A 中的借记,并发送通知电子邮件

在理想情况下,这将给我们带来 “最终” 的一致性。也就是说,在(1)之后,整个系统的状态会变得不一致,但在工作流完成后最终会变得一致。然而,在 “不理想” 路径中,有很多事情都可能出错;执行伪代码的节点可能在任意点崩溃,从数据库 A 的借记可能失败,或从数据库 B 的贷记可能失败。在这种情况下,为了保持一致性,我们必须确保以下几点:

  1. 借记和贷记操作是幂等的,即重新执行相同的借记或贷记操作将变成无操作。
  2. 如果执行节点崩溃,它将从我们上次成功执行持久操作的地方重新开始(如上文 #1 或 #2a)

从这两项来看,(1) 项只能由借记/贷记活动实现提供。第(2)项也可以通过代码完成,即在某个数据库中跟踪当前位置。但这种状态管理会变得很麻烦,尤其是当持久操作的数量增加时。这时,如果有一个能自动进行状态管理的框架,就能大大简化基于代码构建工作流的过程。

2 - 核心概念

Durable Task Framework的 核心概念

https://github.com/Azure/durabletask/wiki/Core-Concepts

Task Hub

Task Hub 是命名空间内服务总线实体的逻辑容器。Task Hub worker 使用这些实体在代码协调和它们协调的活动之间可靠地传递消息。

任务活动

任务活动是执行特定协调步骤的代码片段。任务活动可以从任务协调代码中 “调度”。这种调度产生了一个普通的 .NET 任务,该任务可以(异步)等待,并与其他类似任务组成,以构建复杂的协调。

任务编排

任务协调安排任务活动,并围绕代表活动的任务构建代码协调。

Task Hub Worker

Worker 是任务协调和活动的主机。它还包含对 Task Hub 本身执行 CRUD 操作的 API。

Task Hub Client

Task Hub Client提供

  • 用于创建和管理任务协调实例的 API
  • 用于从 Azure 表查询任务协调实例状态的 API

Task Hub Worker 和 Task Hub client 都配置了服务总线的连接字符串,还可选择配置存储帐户的连接字符串。服务总线用于存储任务协调实例和任务活动之间执行和消息传递的控制流状态。但是,服务总线并不是一个数据库,因此当代码协调完成后,状态就会从服务总线中移除。如果配置了 Azure Table storage,那么只要用户将状态保存在该账户中,就可以对其进行查询。

该框架提供了任务协调(TaskOrchestration)和任务活动(TaskActivity)基类,用户可以从这些基类派生指定自己的协调和活动。然后,他们可以使用 TaskHub API 将这些协调和活动加载到进程中,然后启动 Worker 开始处理创建新协调实例的请求。

TaskHubClient API 可用于创建新的协调实例、查询现有实例并在需要时终止这些实例。

3 - 视频编码的示例

Durable Task Framework 视频编码的示例

https://github.com/Azure/durabletask/wiki/Example---Video-Encoding

假设用户希望建立一个代码协调,对视频进行编码,然后在编码完成后向用户发送电子邮件。

为了使用服务总线持久任务框架实现这一目标,用户将编写两个任务活动,分别用于编码视频和发送电子邮件,以及一个任务协调,在这两个活动之间进行协调。

public class EncodeVideoOrchestration : TaskOrchestration<string, string>
{
    public override async Task<string> RunTask(OrchestrationContext context, 
                                                 string input)
    {
        string encodedUrl = 
              await context.ScheduleTask<string>(typeof (EncodeActivity), input);
        await context.ScheduleTask<object>(typeof (EmailActivity), input);
        return encodedUrl;
    }
}

在此协调中,用户调度编码视频活动,等待响应,然后调度发送电子邮件活动。框架将确保持久保存执行状态。例如,如果托管上述任务协调的节点在调度编码视频活动之前崩溃,那么重启时它就会知道要调度该活动。如果节点在调度完活动后但在收到响应前崩溃,重启时它就会聪明地知道该活动已被调度,并直接开始等待 EncodeVideo 活动的响应。

public class EncodeActivity : TaskActivity<string, string>
{
    protected override string Execute(TaskContext context, string input)
    {
        Console.WriteLine("Encoding video " + input);
        // TODO : actually encode the video to a destination
        return "http://<azurebloblocation>/encoded_video.avi";
    }
}

public class EmailActivity : TaskActivity<string, object>
{
    protected override object Execute(TaskContext context, string input)
    {
        // TODO : actually send email to user
        return null;
    }
}

上述用户代码(EncodeVideoOrchestration、EncodeActivity 和 EmailActivity)需要在某个地方托管和可用才能发挥作用。

这样,用户才能在 Worker 中加载这些协调和活动类,并开始处理请求以创建新的协调实例。

string serviceBusConnString = "Endpoint=sb://<namespace>.servicebus.windows.net/;SharedSecretIssuer=[issuer];SharedSecretValue=[value]";

TaskHubWorker hubWorker = new TaskHubWorker("myvideohub", serviceBusConnString)
    .AddTaskOrchestrations(typeof (EncodeVideoOrchestration))
    .AddTaskActivities(typeof (EncodeActivity), typeof (EmailActivity))
    .Start();

这些 Worker 的多个实例可针对同一个任务中心同时运行,以根据需要提供负载平衡。该框架保证特定的协调实例代码在同一时间只能在单个 Worker 上执行。

TaskHubWorker 还提供了停止 Worker 实例的方法。

最后剩下的部分是创建和管理协调实例,即如何实际触发用户加载的代码协调,以及如何监控或终止它们。

string serviceBusConnString = "Endpoint=sb://<namespace>.servicebus.windows.net/;SharedSecretIssuer=[issuer];SharedSecretValue=[value]";

TaskHubClient client = new TaskHubClient("myvideohub", serviceBusConnString);
client.CreateOrchestrationInstance(typeof (EncodeVideoOrchestration), "http://<azurebloblocation>/MyVideo.mpg");

4 - 编写任务编排

在 Durable Task Framework 中编写任务编排

https://github.com/Azure/durabletask/wiki/Writing-Task-Orchestrations

任务协调基本上是调用任务活动,并定义控制如何从一个活动流向另一个活动。可以在协调中编写的代码是普通的 C#,但有一些限制。之所以存在这些限制,是因为框架是如何重播协调代码的。下面将对此进行简要说明。

每次协调需要处理新工作时(如任务活动完成或定时器启动),框架都会从头开始重播用户的任务协调代码。每当该用户代码尝试调度任务活动时,框架都会拦截该调用并查询协调的 “执行历史”。如果框架发现特定的任务活动已被执行并产生了一些结果,它就会立即重放该活动的结果,然后继续任务协调。这种情况会一直持续,直到用户代码执行到结束或计划了新的 Activity。如果是后一种情况,那么框架将实际安排并执行指定的 Activity。该活动完成后,其结果也将成为执行历史的一部分,其值将用于后续重放。

考虑到这一点,以下是我们可以在任务协调中编写的代码类型的限制:

  • 代码必须具有确定性,因为它将被多次重放,每次都必须产生相同的结果。例如,不能直接调用获取当前日期/时间、随机数、Guids 或远程服务调用等。
  • 传递给 TaskOrchestration.RunTask() 方法的 OrchestrationContext 对象上有一个辅助 API,它提供了一种获取当前日期/时间的确定性方法。应使用该对象代替 System.DateTime
  • 用户可以将非确定性操作封装在 TaskActivities 中,使其成为确定性操作。例如,GenerateGuidActivity、GenerateRandomNumberActivity 等。由于框架会重放任务活动的结果,因此非确定值将在首次执行时生成一次,然后在后续执行时重放相同的值。
  • 未来,“OrchestrationContext “还将添加其他辅助 API。
  • 代码应该是非阻塞的,即没有线程睡眠或 Task.WaitXXX() 方法。框架提供了设置异步计时器的辅助方法,应使用这些方法。
  • 协调的执行历史记录包含所有已安排的任务活动及其结果。该历史记录还受到服务总线大小的限制(在使用服务总线提供程序时),因此,如果没有用户辅助检查点(在 [Eternal Orchestrations](https://github.com/Azure/durabletask/wiki/Feature---Eternal-Orchestrations-(aka-infinite-loops) 中描述),就无法实现无限循环。)

5 - 编写任务活动

在 Durable Task Framework 中编写任务活动

https://github.com/Azure/durabletask/wiki/Writing-Task-Activities

任务活动是协调的 “叶” 节点。这是在协调中实际执行单元操作的代码。这是没有约束的普通 C# 代码。

任务活动代码保证至少被调用一次(at least once)。但在错误情况下,它可能会被调用多次,因此最好是幂等调用。

注:在框架的未来版本中,用户可以将保证切换为最多调用一次,而不是最少调用一次,从而在协调代码中获得更多控制权。

备注

这里提到了 at least once 和 at most once 的问题。活动的执行保证还是很麻烦的。

6 - 编排实例管理

在 Durable Task Framework 中编排实例管理

https://github.com/Azure/durabletask/wiki/Orchestration-Instance-Management

TaskHubClient API 允许用户创建新的协调实例、查询已创建协调实例的状态并终止这些实例。

创建协调实例的 API 将返回实例信息。该信息可在后续 API 中用于查询实例的状态。

OrchestrationInstance instance = client.CreateOrchestrationInstance(typeof (EncodeVideoOrchestration), "http://<azurebloblocation>/MyVideo.mpg");

OrchestrationState state = client.GetOrchestrationState(instance);
Console.WriteLine(state.Name + " " + state.OrchestrationStatus + " " + state.Output);

返回的实例还可用于终止协调:

OrchestrationInstance instance = client.CreateOrchestrationInstance(typeof (EncodeVideoOrchestration),"http://<azurebloblocation>/MyVideo.mpg");
// 不好的事情发生了
client.TerminateInstance(instance);

请注意,实例查询方法要求使用 Azure 存储连接字符串创建 task hub。如果未提供连接字符串,则所有实例查询方法都将抛出 InvalidOperationException 异常。

7 - 错误处理和补偿

在 Durable Task Framework 中进行错误处理和补偿

https://github.com/Azure/durabletask/wiki/Error-Handling-&-Compensation

在 TaskActivity 代码中抛出的任何异常都会在 TaskOrchestration 代码中以 TaskFailedException 的形式回调和抛出。用户可以根据自己的需要编写适当的错误处理和补偿代码。

public class DebitCreditOrchestration : 
    TaskOrchestration<object, DebitCreditOperation>
{
    public override async Task<object> RunTask(OrchestrationContext context, 
        DebitCreditOperation operation)
    {
        bool failed = false;
        bool debited = false;
        try
        {
            await context.ScheduleTask<object>(typeof (DebitAccount),
                            new Tuple<string, float>(operation.SourceAccount, operation.Amount));
            debited = true;

            await context.ScheduleTask<object>(typeof(CreditAccount),
                            new Tuple<string, float>(operation.TargetAccount, operation.Amount));
        }
        catch (TaskFailedException exception)
        {
            if (debited)
            {
                // can build a try-catch around this as well, in which case the 
                // orchestration may either retry a few times or log the inconsistency for review
                await context.ScheduleTask<object>(typeof(CreditAccount),
                            new Tuple<string, float>(operation.SourceAccount, operation.Amount));
            }
        }

        return null;
    }
}

请注意,在 dotnet 6.0 之前,由于 CLR 的限制,不能在 catch 块中使用 await 关键字。在这种情况下,我们可以修改上面的代码,设置一个失败标志,并在返回结果前执行补偿:

# ... orchestration code here
        catch (TaskFailedException exception)
        {
            failed = true;
        }

        if (failed)
        {
            if (debited)
            {
                // can build a try-catch around this as well, in which case the 
                // orchestration may either retry a few times or log the inconsistency for review
                await context.ScheduleTask<object>(typeof(CreditAccount),
                            new Tuple<string, float>(operation.SourceAccount, operation.Amount));
            }
        }

        return null;
    }
}

8 - Task Hub 管理

在 Durable Task Framework 中对 Task Hub 进行管理

https://github.com/Azure/durabletask/wiki/Task-Hub-Management

TaskHubWorker 的 API 可用于对 TaskHub 本身执行 CRUD 操作。

string serviceBusConnString = "Endpoint=sb://<namespace>.servicebus.windows.net/;SharedSecretIssuer=[issuer];SharedSecretValue=[value]";

string tableConnectionString = "UseDevelopmentStorage=true;DevelopmentStorageProxyUri=http://127.0.0.1:10002/";

TaskHubWorker hubWorker = new TaskHubWorker("mytesthub", serviceBusConnString, tableConnectionString);

// creates the required underlying entities in Service Bus and Azure Storage for the task hub
hubWorker.CreateHub();

// creates the required underlying entities in Service Bus and Azure Storage for the task hub
// only if they don't already exist
hubWorker.CreateHubIfNotExists();

// deletes the underlying entities in Service Bus and Azure Storage for the task hub
hubWorker.DeleteHub();

// existence check
bool hubExists = hubWorker.HubExists();

Azure 存储连接字符串是可选的。如果不提供该字符串,将无法创建实例存储,因此也就无法查询实例数据。

9 - 编排版本控制

在 Durable Task Framework 中对编排进行版本控制

https://github.com/Azure/durabletask/wiki/Orchestration-Versioning

本节将概述升级长期运行的协调的过程,并提供示例代码。

使用协调进行调度和工作流管理时,不可避免地会遇到以下挑战:

  1. 需要对协调和活动进行更新、版本控制或删除。
  2. 新版本的协调可能无法向后兼容。事实上,我们严格假定它们永远不会向后兼容。
  3. 在升级时,协调可能正在进行某些工作。我们必须允许它运行到完成,以避免系统处于潜在的不一致状态。另一种方法是以这样一种方式设计协调:删除hub(因此,在中途终止协调)不会使系统处于不一致状态。

处理这些挑战有一套流程,以下是其大纲。

过程概要

  1. 在调度协调时,我们不使用类名,而是使用字符串名和版本名。这样我们就可以将两个不同的协调类视为同一协调的两个不同版本。

  2. 更新协调和/或活动时,我们会将新旧代码都部署到 Worker 上。这需要让之前启动的协调运行完成,然后在运行时优雅地切换到新版本。

  3. 我们在 Worker 中以相同的名称注册新旧协调类,但版本不同。我将在示例中介绍这一点,但我们的想法是使用一个类似于 ObjectCreator 工厂的类。

  4. 对旧的协调代码稍作修改:新版本中提供了 ContinueAsNew 方法(用于启动新的生成)。例如,当您第一次使用版本 “1” 的协调时,您可以在代码中使用如下内容:

    Context.ContinueAsNew("1", input);

    现在,我们将其改为

    Context.ContinueAsNew("2", input);

    这样,当旧的协调完成时,它就会使用新版本重新进行调度。在实践中,我不建议直接修改代码;相反,我们应该建立一个包含常量的文件,然后在代码中使用这些常量。当然,我们必须重新构建协调代码,以便获得常量的新值,或者使用属性代替。

  5. 应更新客户端以调度新版本的协调(为了保持一致性)

  6. 停止和删除协调的过程与更新过程相同,只是新版本是一个立即退出的无操作协调。在下一次部署时,两个版本都可以安全删除。

代码

在本节中,我将向您展示我们需要使用哪些方法重载来实现上一小节中概述的过程。

假设我们有两个类,都已部署:InfiniteOrchestrationV1InfiniteOrchestrationV2

同时,让我们给它们起一个字符串名称 “Infinite”,并分别给它们起一个版本 “1” 和 “2”。版本应为字符串,但可解析为 int。

首先,我们在 Worker 中调度两个版本。

我们首先创建两个 ObjectCreator 子类型"TaskHubObjectCreator<TaskOrchestration>",它们接受三个参数:

  • 编排逻辑名称(在我们的例子中,两个编排都被命名为 “Infinite”)
  • 版本(分别为 “1” 和 “2”)
  • 为(名称、版本)对实际创建正确协调实例的 Lambda:
var InfiniteV1 = new TaskHubObjectCreator<TaskOrchestration>("Infinite", "1", () => { return new InfiniteDemoOrchestrationV1(); });

var InfiniteV2 = new TaskHubObjectCreator<TaskOrchestration>("Infinite", "2", () => { return new InfiniteDemoOrchestrationV2(); });

下面我将提供 TaskHubObjectCreator 类的定义。然后,我们将实例放入一个数组中:

var InfiniteOrchestrations = new[] {InfiniteV1, InfiniteV2};

最后,我们使用 TaskHubWorker AddTaskOrchestrations 重载,它接受一个 ObjectCreator 实例列表:

var taskWorkerHub2 = new TaskHubWorker(Constants.HubName, Constants.ServiceBusConnString)
    .AddTaskOrchestrations(InfiniteOrchestrations)
    .AddTaskActivities(typeof(DemoActivityV1), typeof(DemoActivityV2));

这样,worker 现在就能知道协调的两个版本,并将它们关联为具有相同名称但不同版本的协调。

以下是 TaskHubObjectCreator 类的定义:使用 DurableTask.TaskHubObjectCreator 类;

using System;
using DurableTask;

/// <summary>
/// A factory class which allows creation an orchestration instance based on the string ID and version
/// </summary>
public class TaskHubObjectCreator : ObjectCreator<TaskOrchestration>
{
    private readonly Func<TaskOrchestration> objectCreatorFunc;

    /// <summary>
    /// Constructs an instance of TaskHubObjectCreator
    /// </summary>
    /// <param name="name">The string name of the orchestration which this factory creates. Several different classes which are conceptually related can have the same string ID but differnet version.</param>
    /// <param name="version">The version of the orchestration that this object creates</param>
    /// <param name="objectCreatorFunc">Creator function. This function must create the correct object for the (name, version) pair provided</param>
    public TaskHubObjectCreator(string name, string version, Func<TaskOrchestration> objectCreatorFunc)
    {
        if (string.IsNullOrEmpty(name))
        {
            throw new ArgumentNullException("name");
        }

        if (string.IsNullOrEmpty(version))
        {
            throw new ArgumentNullException("version");
        }

        if (objectCreatorFunc == null)
        {
            throw new ArgumentNullException("objectCreatorFunc");
        }

        this.Name = name;
        this.Version = version;
        this.objectCreatorFunc = objectCreatorFunc;
    }

    /// <summary>
    /// Invokes the creator function, thus creating the correct instance for (name, version) provided.
    /// </summary>
    /// <returns>An instance of an orchestration</returns>
    public override TaskOrchestration Create()
    {
        return this.objectCreatorFunc();
    }
}

完成 Worker 的配置后,我们需要编写与默认不同的客户端,以使该进程正常运行: orchestrationInstance = client.CreateOrchestrationInstance("Infinite", "1", "1", input);

第一个参数是协调的名称。通常,这将是类型或类型的 .ToString 值。但由于我们用逻辑名称 “Infinite “注册了协调,因此将使用该名称。

第二个参数是版本,等于 “1”。这将是 Worker 可以理解的版本之一。 第三个参数(也是 “1”)是 ID。出于幂等性考虑,ID 在不同调用中应保持一致。 最后一个参数是协调的输入,其类型由泛型决定。

最后,在部署过程中不要忘记用版本号更新常量:一旦部署完成,旧的协调必须继续使用新的版本。

10 - 特性-自动重试

在 Durable Task Framework 中实现自动重试的特性

https://github.com/Azure/durabletask/wiki/Feature---Automatic-Retries

任何使用云服务的应用程序都应在一定程度上对故障具有弹性,因此客户端重试成为实施的重要部分。

该框架提供了替代调度方法,可根据提供的策略在任务活动失败时执行重试。如果您需要自动重试,例如从网络服务读取数据或向数据库执行空闲写入的任务活动,这将非常有用。

public class GetQuoteOrchestration : TaskOrchestration<string, string>
{
    public override async Task<string> RunTask(OrchestrationContext context, string input)
    {
        // retry every 10 seconds upto 5 times before giving up and bubbling up the exception
        RetryOptions retryOptions = new RetryOptions(TimeSpan.FromSeconds(10), 5);
        await context.ScheduleWithRetry<object>(typeof (GetQuote), retryOptions, null);
        return null;
    }
}

11 - 特性-持久计时器

在 Durable Task Framework 中实现持久计时器的特性

https://github.com/Azure/durabletask/wiki/Feature---Durable-Timers

用户可以在协调代码中等待异步定时器事件。

public class EncodeVideoOrchestration : TaskOrchestration<string, string>
{
    public override async Task<string> RunTask(OrchestrationContext context, string input)
    {
        string encodedUrl = await context.ScheduleTask<string>(typeof (EncodeActivity), input);
        await context.CreateTimer(context.CurrentUtcDateTime.Add(TimeSpan.FromDays(1)), "timer1");
        await context.ScheduleTask<object>(typeof (EmailActivity), input);
                
        return encodedUrl;
    }
}

等待 CreateTimer 任务的行将导致协调在编码视频和电子邮件活动之间休眠一天。

定时器可用于定期工作和超时。

public class BillingOrchestration : TaskOrchestration<string, string>
{
    public override async Task<string> RunTask(OrchestrationContext context, string input)
    {
        for (int i = 0; i < 10; i++)
        {
            await context.CreateTimer(context.CurrentUtcDateTime.Add(TimeSpan.FromDays(1)), "timer1");
            await context.ScheduleTask<object>(typeof (BillingActivity));
        }
        return null;
    }
}

在上面的片段中,计费协调将每天发出信号,并在唤醒时调用一些计费活动。

public class GetQuoteOrchestration : TaskOrchestration<string, string>
{
    public override async Task<string> RunTask(OrchestrationContext context, string input)
    {
        CancellationTokenSource cancellationTokenForTimer = new CancellationTokenSource();
        Task timer = context.CreateTimer(
            context.CurrentUtcDateTime.Add(TimeSpan.FromSeconds(5)), "timer1", cancellationTokenForTimer.Token);
        Task getQuote = context.ScheduleTask<object>(typeof(GetQuote));
        Task winner = await Task.WhenAny(timer, getQuote);
        if (timer.IsCompleted)
        {
            // request timed out, do some compensating action
        }
        else
        {
            // without this, timer will still block
            // orchestration completion
            cancellationTokenForTimer.Cancel();

            // use getQuote task result
        }
        return null;
    }
}

在此代码段中,我们安排了 GetQuote 活动,并创建了一个 5 秒后触发的计时器。如果定时器在活动返回前触发,我们就运行一些补偿,否则就使用返回的报价。

12 - 特性-等待外部事件

在 Durable Task Framework 中实现等待外部事件的特性

https://github.com/Azure/durabletask/wiki/Feature---Waiting-on-External-Events-%28Human-Interaction%29

通常情况下,协调需要等待外部事件,如人类输入某些输入或其他外部触发。该框架为协调提供了一种异步等待外部事件的机制。

public class GetQuoteOrchestration : TaskOrchestration<string, string>
{
    TaskCompletionSource<object> getPermission = new TaskCompletionSource<object>(); 

    public override async Task<string> RunTask(OrchestrationContext context, string input)
    {
        await getPermission.Task;
        await context.ScheduleTask<object>(typeof (GetQuote), null);
        return null;
    }

    public override void OnEvent(OrchestrationContext context, string name, string input)
    {
        getPermission.SetResult(null);
    }
}

要从外部触发事件,用户可以调用 TaskHubClient.RaiseEvent 方法。

TaskHubClient client = new TaskHubClient("test", serviceBusConnString);
OrchestrationInstance instance = client.CreateOrchestrationInstance(typeof (GetQuoteOrchestration),  "quote")
client.RaiseEvent(instance, "dummyEvent", "dummyData");

13 - 特性-永恒编排

在 Durable Task Framework 中实现永恒编排(又名无限循环)的特性

https://github.com/Azure/durabletask/wiki/Feature---Eternal-Orchestrations-%28aka-infinite-loops%29

如《编写任务编排》一文所述,框架会重播执行历史,为用户的任务编排实例重新创建程序状态。历史记录的大小是有限制的,因此不可能出现无限循环的任务编排类。

利用生成功能,用户可以 “检查点” 协调实例并创建新的实例。

public class CronOrchestration : TaskOrchestration<string, int>
{
    public override async Task<string> RunTask(OrchestrationContext context, int intervalHours)
    {
        // bounded loop
        for (int i = 0; i < 10; i++)
        {
            await context.CreateTimer<object>(
                context.CurrentUtcDateTime.Add(TimeSpan.FromHours(intervalHours)), null);
            // TODO : do something interesting 
        }

        // create a new instance of self with the same input (or different if needed)
        context.ContinueAsNew(intervalHours);
        return null;
    }
}

在这个片段中,用户告诉框架为自己创建一个全新的实例(即新一代或执行),并将收到的输入作为输入转发给新实例。这个协调过程可以无限期运行,而不会受到历史记录大小的限制。

14 - 特性-子编排

在 Durable Task Framework 中实现子编排(又名内嵌编排)的特性

https://github.com/Azure/durabletask/wiki/Feature---Sub-orchestrations-%28aka-nested-orchestrations%29

协调还可以使用子协调功能启动和等待其他协调。如果您有一个协调库,并希望围绕这些协调库构建一个更大的协调,这将非常有用。

public class PeriodicBillingJob : TaskOrchestration<string, int>
{
    // hardcoded list of apps to run billing orchestrations on
    static readonly string[] ApplicationList = new string[] { "app1", "app2" };

    public override async Task<string> RunTask(OrchestrationContext context, int intervalHours)
    {
        // bounded loop
        for (int i = 0; i < 10; i++)
        {
            await context.CreateTimer<object>(
                context.CurrentUtcDateTime.Add(TimeSpan.FromHours(intervalHours)), null);

            List<Task> billingTasks = new List<Task>();

            foreach (string appName in PeriodicBillingJob.ApplicationList)
            {
                billingTasks.Add(
                    context.CreateSubOrchestrationInstance<bool>(typeof (BillingOrchestration), appName));
            }
            await Task.WhenAll(billingTasks);
        }

        // create a new instance of self with the same input (or different if needed)
        context.ContinueAsNew(intervalHours);
        return null;
    }
}

// a reusable orchestration which can either be triggered directly by the admin or via 
// some master recurring periodic billing orchestration
public class BillingOrchestration : TaskOrchestration<bool, string>
{
    public override async Task<bool> RunTask(OrchestrationContext context, string applicationName)
    {
        // TODO : process billing information for 'applicationName'
        return true;
    }
}

15 - 特性-框架诊断

在 Durable Task Framework 中实现框架诊断的特性

https://github.com/Azure/durabletask/wiki/Framework-Diagnostics

重要: 此信息已过时,仅适用于已废弃的 Durable Task Framework v1 版本。关于所有最新版本,请参阅 此处 信息。

框架的所有组件都会记录到跟踪源 “DurableTask”。可以将监听器连接到该跟踪源,以获取框架跟踪。

框架随附的 TraceListener 可将日志记录到控制台和调试流。监听器的类名为 DurableTask.Tracing.OrchestrationConsoleTraceListener。

下面是一个 app.config 文件片段,显示了如何加载控制台跟踪监听器:

  <system.diagnostics>
    <trace autoflush="true"/>
    <sources>
      <source name="DurableTask"
              switchName="traceSwitch"
              switchType="System.Diagnostics.SourceSwitch" >
        <listeners>
          <clear/>
          <add name="configConsoleListener" 
            type=" DurableTask.Tracing.OrchestrationConsoleTraceListener, DurableTask"
               traceOutputOptions="DateTime" />
        </listeners>
      </source>
    </sources>
    <switches>
      <add name="traceSwitch" value="Verbose" />
    </switches>
  </system.diagnostics>