workflow app start 流程
1 - 流程概述
流程整体
workflow app 启动时,典型代码如下:
// Register the OrderProcessingWorkflow and its activities with the builder.
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(OrderProcessingWorkflow.class);
builder.registerActivity(NotifyActivity.class);
builder.registerActivity(ProcessPaymentActivity.class);
builder.registerActivity(RequestApprovalActivity.class);
builder.registerActivity(ReserveInventoryActivity.class);
builder.registerActivity(UpdateInventoryActivity.class);
// Build and then start the workflow runtime pulling and executing tasks
try (WorkflowRuntime runtime = builder.build()) {
System.out.println("Start workflow runtime");
runtime.start(false);
}
这个过程中,注册了 workflow 和 activity,然后 start workflow runtime。workflow runtime 会启动 worker,从 dapr sidecar 持续获取工作任务,包括 workflow task 和 activity task,然后执行这些任务并把任务结果返回给到 dapr sidecar。
@startuml
participant "Workflow App" as WorkflowApp
participant "Dapr Sidecar" as DaprSidecar
WorkflowApp -> WorkflowApp: registerWorkflow()
WorkflowApp -> WorkflowApp: registerActivity()
WorkflowApp -[#red]> WorkflowApp: WorkflowRuntime.start()
WorkflowApp -> DaprSidecar: WorkflowRuntime.getWorkItems()
DaprSidecar --> WorkflowApp:
loop has next task
alt is orchestration task
WorkflowApp -> WorkflowApp: execute orchestration task
WorkflowApp -> DaprSidecar: completeOrchestratorTask()
DaprSidecar --> WorkflowApp:
else is activity task
WorkflowApp -> WorkflowApp: execute activity task
WorkflowApp -> DaprSidecar: completeActivityTask()
DaprSidecar --> WorkflowApp:
end
end
@enduml
详细流程
register workflow
@startuml
participant "Workflow App" as WorkflowApp
participant "Dapr Java SDK" as DaprJavaSDK
participant "DurableTask Java SDK" as DurableTaskJavaSDK
WorkflowApp -> DaprJavaSDK: registerWorkflow()
DaprJavaSDK -> DurableTaskJavaSDK: addOrchestration()
DurableTaskJavaSDK --> DaprJavaSDK
DaprJavaSDK --> WorkflowApp:
@enduml
register activity
@startuml
participant "Workflow App" as WorkflowApp
participant "Dapr Java SDK" as DaprJavaSDK
participant "DurableTask Java SDK" as DurableTaskJavaSDK
WorkflowApp -> DaprJavaSDK: registerActivity()
DaprJavaSDK -> DurableTaskJavaSDK: registerActivity()
DurableTaskJavaSDK --> DaprJavaSDK
DaprJavaSDK --> WorkflowApp:
@enduml
start workflow runtime
@startuml
participant "Workflow App" as WorkflowApp
participant "Dapr Java SDK" as DaprJavaSDK
participant "DurableTask Java SDK" as DurableTaskJavaSDK
WorkflowApp -> DaprJavaSDK: WorkflowRuntime.start()
DaprJavaSDK -> DurableTaskJavaSDK: worker.start()
DurableTaskJavaSDK --> DaprJavaSDK
DaprJavaSDK --> WorkflowApp:
@enduml
worker execute tasks
@startuml
participant "Workflow App" as WorkflowApp
participant "Dapr Java SDK" as DaprJavaSDK
participant "DurableTask Java SDK" as DurableTaskJavaSDK
WorkflowApp -> DaprJavaSDK: registerWorkflow()
DaprJavaSDK -> DurableTaskJavaSDK: addOrchestration()
DurableTaskJavaSDK --> DaprJavaSDK
DaprJavaSDK --> WorkflowApp:
WorkflowApp -> DaprJavaSDK: registerActivity()
DaprJavaSDK -> DurableTaskJavaSDK: registerActivity()
DurableTaskJavaSDK --> DaprJavaSDK
DaprJavaSDK --> WorkflowApp:
WorkflowApp -> DaprJavaSDK: WorkflowRuntime.start()
DaprJavaSDK -> DurableTaskJavaSDK: worker.start()
DurableTaskJavaSDK --> DaprJavaSDK
DaprJavaSDK --> WorkflowApp:
@enduml
2 - 构建workflowruntime的源码
调用代码
workflow app 中构建 WorkflowRuntime 的典型使用代码如下:
// Register the OrderProcessingWorkflow and its activities with the builder.
WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(OrderProcessingWorkflow.class);
builder.registerActivity(NotifyActivity.class);
builder.registerActivity(ProcessPaymentActivity.class);
builder.registerActivity(RequestApprovalActivity.class);
builder.registerActivity(ReserveInventoryActivity.class);
builder.registerActivity(UpdateInventoryActivity.class);
// Build and then start the workflow runtime pulling and executing tasks
try (WorkflowRuntime runtime = builder.build()) {
System.out.println("Start workflow runtime");
runtime.start(false);
}
代码实现
WorkflowRuntimeBuilder
这个类在 dapr java sdk。
WorkflowRuntimeBuilder 的实现中,自己会保存 workflows 和 activities 信息,也会构建一个来自 DurableTask java sdk 的 DurableTaskGrpcWorkerBuilder 的实例。
import com.microsoft.durabletask.DurableTaskGrpcWorkerBuilder;
public class WorkflowRuntimeBuilder {
private static volatile WorkflowRuntime instance;
private DurableTaskGrpcWorkerBuilder builder;
private Logger logger;
private Set<String> workflows = new HashSet<String>();
private Set<String> activities = new HashSet<String>();
/**
* Constructs the WorkflowRuntimeBuilder.
*/
public WorkflowRuntimeBuilder() {
this.builder = new DurableTaskGrpcWorkerBuilder().grpcChannel(
NetworkUtils.buildGrpcManagedChannel(WORKFLOW_INTERCEPTOR));
this.logger = Logger.getLogger(WorkflowRuntimeBuilder.class.getName());
}
registerWorkflow() 方法的实现,除了将请求代理给 DurableTaskGrpcWorkerBuilder 之外,还自己保存到 workflows 集合中:
public <T extends Workflow> WorkflowRuntimeBuilder registerWorkflow(Class<T> clazz) {
this.builder = this.builder.addOrchestration(
new OrchestratorWrapper<>(clazz)
);
this.logger.log(Level.INFO, "Registered Workflow: " + clazz.getSimpleName());
this.workflows.add(clazz.getSimpleName());
return this;
}
registerActivity() 方法的实现类似,除了将请求代理给 DurableTaskGrpcWorkerBuilder 之外,还自己保存到 activities 集合中:
public <T extends WorkflowActivity> void registerActivity(Class<T> clazz) {
this.builder = this.builder.addActivity(
new ActivityWrapper<>(clazz)
);
this.logger.log(Level.INFO, "Registered Activity: " + clazz.getSimpleName());
this.activities.add(clazz.getSimpleName());
}
OrchestratorWrapper 和 ActivityWrapper 负责将 class 包装为 TaskOrchestrationFactory 和 TaskActivityFactory。
build() 方法调用 DurableTaskGrpcWorkerBuilder 的 build() 方法构建出一个 DurableTaskGrpcWorker ,然后传递给 WorkflowRuntime 的新实例。
public WorkflowRuntime build() {
if (instance == null) {
synchronized (WorkflowRuntime.class) {
if (instance == null) {
instance = new WorkflowRuntime(this.builder.build());
}
}
}
this.logger.log(Level.INFO, "Successfully built dapr workflow runtime");
return instance;
}
DurableTaskGrpcWorkerBuilder
这个类在durabletask java sdk中。
DurableTaskGrpcWorkerBuilder 保存 orchestrationFactories 和 activityFactories,还有和 sidecar 连接的一些信息如端口,grpc channel:
public final class DurableTaskGrpcWorkerBuilder {
final HashMap<String, TaskOrchestrationFactory> orchestrationFactories = new HashMap<>();
final HashMap<String, TaskActivityFactory> activityFactories = new HashMap<>();
int port;
Channel channel;
DataConverter dataConverter;
Duration maximumTimerInterval;
......
}
addOrchestration() 将 TaskOrchestrationFactory 保存到 orchestrationFactories 中,key为 name:
public DurableTaskGrpcWorkerBuilder addOrchestration(TaskOrchestrationFactory factory) {
String key = factory.getName();
......
this.orchestrationFactories.put(key, factory);
return this;
}
类似的, addActivity() 将 TaskActivityFactory 保存到 activityFactories 中,key为 name:
public DurableTaskGrpcWorkerBuilder addActivity(TaskActivityFactory factory) {
String key = factory.getName();
......
this.activityFactories.put(key, factory);
return this;
}
build() 方法构建出 DurableTaskGrpcWorker() 对象:
public DurableTaskGrpcWorker build() {
return new DurableTaskGrpcWorker(this);
}
DurableTaskGrpcWorker 的构造函数中会保存注册好的 orchestrationFactories 和 activityFactories,然后构建 TaskHubSidecarServiceGrpc 对象作为 sidecarClient,用于后续和 dapr sidecar 交互:
public final class DurableTaskGrpcWorker implements AutoCloseable {
private final HashMap<String, TaskOrchestrationFactory> orchestrationFactories = new HashMap<>();
private final HashMap<String, TaskActivityFactory> activityFactories = new HashMap<>();
private final TaskHubSidecarServiceBlockingStub sidecarClient;
DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) {
this.orchestrationFactories.putAll(builder.orchestrationFactories);
this.activityFactories.putAll(builder.activityFactories);
Channel sidecarGrpcChannel;
if (builder.channel != null) {
// The caller is responsible for managing the channel lifetime
this.managedSidecarChannel = null;
sidecarGrpcChannel = builder.channel;
} else {
// Construct our own channel using localhost + a port number
int port = DEFAULT_PORT;
if (builder.port > 0) {
port = builder.port;
}
// Need to keep track of this channel so we can dispose it on close()
this.managedSidecarChannel = ManagedChannelBuilder
.forAddress("localhost", port)
.usePlaintext()
.build();
sidecarGrpcChannel = this.managedSidecarChannel;
}
this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel);
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL;
}
结论
dapr java sdk 中的 WorkflowRuntimeBuilder 和 durabletask java sdk 中的 DurableTaskGrpcWorkerBuilder,都是用来保住构建最终要使用的 WorkflowRuntime 和 DurableTaskGrpcWorker。
3 - 启动workflow runtime的源码
调用代码
workflow app 中启动 WorkflowRuntime 的典型使用代码如下:
// Build and then start the workflow runtime pulling and executing tasks
try (WorkflowRuntime runtime = builder.build()) {
System.out.println("Start workflow runtime");
//这里写死了 block=false,不会 block
runtime.start(false);
}
代码实现
WorkflowRuntime
这个类在 dapr java sdk。
WorkflowRuntime 只是对 DurableTaskGrpcWorker 的一个简单包装:
public class WorkflowRuntime implements AutoCloseable {
private DurableTaskGrpcWorker worker;
public WorkflowRuntime(DurableTaskGrpcWorker worker) {
this.worker = worker;
}
......
public void start(boolean block) {
if (block) {
this.worker.startAndBlock();
} else {
this.worker.start();
}
}
}
DurableTaskGrpcWorker
这个类在durabletask java sdk中。
真实的实现代码在 DurableTaskGrpcWorker 中。
public void start(boolean block) {
if (block) {
this.worker.startAndBlock();
} else {
// 1. block写死false了,所以只会进入到这里
this.worker.start();
}
}
public void start() {
// 2. 启动线程来执行 startAndBlock,所以是不阻塞的
new Thread(this::startAndBlock).start();
}
startAndBlock()方法
这是最关键的代码。
这里不展开,看下一章 workflow runtime 的运行。