1 - workflow主流程

Dapr workflow 主流程的源码分析

1.1 - workflow app start 流程

workflow app start流程的源码分析

1.1.1 - 流程概述

workflow app start流程概述

流程整体

workflow app 启动时,典型代码如下:

    // Register the OrderProcessingWorkflow and its activities with the builder.
    WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(OrderProcessingWorkflow.class);
    builder.registerActivity(NotifyActivity.class);
    builder.registerActivity(ProcessPaymentActivity.class);
    builder.registerActivity(RequestApprovalActivity.class);
    builder.registerActivity(ReserveInventoryActivity.class);
    builder.registerActivity(UpdateInventoryActivity.class);

    // Build and then start the workflow runtime pulling and executing tasks
    try (WorkflowRuntime runtime = builder.build()) {
      System.out.println("Start workflow runtime");
      runtime.start(false);
    }

这个过程中,注册了 workflow 和 activity,然后 start workflow runtime。workflow runtime 会启动 worker,从 dapr sidecar 持续获取工作任务,包括 workflow task 和 activity task,然后执行这些任务并把任务结果返回给到 dapr sidecar。

@startuml
participant "Workflow App" as WorkflowApp
participant "Dapr Sidecar" as DaprSidecar

WorkflowApp -> WorkflowApp: registerWorkflow()

WorkflowApp -> WorkflowApp: registerActivity()

WorkflowApp -[#red]> WorkflowApp: WorkflowRuntime.start()


WorkflowApp -> DaprSidecar: WorkflowRuntime.getWorkItems()
DaprSidecar --> WorkflowApp: 

loop has next task

alt is orchestration task

WorkflowApp -> WorkflowApp: execute orchestration task
WorkflowApp -> DaprSidecar: completeOrchestratorTask()
DaprSidecar --> WorkflowApp: 

else is activity task

WorkflowApp -> WorkflowApp: execute activity task
WorkflowApp -> DaprSidecar: completeActivityTask()
DaprSidecar --> WorkflowApp: 

end

end

@enduml

详细流程

register workflow

@startuml
participant "Workflow App" as WorkflowApp
participant "Dapr Java SDK" as DaprJavaSDK
participant "DurableTask Java SDK" as DurableTaskJavaSDK

WorkflowApp -> DaprJavaSDK: registerWorkflow()
DaprJavaSDK -> DurableTaskJavaSDK: addOrchestration()
DurableTaskJavaSDK --> DaprJavaSDK
DaprJavaSDK --> WorkflowApp: 

@enduml

register activity

@startuml
participant "Workflow App" as WorkflowApp
participant "Dapr Java SDK" as DaprJavaSDK
participant "DurableTask Java SDK" as DurableTaskJavaSDK

WorkflowApp -> DaprJavaSDK: registerActivity()
DaprJavaSDK -> DurableTaskJavaSDK: registerActivity()
DurableTaskJavaSDK --> DaprJavaSDK
DaprJavaSDK --> WorkflowApp: 

@enduml

start workflow runtime

@startuml
participant "Workflow App" as WorkflowApp
participant "Dapr Java SDK" as DaprJavaSDK
participant "DurableTask Java SDK" as DurableTaskJavaSDK

WorkflowApp -> DaprJavaSDK: WorkflowRuntime.start()
DaprJavaSDK -> DurableTaskJavaSDK: worker.start()
DurableTaskJavaSDK --> DaprJavaSDK
DaprJavaSDK --> WorkflowApp: 
@enduml

worker execute tasks

@startuml
participant "Workflow App" as WorkflowApp
participant "Dapr Java SDK" as DaprJavaSDK
participant "DurableTask Java SDK" as DurableTaskJavaSDK

WorkflowApp -> DaprJavaSDK: registerWorkflow()
DaprJavaSDK -> DurableTaskJavaSDK: addOrchestration()
DurableTaskJavaSDK --> DaprJavaSDK
DaprJavaSDK --> WorkflowApp: 

WorkflowApp -> DaprJavaSDK: registerActivity()
DaprJavaSDK -> DurableTaskJavaSDK: registerActivity()
DurableTaskJavaSDK --> DaprJavaSDK
DaprJavaSDK --> WorkflowApp: 

WorkflowApp -> DaprJavaSDK: WorkflowRuntime.start()
DaprJavaSDK -> DurableTaskJavaSDK: worker.start()
DurableTaskJavaSDK --> DaprJavaSDK
DaprJavaSDK --> WorkflowApp: 
@enduml

1.1.2 - 构建workflowruntime的源码

workflow app start流程中构建workflow runtime的源码

调用代码

workflow app 中构建 WorkflowRuntime 的典型使用代码如下:

    // Register the OrderProcessingWorkflow and its activities with the builder.
    WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder().registerWorkflow(OrderProcessingWorkflow.class);
    builder.registerActivity(NotifyActivity.class);
    builder.registerActivity(ProcessPaymentActivity.class);
    builder.registerActivity(RequestApprovalActivity.class);
    builder.registerActivity(ReserveInventoryActivity.class);
    builder.registerActivity(UpdateInventoryActivity.class);

    // Build and then start the workflow runtime pulling and executing tasks
    try (WorkflowRuntime runtime = builder.build()) {
      System.out.println("Start workflow runtime");
      runtime.start(false);
    }

代码实现

WorkflowRuntimeBuilder

这个类在 dapr java sdk。

WorkflowRuntimeBuilder 的实现中,自己会保存 workflows 和 activities 信息,也会构建一个来自 DurableTask java sdk 的 DurableTaskGrpcWorkerBuilder 的实例。

import com.microsoft.durabletask.DurableTaskGrpcWorkerBuilder;

public class WorkflowRuntimeBuilder {
  private static volatile WorkflowRuntime instance;
  private DurableTaskGrpcWorkerBuilder builder;
  private Logger logger;
  private Set<String> workflows = new HashSet<String>();
  private Set<String> activities = new HashSet<String>();

  /**
   * Constructs the WorkflowRuntimeBuilder.
   */
  public WorkflowRuntimeBuilder() {
    this.builder = new DurableTaskGrpcWorkerBuilder().grpcChannel(
                          NetworkUtils.buildGrpcManagedChannel(WORKFLOW_INTERCEPTOR));
    this.logger = Logger.getLogger(WorkflowRuntimeBuilder.class.getName());
  }

registerWorkflow() 方法的实现,除了将请求代理给 DurableTaskGrpcWorkerBuilder 之外,还自己保存到 workflows 集合中:

  public <T extends Workflow> WorkflowRuntimeBuilder registerWorkflow(Class<T> clazz) {
    this.builder = this.builder.addOrchestration(
        new OrchestratorWrapper<>(clazz)
    );
    this.logger.log(Level.INFO, "Registered Workflow: " +  clazz.getSimpleName());
    this.workflows.add(clazz.getSimpleName());
    return this;
  }

registerActivity() 方法的实现类似,除了将请求代理给 DurableTaskGrpcWorkerBuilder 之外,还自己保存到 activities 集合中:

  public <T extends WorkflowActivity> void registerActivity(Class<T> clazz) {
    this.builder = this.builder.addActivity(
        new ActivityWrapper<>(clazz)
    );
    this.logger.log(Level.INFO, "Registered Activity: " +  clazz.getSimpleName());
    this.activities.add(clazz.getSimpleName());
  }

OrchestratorWrapper 和 ActivityWrapper 负责将 class 包装为 TaskOrchestrationFactory 和 TaskActivityFactory。

build() 方法调用 DurableTaskGrpcWorkerBuilder 的 build() 方法构建出一个 DurableTaskGrpcWorker ,然后传递给 WorkflowRuntime 的新实例。

  public WorkflowRuntime build() {
    if (instance == null) {
      synchronized (WorkflowRuntime.class) {
        if (instance == null) {
          instance = new WorkflowRuntime(this.builder.build());
        }
      }
    }
    this.logger.log(Level.INFO, "Successfully built dapr workflow runtime");
    return instance;
  }

DurableTaskGrpcWorkerBuilder

这个类在durabletask java sdk中。

DurableTaskGrpcWorkerBuilder 保存 orchestrationFactories 和 activityFactories,还有和 sidecar 连接的一些信息如端口,grpc channel:

public final class DurableTaskGrpcWorkerBuilder {
    final HashMap<String, TaskOrchestrationFactory> orchestrationFactories = new HashMap<>();
    final HashMap<String, TaskActivityFactory> activityFactories = new HashMap<>();
    int port;
    Channel channel;
    DataConverter dataConverter;
    Duration maximumTimerInterval;
......
}

addOrchestration() 将 TaskOrchestrationFactory 保存到 orchestrationFactories 中,key为 name:

    public DurableTaskGrpcWorkerBuilder addOrchestration(TaskOrchestrationFactory factory) {
        String key = factory.getName();
        ......
        this.orchestrationFactories.put(key, factory);
        return this;
    }

类似的, addActivity() 将 TaskActivityFactory 保存到 activityFactories 中,key为 name:

    public DurableTaskGrpcWorkerBuilder addActivity(TaskActivityFactory factory) {
        String key = factory.getName();
        ......
        this.activityFactories.put(key, factory);
        return this;
    }

build() 方法构建出 DurableTaskGrpcWorker() 对象:

    public DurableTaskGrpcWorker build() {
        return new DurableTaskGrpcWorker(this);
    }

DurableTaskGrpcWorker 的构造函数中会保存注册好的 orchestrationFactories 和 activityFactories,然后构建 TaskHubSidecarServiceGrpc 对象作为 sidecarClient,用于后续和 dapr sidecar 交互:

public final class DurableTaskGrpcWorker implements AutoCloseable {
    private final HashMap<String, TaskOrchestrationFactory> orchestrationFactories = new HashMap<>();
    private final HashMap<String, TaskActivityFactory> activityFactories = new HashMap<>();

    private final TaskHubSidecarServiceBlockingStub sidecarClient;

    DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) {
        this.orchestrationFactories.putAll(builder.orchestrationFactories);
        this.activityFactories.putAll(builder.activityFactories);

        Channel sidecarGrpcChannel;
        if (builder.channel != null) {
            // The caller is responsible for managing the channel lifetime
            this.managedSidecarChannel = null;
            sidecarGrpcChannel = builder.channel;
        } else {
            // Construct our own channel using localhost + a port number
            int port = DEFAULT_PORT;
            if (builder.port > 0) {
                port = builder.port;
            }

            // Need to keep track of this channel so we can dispose it on close()
            this.managedSidecarChannel = ManagedChannelBuilder
                    .forAddress("localhost", port)
                    .usePlaintext()
                    .build();
            sidecarGrpcChannel = this.managedSidecarChannel;
        }

        this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel);
        this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
        this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL;
    }

结论

dapr java sdk 中的 WorkflowRuntimeBuilder 和 durabletask java sdk 中的 DurableTaskGrpcWorkerBuilder,都是用来保住构建最终要使用的 WorkflowRuntime 和 DurableTaskGrpcWorker。

1.1.3 - 启动workflow runtime的源码

workflow app start流程中启动workflow runtime的源码

调用代码

workflow app 中启动 WorkflowRuntime 的典型使用代码如下:

    // Build and then start the workflow runtime pulling and executing tasks
    try (WorkflowRuntime runtime = builder.build()) {
      System.out.println("Start workflow runtime");
      //这里写死了 block=false,不会 block
      runtime.start(false);
    }

代码实现

WorkflowRuntime

这个类在 dapr java sdk。

WorkflowRuntime 只是对 DurableTaskGrpcWorker 的一个简单包装:

public class WorkflowRuntime implements AutoCloseable {

  private DurableTaskGrpcWorker worker;

  public WorkflowRuntime(DurableTaskGrpcWorker worker) {
    this.worker = worker;
  }
  ......

  public void start(boolean block) {
    if (block) {
      this.worker.startAndBlock();
    } else {
      this.worker.start();
    }
  }
}

DurableTaskGrpcWorker

这个类在durabletask java sdk中。

真实的实现代码在 DurableTaskGrpcWorker 中。


  public void start(boolean block) {
    if (block) {
      this.worker.startAndBlock();
    } else {
      // 1. block写死false了,所以只会进入到这里
      this.worker.start();
    }
  }

  public void start() {
    // 2. 启动线程来执行 startAndBlock,所以是不阻塞的
    new Thread(this::startAndBlock).start();
  }

startAndBlock()方法

这是最关键的代码。

这里不展开,看下一章 workflow runtime 的运行。

1.2 - workflow app run 流程

workflow app run流程的源码分析

1.2.1 - workflow app run流程概述

workflow app中workflow runtime运行的源码概述

上一章看到 workflow runtime start 之后,就会启动任务处理的流程。

代码实现在 durabletask java sdk 中的 DurableTaskGrpcWorker 类的 startAndBlock()方法中。

这是最关键的代码。

先构建两个 executor,负责执行 Orchestration task 和 activity task:

        TaskOrchestrationExecutor taskOrchestrationExecutor = new TaskOrchestrationExecutor(
                this.orchestrationFactories,
                this.dataConverter,
                this.maximumTimerInterval,
                logger);
        TaskActivityExecutor taskActivityExecutor = new TaskActivityExecutor(
                this.activityFactories,
                this.dataConverter,
                logger);

传入的参数有 orchestrationFactories 和 taskActivityExecutor,之前构建时注册的信息都保存在这里面。

获取工作任务

然后就是一个无限循环,在循环中调用 sidecarClient.getWorkItems(), 针对返回的 workitem stream,还有一个无限循环。而且如果遇到 StatusRuntimeException ,还会sleep之后继续。

while (true) {
  try {
      GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder().build();
      Iterator<WorkItem> workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest);
      while (workItemStream.hasNext()) {
        ......
      }
  } catch(StatusRuntimeException e){
    ......
    // Retry after 5 seconds
    try {
        Thread.sleep(5000);
    } catch (InterruptedException ex) {
        break;
    }
    }
}

work items 的类型只有两种 orchestrator 和 activity:

while (workItemStream.hasNext()) {
    WorkItem workItem = workItemStream.next();
    RequestCase requestType = workItem.getRequestCase();
    if (requestType == RequestCase.ORCHESTRATORREQUEST) {
        ......
    } else if (requestType == RequestCase.ACTIVITYREQUEST) {
        ......
    } else {
        logger.log(Level.WARNING, "Received and dropped an unknown '{0}' work-item from the sidecar.", requestType);
    }
}

执行 orchestrator task

通过 taskOrchestrationExecutor 执行 orchestrator task,然后将结果返回给到 dapr sidecar。

OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest();

TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute(
        orchestratorRequest.getPastEventsList(),
        orchestratorRequest.getNewEventsList());

OrchestratorResponse response = OrchestratorResponse.newBuilder()
        .setInstanceId(orchestratorRequest.getInstanceId())
        .addAllActions(taskOrchestratorResult.getActions())
        .setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus()))
        .build();

this.sidecarClient.completeOrchestratorTask(response);

备注:比较奇怪的是这里为什么不用 grpc 双向 stream 来获取任务和返回任务执行结果,而是通过另外一个 completeOrchestratorTask() 方法来发起请求。

执行 avtivity task

类似的,通过 taskActivityExecutor 执行 avtivity task,然后将结果返回给到 dapr sidecar。

ActivityRequest activityRequest = workItem.getActivityRequest();

String output = null;
TaskFailureDetails failureDetails = null;
try {
    output = taskActivityExecutor.execute(
        activityRequest.getName(),
        activityRequest.getInput().getValue(),
        activityRequest.getTaskId());
} catch (Throwable e) {
    failureDetails = TaskFailureDetails.newBuilder()
        .setErrorType(e.getClass().getName())
        .setErrorMessage(e.getMessage())
        .setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e)))
        .build();
}

ActivityResponse.Builder responseBuilder = ActivityResponse.newBuilder()
        .setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId())
        .setTaskId(activityRequest.getTaskId());

if (output != null) {
    responseBuilder.setResult(StringValue.of(output));
}

if (failureDetails != null) {
    responseBuilder.setFailureDetails(failureDetails);
}

this.sidecarClient.completeActivityTask(responseBuilder.build());

1.2.2 - 获取工作任务

workflow runtime运行时获取工作任务的源码概述

获取工作任务的调用代码

DurableTaskGrpcWorker 会调用 sidecarClient.getWorkItems() 来获取工作任务。

private final TaskHubSidecarServiceBlockingStub sidecarClient;

while (true) {
  try {
      GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder().build();
      Iterator<WorkItem> workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest);
      while (workItemStream.hasNext()) {
        ......
      }
  } catch{}
}

代码实现

proto 定义

TaskHubSidecarServiceBlockingStub 是根据 protobuf 文件生成的 grpc 代码,其 protobuf 定义在submodules/durabletask-protobuf/protos/orchestrator_service.proto 文件中。

service TaskHubSidecarService {
    ......
    rpc GetWorkItems(GetWorkItemsRequest) returns (stream WorkItem);
    ......
}

GetWorkItemsRequest 和 WorkItem 的消息定义为:

message GetWorkItemsRequest {
    // No parameters currently
}

message WorkItem {
    oneof request {
        OrchestratorRequest orchestratorRequest = 1;
        ActivityRequest activityRequest = 2;
    }
}

WorkItem 可能是 OrchestratorRequest 或者 ActivityRequest 。

OrchestratorRequest

message OrchestratorRequest {
    string instanceId = 1;
    google.protobuf.StringValue executionId = 2;
    repeated HistoryEvent pastEvents = 3;
    repeated HistoryEvent newEvents = 4;
}

ActivityRequest

message ActivityRequest {
    string name = 1;
    google.protobuf.StringValue version = 2;
    google.protobuf.StringValue input = 3;
    OrchestrationInstance orchestrationInstance = 4;
    int32 taskId = 5;
}

HistoryEvent

message HistoryEvent {
    int32 eventId = 1;
    google.protobuf.Timestamp timestamp = 2;
    oneof eventType {
        ExecutionStartedEvent executionStarted = 3;
        ExecutionCompletedEvent executionCompleted = 4;
        ExecutionTerminatedEvent executionTerminated = 5;
        TaskScheduledEvent taskScheduled = 6;
        TaskCompletedEvent taskCompleted = 7;
        TaskFailedEvent taskFailed = 8;
        SubOrchestrationInstanceCreatedEvent subOrchestrationInstanceCreated = 9;
        SubOrchestrationInstanceCompletedEvent subOrchestrationInstanceCompleted = 10;
        SubOrchestrationInstanceFailedEvent subOrchestrationInstanceFailed = 11;
        TimerCreatedEvent timerCreated = 12;
        TimerFiredEvent timerFired = 13;
        OrchestratorStartedEvent orchestratorStarted = 14;
        OrchestratorCompletedEvent orchestratorCompleted = 15;
        EventSentEvent eventSent = 16;
        EventRaisedEvent eventRaised = 17;
        GenericEvent genericEvent = 18;
        HistoryStateEvent historyState = 19;
        ContinueAsNewEvent continueAsNew = 20;
        ExecutionSuspendedEvent executionSuspended = 21;
        ExecutionResumedEvent executionResumed = 22;
    }
}

worker 调用

workflow app 中通过调用 sidecarClient.getWorkItems() 方法来获取 work items。

Iterator<WorkItem> workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest);

这里面就是 grpc stub 的生成代码,不细看

TaskHubSidecarService 服务器实现

TaskHubSidecarService 这个 protobuf 定义的 grpc service 的服务器端,代码实现在 durabletask-go 仓库中。

protobuf 生成的 grpc stub 的类在这里:

  • internal/protos/orchestrator_service_grpc.pb.go
  • internal/protos/orchestrator_service.pb.go

服务器端代码实现在 backend/executor.go 中:

// GetWorkItems implements protos.TaskHubSidecarServiceServer
func (g *grpcExecutor) GetWorkItems(req *protos.GetWorkItemsRequest, stream protos.TaskHubSidecarService_GetWorkItemsServer) error {
    ......

	// The worker client invokes this method, which streams back work-items as they arrive.
	for {
		select {
		case <-stream.Context().Done():
			g.logger.Infof("work item stream closed")
			return nil
		case wi := <-g.workItemQueue:
			if err := stream.Send(wi); err != nil {
				return err
			}
		case <-g.streamShutdownChan:
			return errShuttingDown
		}
	}
}

所以返回给客户端调用的 work item stream 的数据来自 g.workItemQueue

type grpcExecutor struct {
    ......
	workItemQueue        chan *protos.WorkItem
}

workItemQueue 的实现逻辑

workItemQueue 在 grpcExecutor 中定义:

type grpcExecutor struct {
	workItemQueue        chan *protos.WorkItem
    ......
}

grpcExecutor 在 NewGrpcExecutor() 方法中构建:

// NewGrpcExecutor returns the Executor object and a method to invoke to register the gRPC server in the executor.
func NewGrpcExecutor(be Backend, logger Logger, opts ...grpcExecutorOptions) (executor Executor, registerServerFn func(grpcServer grpc.ServiceRegistrar)) {
	grpcExecutor := &grpcExecutor{
		workItemQueue:        make(chan *protos.WorkItem),
		backend:              be,
		logger:               logger,
		pendingOrchestrators: &sync.Map{},
		pendingActivities:    &sync.Map{},
	}

    ......
}

将数据写入 workItemQueue 的地方有两个:

  1. ExecuteOrchestrator()

    func (executor *grpcExecutor) ExecuteOrchestrator(......) {
        ......
            workItem := &protos.WorkItem{
            Request: &protos.WorkItem_OrchestratorRequest{
                OrchestratorRequest: &protos.OrchestratorRequest{
                    InstanceId:  string(iid),
                    ExecutionId: nil,
                    PastEvents:  oldEvents,
                    NewEvents:   newEvents,
                },
            },
        }
    
        executor.workItemQueue <- workItem:
    }
    
  2. ExecuteActivity()

    func (executor *grpcExecutor) ExecuteActivity(......) {
        workItem := &protos.WorkItem{
    	Request: &protos.WorkItem_ActivityRequest{
    		ActivityRequest: &protos.ActivityRequest{
    			Name:                  task.Name,
    			Version:               task.Version,
    			Input:                 task.Input,
    			OrchestrationInstance: &protos.OrchestrationInstance{InstanceId: string(iid)},
    			TaskId:                e.EventId,
    		},
    	},
    
        executor.workItemQueue <- workItem:
    }
    

继续跟踪看 ExecuteOrchestrator() 和 ExecuteActivity() 方法是被谁调用的,这个细节在下一节中。

小结

获取工作任务的任务源头在 dapr sidecar,代码实现在 durabletask-go 项目的 backend/executor.go 中。

1.2.3 - 执行orchestrator task

workflow runtime运行时执行orchestrator task的源码

回顾

前面看到执行orchestrator task的代码实现在 durabletask-go 仓库的 client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java 中。

TaskOrchestrationExecutor taskOrchestrationExecutor = new TaskOrchestrationExecutor(
        this.orchestrationFactories,
        this.dataConverter,
        this.maximumTimerInterval,
        logger);
......
Iterator<WorkItem> workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest);
while (workItemStream.hasNext()) {
    WorkItem workItem = workItemStream.next();
    RequestCase requestType = workItem.getRequestCase();
    if (requestType == RequestCase.ORCHESTRATORREQUEST) {
        OrchestratorRequest orchestratorRequest = workItem.getOrchestratorRequest();

        TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute(
                orchestratorRequest.getPastEventsList(),
                orchestratorRequest.getNewEventsList());

        OrchestratorResponse response = OrchestratorResponse.newBuilder()
                .setInstanceId(orchestratorRequest.getInstanceId())
                .addAllActions(taskOrchestratorResult.getActions())
                .setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus()))
                .build();

        this.sidecarClient.completeOrchestratorTask(response);
    }
    ......

实现细节

TaskOrchestrationExecutor

TaskOrchestrationExecutor 类的定义和构造函数:

 final class TaskOrchestrationExecutor {

    private static final String EMPTY_STRING = "";
    private final HashMap<String, TaskOrchestrationFactory> orchestrationFactories;
    private final DataConverter dataConverter;
    private final Logger logger;
    private final Duration maximumTimerInterval;

    public TaskOrchestrationExecutor(
            HashMap<String, TaskOrchestrationFactory> orchestrationFactories,
            DataConverter dataConverter,
            Duration maximumTimerInterval,
            Logger logger) {
        this.orchestrationFactories = orchestrationFactories;
        this.dataConverter = dataConverter;
        this.maximumTimerInterval = maximumTimerInterval;
        this.logger = logger;
    }

其中 orchestrationFactories 是从前面 registerWorkflow()时保存的已经注册的工作流信息。

execute() 方法:

public TaskOrchestratorResult execute(List<HistoryEvent> pastEvents, List<HistoryEvent> newEvents) {
    ContextImplTask context = new ContextImplTask(pastEvents, newEvents);

    boolean completed = false;
    try {
        // Play through the history events until either we've played through everything
        // or we receive a yield signal
        while (context.processNextEvent()) { /* no method body */ }
        completed = true;
    } catch (OrchestratorBlockedException orchestratorBlockedException) {
        logger.fine("The orchestrator has yielded and will await for new events.");
    } catch (ContinueAsNewInterruption continueAsNewInterruption) {
        logger.fine("The orchestrator has continued as new.");
        context.complete(null);
    } catch (Exception e) {
        // The orchestrator threw an unhandled exception - fail it
        // TODO: What's the right way to log this?
        logger.warning("The orchestrator failed with an unhandled exception: " + e.toString());
        context.fail(new FailureDetails(e));
    }

    if ((context.continuedAsNew && !context.isComplete) || (completed && context.pendingActions.isEmpty() && !context.waitingForEvents())) {
        // There are no further actions for the orchestrator to take so auto-complete the orchestration.
        context.complete(null);
    }

    return new TaskOrchestratorResult(context.pendingActions.values(), context.getCustomStatus());
}

这里只是主要流程,细节实现在内部私有类 ContextImplTask 中。

ContextImplTask

ContextImplTask 的定义和构造函数,使用到 OrchestrationHistoryIterator。

private class ContextImplTask implements TaskOrchestrationContext {
    private final OrchestrationHistoryIterator historyEventPlayer;
    ......

    public ContextImplTask(List<HistoryEvent> pastEvents, List<HistoryEvent> newEvents) {
        this.historyEventPlayer = new OrchestrationHistoryIterator(pastEvents, newEvents);
    }
    ......

    private boolean processNextEvent() {
        return this.historyEventPlayer.moveNext();
    }
}

OrchestrationHistoryIterator

OrchestrationHistoryIterator 的类定义和构造函数,其中 pastEvents 和 newEvents 是 daprd sidecar 那边在 getWorkItem() 返回的 orchestratorRequest 中携带的数据。

private class OrchestrationHistoryIterator {
    private final List<HistoryEvent> pastEvents;
    private final List<HistoryEvent> newEvents;

    private List<HistoryEvent> currentHistoryList;
    private int currentHistoryIndex;

    public OrchestrationHistoryIterator(List<HistoryEvent> pastEvents, List<HistoryEvent> newEvents) {
        this.pastEvents = pastEvents;
        this.newEvents = newEvents;
        this.currentHistoryList = pastEvents;
    }

currentHistoryList 初始化指向 pastEvents,currentHistoryIndex 为0。

然后继续看 moveNext() 方法:

    public boolean moveNext() {
        if (this.currentHistoryList == pastEvents && this.currentHistoryIndex >= pastEvents.size()) {
            // 如果当前 currentHistoryList 指向的是 pastEvents,并且已经指到最后一个元素了。
            // 那么 moveNext 就应该指向 this.newEvents,然后将 currentHistoryIndex 设置为0 (即指向第一个元素)
            // Move forward to the next list
            this.currentHistoryList = this.newEvents;
            this.currentHistoryIndex = 0;

            // 这意味着 pastEvents 的游历接触,即 replay 完成。
            ContextImplTask.this.setDoneReplaying();
        }

        if (this.currentHistoryList == this.newEvents && this.currentHistoryIndex >= this.newEvents.size()) {
            // 如果当前 currentHistoryList 指向的是 newEvents,并且已经指到最后一个元素了。
            // 此时已经完成游历,没有更多元素,返回 false 表示可以结束了。
            // We're done enumerating the history
            return false;
        }

        // Process the next event in the history
        // 获取当前元素,然后 currentHistoryIndex +1 指向下一个元素
        HistoryEvent next = this.currentHistoryList.get(this.currentHistoryIndex++);
        // 处理事件
        ContextImplTask.this.processEvent(next);
        return true;
    }

处理事件的代码实现在 ContextImplTask 的 processEvent() 方法中:

        private void processEvent(HistoryEvent e) {
            boolean overrideSuspension = e.getEventTypeCase() == HistoryEvent.EventTypeCase.EXECUTIONRESUMED || e.getEventTypeCase() == HistoryEvent.EventTypeCase.EXECUTIONTERMINATED;
            if (this.isSuspended && !overrideSuspension) {
                this.handleEventWhileSuspended(e);
            } else {
                switch (e.getEventTypeCase()) {
                    case ORCHESTRATORSTARTED:
                        Instant instant = DataConverter.getInstantFromTimestamp(e.getTimestamp());
                        this.setCurrentInstant(instant);
                        break;
                    case ORCHESTRATORCOMPLETED:
                        // No action
                        break;
                    case EXECUTIONSTARTED:
                        ExecutionStartedEvent startedEvent = e.getExecutionStarted();
                        String name = startedEvent.getName();
                        this.setName(name);
                        String instanceId = startedEvent.getOrchestrationInstance().getInstanceId();
                        this.setInstanceId(instanceId);
                        String input = startedEvent.getInput().getValue();
                        this.setInput(input);
                        TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.get(name);
                        if (factory == null) {
                            // Try getting the default orchestrator
                            factory = TaskOrchestrationExecutor.this.orchestrationFactories.get("*");
                        }
                        // TODO: Throw if the factory is null (orchestration by that name doesn't exist)
                        TaskOrchestration orchestrator = factory.create();
                        orchestrator.run(this);
                        break;
//                case EXECUTIONCOMPLETED:
//                    break;
//                case EXECUTIONFAILED:
//                    break;
                    case EXECUTIONTERMINATED:
                        this.handleExecutionTerminated(e);
                        break;
                    case TASKSCHEDULED:
                        this.handleTaskScheduled(e);
                        break;
                    case TASKCOMPLETED:
                        this.handleTaskCompleted(e);
                        break;
                    case TASKFAILED:
                        this.handleTaskFailed(e);
                        break;
                    case TIMERCREATED:
                        this.handleTimerCreated(e);
                        break;
                    case TIMERFIRED:
                        this.handleTimerFired(e);
                        break;
                    case SUBORCHESTRATIONINSTANCECREATED:
                        this.handleSubOrchestrationCreated(e);
                        break;
                    case SUBORCHESTRATIONINSTANCECOMPLETED:
                        this.handleSubOrchestrationCompleted(e);
                        break;
                    case SUBORCHESTRATIONINSTANCEFAILED:
                        this.handleSubOrchestrationFailed(e);
                        break;
//                case EVENTSENT:
//                    break;
                    case EVENTRAISED:
                        this.handleEventRaised(e);
                        break;
//                case GENERICEVENT:
//                    break;
//                case HISTORYSTATE:
//                    break;
//                case EVENTTYPE_NOT_SET:
//                    break;
                    case EXECUTIONSUSPENDED:
                        this.handleExecutionSuspended(e);
                        break;
                    case EXECUTIONRESUMED:
                        this.handleExecutionResumed(e);
                        break;
                    default:
                        throw new IllegalStateException("Don't know how to handle history type " + e.getEventTypeCase());
                }
            }
        }

这里具体会执行什么代码,就要看给过来的 event 是什么了。

EXECUTIONSTARTED 事件的执行

这是 ExecutionStartedEvent 的 proto 定义:

message ExecutionStartedEvent {
    string name = 1;
    google.protobuf.StringValue version = 2;
    google.protobuf.StringValue input = 3;
    OrchestrationInstance orchestrationInstance = 4;
    ParentInstanceInfo parentInstance = 5;
    google.protobuf.Timestamp scheduledStartTimestamp = 6;
    TraceContext parentTraceContext = 7;
    google.protobuf.StringValue orchestrationSpanID = 8;
}

EXECUTIONSTARTED 事件的处理:

case EXECUTIONSTARTED:
    ExecutionStartedEvent startedEvent = e.getExecutionStarted();
    String name = startedEvent.getName();
    this.setName(name);
    String instanceId = startedEvent.getOrchestrationInstance().getInstanceId();
    this.setInstanceId(instanceId);
    String input = startedEvent.getInput().getValue();
    this.setInput(input);
    TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.get(name);
    if (factory == null) {
        // Try getting the default orchestrator
        factory = TaskOrchestrationExecutor.this.orchestrationFactories.get("*");
    }
    // TODO: Throw if the factory is null (orchestration by that name doesn't exist)
    TaskOrchestration orchestrator = factory.create();
    orchestrator.run(this);
    break;

name / instanceId / input 等基本信息直接设置在 ContextImplTask 上。

factory 要从 orchestrationFactories 里面根据 name 查找,如果没有找到,则获取默认。

从 factory 创建 TaskOrchestration,再运行 orchestrator.run():

    TaskOrchestration orchestrator = factory.create();
    orchestrator.run(this);

这就回到 TaskOrchestration 的实现了。

OrchestratorWrapper

Dapr java sdk 中的 OrchestratorWrapper 实现了 TaskOrchestration 接口

class OrchestratorWrapper<T extends Workflow> implements TaskOrchestrationFactory {
  @Override
  public TaskOrchestration create() {
    return ctx -> {
      T workflow;
      try {
        workflow = this.workflowConstructor.newInstance();
      } ......
    };
  }
}

1.3 - client app start 流程

client app start流程的源码分析

1.3.1 - 流程概述

client app start流程概述

流程整体

client app 启动时,典型代码如下(忽略细节和异常处理):

DaprWorkflowClient workflowClient = new DaprWorkflowClient();
String instanceId = workflowClient.scheduleNewWorkflow(OrderProcessingWorkflow.class, order);
workflowClient.waitForInstanceStart(instanceId, Duration.ofSeconds(10), false);
WorkflowInstanceStatus workflowStatus = workflowClient.waitForInstanceCompletion(instanceId,
          Duration.ofSeconds(30),

这个过程中,初始化 workflowClient,然后通过 workflowClient 调度执行了一个 workflow 实例:包括等待实例启动,等待实例完成。

@startuml
participant "Client App" as ClientApp
participant "Dapr Sidecar" as DaprSidecar

ClientApp -> ClientApp: create workflow client

ClientApp -[#red]> DaprSidecar: scheduleNewWorkflow()
DaprSidecar --> ClientApp: instanceId

ClientApp -> DaprSidecar: waitForInstanceStart(instanceId)
DaprSidecar --> ClientApp: 

ClientApp -> DaprSidecar: waitForInstanceCompletion(instanceId)
DaprSidecar --> ClientApp: 

@enduml

1.3.2 - client app start流程

client app start流程源码分析

DaprWorkflowClient

Dapr java SDK 中的 DaprWorkflowClient,包裹了 durabletask java sdk 的 DurableTaskClient:

public class DaprWorkflowClient implements AutoCloseable {

  private DurableTaskClient innerClient;
  private ManagedChannel grpcChannel;

  private DaprWorkflowClient(ManagedChannel grpcChannel) {
    this(createDurableTaskClient(grpcChannel), grpcChannel);
  }

  private DaprWorkflowClient(DurableTaskClient innerClient, ManagedChannel grpcChannel) {
    this.innerClient = innerClient;
    this.grpcChannel = grpcChannel;
  }

  private static DurableTaskClient createDurableTaskClient(ManagedChannel grpcChannel) {
    return new DurableTaskGrpcClientBuilder()
        .grpcChannel(grpcChannel)
        .build();
  }
  ......
}

scheduleNewWorkflow()方法代理给了 DurableTaskClient 的 scheduleNewOrchestrationInstance() 方法:

  public <T extends Workflow> String scheduleNewWorkflow(Class<T> clazz, Object input, String instanceId) {
    return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName(), input, instanceId);
  }

DurableTaskClient 和 DurableTaskGrpcClient

这两个类在 durabletask java sdk 中。

DurableTaskGrpcClient 的 scheduleNewOrchestrationInstance() 方法的实现:

@Override
public String scheduleNewOrchestrationInstance(
        String orchestratorName,
        NewOrchestrationInstanceOptions options) {
    if (orchestratorName == null || orchestratorName.length() == 0) {
        throw new IllegalArgumentException("A non-empty orchestrator name must be specified.");
    }

    Helpers.throwIfArgumentNull(options, "options");

    CreateInstanceRequest.Builder builder = CreateInstanceRequest.newBuilder();
    builder.setName(orchestratorName);

    String instanceId = options.getInstanceId();
    if (instanceId == null) {
        instanceId = UUID.randomUUID().toString();
    }
    builder.setInstanceId(instanceId);

    String version = options.getVersion();
    if (version != null) {
        builder.setVersion(StringValue.of(version));
    }

    Object input = options.getInput();
    if (input != null) {
        String serializedInput = this.dataConverter.serialize(input);
        builder.setInput(StringValue.of(serializedInput));
    }

    Instant startTime = options.getStartTime();
    if (startTime != null) {
        Timestamp ts = DataConverter.getTimestampFromInstant(startTime);
        builder.setScheduledStartTimestamp(ts);
    }

    CreateInstanceRequest request = builder.build();
    CreateInstanceResponse response = this.sidecarClient.startInstance(request);
    return response.getInstanceId();
}

前面一大段都是为了构建 CreateInstanceRequest,然后最后调用 sidecarClient.startInstance() 方法去访问 sidecar 。

proto 定义

TaskHubSidecarServiceBlockingStub 是根据 protobuf 文件生成的 grpc 代码,其 protobuf 定义在submodules/durabletask-protobuf/protos/orchestrator_service.proto 文件中。

service TaskHubSidecarService {
    ......
    // Starts a new orchestration instance.
    rpc StartInstance(CreateInstanceRequest) returns (CreateInstanceResponse);
    ......
}

CreateInstanceRequest 消息的定义为:

message CreateInstanceRequest {
    string instanceId = 1;
    string name = 2;
    google.protobuf.StringValue version = 3;
    google.protobuf.StringValue input = 4;
    google.protobuf.Timestamp scheduledStartTimestamp = 5;
    OrchestrationIdReusePolicy orchestrationIdReusePolicy = 6;
}

备注:这个version字段不知道是做什么的?后面注意看看细节。

CreateInstanceResponse 信息的定义,很简单,只有一个 instanceId 字段。

message CreateInstanceResponse {
    string instanceId = 1;
}

代码实现

StartInstance 的代码实现在 backend/executor.go 中:

func (g *grpcExecutor) StartInstance(ctx context.Context, req *protos.CreateInstanceRequest) (*protos.CreateInstanceResponse, error) {
	instanceID := req.InstanceId
	ctx, span := helpers.StartNewCreateOrchestrationSpan(ctx, req.Name, req.Version.GetValue(), instanceID)
	defer span.End()

	e := helpers.NewExecutionStartedEvent(req.Name, instanceID, req.Input, nil, helpers.TraceContextFromSpan(span))
	if err := g.backend.CreateOrchestrationInstance(ctx, e, WithOrchestrationIdReusePolicy(req.OrchestrationIdReusePolicy)); err != nil {
		return nil, err
	}

	return &protos.CreateInstanceResponse{InstanceId: instanceID}, nil
}

StartNewCreateOrchestrationSpan() 方法

helpers.StartNewCreateOrchestrationSpan() 方法的实现:

func StartNewCreateOrchestrationSpan(
	ctx context.Context, name string, version string, instanceID string,
) (context.Context, trace.Span) {
	attributes := []attribute.KeyValue{
		{Key: "durabletask.type", Value: attribute.StringValue("orchestration")},
		{Key: "durabletask.task.name", Value: attribute.StringValue(name)},
		{Key: "durabletask.task.instance_id", Value: attribute.StringValue(instanceID)},
	}
	return startNewSpan(ctx, "create_orchestration", name, version, attributes, trace.SpanKindClient, time.Now().UTC())
}

startNewSpan()的实现:

func startNewSpan(
	ctx context.Context,
	taskType string,
	taskName string,
	taskVersion string,
	attributes []attribute.KeyValue,
	kind trace.SpanKind,
	timestamp time.Time,
) (context.Context, trace.Span) {
	var spanName string
	if taskVersion != "" {
		spanName = taskType + "||" + taskName + "||" + taskVersion
		attributes = append(attributes, attribute.KeyValue{
			Key:   "durabletask.task.version",
			Value: attribute.StringValue(taskVersion),
		})
	} else if taskName != "" {
		spanName = taskType + "||" + taskName
	} else {
		spanName = taskType
	}

	var span trace.Span
	ctx, span = tracer.Start(
		ctx,
		spanName,
		trace.WithSpanKind(kind),
		trace.WithTimestamp(timestamp),
		trace.WithAttributes(attributes...),
	)
	return ctx, span
}

构建 spanName 的逻辑比较复杂,因为 taskVersion 和 taskName 可能为空(按说 taskName 不能为空)

  • spanName = taskType + "||" + taskName + "||" + taskVersion
  • spanName = taskType + "||" + taskName
  • spanName = taskType

NewExecutionStartedEvent() 方法

这行代码的作用是构建一个 ExecutionStartedEvent 事件:

e := helpers.NewExecutionStartedEvent(req.Name, instanceID, req.Input, nil, helpers.TraceContextFromSpan(span))

具体实现为:

func NewExecutionStartedEvent(
	name string,
	instanceId string,
	input *wrapperspb.StringValue,
	parent *protos.ParentInstanceInfo,
	parentTraceContext *protos.TraceContext,
) *protos.HistoryEvent {
	return &protos.HistoryEvent{
		EventId:   -1,
		Timestamp: timestamppb.New(time.Now()),
		EventType: &protos.HistoryEvent_ExecutionStarted{
			ExecutionStarted: &protos.ExecutionStartedEvent{
				Name:           name,
				ParentInstance: parent,
				Input:          input,
				OrchestrationInstance: &protos.OrchestrationInstance{
					InstanceId:  instanceId,
					ExecutionId: wrapperspb.String(uuid.New().String()),
				},
				ParentTraceContext: parentTraceContext,
			},
		},
	}
}

备注:这里没有用到 version 字段

CreateOrchestrationInstance() 方法

最关键的代码:

if err := g.backend.CreateOrchestrationInstance(ctx, e, WithOrchestrationIdReusePolicy(req.OrchestrationIdReusePolicy)); err != nil {
  return nil, err
}

Backend 是一个 interface,CreateOrchestrationInstance() 方法定义如下:

type Backend interface {
  // CreateOrchestrationInstance creates a new orchestration instance with a history event that
	// wraps a ExecutionStarted event.
	CreateOrchestrationInstance(context.Context, *HistoryEvent, ...OrchestrationIdReusePolicyOptions) error
  ......
}

daprd 的实现

在 daprd sidecar 的代码实现中,这个 backend 是这样构建的,代码在 dapr/dapr 仓库的 pkg/runtime/wfengine/wfengine.go :

func (wfe *WorkflowEngine) ConfigureGrpcExecutor() {
	// Enable lazy auto-starting the worker only when a workflow app connects to fetch work items.
	autoStartCallback := backend.WithOnGetWorkItemsConnectionCallback(func(ctx context.Context) error {
		// NOTE: We don't propagate the context here because that would cause the engine to shut
		//       down when the client disconnects and cancels the passed-in context. Once it starts
		//       up, we want to keep the engine running until the runtime shuts down.
		if err := wfe.Start(context.Background()); err != nil {
			// This can happen if the workflow app connects before the sidecar has finished initializing.
			// The client app is expected to continuously retry until successful.
			return fmt.Errorf("failed to auto-start the workflow engine: %w", err)
		}
		return nil
	})

	// Create a channel that can be used to disconnect the remote client during shutdown.
	wfe.disconnectChan = make(chan any, 1)
	disconnectHelper := backend.WithStreamShutdownChannel(wfe.disconnectChan)

	wfe.executor, wfe.registerGrpcServerFn = backend.NewGrpcExecutor(wfe.Backend, wfLogger, autoStartCallback, disconnectHelper)
}

WorkflowEngine 的初始化代码在 pkg/runtime/runtime.go 中:

	// Creating workflow engine after components are loaded
	wfe := wfengine.NewWorkflowEngine(a.runtimeConfig.id, a.globalConfig.GetWorkflowSpec(), a.processor.WorkflowBackend())
	wfe.ConfigureGrpcExecutor()
	a.workflowEngine = wfe
	processor := processor.New(processor.Options{
		ID:             runtimeConfig.id,
		Namespace:      namespace,
		IsHTTP:         runtimeConfig.appConnectionConfig.Protocol.IsHTTP(),
		ActorsEnabled:  len(runtimeConfig.actorsService) > 0,
		Registry:       runtimeConfig.registry,
		ComponentStore: compStore,
		Meta:           meta,
		GlobalConfig:   globalConfig,
		Resiliency:     resiliencyProvider,
		Mode:           runtimeConfig.mode,
		PodName:        podName,
		Standalone:     runtimeConfig.standalone,
		OperatorClient: operatorClient,
		GRPC:           grpc,
		Channels:       channels,
	})

ActorBackend

ActorBackend 实现了 durabletask-go 定义的 Backend 接口:

type ActorBackend struct {
	orchestrationWorkItemChan chan *backend.OrchestrationWorkItem
	activityWorkItemChan      chan *backend.ActivityWorkItem
	startedOnce               sync.Once
	config                    actorsBackendConfig
	activityActorOpts         activityActorOpts
	workflowActorOpts         workflowActorOpts

	actorRuntime  actors.ActorRuntime
	actorsReady   atomic.Bool
	actorsReadyCh chan struct{}
}

CreateOrchestrationInstance() 方法的实现:

func (abe *ActorBackend) CreateOrchestrationInstance(ctx context.Context, e *backend.HistoryEvent, opts ...backend.OrchestrationIdReusePolicyOptions) error {
	if err := abe.validateConfiguration(); err != nil {
		return err
	}

  // 对输入做必要的检查
	var workflowInstanceID string
	if es := e.GetExecutionStarted(); es == nil {
		return errors.New("the history event must be an ExecutionStartedEvent")
	} else if oi := es.GetOrchestrationInstance(); oi == nil {
		return errors.New("the ExecutionStartedEvent did not contain orchestration instance information")
	} else {
		workflowInstanceID = oi.GetInstanceId()
	}

	policy := &api.OrchestrationIdReusePolicy{}
	for _, opt := range opts {
		opt(policy)
	}

	eventData, err := backend.MarshalHistoryEvent(e)
	if err != nil {
		return err
	}

	requestBytes, err := json.Marshal(CreateWorkflowInstanceRequest{
		Policy:          policy,
		StartEventBytes: eventData,
	})
	if err != nil {
		return fmt.Errorf("failed to marshal CreateWorkflowInstanceRequest: %w", err)
	}

	// Invoke the well-known workflow actor directly, which will be created by this invocation request.
	// Note that this request goes directly to the actor runtime, bypassing the API layer.
	req := internalsv1pb.NewInternalInvokeRequest(CreateWorkflowInstanceMethod).
		WithActor(abe.config.workflowActorType, workflowInstanceID).
		WithData(requestBytes).
		WithContentType(invokev1.JSONContentType)
	start := time.Now()
	_, err = abe.actorRuntime.Call(ctx, req)
	elapsed := diag.ElapsedSince(start)
	if err != nil {
		// failed request to CREATE workflow, record count and latency metrics.
		diag.DefaultWorkflowMonitoring.WorkflowOperationEvent(ctx, diag.CreateWorkflow, diag.StatusFailed, elapsed)
		return err
	}
	// successful request to CREATE workflow, record count and latency metrics.
	diag.DefaultWorkflowMonitoring.WorkflowOperationEvent(ctx, diag.CreateWorkflow, diag.StatusSuccess, elapsed)
	return nil
}

关键代码在:

_, err = abe.actorRuntime.Call(ctx, req)

这是通过 actor 来进行调用。

其中 ActorRuntime 是这样设置进来的:

func (abe *ActorBackend) SetActorRuntime(ctx context.Context, actorRuntime actors.ActorRuntime) {
	abe.actorRuntime = actorRuntime
	if abe.actorsReady.CompareAndSwap(false, true) {
		close(abe.actorsReadyCh)
	}
}

调用的地方在 pkg/runtime/runtime.go 的 initWorkflowEngine() 方法中:

func (a *DaprRuntime) initWorkflowEngine(ctx context.Context) error {
	wfComponentFactory := wfengine.BuiltinWorkflowFactory(a.workflowEngine)

	// If actors are not enabled, still invoke SetActorRuntime on the workflow engine with `nil` to unblock startup
	if abe, ok := a.workflowEngine.Backend.(interface {
		SetActorRuntime(ctx context.Context, actorRuntime actors.ActorRuntime)
	}); ok {
		log.Info("Configuring workflow engine with actors backend")
		var actorRuntime actors.ActorRuntime
		if a.runtimeConfig.ActorsEnabled() {
			actorRuntime = a.actor
		}
		abe.SetActorRuntime(ctx, actorRuntime)
	}
  ......

actorRuntime的实现

ActorRuntime 的 interface 定义:

// ActorRuntime is the main runtime for the actors subsystem.
type ActorRuntime interface {
	Actors
	io.Closer
	Init(context.Context) error
	IsActorHosted(ctx context.Context, req *ActorHostedRequest) bool
	GetRuntimeStatus(ctx context.Context) *runtimev1pb.ActorRuntime
	RegisterInternalActor(ctx context.Context, actorType string, actor InternalActorFactory, actorIdleTimeout time.Duration) error
}

ActorRuntime 继承了 Actors interface,call()方法在这里定义:

// Actors allow calling into virtual actors as well as actor state management.
type Actors interface {
	// Call an actor.
	Call(ctx context.Context, req *internalv1pb.InternalInvokeRequest) (*internalv1pb.InternalInvokeResponse, error)
  ......
}

Call()方法的代码实现:

func (a *actorsRuntime) Call(ctx context.Context, req *internalv1pb.InternalInvokeRequest) (res *internalv1pb.InternalInvokeResponse, err error) {
	err = a.placement.WaitUntilReady(ctx)
	if err != nil {
		return nil, fmt.Errorf("failed to wait for placement readiness: %w", err)
	}

	// Do a lookup to check if the actor is local
	actor := req.GetActor()
	actorType := actor.GetActorType()
	lar, err := a.placement.LookupActor(ctx, internal.LookupActorRequest{
		ActorType: actorType,
		ActorID:   actor.GetActorId(),
	})
	if err != nil {
		return nil, err
	}

	if a.isActorLocal(lar.Address, a.actorsConfig.Config.HostAddress, a.actorsConfig.Config.Port) {
		// If this is an internal actor, we call it using a separate path
		internalAct, ok := a.getInternalActor(actorType, actor.GetActorId())
		if ok {
			res, err = a.callInternalActor(ctx, req, internalAct)
		} else {
			res, err = a.callLocalActor(ctx, req)
		}
	} else {
		res, err = a.callRemoteActorWithRetry(ctx, retry.DefaultLinearRetryCount, retry.DefaultLinearBackoffInterval, a.callRemoteActor, lar.Address, lar.AppID, req)
	}

	if err != nil {
		if res != nil && actorerrors.Is(err) {
			return res, err
		}
		return nil, err
	}
	return res, nil
}

关键代码在这里,调用 placement.LookupActor() 方法来查找要调用的目标actor的地址:

	lar, err := a.placement.LookupActor(ctx, internal.LookupActorRequest{
		ActorType: actorType,
		ActorID:   actor.GetActorId(),
	})

placement 的实现

PlacementService 的接口定义:

type PlacementService interface {
	io.Closer

	Start(context.Context) error
	WaitUntilReady(ctx context.Context) error
	LookupActor(ctx context.Context, req LookupActorRequest) (LookupActorResponse, error)
	AddHostedActorType(actorType string, idleTimeout time.Duration) error
	ReportActorDeactivation(ctx context.Context, actorType, actorID string) error

	SetHaltActorFns(haltFn HaltActorFn, haltAllFn HaltAllActorsFn)
	SetOnAPILevelUpdate(fn func(apiLevel uint32))
	SetOnTableUpdateFn(fn func())

	// PlacementHealthy returns true if the placement service is healthy.
	PlacementHealthy() bool
	// StatusMessage returns a custom status message.
	StatusMessage() string
}

代码实现在 pkg/actors/placement/placement.go 中:

// LookupActor resolves to actor service instance address using consistent hashing table.
func (p *actorPlacement) LookupActor(ctx context.Context, req internal.LookupActorRequest) (internal.LookupActorResponse, error) {
	// Retry here to allow placement table dissemination/rebalancing to happen.
	policyDef := p.resiliency.BuiltInPolicy(resiliency.BuiltInActorNotFoundRetries)
	policyRunner := resiliency.NewRunner[internal.LookupActorResponse](ctx, policyDef)
	return policyRunner(func(ctx context.Context) (res internal.LookupActorResponse, rErr error) {
		rAddr, rAppID, rErr := p.doLookupActor(ctx, req.ActorType, req.ActorID)
		if rErr != nil {
			return res, fmt.Errorf("error finding address for actor %s/%s: %w", req.ActorType, req.ActorID, rErr)
		} else if rAddr == "" {
			return res, fmt.Errorf("did not find address for actor %s/%s", req.ActorType, req.ActorID)
		}
		res.Address = rAddr
		res.AppID = rAppID
		return res, nil
	})
}

doLookupActor():

func (p *actorPlacement) doLookupActor(ctx context.Context, actorType, actorID string) (string, string, error) {
  // 加读锁
	p.placementTableLock.RLock()
	defer p.placementTableLock.RUnlock()

	if p.placementTables == nil {
		return "", "", errors.New("placement tables are not set")
	}

  // 先根据 actorType 找到符合要求的 Entries
	t := p.placementTables.Entries[actorType]
	if t == nil {
		return "", "", nil
	}
	host, err := t.GetHost(actorID)
	if err != nil || host == nil {
		return "", "", nil //nolint:nilerr
	}
	return host.Name, host.AppID, nil
}

p.placementTables 的结构体定义如下:

type ConsistentHashTables struct {
	Version string
	Entries map[string]*Consistent
}

Consistent 的结构体定义如下:

// Consistent represents a data structure for consistent hashing.
type Consistent struct {
	hosts             map[uint64]string
	sortedSet         []uint64
	loadMap           map[string]*Host
	totalLoad         int64
	replicationFactor int

	sync.RWMutex
}

host, err := t.GetHost(actorID) 代码对应的 GetHost() 方法:

func (c *Consistent) GetHost(key string) (*Host, error) {
	h, err := c.Get(key)
	if err != nil {
		return nil, err
	}

	return c.loadMap[h], nil
}