1 - workflow定义

workflow定义

workflow

Workflow 定义定义很简单:

public abstract class Workflow {
  // 默认构造函数应该可以不用写的
  public Workflow(){
  }

  public abstract WorkflowStub create();

  public void run(WorkflowContext ctx) {
    this.create().run(ctx);
  }
}

create() 方法定义创建 WorkflowStub 的模板方法,然后在 run() 方法通过执行 create() 方法创建 WorkflowStub ,在执行 WorkflowStub 的 run() 方法。

WorkflowStub

WorkflowStub 是一个单方法的接口定义,用于实现函数编程,标注有 java.lang.@FunctionalInterface 注解。

@FunctionalInterface
public interface WorkflowStub {
  void run(WorkflowContext ctx);
}

@FunctionalInterface 的 javadoc 描述如下:

一种信息性注解类型,用于表明接口类型声明是 Java 语言规范所定义的函数接口。从概念上讲,一个函数接口只有一个抽象方法。由于默认方法有一个实现,所以它们不是抽象方法。如果一个接口声明了一个覆盖 java.lang.Object 公共方法之一的抽象方法,该方法也不计入接口的抽象方法数,因为接口的任何实现都将有一个来自 java.lang.Object 或其他地方的实现。

请注意,函数接口的实例可以通过 lambda 表达式、方法引用或构造器引用来创建。

如果一个类型被注释为该注释类型,编译器必须生成一条错误信息,除非:

  • 该类型是接口类型,而不是注解类型、枚举或类。
  • 注解的类型满足函数接口的要求。

然而,无论接口声明中是否有 FunctionalInterface 注解,编译器都会将任何符合函数接口定义的接口视为函数接口。

WorkflowContext

出乎意外的是 WorkflowContext 的定义超级复杂,远远不是一个 上下文 那么简单。

WorkflowContext的基本方法

WorkflowContext 接口上定义了大量的方法,其中部分基本方法

public interface WorkflowContext {
  // 通过这个方法传递 logger 对象以供在后续执行时打印日志
  Logger getLogger();

  // 获取 workflow 的 name
  String getName();

  // 获取 workflow instance 的 id
  String getInstanceId();

  //获取当前协调时间(UTC)
  Instant getCurrentInstant();

  // 完成当前 wofklow,输出是完成的workflow的序列化输出
  void complete(Object output);
  ......
}

waitForExternalEvent()方法

WorkflowContext 接口上定义了三个 waitForExternalEvent() 接口方法和一个默认实现:

public interface WorkflowContext {
  ......
  <V> Task<V> waitForExternalEvent(String name, Duration timeout, Class<V> dataType) throws TaskCanceledException;

  <V> Task<Void> waitForExternalEvent(String name, Duration timeout) throws TaskCanceledException;

  <V> Task<Void> waitForExternalEvent(String name) throws TaskCanceledException;

  default <V> Task<V> waitForExternalEvent(String name, Class<V> dataType) {
    try {
      return this.waitForExternalEvent(name, null, dataType);
    } catch (TaskCanceledException e) {
      // This should never happen because of the max duration
      throw new RuntimeException("An unexpected exception was throw while waiting for an external event.", e);
    }
  }
  ......
}

waitForExternalEvent 的 javadoc 描述如下:

等待名为 name 的事件发生,并返回一个 Task,该任务在收到事件时完成,或在超时时取消。

如果当前协调器尚未等待名为 name 的事件,那么事件将保存在协调器实例状态中,并在调用此方法时立即派发。即使当前协调器在收到事件前取消了等待操作,事件保存也会发生。

协调器可以多次等待同一事件名,因此允许等待多个同名事件。协调器收到的每个外部事件将只完成本方法返回的一个任务。

特别注意: 这个 Task 的类型是 com.microsoft.durabletask.Task ,直接用在 dapr workflow 的接口定义上,意味着 dapr workflow 彻底和 durabletask 绑定。

callActivity()方法

WorkflowContext 接口上定义了 callActivity() 接口方法和多个默认方法来重写不同参数的 callActivity() 方法

public interface WorkflowContext {
  ......
  <V> Task<V> callActivity(String name, Object input, TaskOptions options, Class<V> returnType);

  default Task<Void> callActivity(String name) {
    return this.callActivity(name, null, null, Void.class);
  }

  default Task<Void> callActivity(String name, Object input) {
    return this.callActivity(name, input, null, Void.class);
  }

  default <V> Task<V> callActivity(String name, Class<V> returnType) {
    return this.callActivity(name, null, null, returnType);
  }

  default <V> Task<V> callActivity(String name, Object input, Class<V> returnType) {
    return this.callActivity(name, input, null, returnType);
  }

  default Task<Void> callActivity(String name, Object input, TaskOptions options) {
    return this.callActivity(name, input, options, Void.class);
  }
  ......
}

waitForExternalEvent 的 javadoc 描述如下:

使用指定的 input 异步调用一个 activity,并在 activity 完成时返回一个新的 task。如果 activity 成功完成,返回的 task 值将是 task 的输出。如果 activity 失败,返回的 task 将以 TaskFailedException 异常完成。

isReplaying() 方法

isReplaying() 用来判断当前工作流当前是否正在重放之前的执行:

public interface WorkflowContext {
  ......
  boolean isReplaying();
}

waitForExternalEvent 的 javadoc 描述如下:

获取一个值,指示工作流当前是否正在重放之前的执行。

工作流函数从内存中卸载后会进行 “重放”,以重建本地变量状态。在重放过程中,先前执行的任务将自动使用存储在工作流历史记录中的先前查看值完成。一旦工作流达到不再重放现有历史记录的程度,此方法将返回 false。

如果您的逻辑只需要在不重放时运行,则可以使用此方法。例如,某些类型的应用程序日志在作为重放的一部分进行复制时可能会变得过于嘈杂。应用程序代码可以检查函数是否正在重放,然后在该值为 false 时发出日志语句。

allOf()和 anyOf()方法

  <V> Task<List<V>> allOf(List<Task<V>> tasks) throws CompositeTaskFailedException;

  Task<Task<?>> anyOf(List<Task<?>> tasks);

  default Task<Task<?>> anyOf(Task<?>... tasks) {
    return this.anyOf(Arrays.asList(tasks));
  }

allOf 的 javadoc 描述如下:

返回一个新任务,该任务在所有给定任务完成后完成。如果任何给定任务在完成时出现异常,返回的任务也会在完成时出现 CompositeTaskFailedException,其中包含第一次遇到的故障的详细信息。返回的任务值是给定任务返回值的有序列表。如果没有提供任务,则返回值为空的已完成任务。

该方法适用于在继续协调的下一步之前等待一组独立任务的完成,如下面的示例:

Task t1 = ctx.callActivity(“MyActivity”, String.class); Task t2 = ctx.callActivity(“MyActivity”, String.class); Task t3 = ctx.callActivity(“MyActivity”, String.class);

List orderedResults = ctx.allOf(List.of(t1, t2, t3)).await();

任何给定任务出现异常都会导致非受查的 CompositeTaskFailedException 异常。可以通过检查该异常来获取单个任务的失败详情。

try { List orderedResults = ctx.allOf(List.of(t1, t2, t3)).await(); } catch (CompositeTaskFailedException e) { List exceptions = e.getExceptions() } }

特别注意: 这个 CompositeTaskFailedException 的类型是 com.microsoft.durabletask.CompositeTaskFailedException ,直接用在 dapr workflow 的接口定义上,意味着 dapr workflow 彻底和 durabletask 绑定。

anyOf 的 javadoc 描述如下:

当任何给定任务完成时,返回一个已完成的新任务。新任务的值是已完成任务对象的引用。如果没有提供任务,则返回一个永不完成的任务。

该方法适用于等待多个并发任务,并在第一个任务完成时执行特定于任务的操作,如下面的示例:

Task event1 = ctx.waitForExternalEvent(“Event1”); Task event2 = ctx.waitForExternalEvent(“Event2”); Task event3 = ctx.waitForExternalEvent(“Event3”);

Task winner = ctx.anyOf(event1、event2、event3).await(); 如果(winner == event1){ // … } else if (winner == event2) { // … // … } else if (winner == event3) { // … // … }

anyOf 方法还可用于实现长时间超时,如下面的示例:

Task activityTask = ctx.callActivity(“SlowActivity”); Task timeoutTask = ctx.createTimer(Duration.ofMinutes(30));

Task winner = ctx.anyOf(activityTask, timeoutTask).await(); 如果(winner == activityTask){ // 完成情况 } else { // 超时情况 }

createTimer()方法

创建一个在指定延迟后过期的 durable timer。

指定较长的延迟(例如,几天或更长时间的延迟)可能会导致创建多个内部管理的 durable timer。协调器代码不需要意识到这种行为。不过,框架日志和存储的历史状态中可能会显示这种行为。

  Task<Void> createTimer(Duration duration);

  default Task<Void> createTimer(ZonedDateTime zonedDateTime) {
    throw new UnsupportedOperationException("This method is not implemented.");
  }

getInput()方法

getInput() 方法获取当前任务协调器的反序列化输入。

<V> V getInput(Class<V> targetType);

callSubWorkflow()

callSubWorkflow() 方法异步调用另一个工作流作为子工作流:

  default Task<Void> callSubWorkflow(String name) {
    return this.callSubWorkflow(name, null);
  }

  default Task<Void> callSubWorkflow(String name, Object input) {
    return this.callSubWorkflow(name, input, null);
  }

  default <V> Task<V> callSubWorkflow(String name, Object input, Class<V> returnType) {
    return this.callSubWorkflow(name, input, null, returnType);
  }

  default <V> Task<V> callSubWorkflow(String name, Object input, String instanceID, Class<V> returnType) {
    return this.callSubWorkflow(name, input, instanceID, null, returnType);
  }

  default Task<Void> callSubWorkflow(String name, Object input, String instanceID, TaskOptions options) {
    return this.callSubWorkflow(name, input, instanceID, options, Void.class);
  }

  <V> Task<V> callSubWorkflow(String name,
                              @Nullable Object input,
                              @Nullable String instanceID,
                              @Nullable TaskOptions options,
                              Class<V> returnType);

callSubWorkflow() 的 javadoc 描述如下:

异步调用另一个工作流作为子工作流,并在子工作流完成时返回一个任务。如果子工作流成功完成,返回的任务值将是 activity 的输出。如果子工作流失败,返回的任务将以 TaskFailedException 异常完成。

子工作流有自己的 instance ID、历史和状态,与启动它的父工作流无关。将大型协调分解为子工作流有很多好处:

  • 将大型协调拆分成一系列较小的子工作流可以使代码更易于维护。
  • 如果协调逻辑需要协调大量任务,那么在多个计算节点上并发分布协调逻辑就非常有用。
  • 通过保持较小的父协调历史记录,可以减少内存使用和 CPU 开销。

缺点是启动子工作流和处理其输出会产生开销。这通常只适用于非常小的协调。

由于子工作流独立于父工作流,因此终止父协调不会影响任何子工作流。

continueAsNew()

callSubWorkflow() 方法使用新输入重启协调并清除其历史记录:

  default void continueAsNew(Object input) {
    this.continueAsNew(input, true);
  }

  void continueAsNew(Object input, boolean preserveUnprocessedEvents);
}

continueAsNew() 的 javadoc 描述如下:

使用新输入重启协调并清除其历史记录。

该方法主要针对永恒协调(eternal orchestrations),即可能永远无法完成的协调。它的工作原理是重新启动协调,为其提供新的输入,并截断现有的协调历史。它允许协调无限期地继续运行,而不会让其历史记录无限制地增长。定期截断历史记录的好处包括降低内存使用率、减少存储容量,以及在重建状态时缩短协调器重播时间。

当协调器调用 continueAsNew 时,任何未完成任务的结果都将被丢弃。例如,如果计划了一个定时器,但在定时器启动前调用了 continueAsNew,那么定时器事件将被丢弃。唯一的例外是外部事件。默认情况下,如果协调收到外部事件但尚未处理,则会通过调用 waitForExternalEvent 将该事件保存在协调状态单元中。即使协调器使用 continueAsNew 重新启动,这些事件也会保留在内存中。可以通过为 preserveUnprocessedEvents 参数值指定 false 来禁用此行为。

协调器实现应在调用 continueAsNew 方法后立即完成。

2 - DaprWorkflowContextImpl实现

DaprWorkflowContextImpl实现

类定义

DaprWorkflowContextImpl 类实现了 WorkflowContext 接口,实现上采用代理给内部字段 innerContext,这是一个 com.microsoft.durabletask.TaskOrchestrationContext

import com.microsoft.durabletask.TaskOrchestrationContext;

public class DaprWorkflowContextImpl implements WorkflowContext {
  private final TaskOrchestrationContext innerContext;
  private final Logger logger;
  ......
}

构造函数只是简单赋值,加了一些必要的 null 检测:

public DaprWorkflowContextImpl(TaskOrchestrationContext context) throws IllegalArgumentException {
    this(context, LoggerFactory.getLogger(WorkflowContext.class));
  }

  public DaprWorkflowContextImpl(TaskOrchestrationContext context, Logger logger) throws IllegalArgumentException {
    if (context == null) {
      throw new IllegalArgumentException("Context cannot be null");
    }
    if (logger == null) {
      throw new IllegalArgumentException("Logger cannot be null");
    }

    this.innerContext = context;
    this.logger = logger;
  }

方法实现

除 getLogger() 外的所有方法的实现都是简单的代理给 innerContext 的同名方法:

  public Logger getLogger() {
    if (this.innerContext.getIsReplaying()) {
      return NOPLogger.NOP_LOGGER;
    }
    return this.logger;
  }

  public String getName() {
    return this.innerContext.getName();
  }

  public String getInstanceId() {
    return this.innerContext.getInstanceId();
  }

  public Instant getCurrentInstant() {
    return this.innerContext.getCurrentInstant();
  }

  public boolean isReplaying() {
    return this.innerContext.getIsReplaying();
  }

  public <V> Task<V> callSubWorkflow(String name, @Nullable Object input, @Nullable String instanceID,
                                     @Nullable TaskOptions options, Class<V> returnType) {

    return this.innerContext.callSubOrchestrator(name, input, instanceID, options, returnType);
  }

  public void continueAsNew(Object input) {
    this.innerContext.continueAsNew(input);
  }

小结

这个类基本就是 com.microsoft.durabletask.TaskOrchestrationContext 的简单包裹,所有功能都代理给 com.microsoft.durabletask.TaskOrchestrationContext, 包括设计甚至方法名。

dapr 的 workflow 实现基本是完全绑定在 durabletask 上的。

3 - runtime package

runtime package中的代码

3.1 - WorkflowRuntime实现

WorkflowRuntime的代码实现

WorkflowRuntime 简单封装了 durabletask 的 DurableTaskGrpcWorker:

import com.microsoft.durabletask.DurableTaskGrpcWorker;

public class WorkflowRuntime implements AutoCloseable {

  private DurableTaskGrpcWorker worker;

  public WorkflowRuntime(DurableTaskGrpcWorker worker) {
    this.worker = worker;
  }
  ......   
}

然后将 start() 和 close() 方法简单的代理给 durabletask 的 DurableTaskGrpcWorker:

  public void start() {
    this.start(true);
  }

  public void start(boolean block) {
    if (block) {
      this.worker.startAndBlock();
    } else {
      this.worker.start();
    }
  }

  public void close() {
    if (this.worker != null) {
      this.worker.close();
      this.worker = null;
    }
  }

3.2 - WorkflowRuntimeBuilder实现

WorkflowRuntime的代码实现

类定义

WorkflowRuntimeBuilder 用来构建 WorkflowRuntime,类似 WorkflowRuntime 只是简单封装了 durabletask 的 DurableTaskGrpcWorker, WorkflowRuntimeBuilder 的实现也是简单封装了 durabletask 的 DurableTaskGrpcWorkerBuilder:

import com.microsoft.durabletask.DurableTaskGrpcWorkerBuilder;

public class WorkflowRuntimeBuilder {
  private static volatile WorkflowRuntime instance;
  private DurableTaskGrpcWorkerBuilder builder;

  public WorkflowRuntimeBuilder() {
    this.builder = new DurableTaskGrpcWorkerBuilder().grpcChannel(NetworkUtils.buildGrpcManagedChannel());
  }
  ......
}

grpcChannel()的细节后面细看。

registerWorkflow()方法

registerWorkflow() 方法注册 workflow 对象,实际代理给 DurableTaskGrpcWorkerBuilder 的 addOrchestration() 方法:

  public <T extends Workflow> WorkflowRuntimeBuilder registerWorkflow(Class<T> clazz) {
    this.builder = this.builder.addOrchestration(
        new OrchestratorWrapper<>(clazz)
    );

    return this;
  }

registerActivity() 方法

registerActivity() 方法注册 activity 对象,实际代理给 DurableTaskGrpcWorkerBuilder 的 addActivity() 方法:

  public <T extends WorkflowActivity> void registerActivity(Class<T> clazz) {
    this.builder = this.builder.addActivity(
        new ActivityWrapper<>(clazz)
    );
  }

build() 方法

build() 方法实现了一个简单的单例,只容许构建一个 WorkflowRuntime 的 instance:

private static volatile WorkflowRuntime instance;  

public WorkflowRuntime build() {
    if (instance == null) {
      synchronized (WorkflowRuntime.class) {
        if (instance == null) {
          instance = new WorkflowRuntime(this.builder.build());
        }
      }
    }
    return instance;
  }

grpcChannel 的构建细节

DurableTaskGrpcWorkerBuilder() 在构建时,需要设置 grpcChannel,而这个 grpcChannel 是通过 NetworkUtils.buildGrpcManagedChannel() 方法来实现的。

NetworkUtils.buildGrpcManagedChannel() 在 sdk/src/main/java/io/dapr/utils/NetworkUtils.java 文件中,是一个通用的网络工具类。buildGrpcManagedChannel() 方法的实现如下:

  
private static final String DEFAULT_SIDECAR_IP = "127.0.0.1";
private static final Integer DEFAULT_GRPC_PORT = 50001;

public static final Property<String> SIDECAR_IP = new StringProperty(
      "dapr.sidecar.ip",
      "DAPR_SIDECAR_IP",
      DEFAULT_SIDECAR_IP);

  public static final Property<Integer> GRPC_PORT = new IntegerProperty(
      "dapr.grpc.port",
      "DAPR_GRPC_PORT",
      DEFAULT_GRPC_PORT);

  public static final Property<String> GRPC_ENDPOINT = new StringProperty(
      "dapr.grpc.endpoint",
      "DAPR_GRPC_ENDPOINT",
      null);

public static ManagedChannel buildGrpcManagedChannel() {
    // 从系统属性或者环境变量中读取 dapr sidecar 的IP
    String address = Properties.SIDECAR_IP.get();
    // 从系统属性或者环境变量中读取 dapr grpc 端口
    int port = Properties.GRPC_PORT.get();
    // 默认不用https
    boolean insecure = true;
    // 从系统属性或者环境变量中读取 dapr grpc 端点信息
    String grpcEndpoint = Properties.GRPC_ENDPOINT.get();
    if ((grpcEndpoint != null) && !grpcEndpoint.isEmpty()) {
      // 如果 dapr grpc 端点不为空,则用 grpc 端点的内容覆盖 
      URI uri = URI.create(grpcEndpoint);
      // 通过 schema 是不是 http 来判断是 http 还是 https
      insecure = uri.getScheme().equalsIgnoreCase("http");
      // grpcEndpoint 如果设置有端口则采用,没有设置则根据是 http 还是 https 来选择 80 或者 443 端口
      port = uri.getPort() > 0 ? uri.getPort() : (insecure ? 80 : 443);
      // 覆盖 dapr sidecar 的地址
      address = uri.getHost();
      if ((uri.getPath() != null) && !uri.getPath().isEmpty()) {
        address += uri.getPath();
      }
    }
    
    // 构建连接到指定地址的 grpc channel
    ManagedChannelBuilder<?> builder = ManagedChannelBuilder.forAddress(address, port)
        .userAgent(Version.getSdkVersion());
    if (insecure) {
      builder = builder.usePlaintext();
    }
    return builder.build();
  }

从部署来看,runtime 运行在 client 一侧的 app 应用程序内部,然后通过 durabletask 的 sdk 连接到 dapr sidecar 了,走 grpc 协议。

这个设计有点奇怪,dapr sdk 和 dapr sidecar 之间没有走标准的 dapr API,而是通过 durabletask 的 sdk 。

3.3 - OrchestratorWrapper实现

OrchestratorWrapper的代码实现

背景

WorkflowRuntimeBuilder 的 registerWorkflow() 方法在注册 workflow 对象时,实际代理给 DurableTaskGrpcWorkerBuilder 的 addOrchestration() 方法:

import com.microsoft.durabletask.TaskOrchestrationFactory;  

public <T extends Workflow> WorkflowRuntimeBuilder registerWorkflow(Class<T> clazz) {
    this.builder = this.builder.addOrchestration(
        new OrchestratorWrapper<>(clazz)
    );

    return this;
  }

而 addOrchestration() 方法的输入参数为 com.microsoft.durabletask.TaskOrchestrationFactory

public interface TaskOrchestrationFactory {
    String getName();
    TaskOrchestration create();
}

因此需要提供一个 TaskOrchestrationFactory 的实现。

类定义

OrchestratorWrapper 类实现了 com.microsoft.durabletask.TaskOrchestrationFactory 接口:

class OrchestratorWrapper<T extends Workflow> implements TaskOrchestrationFactory {
  private final Constructor<T> workflowConstructor;
  private final String name;
  ......  
}

构造函数:

  public OrchestratorWrapper(Class<T> clazz) {
    // 获取并设置 name
    this.name = clazz.getCanonicalName();
    try {
      // 获取 Constructor
      this.workflowConstructor = clazz.getDeclaredConstructor();
    } catch (NoSuchMethodException e) {
      throw new RuntimeException(
          String.format("No constructor found for workflow class '%s'.", this.name), e
      );
    }
  }

接口实现

TaskOrchestrationFactory 接口要求的 getName() 方法,直接返回前面获取的 name:

  @Override
  public String getName() {
    return name;
  }

TaskOrchestrationFactory 接口要求的 create() 方法,要返回一个 durabletask 的 TaskOrchestration ,而 TaskOrchestration 是一个 @FunctionalInterface,仅有一个 run() 方法:

@FunctionalInterface
public interface TaskOrchestration {
    void run(TaskOrchestrationContext ctx);
}

因此构建 TaskOrchestration 实例的方式被简写为:

import com.microsoft.durabletask.TaskOrchestration;

  @Override
  public TaskOrchestration create() {
    return ctx -> {
      T workflow;
      try {
        // 通过 workflow 的构造器生成一个 workflow 实例
        workflow = this.workflowConstructor.newInstance();
      } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
        throw new RuntimeException(
            String.format("Unable to instantiate instance of workflow class '%s'", this.name), e
        );
      }
      // 将 durable task 的 context 包装为 dapr 的 workflow context DaprWorkflowContextImpl
      // 然后执行 workflow.run()
      workflow.run(new DaprWorkflowContextImpl(ctx));
    };

  }

3.4 - ActivityWrapper实现

ActivityWrapper的代码实现

背景

WorkflowRuntimeBuilder 的 registerActivity() 方法在注册 activity 对象时,实际代理给 DurableTaskGrpcWorkerBuilder 的 addActivity() 方法:

import com.microsoft.durabletask.TaskOrchestrationFactory;  

  public <T extends WorkflowActivity> void registerActivity(Class<T> clazz) {
    this.builder = this.builder.addActivity(
        new ActivityWrapper<>(clazz)
    );
  }

而 addActivity() 方法的输入参数为 com.microsoft.durabletask.TaskActivityFactory

public interface TaskActivityFactory {
    String getName();
    TaskActivity create();
}

因此需要提供一个 TaskActivityFactory 的实现。

类定义

ActivityWrapper 类实现了 com.microsoft.durabletask.TaskActivityFactory 接口:

public class ActivityWrapper<T extends WorkflowActivity> implements TaskActivityFactory {
  private final Constructor<T> activityConstructor;
  private final String name;
  ......  
}

构造函数:

  public ActivityWrapper(Class<T> clazz) {
    this.name = clazz.getCanonicalName();
    try {
      this.activityConstructor = clazz.getDeclaredConstructor();
    } catch (NoSuchMethodException e) {
      throw new RuntimeException(
          String.format("No constructor found for activity class '%s'.", this.name), e
      );
    }
  }

接口实现

TaskActivityFactory 接口要求的 getName() 方法,直接返回前面获取的 name:

  @Override
  public String getName() {
    return name;
  }

TaskActivityFactory 接口要求的 create() 方法,要返回一个 durabletask 的 TaskActivity ,而 TaskActivity 是一个 @FunctionalInterface,仅有一个 run() 方法:

@FunctionalInterface
public interface TaskActivity {
    Object run(TaskActivityContext ctx);
}

因此构建 TaskActivity 实例的方式被简写为:

import com.microsoft.durabletask.TaskActivity;

  @Override
  public TaskActivity create() {
    return ctx -> {
      Object result;
      T activity;
      
      try {
        activity = this.activityConstructor.newInstance();
      } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
        throw new RuntimeException(
            String.format("Unable to instantiate instance of activity class '%s'", this.name), e
        );
      }

      result = activity.run(new WorkflowActivityContext(ctx));
      return result;
    };
  }
}

3.5 - WorkflowActivity实现

WorkflowActivity的代码实现

WorkflowActivity接口定义

WorkflowActivity接口定义了 Activity

public interface WorkflowActivity {
  /**
   * 执行活动逻辑并返回一个值,该值将被序列化并返回给调用的协调器。
   *
   * @param ctx 提供有关当前活动执行的信息,如活动名称和协调程序提供给它的输入数据。
   * @return 要返回给调用协调器的任何可序列化值。
   */
  Object run(WorkflowActivityContext ctx);
}

WorkflowActivity 的 javadoc 描述如下:

任务活动实现的通用接口。

活动(Activity)是 durable task 协调的基本工作单位。活动(Activity)是在业务流程中进行协调的任务。例如,您可以创建一个协调器来处理订单。这些任务包括检查库存、向客户收费和创建装运。每个任务都是一个单独的活动(Activity)。这些活动(Activity)可以串行执行、并行执行或两者结合执行。

与任务协调器不同的是,活动(Activity)在工作类型上不受限制。活动(Activity)函数经常用于进行网络调用或运行 CPU 密集型操作。活动(Activity)还可以将数据返回给协调器函数。 durable task 运行时保证每个被调用的活动(Activity)函数在协调执行期间至少被执行一次。

由于活动(Activity)只能保证至少执行一次,因此建议尽可能将活动(Activity)逻辑作为幂等逻辑来实现。

协调器使用 io.dapr.workflows.WorkflowContext.callActivity 方法重载之一来调度活动。

WorkflowActivityContext

WorkflowActivityContext 简单包装了 durabletask 的 TaskActivityContext :

import com.microsoft.durabletask.TaskActivityContext;

public class WorkflowActivityContext implements TaskActivityContext {
  private final TaskActivityContext innerContext;

  public WorkflowActivityContext(TaskActivityContext context) throws IllegalArgumentException {
    if (context == null) {
      throw new IllegalArgumentException("Context cannot be null");
    }
    this.innerContext = context;
  }
  ......
}

TaskActivityContext 接口要求的 getName() 和 getInput() 方法都简单代理给了内部的 durabletask 的 TaskActivityContext :

  public String getName() {
    return this.innerContext.getName();
  }

  public <T> T getInput(Class<T> targetType) {
    return this.innerContext.getInput(targetType);
  }

备注:这样的包装并没有起到隔离 dapr sdk 和 durabletask sdk 的目的,还是紧密的耦合在一起,包装的意义何在?

4 - client package

client package中的代码

4.1 - DaprWorkflowClient代码实现

DaprWorkflowClient 的代码实现

定义和创建

类定义

DaprWorkflowClient 定义管理 Dapr 工作流实例的客户端操作。

注意这里是 “管理” !

import com.microsoft.durabletask.DurableTaskClient;

public class DaprWorkflowClient implements AutoCloseable {

  DurableTaskClient innerClient;
  ManagedChannel grpcChannel;
    
  public DaprWorkflowClient() {
    this(NetworkUtils.buildGrpcManagedChannel());
  }
    
  private DaprWorkflowClient(ManagedChannel grpcChannel) {
    this(createDurableTaskClient(grpcChannel), grpcChannel);
  }
    
  private DaprWorkflowClient(DurableTaskClient innerClient, ManagedChannel grpcChannel) {
    this.innerClient = innerClient;
    this.grpcChannel = grpcChannel;
  }

实现上依然是包装 durabletask 的 DurableTaskClient , 而 durabletask 的 DurableTaskClient 在创建时需要传入一个 grpcChannel。

关键点在于这个 grpcChannel 的创建,可以从外部传入,如果没有传入则可以通过 NetworkUtils.buildGrpcManagedChannel() 方法进行创建。

grpcChannel 的创建

实现和之前 WorkflowRuntimeBuilder 中的一致,都是调用 NetworkUtils.buildGrpcManagedChannel() 方法。

NetworkUtils.buildGrpcManagedChannel() 方法在 dapr java sdk 中一共有3处调用:

  1. WorkflowRuntimeBuilder:

      public WorkflowRuntimeBuilder() {
        this.builder = new DurableTaskGrpcWorkerBuilder().grpcChannel(NetworkUtils.buildGrpcManagedChannel());
      }
    
  2. DaprWorkflowClient:

      public DaprWorkflowClient() {
        this(NetworkUtils.buildGrpcManagedChannel());
      }
    
  3. DaprClientBuilder

    final ManagedChannel channel = NetworkUtils.buildGrpcManagedChannel();
    

DurableTaskClient 的创建

DurableTaskClient 的创建是简单的调用 durabletask 的 DurableTaskGrpcClientBuilder 来实现的:

import com.microsoft.durabletask.DurableTaskGrpcClientBuilder;

private static DurableTaskClient createDurableTaskClient(ManagedChannel grpcChannel) {
    return new DurableTaskGrpcClientBuilder()
        .grpcChannel(grpcChannel)
        .build();
  }

close() 方法

close() 方法用于关闭 DaprWorkflowClient,内部实现为关闭包装的 durabletask 的 DurableTaskClient 以及创建时传入的 grpcChannel:

  public void close() throws InterruptedException {
    try {
      if (this.innerClient != null) {
        this.innerClient.close();
        this.innerClient = null;
      }
    } finally {
      if (this.grpcChannel != null && !this.grpcChannel.isShutdown()) {
        this.grpcChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
        this.grpcChannel = null;
      }
    }
  }
}

操作 workflow instance

scheduleNewWorkflow() 方法

scheduleNewWorkflow() 方法调度一个新的 workflow ,即创建并开始一个新的 workflow instance,这个方法返回 workflow instance id:

package io.dapr.workflows.client;  

public <T extends Workflow> String scheduleNewWorkflow(Class<T> clazz) {
    return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName());
  }

  public <T extends Workflow> String scheduleNewWorkflow(Class<T> clazz, Object input) {
    return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName(), input);
  }

  public <T extends Workflow> String scheduleNewWorkflow(Class<T> clazz, Object input, String instanceId) {
    return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName(), input, instanceId);
  }

实现完全代理给 durabletask 的 DurableTaskClient 。

terminateWorkflow() 方法

terminateWorkflow() 方法终止一个 workflow instance 的执行,需要传入之前从 scheduleNewWorkflow() 方法中得到的 workflow instance id。

  public void terminateWorkflow(String workflowInstanceId, @Nullable Object output) {
    this.innerClient.terminate(workflowInstanceId, output);
  }

output 参数是可选的,用来传递被终止的 workflow instance 的输出。

getInstanceState() 方法

getInstanceState() 方法获取 workflow instance 的状态,同样需要传入之前从 scheduleNewWorkflow() 方法中得到的 workflow instance id:

  @Nullable
  public WorkflowInstanceStatus getInstanceState(String instanceId, boolean getInputsAndOutputs) {
    OrchestrationMetadata metadata = this.innerClient.getInstanceMetadata(instanceId, getInputsAndOutputs);
    if (metadata == null) {
      return null;
    }
    return new WorkflowInstanceStatus(metadata);
  }

实现为调用 durabletask 的 DurableTaskClient 的 getInstanceMetadata() 方法来获取 OrchestrationMetadata,然后转换为 dapr 定义的 WorkflowInstanceStatus()。

这里的细节在 WorkflowInstanceStatus 类实现中展开。

waitForInstanceStart() 方法

waitForInstanceStart() 方法等待 workflow instance 执行的开始:

  @Nullable
  public WorkflowInstanceStatus waitForInstanceStart(String instanceId, Duration timeout, boolean getInputsAndOutputs)
      throws TimeoutException {

    OrchestrationMetadata metadata = this.innerClient.waitForInstanceStart(instanceId, timeout, getInputsAndOutputs);
    return metadata == null ? null : new WorkflowInstanceStatus(metadata);
  }

waitForInstanceStart() 方法的 javadoc 描述为:

等待工作流开始运行,并返回一个 WorkflowInstanceStatus 对象,该对象包含已启动实例的元数据,以及可选的输入、输出和自定义状态有效载荷。

“已启动” 的工作流实例是指未处于 “Pending” 状态的任何实例。

如果调用该方法时工作流实例已在运行,该方法将立即返回。

waitForInstanceCompletion() 方法

waitForInstanceCompletion() 方法等待 workflow instance 执行的完成:

  @Nullable
  public WorkflowInstanceStatus waitForInstanceCompletion(String instanceId, Duration timeout,
                                                          boolean getInputsAndOutputs) throws TimeoutException {

    OrchestrationMetadata metadata =
        this.innerClient.waitForInstanceCompletion(instanceId, timeout, getInputsAndOutputs);
    return metadata == null ? null : new WorkflowInstanceStatus(metadata);
  }

waitForInstanceStart() 方法的 javadoc 描述为:

等待工作流完成,并返回一个包含已完成实例元数据的 WorkflowInstanceStatus 对象。

“已完成” 的工作流实例是指处于终止状态之一的任何实例。例如,“Completed”、“Failed” 或 “Terminated” 状态。

工作流是长期运行的,可能需要数小时、数天或数月才能完成。工作流也可能是长久的,在这种情况下,除非终止,否则永远不会完成。在这种情况下,该调用可能会无限期阻塞,因此必须注意确保使用适当的超时。如果调用该方法时工作流实例已经完成,该方法将立即返回。

purgeInstance() 方法

purgeInstance() 方法从工作流状态存储中清除工作流实例的状态:

  public boolean purgeInstance(String workflowInstanceId) {
    PurgeResult result = this.innerClient.purgeInstance(workflowInstanceId);
    if (result != null) {
      return result.getDeletedInstanceCount() > 0;
    }
    return false;
  }

如果找到工作流状态并成功清除,则返回 true,否则返回 false。

raiseEvent() 方法

raiseEvent() 方法向等待中的工作流实例发送事件通知消息:

  public void raiseEvent(String workflowInstanceId, String eventName, Object eventPayload) {
    this.innerClient.raiseEvent(workflowInstanceId, eventName, eventPayload);
  }

TaskHub的方法

这两个方法暂时还知道什么情况下用,暂时忽略。

  public void createTaskHub(boolean recreateIfExists) {
    this.innerClient.createTaskHub(recreateIfExists);
  }

  public void deleteTaskHub() {
    this.innerClient.deleteTaskHub();
  }

4.2 - WorkflowInstanceStatus代码实现

WorkflowInstanceStatus 的代码实现

类定义和构造函数

WorkflowInstanceStatus 代表工作流实例当前状态的快照,包括元数据。

WorkflowInstanceStatus 的实现依然是包装 durabletask,内部是一个 durabletask 的 OrchestrationMetadata,以及 OrchestrationMetadata 携带的 FailureDetails:

import com.microsoft.durabletask.FailureDetails;
import com.microsoft.durabletask.OrchestrationMetadata;

public class WorkflowInstanceStatus {

  private final OrchestrationMetadata orchestrationMetadata;

  @Nullable
  private final WorkflowFailureDetails failureDetails;
    
  public WorkflowInstanceStatus(OrchestrationMetadata orchestrationMetadata) {
    if (orchestrationMetadata == null) {
      throw new IllegalArgumentException("OrchestrationMetadata cannot be null");
    }
    this.orchestrationMetadata = orchestrationMetadata;
    FailureDetails details = orchestrationMetadata.getFailureDetails();
    if (details != null) {
      this.failureDetails = new WorkflowFailureDetails(details);
    } else {
      this.failureDetails = null;
    }
  }

获取 FailureDetails 之后将转为 dapr 的 WorkflowFailureDetails, 这里的细节在 WorkflowFailureDetails 类实现中展开。

各种代理方法

4.3 - WorkflowFailureDetails代码实现

WorkflowFailureDetails 的代码实现

WorkflowFailureDetails 只是非常简单的包装了 durabletask 的 FailureDetails

public class WorkflowFailureDetails {

  FailureDetails workflowFailureDetails;

  /**
   * Class constructor.
   * @param failureDetails failure Details
   */
  public WorkflowFailureDetails(FailureDetails failureDetails) {
    this.workflowFailureDetails = failureDetails;
  }

然后代理各种方法:

  public String getErrorType() {
    return workflowFailureDetails.getErrorType();
  }

  public String getErrorMessage() {
    return workflowFailureDetails.getErrorMessage();
  }

  public String getStackTrace() {
    return workflowFailureDetails.getStackTrace();
  }