这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

runtime package

runtime package中的代码

1 - WorkflowRuntime实现

WorkflowRuntime的代码实现

WorkflowRuntime 简单封装了 durabletask 的 DurableTaskGrpcWorker:

import com.microsoft.durabletask.DurableTaskGrpcWorker;

public class WorkflowRuntime implements AutoCloseable {

  private DurableTaskGrpcWorker worker;

  public WorkflowRuntime(DurableTaskGrpcWorker worker) {
    this.worker = worker;
  }
  ......   
}

然后将 start() 和 close() 方法简单的代理给 durabletask 的 DurableTaskGrpcWorker:

  public void start() {
    this.start(true);
  }

  public void start(boolean block) {
    if (block) {
      this.worker.startAndBlock();
    } else {
      this.worker.start();
    }
  }

  public void close() {
    if (this.worker != null) {
      this.worker.close();
      this.worker = null;
    }
  }

2 - WorkflowRuntimeBuilder实现

WorkflowRuntime的代码实现

类定义

WorkflowRuntimeBuilder 用来构建 WorkflowRuntime,类似 WorkflowRuntime 只是简单封装了 durabletask 的 DurableTaskGrpcWorker, WorkflowRuntimeBuilder 的实现也是简单封装了 durabletask 的 DurableTaskGrpcWorkerBuilder:

import com.microsoft.durabletask.DurableTaskGrpcWorkerBuilder;

public class WorkflowRuntimeBuilder {
  private static volatile WorkflowRuntime instance;
  private DurableTaskGrpcWorkerBuilder builder;

  public WorkflowRuntimeBuilder() {
    this.builder = new DurableTaskGrpcWorkerBuilder().grpcChannel(NetworkUtils.buildGrpcManagedChannel());
  }
  ......
}

grpcChannel()的细节后面细看。

registerWorkflow()方法

registerWorkflow() 方法注册 workflow 对象,实际代理给 DurableTaskGrpcWorkerBuilder 的 addOrchestration() 方法:

  public <T extends Workflow> WorkflowRuntimeBuilder registerWorkflow(Class<T> clazz) {
    this.builder = this.builder.addOrchestration(
        new OrchestratorWrapper<>(clazz)
    );

    return this;
  }

registerActivity() 方法

registerActivity() 方法注册 activity 对象,实际代理给 DurableTaskGrpcWorkerBuilder 的 addActivity() 方法:

  public <T extends WorkflowActivity> void registerActivity(Class<T> clazz) {
    this.builder = this.builder.addActivity(
        new ActivityWrapper<>(clazz)
    );
  }

build() 方法

build() 方法实现了一个简单的单例,只容许构建一个 WorkflowRuntime 的 instance:

private static volatile WorkflowRuntime instance;  

public WorkflowRuntime build() {
    if (instance == null) {
      synchronized (WorkflowRuntime.class) {
        if (instance == null) {
          instance = new WorkflowRuntime(this.builder.build());
        }
      }
    }
    return instance;
  }

grpcChannel 的构建细节

DurableTaskGrpcWorkerBuilder() 在构建时,需要设置 grpcChannel,而这个 grpcChannel 是通过 NetworkUtils.buildGrpcManagedChannel() 方法来实现的。

NetworkUtils.buildGrpcManagedChannel() 在 sdk/src/main/java/io/dapr/utils/NetworkUtils.java 文件中,是一个通用的网络工具类。buildGrpcManagedChannel() 方法的实现如下:

  
private static final String DEFAULT_SIDECAR_IP = "127.0.0.1";
private static final Integer DEFAULT_GRPC_PORT = 50001;

public static final Property<String> SIDECAR_IP = new StringProperty(
      "dapr.sidecar.ip",
      "DAPR_SIDECAR_IP",
      DEFAULT_SIDECAR_IP);

  public static final Property<Integer> GRPC_PORT = new IntegerProperty(
      "dapr.grpc.port",
      "DAPR_GRPC_PORT",
      DEFAULT_GRPC_PORT);

  public static final Property<String> GRPC_ENDPOINT = new StringProperty(
      "dapr.grpc.endpoint",
      "DAPR_GRPC_ENDPOINT",
      null);

public static ManagedChannel buildGrpcManagedChannel() {
    // 从系统属性或者环境变量中读取 dapr sidecar 的IP
    String address = Properties.SIDECAR_IP.get();
    // 从系统属性或者环境变量中读取 dapr grpc 端口
    int port = Properties.GRPC_PORT.get();
    // 默认不用https
    boolean insecure = true;
    // 从系统属性或者环境变量中读取 dapr grpc 端点信息
    String grpcEndpoint = Properties.GRPC_ENDPOINT.get();
    if ((grpcEndpoint != null) && !grpcEndpoint.isEmpty()) {
      // 如果 dapr grpc 端点不为空,则用 grpc 端点的内容覆盖 
      URI uri = URI.create(grpcEndpoint);
      // 通过 schema 是不是 http 来判断是 http 还是 https
      insecure = uri.getScheme().equalsIgnoreCase("http");
      // grpcEndpoint 如果设置有端口则采用,没有设置则根据是 http 还是 https 来选择 80 或者 443 端口
      port = uri.getPort() > 0 ? uri.getPort() : (insecure ? 80 : 443);
      // 覆盖 dapr sidecar 的地址
      address = uri.getHost();
      if ((uri.getPath() != null) && !uri.getPath().isEmpty()) {
        address += uri.getPath();
      }
    }
    
    // 构建连接到指定地址的 grpc channel
    ManagedChannelBuilder<?> builder = ManagedChannelBuilder.forAddress(address, port)
        .userAgent(Version.getSdkVersion());
    if (insecure) {
      builder = builder.usePlaintext();
    }
    return builder.build();
  }

从部署来看,runtime 运行在 client 一侧的 app 应用程序内部,然后通过 durabletask 的 sdk 连接到 dapr sidecar 了,走 grpc 协议。

这个设计有点奇怪,dapr sdk 和 dapr sidecar 之间没有走标准的 dapr API,而是通过 durabletask 的 sdk 。

3 - OrchestratorWrapper实现

OrchestratorWrapper的代码实现

背景

WorkflowRuntimeBuilder 的 registerWorkflow() 方法在注册 workflow 对象时,实际代理给 DurableTaskGrpcWorkerBuilder 的 addOrchestration() 方法:

import com.microsoft.durabletask.TaskOrchestrationFactory;  

public <T extends Workflow> WorkflowRuntimeBuilder registerWorkflow(Class<T> clazz) {
    this.builder = this.builder.addOrchestration(
        new OrchestratorWrapper<>(clazz)
    );

    return this;
  }

而 addOrchestration() 方法的输入参数为 com.microsoft.durabletask.TaskOrchestrationFactory

public interface TaskOrchestrationFactory {
    String getName();
    TaskOrchestration create();
}

因此需要提供一个 TaskOrchestrationFactory 的实现。

类定义

OrchestratorWrapper 类实现了 com.microsoft.durabletask.TaskOrchestrationFactory 接口:

class OrchestratorWrapper<T extends Workflow> implements TaskOrchestrationFactory {
  private final Constructor<T> workflowConstructor;
  private final String name;
  ......  
}

构造函数:

  public OrchestratorWrapper(Class<T> clazz) {
    // 获取并设置 name
    this.name = clazz.getCanonicalName();
    try {
      // 获取 Constructor
      this.workflowConstructor = clazz.getDeclaredConstructor();
    } catch (NoSuchMethodException e) {
      throw new RuntimeException(
          String.format("No constructor found for workflow class '%s'.", this.name), e
      );
    }
  }

接口实现

TaskOrchestrationFactory 接口要求的 getName() 方法,直接返回前面获取的 name:

  @Override
  public String getName() {
    return name;
  }

TaskOrchestrationFactory 接口要求的 create() 方法,要返回一个 durabletask 的 TaskOrchestration ,而 TaskOrchestration 是一个 @FunctionalInterface,仅有一个 run() 方法:

@FunctionalInterface
public interface TaskOrchestration {
    void run(TaskOrchestrationContext ctx);
}

因此构建 TaskOrchestration 实例的方式被简写为:

import com.microsoft.durabletask.TaskOrchestration;

  @Override
  public TaskOrchestration create() {
    return ctx -> {
      T workflow;
      try {
        // 通过 workflow 的构造器生成一个 workflow 实例
        workflow = this.workflowConstructor.newInstance();
      } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
        throw new RuntimeException(
            String.format("Unable to instantiate instance of workflow class '%s'", this.name), e
        );
      }
      // 将 durable task 的 context 包装为 dapr 的 workflow context DaprWorkflowContextImpl
      // 然后执行 workflow.run()
      workflow.run(new DaprWorkflowContextImpl(ctx));
    };

  }

4 - ActivityWrapper实现

ActivityWrapper的代码实现

背景

WorkflowRuntimeBuilder 的 registerActivity() 方法在注册 activity 对象时,实际代理给 DurableTaskGrpcWorkerBuilder 的 addActivity() 方法:

import com.microsoft.durabletask.TaskOrchestrationFactory;  

  public <T extends WorkflowActivity> void registerActivity(Class<T> clazz) {
    this.builder = this.builder.addActivity(
        new ActivityWrapper<>(clazz)
    );
  }

而 addActivity() 方法的输入参数为 com.microsoft.durabletask.TaskActivityFactory

public interface TaskActivityFactory {
    String getName();
    TaskActivity create();
}

因此需要提供一个 TaskActivityFactory 的实现。

类定义

ActivityWrapper 类实现了 com.microsoft.durabletask.TaskActivityFactory 接口:

public class ActivityWrapper<T extends WorkflowActivity> implements TaskActivityFactory {
  private final Constructor<T> activityConstructor;
  private final String name;
  ......  
}

构造函数:

  public ActivityWrapper(Class<T> clazz) {
    this.name = clazz.getCanonicalName();
    try {
      this.activityConstructor = clazz.getDeclaredConstructor();
    } catch (NoSuchMethodException e) {
      throw new RuntimeException(
          String.format("No constructor found for activity class '%s'.", this.name), e
      );
    }
  }

接口实现

TaskActivityFactory 接口要求的 getName() 方法,直接返回前面获取的 name:

  @Override
  public String getName() {
    return name;
  }

TaskActivityFactory 接口要求的 create() 方法,要返回一个 durabletask 的 TaskActivity ,而 TaskActivity 是一个 @FunctionalInterface,仅有一个 run() 方法:

@FunctionalInterface
public interface TaskActivity {
    Object run(TaskActivityContext ctx);
}

因此构建 TaskActivity 实例的方式被简写为:

import com.microsoft.durabletask.TaskActivity;

  @Override
  public TaskActivity create() {
    return ctx -> {
      Object result;
      T activity;
      
      try {
        activity = this.activityConstructor.newInstance();
      } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
        throw new RuntimeException(
            String.format("Unable to instantiate instance of activity class '%s'", this.name), e
        );
      }

      result = activity.run(new WorkflowActivityContext(ctx));
      return result;
    };
  }
}

5 - WorkflowActivity实现

WorkflowActivity的代码实现

WorkflowActivity接口定义

WorkflowActivity接口定义了 Activity

public interface WorkflowActivity {
  /**
   * 执行活动逻辑并返回一个值,该值将被序列化并返回给调用的协调器。
   *
   * @param ctx 提供有关当前活动执行的信息,如活动名称和协调程序提供给它的输入数据。
   * @return 要返回给调用协调器的任何可序列化值。
   */
  Object run(WorkflowActivityContext ctx);
}

WorkflowActivity 的 javadoc 描述如下:

任务活动实现的通用接口。

活动(Activity)是 durable task 协调的基本工作单位。活动(Activity)是在业务流程中进行协调的任务。例如,您可以创建一个协调器来处理订单。这些任务包括检查库存、向客户收费和创建装运。每个任务都是一个单独的活动(Activity)。这些活动(Activity)可以串行执行、并行执行或两者结合执行。

与任务协调器不同的是,活动(Activity)在工作类型上不受限制。活动(Activity)函数经常用于进行网络调用或运行 CPU 密集型操作。活动(Activity)还可以将数据返回给协调器函数。 durable task 运行时保证每个被调用的活动(Activity)函数在协调执行期间至少被执行一次。

由于活动(Activity)只能保证至少执行一次,因此建议尽可能将活动(Activity)逻辑作为幂等逻辑来实现。

协调器使用 io.dapr.workflows.WorkflowContext.callActivity 方法重载之一来调度活动。

WorkflowActivityContext

WorkflowActivityContext 简单包装了 durabletask 的 TaskActivityContext :

import com.microsoft.durabletask.TaskActivityContext;

public class WorkflowActivityContext implements TaskActivityContext {
  private final TaskActivityContext innerContext;

  public WorkflowActivityContext(TaskActivityContext context) throws IllegalArgumentException {
    if (context == null) {
      throw new IllegalArgumentException("Context cannot be null");
    }
    this.innerContext = context;
  }
  ......
}

TaskActivityContext 接口要求的 getName() 和 getInput() 方法都简单代理给了内部的 durabletask 的 TaskActivityContext :

  public String getName() {
    return this.innerContext.getName();
  }

  public <T> T getInput(Class<T> targetType) {
    return this.innerContext.getInput(targetType);
  }

备注:这样的包装并没有起到隔离 dapr sdk 和 durabletask sdk 的目的,还是紧密的耦合在一起,包装的意义何在?