这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

workflow app start 流程

workflow app start流程的源码分析

1 - 流程概述

workflow app start流程概述

流程整体

workflow app 启动时,典型代码如下:

    // Register the OrderProcessingWorkflow and its activities with the builder.
    WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(OrderProcessingWorkflow.class);
    builder.registerActivity(NotifyActivity.class);
    builder.registerActivity(ProcessPaymentActivity.class);
    builder.registerActivity(RequestApprovalActivity.class);
    builder.registerActivity(ReserveInventoryActivity.class);
    builder.registerActivity(UpdateInventoryActivity.class);

    // Build and then start the workflow runtime pulling and executing tasks
    try (WorkflowRuntime runtime = builder.build()) {
      System.out.println("Start workflow runtime");
      runtime.start(false);
    }

这个过程中,注册了 workflow 和 activity,然后 start workflow runtime。workflow runtime 会启动 worker,从 dapr sidecar 持续获取工作任务,包括 workflow task 和 activity task,然后执行这些任务并把任务结果返回给到 dapr sidecar。

@startuml
participant "Workflow App" as WorkflowApp
participant "Dapr Sidecar" as DaprSidecar

WorkflowApp -> WorkflowApp: registerWorkflow()

WorkflowApp -> WorkflowApp: registerActivity()

WorkflowApp -[#red]> WorkflowApp: WorkflowRuntime.start()


WorkflowApp -> DaprSidecar: WorkflowRuntime.getWorkItems()
DaprSidecar --> WorkflowApp: 

loop has next task

alt is orchestration task

WorkflowApp -> WorkflowApp: execute orchestration task
WorkflowApp -> DaprSidecar: completeOrchestratorTask()
DaprSidecar --> WorkflowApp: 

else is activity task

WorkflowApp -> WorkflowApp: execute activity task
WorkflowApp -> DaprSidecar: completeActivityTask()
DaprSidecar --> WorkflowApp: 

end

end

@enduml

详细流程

register workflow

@startuml
participant "Workflow App" as WorkflowApp
participant "Dapr Java SDK" as DaprJavaSDK
participant "DurableTask Java SDK" as DurableTaskJavaSDK

WorkflowApp -> DaprJavaSDK: registerWorkflow()
DaprJavaSDK -> DurableTaskJavaSDK: addOrchestration()
DurableTaskJavaSDK --> DaprJavaSDK
DaprJavaSDK --> WorkflowApp: 

@enduml

register activity

@startuml
participant "Workflow App" as WorkflowApp
participant "Dapr Java SDK" as DaprJavaSDK
participant "DurableTask Java SDK" as DurableTaskJavaSDK

WorkflowApp -> DaprJavaSDK: registerActivity()
DaprJavaSDK -> DurableTaskJavaSDK: registerActivity()
DurableTaskJavaSDK --> DaprJavaSDK
DaprJavaSDK --> WorkflowApp: 

@enduml

start workflow runtime

@startuml
participant "Workflow App" as WorkflowApp
participant "Dapr Java SDK" as DaprJavaSDK
participant "DurableTask Java SDK" as DurableTaskJavaSDK

WorkflowApp -> DaprJavaSDK: WorkflowRuntime.start()
DaprJavaSDK -> DurableTaskJavaSDK: worker.start()
DurableTaskJavaSDK --> DaprJavaSDK
DaprJavaSDK --> WorkflowApp: 
@enduml

worker execute tasks

@startuml
participant "Workflow App" as WorkflowApp
participant "Dapr Java SDK" as DaprJavaSDK
participant "DurableTask Java SDK" as DurableTaskJavaSDK

WorkflowApp -> DaprJavaSDK: registerWorkflow()
DaprJavaSDK -> DurableTaskJavaSDK: addOrchestration()
DurableTaskJavaSDK --> DaprJavaSDK
DaprJavaSDK --> WorkflowApp: 

WorkflowApp -> DaprJavaSDK: registerActivity()
DaprJavaSDK -> DurableTaskJavaSDK: registerActivity()
DurableTaskJavaSDK --> DaprJavaSDK
DaprJavaSDK --> WorkflowApp: 

WorkflowApp -> DaprJavaSDK: WorkflowRuntime.start()
DaprJavaSDK -> DurableTaskJavaSDK: worker.start()
DurableTaskJavaSDK --> DaprJavaSDK
DaprJavaSDK --> WorkflowApp: 
@enduml

2 - 构建workflowruntime的源码

workflow app start流程中构建workflow runtime的源码

调用代码

workflow app 中构建 WorkflowRuntime 的典型使用代码如下:

    // Register the OrderProcessingWorkflow and its activities with the builder.
    WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(OrderProcessingWorkflow.class);
    builder.registerActivity(NotifyActivity.class);
    builder.registerActivity(ProcessPaymentActivity.class);
    builder.registerActivity(RequestApprovalActivity.class);
    builder.registerActivity(ReserveInventoryActivity.class);
    builder.registerActivity(UpdateInventoryActivity.class);

    // Build and then start the workflow runtime pulling and executing tasks
    try (WorkflowRuntime runtime = builder.build()) {
      System.out.println("Start workflow runtime");
      runtime.start(false);
    }

代码实现

WorkflowRuntimeBuilder

这个类在 dapr java sdk。

WorkflowRuntimeBuilder 的实现中,自己会保存 workflows 和 activities 信息,也会构建一个来自 DurableTask java sdk 的 DurableTaskGrpcWorkerBuilder 的实例。

import com.microsoft.durabletask.DurableTaskGrpcWorkerBuilder;

public class WorkflowRuntimeBuilder {
  private static volatile WorkflowRuntime instance;
  private DurableTaskGrpcWorkerBuilder builder;
  private Logger logger;
  private Set<String> workflows = new HashSet<String>();
  private Set<String> activities = new HashSet<String>();

  /**
   * Constructs the WorkflowRuntimeBuilder.
   */
  public WorkflowRuntimeBuilder() {
    this.builder = new DurableTaskGrpcWorkerBuilder().grpcChannel(
                          NetworkUtils.buildGrpcManagedChannel(WORKFLOW_INTERCEPTOR));
    this.logger = Logger.getLogger(WorkflowRuntimeBuilder.class.getName());
  }

registerWorkflow() 方法的实现,除了将请求代理给 DurableTaskGrpcWorkerBuilder 之外,还自己保存到 workflows 集合中:

  public <T extends Workflow> WorkflowRuntimeBuilder registerWorkflow(Class<T> clazz) {
    this.builder = this.builder.addOrchestration(
        new OrchestratorWrapper<>(clazz)
    );
    this.logger.log(Level.INFO, "Registered Workflow: " +  clazz.getSimpleName());
    this.workflows.add(clazz.getSimpleName());
    return this;
  }

registerActivity() 方法的实现类似,除了将请求代理给 DurableTaskGrpcWorkerBuilder 之外,还自己保存到 activities 集合中:

  public <T extends WorkflowActivity> void registerActivity(Class<T> clazz) {
    this.builder = this.builder.addActivity(
        new ActivityWrapper<>(clazz)
    );
    this.logger.log(Level.INFO, "Registered Activity: " +  clazz.getSimpleName());
    this.activities.add(clazz.getSimpleName());
  }

OrchestratorWrapper 和 ActivityWrapper 负责将 class 包装为 TaskOrchestrationFactory 和 TaskActivityFactory。

build() 方法调用 DurableTaskGrpcWorkerBuilder 的 build() 方法构建出一个 DurableTaskGrpcWorker ,然后传递给 WorkflowRuntime 的新实例。

  public WorkflowRuntime build() {
    if (instance == null) {
      synchronized (WorkflowRuntime.class) {
        if (instance == null) {
          instance = new WorkflowRuntime(this.builder.build());
        }
      }
    }
    this.logger.log(Level.INFO, "Successfully built dapr workflow runtime");
    return instance;
  }

DurableTaskGrpcWorkerBuilder

这个类在durabletask java sdk中。

DurableTaskGrpcWorkerBuilder 保存 orchestrationFactories 和 activityFactories,还有和 sidecar 连接的一些信息如端口,grpc channel:

public final class DurableTaskGrpcWorkerBuilder {
    final HashMap<String, TaskOrchestrationFactory> orchestrationFactories = new HashMap<>();
    final HashMap<String, TaskActivityFactory> activityFactories = new HashMap<>();
    int port;
    Channel channel;
    DataConverter dataConverter;
    Duration maximumTimerInterval;
......
}

addOrchestration() 将 TaskOrchestrationFactory 保存到 orchestrationFactories 中,key为 name:

    public DurableTaskGrpcWorkerBuilder addOrchestration(TaskOrchestrationFactory factory) {
        String key = factory.getName();
        ......
        this.orchestrationFactories.put(key, factory);
        return this;
    }

类似的, addActivity() 将 TaskActivityFactory 保存到 activityFactories 中,key为 name:

    public DurableTaskGrpcWorkerBuilder addActivity(TaskActivityFactory factory) {
        String key = factory.getName();
        ......
        this.activityFactories.put(key, factory);
        return this;
    }

build() 方法构建出 DurableTaskGrpcWorker() 对象:

    public DurableTaskGrpcWorker build() {
        return new DurableTaskGrpcWorker(this);
    }

DurableTaskGrpcWorker 的构造函数中会保存注册好的 orchestrationFactories 和 activityFactories,然后构建 TaskHubSidecarServiceGrpc 对象作为 sidecarClient,用于后续和 dapr sidecar 交互:

public final class DurableTaskGrpcWorker implements AutoCloseable {
    private final HashMap<String, TaskOrchestrationFactory> orchestrationFactories = new HashMap<>();
    private final HashMap<String, TaskActivityFactory> activityFactories = new HashMap<>();

    private final TaskHubSidecarServiceBlockingStub sidecarClient;

    DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) {
        this.orchestrationFactories.putAll(builder.orchestrationFactories);
        this.activityFactories.putAll(builder.activityFactories);

        Channel sidecarGrpcChannel;
        if (builder.channel != null) {
            // The caller is responsible for managing the channel lifetime
            this.managedSidecarChannel = null;
            sidecarGrpcChannel = builder.channel;
        } else {
            // Construct our own channel using localhost + a port number
            int port = DEFAULT_PORT;
            if (builder.port > 0) {
                port = builder.port;
            }

            // Need to keep track of this channel so we can dispose it on close()
            this.managedSidecarChannel = ManagedChannelBuilder
                    .forAddress("localhost", port)
                    .usePlaintext()
                    .build();
            sidecarGrpcChannel = this.managedSidecarChannel;
        }

        this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel);
        this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
        this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL;
    }

结论

dapr java sdk 中的 WorkflowRuntimeBuilder 和 durabletask java sdk 中的 DurableTaskGrpcWorkerBuilder,都是用来保住构建最终要使用的 WorkflowRuntime 和 DurableTaskGrpcWorker。

3 - 启动workflow runtime的源码

workflow app start流程中启动workflow runtime的源码

调用代码

workflow app 中启动 WorkflowRuntime 的典型使用代码如下:

    // Build and then start the workflow runtime pulling and executing tasks
    try (WorkflowRuntime runtime = builder.build()) {
      System.out.println("Start workflow runtime");
      //这里写死了 block=false,不会 block
      runtime.start(false);
    }

代码实现

WorkflowRuntime

这个类在 dapr java sdk。

WorkflowRuntime 只是对 DurableTaskGrpcWorker 的一个简单包装:

public class WorkflowRuntime implements AutoCloseable {

  private DurableTaskGrpcWorker worker;

  public WorkflowRuntime(DurableTaskGrpcWorker worker) {
    this.worker = worker;
  }
  ......

  public void start(boolean block) {
    if (block) {
      this.worker.startAndBlock();
    } else {
      this.worker.start();
    }
  }
}

DurableTaskGrpcWorker

这个类在durabletask java sdk中。

真实的实现代码在 DurableTaskGrpcWorker 中。


  public void start(boolean block) {
    if (block) {
      this.worker.startAndBlock();
    } else {
      // 1. block写死false了,所以只会进入到这里
      this.worker.start();
    }
  }

  public void start() {
    // 2. 启动线程来执行 startAndBlock,所以是不阻塞的
    new Thread(this::startAndBlock).start();
  }

startAndBlock()方法

这是最关键的代码。

这里不展开,看下一章 workflow runtime 的运行。