WorkflowActivity实现

WorkflowActivity的代码实现

WorkflowActivity接口定义

WorkflowActivity接口定义了 Activity

public interface WorkflowActivity {
  /**
   * 执行活动逻辑并返回一个值,该值将被序列化并返回给调用的协调器。
   *
   * @param ctx 提供有关当前活动执行的信息,如活动名称和协调程序提供给它的输入数据。
   * @return 要返回给调用协调器的任何可序列化值。
   */
  Object run(WorkflowActivityContext ctx);
}

WorkflowActivity 的 javadoc 描述如下:

任务活动实现的通用接口。

活动(Activity)是 durable task 协调的基本工作单位。活动(Activity)是在业务流程中进行协调的任务。例如,您可以创建一个协调器来处理订单。这些任务包括检查库存、向客户收费和创建装运。每个任务都是一个单独的活动(Activity)。这些活动(Activity)可以串行执行、并行执行或两者结合执行。

与任务协调器不同的是,活动(Activity)在工作类型上不受限制。活动(Activity)函数经常用于进行网络调用或运行 CPU 密集型操作。活动(Activity)还可以将数据返回给协调器函数。 durable task 运行时保证每个被调用的活动(Activity)函数在协调执行期间至少被执行一次。

由于活动(Activity)只能保证至少执行一次,因此建议尽可能将活动(Activity)逻辑作为幂等逻辑来实现。

协调器使用 io.dapr.workflows.WorkflowContext.callActivity 方法重载之一来调度活动。

WorkflowActivityContext

WorkflowActivityContext 简单包装了 durabletask 的 TaskActivityContext :

import com.microsoft.durabletask.TaskActivityContext;

public class WorkflowActivityContext implements TaskActivityContext {
  private final TaskActivityContext innerContext;

  public WorkflowActivityContext(TaskActivityContext context) throws IllegalArgumentException {
    if (context == null) {
      throw new IllegalArgumentException("Context cannot be null");
    }
    this.innerContext = context;
  }
  ......
}

TaskActivityContext 接口要求的 getName() 和 getInput() 方法都简单代理给了内部的 durabletask 的 TaskActivityContext :

  public String getName() {
    return this.innerContext.getName();
  }

  public <T> T getInput(Class<T> targetType) {
    return this.innerContext.getInput(targetType);
  }

备注:这样的包装并没有起到隔离 dapr sdk 和 durabletask sdk 的目的,还是紧密的耦合在一起,包装的意义何在?