workflow app run 流程
1 - workflow app run流程概述
上一章看到 workflow runtime start 之后,就会启动任务处理的流程。
代码实现在 durabletask java sdk 中的 DurableTaskGrpcWorker 类的 startAndBlock()方法中。
这是最关键的代码。
先构建两个 executor,负责执行 Orchestration task 和 activity task:
TaskOrchestrationExecutor taskOrchestrationExecutor = new TaskOrchestrationExecutor(
this.orchestrationFactories,
this.dataConverter,
this.maximumTimerInterval,
logger);
TaskActivityExecutor taskActivityExecutor = new TaskActivityExecutor(
this.activityFactories,
this.dataConverter,
logger);
传入的参数有 orchestrationFactories 和 taskActivityExecutor,之前构建时注册的信息都保存在这里面。
获取工作任务
然后就是一个无限循环,在循环中调用 sidecarClient.getWorkItems(), 针对返回的 workitem stream,还有一个无限循环。而且如果遇到 StatusRuntimeException ,还会sleep之后继续。
while (true) {
try {
GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder().build();
Iterator<WorkItem> workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest);
while (workItemStream.hasNext()) {
......
}
} catch(StatusRuntimeException e){
......
// Retry after 5 seconds
try {
Thread.sleep(5000);
} catch (InterruptedException ex) {
break;
}
}
}
work items 的类型只有两种 orchestrator 和 activity:
while (workItemStream.hasNext()) {
WorkItem workItem = workItemStream.next();
RequestCase requestType = workItem.getRequestCase();
if (requestType == RequestCase.ORCHESTRATORREQUEST) {
......
} else if (requestType == RequestCase.ACTIVITYREQUEST) {
......
} else {
logger.log(Level.WARNING, "Received and dropped an unknown '{0}' work-item from the sidecar.", requestType);
}
}
执行 orchestrator task
通过 taskOrchestrationExecutor 执行 orchestrator task,然后将结果返回给到 dapr sidecar。
OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest();
TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute(
orchestratorRequest.getPastEventsList(),
orchestratorRequest.getNewEventsList());
OrchestratorResponse response = OrchestratorResponse.newBuilder()
.setInstanceId(orchestratorRequest.getInstanceId())
.addAllActions(taskOrchestratorResult.getActions())
.setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus()))
.build();
this.sidecarClient.completeOrchestratorTask(response);
备注:比较奇怪的是这里为什么不用 grpc 双向 stream 来获取任务和返回任务执行结果,而是通过另外一个 completeOrchestratorTask() 方法来发起请求。
执行 avtivity task
类似的,通过 taskActivityExecutor 执行 avtivity task,然后将结果返回给到 dapr sidecar。
ActivityRequest activityRequest = workItem.getActivityRequest();
String output = null;
TaskFailureDetails failureDetails = null;
try {
output = taskActivityExecutor.execute(
activityRequest.getName(),
activityRequest.getInput().getValue(),
activityRequest.getTaskId());
} catch (Throwable e) {
failureDetails = TaskFailureDetails.newBuilder()
.setErrorType(e.getClass().getName())
.setErrorMessage(e.getMessage())
.setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e)))
.build();
}
ActivityResponse.Builder responseBuilder = ActivityResponse.newBuilder()
.setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId())
.setTaskId(activityRequest.getTaskId());
if (output != null) {
responseBuilder.setResult(StringValue.of(output));
}
if (failureDetails != null) {
responseBuilder.setFailureDetails(failureDetails);
}
this.sidecarClient.completeActivityTask(responseBuilder.build());
2 - 获取工作任务
获取工作任务的调用代码
DurableTaskGrpcWorker 会调用 sidecarClient.getWorkItems() 来获取工作任务。
private final TaskHubSidecarServiceBlockingStub sidecarClient;
while (true) {
try {
GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder().build();
Iterator<WorkItem> workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest);
while (workItemStream.hasNext()) {
......
}
} catch{}
}
代码实现
proto 定义
TaskHubSidecarServiceBlockingStub 是根据 protobuf 文件生成的 grpc 代码,其 protobuf 定义在submodules/durabletask-protobuf/protos/orchestrator_service.proto
文件中。
service TaskHubSidecarService {
......
rpc GetWorkItems(GetWorkItemsRequest) returns (stream WorkItem);
......
}
GetWorkItemsRequest 和 WorkItem 的消息定义为:
message GetWorkItemsRequest {
// No parameters currently
}
message WorkItem {
oneof request {
OrchestratorRequest orchestratorRequest = 1;
ActivityRequest activityRequest = 2;
}
}
WorkItem 可能是 OrchestratorRequest 或者 ActivityRequest 。
OrchestratorRequest
message OrchestratorRequest {
string instanceId = 1;
google.protobuf.StringValue executionId = 2;
repeated HistoryEvent pastEvents = 3;
repeated HistoryEvent newEvents = 4;
}
ActivityRequest
message ActivityRequest {
string name = 1;
google.protobuf.StringValue version = 2;
google.protobuf.StringValue input = 3;
OrchestrationInstance orchestrationInstance = 4;
int32 taskId = 5;
}
HistoryEvent
message HistoryEvent {
int32 eventId = 1;
google.protobuf.Timestamp timestamp = 2;
oneof eventType {
ExecutionStartedEvent executionStarted = 3;
ExecutionCompletedEvent executionCompleted = 4;
ExecutionTerminatedEvent executionTerminated = 5;
TaskScheduledEvent taskScheduled = 6;
TaskCompletedEvent taskCompleted = 7;
TaskFailedEvent taskFailed = 8;
SubOrchestrationInstanceCreatedEvent subOrchestrationInstanceCreated = 9;
SubOrchestrationInstanceCompletedEvent subOrchestrationInstanceCompleted = 10;
SubOrchestrationInstanceFailedEvent subOrchestrationInstanceFailed = 11;
TimerCreatedEvent timerCreated = 12;
TimerFiredEvent timerFired = 13;
OrchestratorStartedEvent orchestratorStarted = 14;
OrchestratorCompletedEvent orchestratorCompleted = 15;
EventSentEvent eventSent = 16;
EventRaisedEvent eventRaised = 17;
GenericEvent genericEvent = 18;
HistoryStateEvent historyState = 19;
ContinueAsNewEvent continueAsNew = 20;
ExecutionSuspendedEvent executionSuspended = 21;
ExecutionResumedEvent executionResumed = 22;
}
}
worker 调用
workflow app 中通过调用 sidecarClient.getWorkItems() 方法来获取 work items。
Iterator<WorkItem> workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest);
这里面就是 grpc stub 的生成代码,不细看
TaskHubSidecarService 服务器实现
TaskHubSidecarService 这个 protobuf 定义的 grpc service 的服务器端,代码实现在 durabletask-go 仓库中。
protobuf 生成的 grpc stub 的类在这里:
- internal/protos/orchestrator_service_grpc.pb.go
- internal/protos/orchestrator_service.pb.go
服务器端代码实现在 backend/executor.go
中:
// GetWorkItems implements protos.TaskHubSidecarServiceServer
func (g *grpcExecutor) GetWorkItems(req *protos.GetWorkItemsRequest, stream protos.TaskHubSidecarService_GetWorkItemsServer) error {
......
// The worker client invokes this method, which streams back work-items as they arrive.
for {
select {
case <-stream.Context().Done():
g.logger.Infof("work item stream closed")
return nil
case wi := <-g.workItemQueue:
if err := stream.Send(wi); err != nil {
return err
}
case <-g.streamShutdownChan:
return errShuttingDown
}
}
}
所以返回给客户端调用的 work item stream 的数据来自 g.workItemQueue
type grpcExecutor struct {
......
workItemQueue chan *protos.WorkItem
}
workItemQueue 的实现逻辑
workItemQueue 在 grpcExecutor 中定义:
type grpcExecutor struct {
workItemQueue chan *protos.WorkItem
......
}
grpcExecutor 在 NewGrpcExecutor() 方法中构建:
// NewGrpcExecutor returns the Executor object and a method to invoke to register the gRPC server in the executor.
func NewGrpcExecutor(be Backend, logger Logger, opts ...grpcExecutorOptions) (executor Executor, registerServerFn func(grpcServer grpc.ServiceRegistrar)) {
grpcExecutor := &grpcExecutor{
workItemQueue: make(chan *protos.WorkItem),
backend: be,
logger: logger,
pendingOrchestrators: &sync.Map{},
pendingActivities: &sync.Map{},
}
......
}
将数据写入 workItemQueue 的地方有两个:
-
ExecuteOrchestrator()
func (executor *grpcExecutor) ExecuteOrchestrator(......) { ...... workItem := &protos.WorkItem{ Request: &protos.WorkItem_OrchestratorRequest{ OrchestratorRequest: &protos.OrchestratorRequest{ InstanceId: string(iid), ExecutionId: nil, PastEvents: oldEvents, NewEvents: newEvents, }, }, } executor.workItemQueue <- workItem: }
-
ExecuteActivity()
func (executor *grpcExecutor) ExecuteActivity(......) { workItem := &protos.WorkItem{ Request: &protos.WorkItem_ActivityRequest{ ActivityRequest: &protos.ActivityRequest{ Name: task.Name, Version: task.Version, Input: task.Input, OrchestrationInstance: &protos.OrchestrationInstance{InstanceId: string(iid)}, TaskId: e.EventId, }, }, executor.workItemQueue <- workItem: }
继续跟踪看 ExecuteOrchestrator() 和 ExecuteActivity() 方法是被谁调用的,这个细节在下一节中。
小结
获取工作任务的任务源头在 dapr sidecar,代码实现在 durabletask-go 项目的 backend/executor.go
中。
3 - 执行orchestrator task
回顾
前面看到执行orchestrator task的代码实现在 durabletask-go 仓库的 client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java
中。
TaskOrchestrationExecutor taskOrchestrationExecutor = new TaskOrchestrationExecutor(
this.orchestrationFactories,
this.dataConverter,
this.maximumTimerInterval,
logger);
......
Iterator<WorkItem> workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest);
while (workItemStream.hasNext()) {
WorkItem workItem = workItemStream.next();
RequestCase requestType = workItem.getRequestCase();
if (requestType == RequestCase.ORCHESTRATORREQUEST) {
OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest();
TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute(
orchestratorRequest.getPastEventsList(),
orchestratorRequest.getNewEventsList());
OrchestratorResponse response = OrchestratorResponse.newBuilder()
.setInstanceId(orchestratorRequest.getInstanceId())
.addAllActions(taskOrchestratorResult.getActions())
.setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus()))
.build();
this.sidecarClient.completeOrchestratorTask(response);
}
......
实现细节
TaskOrchestrationExecutor
TaskOrchestrationExecutor 类的定义和构造函数:
final class TaskOrchestrationExecutor {
private static final String EMPTY_STRING = "";
private final HashMap<String, TaskOrchestrationFactory> orchestrationFactories;
private final DataConverter dataConverter;
private final Logger logger;
private final Duration maximumTimerInterval;
public TaskOrchestrationExecutor(
HashMap<String, TaskOrchestrationFactory> orchestrationFactories,
DataConverter dataConverter,
Duration maximumTimerInterval,
Logger logger) {
this.orchestrationFactories = orchestrationFactories;
this.dataConverter = dataConverter;
this.maximumTimerInterval = maximumTimerInterval;
this.logger = logger;
}
其中 orchestrationFactories 是从前面 registerWorkflow()时保存的已经注册的工作流信息。
execute() 方法:
public TaskOrchestratorResult execute(List<HistoryEvent> pastEvents, List<HistoryEvent> newEvents) {
ContextImplTask context = new ContextImplTask(pastEvents, newEvents);
boolean completed = false;
try {
// Play through the history events until either we've played through everything
// or we receive a yield signal
while (context.processNextEvent()) { /* no method body */ }
completed = true;
} catch (OrchestratorBlockedException orchestratorBlockedException) {
logger.fine("The orchestrator has yielded and will await for new events.");
} catch (ContinueAsNewInterruption continueAsNewInterruption) {
logger.fine("The orchestrator has continued as new.");
context.complete(null);
} catch (Exception e) {
// The orchestrator threw an unhandled exception - fail it
// TODO: What's the right way to log this?
logger.warning("The orchestrator failed with an unhandled exception: " + e.toString());
context.fail(new FailureDetails(e));
}
if ((context.continuedAsNew && !context.isComplete) || (completed && context.pendingActions.isEmpty() && !context.waitingForEvents())) {
// There are no further actions for the orchestrator to take so auto-complete the orchestration.
context.complete(null);
}
return new TaskOrchestratorResult(context.pendingActions.values(), context.getCustomStatus());
}
这里只是主要流程,细节实现在内部私有类 ContextImplTask 中。
ContextImplTask
ContextImplTask 的定义和构造函数,使用到 OrchestrationHistoryIterator。
private class ContextImplTask implements TaskOrchestrationContext {
private final OrchestrationHistoryIterator historyEventPlayer;
......
public ContextImplTask(List<HistoryEvent> pastEvents, List<HistoryEvent> newEvents) {
this.historyEventPlayer = new OrchestrationHistoryIterator(pastEvents, newEvents);
}
......
private boolean processNextEvent() {
return this.historyEventPlayer.moveNext();
}
}
OrchestrationHistoryIterator
OrchestrationHistoryIterator 的类定义和构造函数,其中 pastEvents 和 newEvents 是 daprd sidecar 那边在 getWorkItem() 返回的 orchestratorRequest 中携带的数据。
private class OrchestrationHistoryIterator {
private final List<HistoryEvent> pastEvents;
private final List<HistoryEvent> newEvents;
private List<HistoryEvent> currentHistoryList;
private int currentHistoryIndex;
public OrchestrationHistoryIterator(List<HistoryEvent> pastEvents, List<HistoryEvent> newEvents) {
this.pastEvents = pastEvents;
this.newEvents = newEvents;
this.currentHistoryList = pastEvents;
}
currentHistoryList 初始化指向 pastEvents,currentHistoryIndex 为0。
然后继续看 moveNext() 方法:
public boolean moveNext() {
if (this.currentHistoryList == pastEvents && this.currentHistoryIndex >= pastEvents.size()) {
// 如果当前 currentHistoryList 指向的是 pastEvents,并且已经指到最后一个元素了。
// 那么 moveNext 就应该指向 this.newEvents,然后将 currentHistoryIndex 设置为0 (即指向第一个元素)
// Move forward to the next list
this.currentHistoryList = this.newEvents;
this.currentHistoryIndex = 0;
// 这意味着 pastEvents 的游历接触,即 replay 完成。
ContextImplTask.this.setDoneReplaying();
}
if (this.currentHistoryList == this.newEvents && this.currentHistoryIndex >= this.newEvents.size()) {
// 如果当前 currentHistoryList 指向的是 newEvents,并且已经指到最后一个元素了。
// 此时已经完成游历,没有更多元素,返回 false 表示可以结束了。
// We're done enumerating the history
return false;
}
// Process the next event in the history
// 获取当前元素,然后 currentHistoryIndex +1 指向下一个元素
HistoryEvent next = this.currentHistoryList.get(this.currentHistoryIndex++);
// 处理事件
ContextImplTask.this.processEvent(next);
return true;
}
处理事件的代码实现在 ContextImplTask 的 processEvent() 方法中:
private void processEvent(HistoryEvent e) {
boolean overrideSuspension = e.getEventTypeCase() == HistoryEvent.EventTypeCase.EXECUTIONRESUMED || e.getEventTypeCase() == HistoryEvent.EventTypeCase.EXECUTIONTERMINATED;
if (this.isSuspended && !overrideSuspension) {
this.handleEventWhileSuspended(e);
} else {
switch (e.getEventTypeCase()) {
case ORCHESTRATORSTARTED:
Instant instant = DataConverter.getInstantFromTimestamp(e.getTimestamp());
this.setCurrentInstant(instant);
break;
case ORCHESTRATORCOMPLETED:
// No action
break;
case EXECUTIONSTARTED:
ExecutionStartedEvent startedEvent = e.getExecutionStarted();
String name = startedEvent.getName();
this.setName(name);
String instanceId = startedEvent.getOrchestrationInstance().getInstanceId();
this.setInstanceId(instanceId);
String input = startedEvent.getInput().getValue();
this.setInput(input);
TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.get(name);
if (factory == null) {
// Try getting the default orchestrator
factory = TaskOrchestrationExecutor.this.orchestrationFactories.get("*");
}
// TODO: Throw if the factory is null (orchestration by that name doesn't exist)
TaskOrchestration orchestrator = factory.create();
orchestrator.run(this);
break;
// case EXECUTIONCOMPLETED:
// break;
// case EXECUTIONFAILED:
// break;
case EXECUTIONTERMINATED:
this.handleExecutionTerminated(e);
break;
case TASKSCHEDULED:
this.handleTaskScheduled(e);
break;
case TASKCOMPLETED:
this.handleTaskCompleted(e);
break;
case TASKFAILED:
this.handleTaskFailed(e);
break;
case TIMERCREATED:
this.handleTimerCreated(e);
break;
case TIMERFIRED:
this.handleTimerFired(e);
break;
case SUBORCHESTRATIONINSTANCECREATED:
this.handleSubOrchestrationCreated(e);
break;
case SUBORCHESTRATIONINSTANCECOMPLETED:
this.handleSubOrchestrationCompleted(e);
break;
case SUBORCHESTRATIONINSTANCEFAILED:
this.handleSubOrchestrationFailed(e);
break;
// case EVENTSENT:
// break;
case EVENTRAISED:
this.handleEventRaised(e);
break;
// case GENERICEVENT:
// break;
// case HISTORYSTATE:
// break;
// case EVENTTYPE_NOT_SET:
// break;
case EXECUTIONSUSPENDED:
this.handleExecutionSuspended(e);
break;
case EXECUTIONRESUMED:
this.handleExecutionResumed(e);
break;
default:
throw new IllegalStateException("Don't know how to handle history type " + e.getEventTypeCase());
}
}
}
这里具体会执行什么代码,就要看给过来的 event 是什么了。
EXECUTIONSTARTED 事件的执行
这是 ExecutionStartedEvent 的 proto 定义:
message ExecutionStartedEvent {
string name = 1;
google.protobuf.StringValue version = 2;
google.protobuf.StringValue input = 3;
OrchestrationInstance orchestrationInstance = 4;
ParentInstanceInfo parentInstance = 5;
google.protobuf.Timestamp scheduledStartTimestamp = 6;
TraceContext parentTraceContext = 7;
google.protobuf.StringValue orchestrationSpanID = 8;
}
EXECUTIONSTARTED 事件的处理:
case EXECUTIONSTARTED:
ExecutionStartedEvent startedEvent = e.getExecutionStarted();
String name = startedEvent.getName();
this.setName(name);
String instanceId = startedEvent.getOrchestrationInstance().getInstanceId();
this.setInstanceId(instanceId);
String input = startedEvent.getInput().getValue();
this.setInput(input);
TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.get(name);
if (factory == null) {
// Try getting the default orchestrator
factory = TaskOrchestrationExecutor.this.orchestrationFactories.get("*");
}
// TODO: Throw if the factory is null (orchestration by that name doesn't exist)
TaskOrchestration orchestrator = factory.create();
orchestrator.run(this);
break;
name / instanceId / input 等基本信息直接设置在 ContextImplTask 上。
factory 要从 orchestrationFactories 里面根据 name 查找,如果没有找到,则获取默认。
从 factory 创建 TaskOrchestration,再运行 orchestrator.run():
TaskOrchestration orchestrator = factory.create();
orchestrator.run(this);
这就回到 TaskOrchestration 的实现了。
OrchestratorWrapper
Dapr java sdk 中的 OrchestratorWrapper 实现了 TaskOrchestration 接口
class OrchestratorWrapper<T extends Workflow> implements TaskOrchestrationFactory {
@Override
public TaskOrchestration create() {
return ctx -> {
T workflow;
try {
workflow = this.workflowConstructor.newInstance();
} ......
};
}
}