workflow app run流程概述
workflow app中workflow runtime运行的源码概述
上一章看到 workflow runtime start 之后,就会启动任务处理的流程。
代码实现在 durabletask java sdk 中的 DurableTaskGrpcWorker 类的 startAndBlock()方法中。
这是最关键的代码。
先构建两个 executor,负责执行 Orchestration task 和 activity task:
TaskOrchestrationExecutor taskOrchestrationExecutor = new TaskOrchestrationExecutor(
this.orchestrationFactories,
this.dataConverter,
this.maximumTimerInterval,
logger);
TaskActivityExecutor taskActivityExecutor = new TaskActivityExecutor(
this.activityFactories,
this.dataConverter,
logger);
传入的参数有 orchestrationFactories 和 taskActivityExecutor,之前构建时注册的信息都保存在这里面。
获取工作任务
然后就是一个无限循环,在循环中调用 sidecarClient.getWorkItems(), 针对返回的 workitem stream,还有一个无限循环。而且如果遇到 StatusRuntimeException ,还会sleep之后继续。
while (true) {
try {
GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder().build();
Iterator<WorkItem> workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest);
while (workItemStream.hasNext()) {
......
}
} catch(StatusRuntimeException e){
......
// Retry after 5 seconds
try {
Thread.sleep(5000);
} catch (InterruptedException ex) {
break;
}
}
}
work items 的类型只有两种 orchestrator 和 activity:
while (workItemStream.hasNext()) {
WorkItem workItem = workItemStream.next();
RequestCase requestType = workItem.getRequestCase();
if (requestType == RequestCase.ORCHESTRATORREQUEST) {
......
} else if (requestType == RequestCase.ACTIVITYREQUEST) {
......
} else {
logger.log(Level.WARNING, "Received and dropped an unknown '{0}' work-item from the sidecar.", requestType);
}
}
执行 orchestrator task
通过 taskOrchestrationExecutor 执行 orchestrator task,然后将结果返回给到 dapr sidecar。
OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest();
TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute(
orchestratorRequest.getPastEventsList(),
orchestratorRequest.getNewEventsList());
OrchestratorResponse response = OrchestratorResponse.newBuilder()
.setInstanceId(orchestratorRequest.getInstanceId())
.addAllActions(taskOrchestratorResult.getActions())
.setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus()))
.build();
this.sidecarClient.completeOrchestratorTask(response);
备注:比较奇怪的是这里为什么不用 grpc 双向 stream 来获取任务和返回任务执行结果,而是通过另外一个 completeOrchestratorTask() 方法来发起请求。
执行 avtivity task
类似的,通过 taskActivityExecutor 执行 avtivity task,然后将结果返回给到 dapr sidecar。
ActivityRequest activityRequest = workItem.getActivityRequest();
String output = null;
TaskFailureDetails failureDetails = null;
try {
output = taskActivityExecutor.execute(
activityRequest.getName(),
activityRequest.getInput().getValue(),
activityRequest.getTaskId());
} catch (Throwable e) {
failureDetails = TaskFailureDetails.newBuilder()
.setErrorType(e.getClass().getName())
.setErrorMessage(e.getMessage())
.setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e)))
.build();
}
ActivityResponse.Builder responseBuilder = ActivityResponse.newBuilder()
.setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId())
.setTaskId(activityRequest.getTaskId());
if (output != null) {
responseBuilder.setResult(StringValue.of(output));
}
if (failureDetails != null) {
responseBuilder.setFailureDetails(failureDetails);
}
this.sidecarClient.completeActivityTask(responseBuilder.build());