这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

workflow app run 流程

workflow app run流程的源码分析

1 - workflow app run流程概述

workflow app中workflow runtime运行的源码概述

上一章看到 workflow runtime start 之后,就会启动任务处理的流程。

代码实现在 durabletask java sdk 中的 DurableTaskGrpcWorker 类的 startAndBlock()方法中。

这是最关键的代码。

先构建两个 executor,负责执行 Orchestration task 和 activity task:

        TaskOrchestrationExecutor taskOrchestrationExecutor = new TaskOrchestrationExecutor(
                this.orchestrationFactories,
                this.dataConverter,
                this.maximumTimerInterval,
                logger);
        TaskActivityExecutor taskActivityExecutor = new TaskActivityExecutor(
                this.activityFactories,
                this.dataConverter,
                logger);

传入的参数有 orchestrationFactories 和 taskActivityExecutor,之前构建时注册的信息都保存在这里面。

获取工作任务

然后就是一个无限循环,在循环中调用 sidecarClient.getWorkItems(), 针对返回的 workitem stream,还有一个无限循环。而且如果遇到 StatusRuntimeException ,还会sleep之后继续。

while (true) {
  try {
      GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder().build();
      Iterator<WorkItem> workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest);
      while (workItemStream.hasNext()) {
        ......
      }
  } catch(StatusRuntimeException e){
    ......
    // Retry after 5 seconds
    try {
        Thread.sleep(5000);
    } catch (InterruptedException ex) {
        break;
    }
    }
}

work items 的类型只有两种 orchestrator 和 activity:

while (workItemStream.hasNext()) {
    WorkItem workItem = workItemStream.next();
    RequestCase requestType = workItem.getRequestCase();
    if (requestType == RequestCase.ORCHESTRATORREQUEST) {
        ......
    } else if (requestType == RequestCase.ACTIVITYREQUEST) {
        ......
    } else {
        logger.log(Level.WARNING, "Received and dropped an unknown '{0}' work-item from the sidecar.", requestType);
    }
}

执行 orchestrator task

通过 taskOrchestrationExecutor 执行 orchestrator task,然后将结果返回给到 dapr sidecar。

OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest();

TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute(
        orchestratorRequest.getPastEventsList(),
        orchestratorRequest.getNewEventsList());

OrchestratorResponse response = OrchestratorResponse.newBuilder()
        .setInstanceId(orchestratorRequest.getInstanceId())
        .addAllActions(taskOrchestratorResult.getActions())
        .setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus()))
        .build();

this.sidecarClient.completeOrchestratorTask(response);

备注:比较奇怪的是这里为什么不用 grpc 双向 stream 来获取任务和返回任务执行结果,而是通过另外一个 completeOrchestratorTask() 方法来发起请求。

执行 avtivity task

类似的,通过 taskActivityExecutor 执行 avtivity task,然后将结果返回给到 dapr sidecar。

ActivityRequest activityRequest = workItem.getActivityRequest();

String output = null;
TaskFailureDetails failureDetails = null;
try {
    output = taskActivityExecutor.execute(
        activityRequest.getName(),
        activityRequest.getInput().getValue(),
        activityRequest.getTaskId());
} catch (Throwable e) {
    failureDetails = TaskFailureDetails.newBuilder()
        .setErrorType(e.getClass().getName())
        .setErrorMessage(e.getMessage())
        .setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e)))
        .build();
}

ActivityResponse.Builder responseBuilder = ActivityResponse.newBuilder()
        .setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId())
        .setTaskId(activityRequest.getTaskId());

if (output != null) {
    responseBuilder.setResult(StringValue.of(output));
}

if (failureDetails != null) {
    responseBuilder.setFailureDetails(failureDetails);
}

this.sidecarClient.completeActivityTask(responseBuilder.build());

2 - 获取工作任务

workflow runtime运行时获取工作任务的源码概述

获取工作任务的调用代码

DurableTaskGrpcWorker 会调用 sidecarClient.getWorkItems() 来获取工作任务。

private final TaskHubSidecarServiceBlockingStub sidecarClient;

while (true) {
  try {
      GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder().build();
      Iterator<WorkItem> workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest);
      while (workItemStream.hasNext()) {
        ......
      }
  } catch{}
}

代码实现

proto 定义

TaskHubSidecarServiceBlockingStub 是根据 protobuf 文件生成的 grpc 代码,其 protobuf 定义在submodules/durabletask-protobuf/protos/orchestrator_service.proto 文件中。

service TaskHubSidecarService {
    ......
    rpc GetWorkItems(GetWorkItemsRequest) returns (stream WorkItem);
    ......
}

GetWorkItemsRequest 和 WorkItem 的消息定义为:

message GetWorkItemsRequest {
    // No parameters currently
}

message WorkItem {
    oneof request {
        OrchestratorRequest orchestratorRequest = 1;
        ActivityRequest activityRequest = 2;
    }
}

WorkItem 可能是 OrchestratorRequest 或者 ActivityRequest 。

OrchestratorRequest

message OrchestratorRequest {
    string instanceId = 1;
    google.protobuf.StringValue executionId = 2;
    repeated HistoryEvent pastEvents = 3;
    repeated HistoryEvent newEvents = 4;
}

ActivityRequest

message ActivityRequest {
    string name = 1;
    google.protobuf.StringValue version = 2;
    google.protobuf.StringValue input = 3;
    OrchestrationInstance orchestrationInstance = 4;
    int32 taskId = 5;
}

HistoryEvent

message HistoryEvent {
    int32 eventId = 1;
    google.protobuf.Timestamp timestamp = 2;
    oneof eventType {
        ExecutionStartedEvent executionStarted = 3;
        ExecutionCompletedEvent executionCompleted = 4;
        ExecutionTerminatedEvent executionTerminated = 5;
        TaskScheduledEvent taskScheduled = 6;
        TaskCompletedEvent taskCompleted = 7;
        TaskFailedEvent taskFailed = 8;
        SubOrchestrationInstanceCreatedEvent subOrchestrationInstanceCreated = 9;
        SubOrchestrationInstanceCompletedEvent subOrchestrationInstanceCompleted = 10;
        SubOrchestrationInstanceFailedEvent subOrchestrationInstanceFailed = 11;
        TimerCreatedEvent timerCreated = 12;
        TimerFiredEvent timerFired = 13;
        OrchestratorStartedEvent orchestratorStarted = 14;
        OrchestratorCompletedEvent orchestratorCompleted = 15;
        EventSentEvent eventSent = 16;
        EventRaisedEvent eventRaised = 17;
        GenericEvent genericEvent = 18;
        HistoryStateEvent historyState = 19;
        ContinueAsNewEvent continueAsNew = 20;
        ExecutionSuspendedEvent executionSuspended = 21;
        ExecutionResumedEvent executionResumed = 22;
    }
}

worker 调用

workflow app 中通过调用 sidecarClient.getWorkItems() 方法来获取 work items。

Iterator<WorkItem> workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest);

这里面就是 grpc stub 的生成代码,不细看

TaskHubSidecarService 服务器实现

TaskHubSidecarService 这个 protobuf 定义的 grpc service 的服务器端,代码实现在 durabletask-go 仓库中。

protobuf 生成的 grpc stub 的类在这里:

  • internal/protos/orchestrator_service_grpc.pb.go
  • internal/protos/orchestrator_service.pb.go

服务器端代码实现在 backend/executor.go 中:

// GetWorkItems implements protos.TaskHubSidecarServiceServer
func (g *grpcExecutor) GetWorkItems(req *protos.GetWorkItemsRequest, stream protos.TaskHubSidecarService_GetWorkItemsServer) error {
    ......

	// The worker client invokes this method, which streams back work-items as they arrive.
	for {
		select {
		case <-stream.Context().Done():
			g.logger.Infof("work item stream closed")
			return nil
		case wi := <-g.workItemQueue:
			if err := stream.Send(wi); err != nil {
				return err
			}
		case <-g.streamShutdownChan:
			return errShuttingDown
		}
	}
}

所以返回给客户端调用的 work item stream 的数据来自 g.workItemQueue

type grpcExecutor struct {
    ......
	workItemQueue        chan *protos.WorkItem
}

workItemQueue 的实现逻辑

workItemQueue 在 grpcExecutor 中定义:

type grpcExecutor struct {
	workItemQueue        chan *protos.WorkItem
    ......
}

grpcExecutor 在 NewGrpcExecutor() 方法中构建:

// NewGrpcExecutor returns the Executor object and a method to invoke to register the gRPC server in the executor.
func NewGrpcExecutor(be Backend, logger Logger, opts ...grpcExecutorOptions) (executor Executor, registerServerFn func(grpcServer grpc.ServiceRegistrar)) {
	grpcExecutor := &grpcExecutor{
		workItemQueue:        make(chan *protos.WorkItem),
		backend:              be,
		logger:               logger,
		pendingOrchestrators: &sync.Map{},
		pendingActivities:    &sync.Map{},
	}

    ......
}

将数据写入 workItemQueue 的地方有两个:

  1. ExecuteOrchestrator()

    func (executor *grpcExecutor) ExecuteOrchestrator(......) {
        ......
            workItem := &protos.WorkItem{
            Request: &protos.WorkItem_OrchestratorRequest{
                OrchestratorRequest: &protos.OrchestratorRequest{
                    InstanceId:  string(iid),
                    ExecutionId: nil,
                    PastEvents:  oldEvents,
                    NewEvents:   newEvents,
                },
            },
        }
    
        executor.workItemQueue <- workItem:
    }
    
  2. ExecuteActivity()

    func (executor *grpcExecutor) ExecuteActivity(......) {
        workItem := &protos.WorkItem{
    	Request: &protos.WorkItem_ActivityRequest{
    		ActivityRequest: &protos.ActivityRequest{
    			Name:                  task.Name,
    			Version:               task.Version,
    			Input:                 task.Input,
    			OrchestrationInstance: &protos.OrchestrationInstance{InstanceId: string(iid)},
    			TaskId:                e.EventId,
    		},
    	},
    
        executor.workItemQueue <- workItem:
    }
    

继续跟踪看 ExecuteOrchestrator() 和 ExecuteActivity() 方法是被谁调用的,这个细节在下一节中。

小结

获取工作任务的任务源头在 dapr sidecar,代码实现在 durabletask-go 项目的 backend/executor.go 中。

3 - 执行orchestrator task

workflow runtime运行时执行orchestrator task的源码

回顾

前面看到执行orchestrator task的代码实现在 durabletask-go 仓库的 client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java 中。

TaskOrchestrationExecutor taskOrchestrationExecutor = new TaskOrchestrationExecutor(
        this.orchestrationFactories,
        this.dataConverter,
        this.maximumTimerInterval,
        logger);
......
Iterator<WorkItem> workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest);
while (workItemStream.hasNext()) {
    WorkItem workItem = workItemStream.next();
    RequestCase requestType = workItem.getRequestCase();
    if (requestType == RequestCase.ORCHESTRATORREQUEST) {
        OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest();

        TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute(
                orchestratorRequest.getPastEventsList(),
                orchestratorRequest.getNewEventsList());

        OrchestratorResponse response = OrchestratorResponse.newBuilder()
                .setInstanceId(orchestratorRequest.getInstanceId())
                .addAllActions(taskOrchestratorResult.getActions())
                .setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus()))
                .build();

        this.sidecarClient.completeOrchestratorTask(response);
    }
    ......

实现细节

TaskOrchestrationExecutor

TaskOrchestrationExecutor 类的定义和构造函数:

 final class TaskOrchestrationExecutor {

    private static final String EMPTY_STRING = "";
    private final HashMap<String, TaskOrchestrationFactory> orchestrationFactories;
    private final DataConverter dataConverter;
    private final Logger logger;
    private final Duration maximumTimerInterval;

    public TaskOrchestrationExecutor(
            HashMap<String, TaskOrchestrationFactory> orchestrationFactories,
            DataConverter dataConverter,
            Duration maximumTimerInterval,
            Logger logger) {
        this.orchestrationFactories = orchestrationFactories;
        this.dataConverter = dataConverter;
        this.maximumTimerInterval = maximumTimerInterval;
        this.logger = logger;
    }

其中 orchestrationFactories 是从前面 registerWorkflow()时保存的已经注册的工作流信息。

execute() 方法:

public TaskOrchestratorResult execute(List<HistoryEvent> pastEvents, List<HistoryEvent> newEvents) {
    ContextImplTask context = new ContextImplTask(pastEvents, newEvents);

    boolean completed = false;
    try {
        // Play through the history events until either we've played through everything
        // or we receive a yield signal
        while (context.processNextEvent()) { /* no method body */ }
        completed = true;
    } catch (OrchestratorBlockedException orchestratorBlockedException) {
        logger.fine("The orchestrator has yielded and will await for new events.");
    } catch (ContinueAsNewInterruption continueAsNewInterruption) {
        logger.fine("The orchestrator has continued as new.");
        context.complete(null);
    } catch (Exception e) {
        // The orchestrator threw an unhandled exception - fail it
        // TODO: What's the right way to log this?
        logger.warning("The orchestrator failed with an unhandled exception: " + e.toString());
        context.fail(new FailureDetails(e));
    }

    if ((context.continuedAsNew && !context.isComplete) || (completed && context.pendingActions.isEmpty() && !context.waitingForEvents())) {
        // There are no further actions for the orchestrator to take so auto-complete the orchestration.
        context.complete(null);
    }

    return new TaskOrchestratorResult(context.pendingActions.values(), context.getCustomStatus());
}

这里只是主要流程,细节实现在内部私有类 ContextImplTask 中。

ContextImplTask

ContextImplTask 的定义和构造函数,使用到 OrchestrationHistoryIterator。

private class ContextImplTask implements TaskOrchestrationContext {
    private final OrchestrationHistoryIterator historyEventPlayer;
    ......

    public ContextImplTask(List<HistoryEvent> pastEvents, List<HistoryEvent> newEvents) {
        this.historyEventPlayer = new OrchestrationHistoryIterator(pastEvents, newEvents);
    }
    ......

    private boolean processNextEvent() {
        return this.historyEventPlayer.moveNext();
    }
}

OrchestrationHistoryIterator

OrchestrationHistoryIterator 的类定义和构造函数,其中 pastEvents 和 newEvents 是 daprd sidecar 那边在 getWorkItem() 返回的 orchestratorRequest 中携带的数据。

private class OrchestrationHistoryIterator {
    private final List<HistoryEvent> pastEvents;
    private final List<HistoryEvent> newEvents;

    private List<HistoryEvent> currentHistoryList;
    private int currentHistoryIndex;

    public OrchestrationHistoryIterator(List<HistoryEvent> pastEvents, List<HistoryEvent> newEvents) {
        this.pastEvents = pastEvents;
        this.newEvents = newEvents;
        this.currentHistoryList = pastEvents;
    }

currentHistoryList 初始化指向 pastEvents,currentHistoryIndex 为0。

然后继续看 moveNext() 方法:

    public boolean moveNext() {
        if (this.currentHistoryList == pastEvents && this.currentHistoryIndex >= pastEvents.size()) {
            // 如果当前 currentHistoryList 指向的是 pastEvents,并且已经指到最后一个元素了。
            // 那么 moveNext 就应该指向 this.newEvents,然后将 currentHistoryIndex 设置为0 (即指向第一个元素)
            // Move forward to the next list
            this.currentHistoryList = this.newEvents;
            this.currentHistoryIndex = 0;

            // 这意味着 pastEvents 的游历接触,即 replay 完成。
            ContextImplTask.this.setDoneReplaying();
        }

        if (this.currentHistoryList == this.newEvents && this.currentHistoryIndex >= this.newEvents.size()) {
            // 如果当前 currentHistoryList 指向的是 newEvents,并且已经指到最后一个元素了。
            // 此时已经完成游历,没有更多元素,返回 false 表示可以结束了。
            // We're done enumerating the history
            return false;
        }

        // Process the next event in the history
        // 获取当前元素,然后 currentHistoryIndex +1 指向下一个元素
        HistoryEvent next = this.currentHistoryList.get(this.currentHistoryIndex++);
        // 处理事件
        ContextImplTask.this.processEvent(next);
        return true;
    }

处理事件的代码实现在 ContextImplTask 的 processEvent() 方法中:

        private void processEvent(HistoryEvent e) {
            boolean overrideSuspension = e.getEventTypeCase() == HistoryEvent.EventTypeCase.EXECUTIONRESUMED || e.getEventTypeCase() == HistoryEvent.EventTypeCase.EXECUTIONTERMINATED;
            if (this.isSuspended && !overrideSuspension) {
                this.handleEventWhileSuspended(e);
            } else {
                switch (e.getEventTypeCase()) {
                    case ORCHESTRATORSTARTED:
                        Instant instant = DataConverter.getInstantFromTimestamp(e.getTimestamp());
                        this.setCurrentInstant(instant);
                        break;
                    case ORCHESTRATORCOMPLETED:
                        // No action
                        break;
                    case EXECUTIONSTARTED:
                        ExecutionStartedEvent startedEvent = e.getExecutionStarted();
                        String name = startedEvent.getName();
                        this.setName(name);
                        String instanceId = startedEvent.getOrchestrationInstance().getInstanceId();
                        this.setInstanceId(instanceId);
                        String input = startedEvent.getInput().getValue();
                        this.setInput(input);
                        TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.get(name);
                        if (factory == null) {
                            // Try getting the default orchestrator
                            factory = TaskOrchestrationExecutor.this.orchestrationFactories.get("*");
                        }
                        // TODO: Throw if the factory is null (orchestration by that name doesn't exist)
                        TaskOrchestration orchestrator = factory.create();
                        orchestrator.run(this);
                        break;
//                case EXECUTIONCOMPLETED:
//                    break;
//                case EXECUTIONFAILED:
//                    break;
                    case EXECUTIONTERMINATED:
                        this.handleExecutionTerminated(e);
                        break;
                    case TASKSCHEDULED:
                        this.handleTaskScheduled(e);
                        break;
                    case TASKCOMPLETED:
                        this.handleTaskCompleted(e);
                        break;
                    case TASKFAILED:
                        this.handleTaskFailed(e);
                        break;
                    case TIMERCREATED:
                        this.handleTimerCreated(e);
                        break;
                    case TIMERFIRED:
                        this.handleTimerFired(e);
                        break;
                    case SUBORCHESTRATIONINSTANCECREATED:
                        this.handleSubOrchestrationCreated(e);
                        break;
                    case SUBORCHESTRATIONINSTANCECOMPLETED:
                        this.handleSubOrchestrationCompleted(e);
                        break;
                    case SUBORCHESTRATIONINSTANCEFAILED:
                        this.handleSubOrchestrationFailed(e);
                        break;
//                case EVENTSENT:
//                    break;
                    case EVENTRAISED:
                        this.handleEventRaised(e);
                        break;
//                case GENERICEVENT:
//                    break;
//                case HISTORYSTATE:
//                    break;
//                case EVENTTYPE_NOT_SET:
//                    break;
                    case EXECUTIONSUSPENDED:
                        this.handleExecutionSuspended(e);
                        break;
                    case EXECUTIONRESUMED:
                        this.handleExecutionResumed(e);
                        break;
                    default:
                        throw new IllegalStateException("Don't know how to handle history type " + e.getEventTypeCase());
                }
            }
        }

这里具体会执行什么代码,就要看给过来的 event 是什么了。

EXECUTIONSTARTED 事件的执行

这是 ExecutionStartedEvent 的 proto 定义:

message ExecutionStartedEvent {
    string name = 1;
    google.protobuf.StringValue version = 2;
    google.protobuf.StringValue input = 3;
    OrchestrationInstance orchestrationInstance = 4;
    ParentInstanceInfo parentInstance = 5;
    google.protobuf.Timestamp scheduledStartTimestamp = 6;
    TraceContext parentTraceContext = 7;
    google.protobuf.StringValue orchestrationSpanID = 8;
}

EXECUTIONSTARTED 事件的处理:

case EXECUTIONSTARTED:
    ExecutionStartedEvent startedEvent = e.getExecutionStarted();
    String name = startedEvent.getName();
    this.setName(name);
    String instanceId = startedEvent.getOrchestrationInstance().getInstanceId();
    this.setInstanceId(instanceId);
    String input = startedEvent.getInput().getValue();
    this.setInput(input);
    TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.get(name);
    if (factory == null) {
        // Try getting the default orchestrator
        factory = TaskOrchestrationExecutor.this.orchestrationFactories.get("*");
    }
    // TODO: Throw if the factory is null (orchestration by that name doesn't exist)
    TaskOrchestration orchestrator = factory.create();
    orchestrator.run(this);
    break;

name / instanceId / input 等基本信息直接设置在 ContextImplTask 上。

factory 要从 orchestrationFactories 里面根据 name 查找,如果没有找到,则获取默认。

从 factory 创建 TaskOrchestration,再运行 orchestrator.run():

    TaskOrchestration orchestrator = factory.create();
    orchestrator.run(this);

这就回到 TaskOrchestration 的实现了。

OrchestratorWrapper

Dapr java sdk 中的 OrchestratorWrapper 实现了 TaskOrchestration 接口

class OrchestratorWrapper<T extends Workflow> implements TaskOrchestrationFactory {
  @Override
  public TaskOrchestration create() {
    return ctx -> {
      T workflow;
      try {
        workflow = this.workflowConstructor.newInstance();
      } ......
    };
  }
}