这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

worker

DurableTask worker 的源码分析

1 - worker initial

DurableTask client 初始化的源码分析

1.1 - 过时的初始化

DurableTask worker 已经过时的初始化过程

GrpcDurableTaskWorker

仓库 durabletask-dotnet 下的文件 src\Worker\Grpc\GrpcDurableTaskWorker.Processor.cs

 public async Task ExecuteAsync(CancellationToken cancellation)
        {
            while (!cancellation.IsCancellationRequested)
            {
                try
                {
                    AsyncServerStreamingCall<P.WorkItem> stream = await this.ConnectAsync(cancellation);
                    await this.ProcessWorkItemsAsync(stream, cancellation);
                }
                ......
            }
        }

ConnectAsync() 方法中会调用 grpc protobuf 文件中定义的 GetWorkItems() 方法:

        async Task<AsyncServerStreamingCall<P.WorkItem>> ConnectAsync(CancellationToken cancellation)
        {
            await this.sidecar!.HelloAsync(EmptyMessage, cancellationToken: cancellation);
            this.Logger.EstablishedWorkItemConnection();

            Console.WriteLine("********GrpcDurableTaskWorker call GetWorkItems()********");

            // Get the stream for receiving work-items
            return this.sidecar!.GetWorkItems(new P.GetWorkItemsRequest(), cancellationToken: cancellation);
        }

这本该是 worker 正常的初始化流程,但现在已经被废弃。

备注:记录一下避免误解,事实上我被耽误了很多时间在这里。

1.2 - 调用堆栈

DurableTask client 启动新的实例时的调用堆栈

调用堆栈概况

azure-functions-dotnet-worker 仓库下的 src\DotNetWorker.Grpc\GrpcWorker.cs 中增加日志打印:

        public Task StartAsync(CancellationToken token)
        {
            Console.WriteLine(new System.Diagnostics.StackTrace(true));

            _workerClient = _workerClientFactory.CreateClient(this);

            Console.WriteLine("_workerClient is " + _workerClient.GetType().Name);

            return _workerClient.StartAsync(token);
        }

得到启动时初始化 worker 的调用堆栈:

[2024-04-08T09:07:04.591Z]    at Microsoft.Azure.Functions.Worker.GrpcWorker.StartAsync(CancellationToken token) in C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\src\DotNetWorker.Grpc\GrpcWorker.cs:line 58
[2024-04-08T09:07:04.592Z]    at Microsoft.Azure.Functions.Worker.WorkerHostedService.StartAsync(CancellationToken cancellationToken) in C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\src\DotNetWorker.Core\WorkerHostedService.cs:line 25
[2024-04-08T09:07:04.592Z]    at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
[2024-04-08T09:07:04.593Z]    at Microsoft.Azure.Functions.Worker.WorkerHostedService.StartAsync(CancellationToken cancellationToken)
[2024-04-08T09:07:04.593Z]    at Microsoft.Extensions.Hosting.Internal.Host.<StartAsync>b__15_1(IHostedService service, CancellationToken token)
[2024-04-08T09:07:04.593Z]    at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
[2024-04-08T09:07:04.593Z]    at Microsoft.Extensions.Hosting.Internal.Host.<StartAsync>b__15_1(IHostedService service, CancellationToken token)
[2024-04-08T09:07:04.594Z]    at Microsoft.Extensions.Hosting.Internal.Host.ForeachService[T](IEnumerable`1 services, CancellationToken token, Boolean concurrent, Boolean abortOnFirstException, List`1 exceptions, Func`3 operation)
[2024-04-08T09:07:04.594Z]    at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
[2024-04-08T09:07:04.594Z]    at Microsoft.Extensions.Hosting.Internal.Host.ForeachService[T](IEnumerable`1 services, CancellationToken token, Boolean concurrent, Boolean abortOnFirstException, List`1 exceptions, Func`3 operation)
[2024-04-08T09:07:04.594Z]    at Microsoft.Extensions.Hosting.Internal.Host.StartAsync(CancellationToken cancellationToken)
[2024-04-08T09:07:04.595Z]    at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
[2024-04-08T09:07:04.595Z]    at Microsoft.Extensions.Hosting.Internal.Host.StartAsync(CancellationToken cancellationToken)
[2024-04-08T09:07:04.595Z]    at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.RunAsync(IHost host, CancellationToken token)
[2024-04-08T09:07:04.596Z]    at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
[2024-04-08T09:07:04.596Z]    at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.RunAsync(IHost host, CancellationToken token)
[2024-04-08T09:07:04.596Z]    at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.Run(IHost host)
[2024-04-08T09:07:04.596Z]    at Program.<Main>$(String[] args) in C:\Users\sky\work\code\durabletask\MyDurableFunction1\Program.cs:line 13
    
......
[2024-04-08T09:07:04.598Z] _workerClient is GrpcWorkerClient
[2024-04-08T09:07:04.629Z] Worker process started and initialized.

调用堆栈

GrpcWorker

        public Task StartAsync(CancellationToken token)
        {
          _workerClient = _workerClientFactory.CreateClient(this);
          return _workerClient.StartAsync(token);
        }

GrpcWorkerClient

src\DotNetWorker.Grpc\GrpcWorkerClientFactory.cs 文件中的 GrpcWorkerClient


            public async Task StartAsync(CancellationToken token)
            {
                if (_running)
                {
                    throw new InvalidOperationException($"The client is already running. Multiple calls to {nameof(StartAsync)} are not supported.");
                }

                _running = true;

                var eventStream = _grpcClient.EventStream(cancellationToken: token);

                await SendStartStreamMessageAsync(eventStream.RequestStream);

                _ = StartWriterAsync(eventStream.RequestStream);
                _ = StartReaderAsync(eventStream.ResponseStream);
            }

1.3 - GrpcWorkerClient

DurableTask GrpcWorkerClient

src\DotNetWorker.Grpc\GrpcWorkerClientFactory.cs 文件中的 GrpcWorkerClient

类定义

         private class GrpcWorkerClient : IWorkerClient
        {
            private readonly FunctionRpcClient _grpcClient;
            private readonly GrpcWorkerStartupOptions _startupOptions;
            private readonly ChannelReader<StreamingMessage> _outputReader;
            private readonly ChannelWriter<StreamingMessage> _outputWriter;
            private bool _running;
            private IMessageProcessor? _processor;
           
         }

构造函数

            public GrpcWorkerClient(GrpcHostChannel outputChannel, GrpcWorkerStartupOptions startupOptions, IMessageProcessor processor)
            {
                _startupOptions = startupOptions ?? throw new ArgumentNullException(nameof(startupOptions));
                _processor = processor ?? throw new ArgumentNullException(nameof(processor));

              // 初始化 reader 和 writer,都来自 outputChannel
                _outputReader = outputChannel.Channel.Reader;
                _outputWriter = outputChannel.Channel.Writer;

              // 创建 _grpcClient,FunctionRpcClient 类型
                _grpcClient = CreateClient();
            }

CreateClient() 方法:

private FunctionRpcClient CreateClient()
            {
#if NET5_0_OR_GREATER
                GrpcChannel grpcChannel = GrpcChannel.ForAddress(_startupOptions.HostEndpoint!.AbsoluteUri, new GrpcChannelOptions()
                {
                    MaxReceiveMessageSize = _startupOptions.GrpcMaxMessageLength,
                    MaxSendMessageSize = _startupOptions.GrpcMaxMessageLength,
                    Credentials = ChannelCredentials.Insecure
                });
#else

                var options = new ChannelOption[]
                {
                    new ChannelOption(GrpcCore.ChannelOptions.MaxReceiveMessageLength, _startupOptions.GrpcMaxMessageLength),
                    new ChannelOption(GrpcCore.ChannelOptions.MaxSendMessageLength, _startupOptions.GrpcMaxMessageLength)
                };

                GrpcCore.Channel grpcChannel = new GrpcCore.Channel(_startupOptions.HostEndpoint!.Host, _startupOptions.HostEndpoint.Port, ChannelCredentials.Insecure, options);

#endif
                return new FunctionRpcClient(grpcChannel);
            }
        }

start过程

StartAsync


            public async Task StartAsync(CancellationToken token)
            {
                if (_running)
                {
                    throw new InvalidOperationException($"The client is already running. Multiple calls to {nameof(StartAsync)} are not supported.");
                }

                _running = true;

                var eventStream = _grpcClient.EventStream(cancellationToken: token);

                await SendStartStreamMessageAsync(eventStream.RequestStream);

                _ = StartWriterAsync(eventStream.RequestStream);
                _ = StartReaderAsync(eventStream.ResponseStream);
            }

1.4 - GrpcWorker

DurableTask GrpcWorker

src\DotNetWorker.Grpc\GrpcWorkerClientFactory.cs 文件中的 GrpcWorkerClient

类定义

         private class GrpcWorkerClient : IWorkerClient
        {
            private readonly FunctionRpcClient _grpcClient;
            private readonly GrpcWorkerStartupOptions _startupOptions;
            private readonly ChannelReader<StreamingMessage> _outputReader;
            private readonly ChannelWriter<StreamingMessage> _outputWriter;
            private bool _running;
            private IMessageProcessor? _processor;
           
         }

构造函数

            public GrpcWorkerClient(GrpcHostChannel outputChannel, GrpcWorkerStartupOptions startupOptions, IMessageProcessor processor)
            {
                _startupOptions = startupOptions ?? throw new ArgumentNullException(nameof(startupOptions));
                _processor = processor ?? throw new ArgumentNullException(nameof(processor));

              // 初始化 reader 和 writer,都来自 outputChannel
                _outputReader = outputChannel.Channel.Reader;
                _outputWriter = outputChannel.Channel.Writer;

              // 创建 _grpcClient,FunctionRpcClient 类型
                _grpcClient = CreateClient();
            }

CreateClient() 方法:

private FunctionRpcClient CreateClient()
            {
#if NET5_0_OR_GREATER
                GrpcChannel grpcChannel = GrpcChannel.ForAddress(_startupOptions.HostEndpoint!.AbsoluteUri, new GrpcChannelOptions()
                {
                    MaxReceiveMessageSize = _startupOptions.GrpcMaxMessageLength,
                    MaxSendMessageSize = _startupOptions.GrpcMaxMessageLength,
                    Credentials = ChannelCredentials.Insecure
                });
#else

                var options = new ChannelOption[]
                {
                    new ChannelOption(GrpcCore.ChannelOptions.MaxReceiveMessageLength, _startupOptions.GrpcMaxMessageLength),
                    new ChannelOption(GrpcCore.ChannelOptions.MaxSendMessageLength, _startupOptions.GrpcMaxMessageLength)
                };

                GrpcCore.Channel grpcChannel = new GrpcCore.Channel(_startupOptions.HostEndpoint!.Host, _startupOptions.HostEndpoint.Port, ChannelCredentials.Insecure, options);

#endif
                return new FunctionRpcClient(grpcChannel);
            }
        }

处理消息

ProcessMessageAsync

        Task IMessageProcessor.ProcessMessageAsync(StreamingMessage message)
        {
            // Dispatch and return.
            Task.Run(() => ProcessRequestCoreAsync(message));

            return Task.CompletedTask;
        }

ProcessRequestCoreAsync

private async Task ProcessRequestCoreAsync(StreamingMessage request)
        {
            StreamingMessage responseMessage = new StreamingMessage
            {
                RequestId = request.RequestId
            };

            switch (request.ContentCase)
            {
                case MsgType.InvocationRequest:
                    responseMessage.InvocationResponse = await InvocationRequestHandlerAsync(request.InvocationRequest);
                    break;

                case MsgType.WorkerInitRequest:
                    Console.WriteLine("GrpcWorker received WorkerInitRequest");
                    responseMessage.WorkerInitResponse = WorkerInitRequestHandler(request.WorkerInitRequest, _workerOptions);
                    break;

                case MsgType.WorkerStatusRequest:
                    responseMessage.WorkerStatusResponse = new WorkerStatusResponse();
                    break;

                case MsgType.FunctionsMetadataRequest:
                    responseMessage.FunctionMetadataResponse = await GetFunctionMetadataAsync(request.FunctionsMetadataRequest.FunctionAppDirectory);
                    break;

                case MsgType.WorkerTerminate:
                    WorkerTerminateRequestHandler(request.WorkerTerminate);
                    break;

                case MsgType.FunctionLoadRequest:
                    responseMessage.FunctionLoadResponse = FunctionLoadRequestHandler(request.FunctionLoadRequest, _application, _methodInfoLocator);
                    break;

                case MsgType.FunctionEnvironmentReloadRequest:
                    responseMessage.FunctionEnvironmentReloadResponse = EnvironmentReloadRequestHandler(_workerOptions);
                    break;

                case MsgType.InvocationCancel:
                    InvocationCancelRequestHandler(request.InvocationCancel);
                    break;

                default:
                    // TODO: Trace failure here.
                    return;
            }

            await _workerClient!.SendMessageAsync(responseMessage);
        }

WorkerInitRequest

                case MsgType.WorkerInitRequest:
                    responseMessage.WorkerInitResponse = WorkerInitRequestHandler(request.WorkerInitRequest, _workerOptions);
                    break;

WorkerInitRequestHandler() 方法的实现:

        internal static WorkerInitResponse WorkerInitRequestHandler(WorkerInitRequest request, WorkerOptions workerOptions)
        {
            var response = new WorkerInitResponse
            {
                Result = new StatusResult { Status = StatusResult.Types.Status.Success },
                WorkerVersion = WorkerInformation.Instance.WorkerVersion,
                WorkerMetadata = GetWorkerMetadata()
            };

            response.Capabilities.Add(GetWorkerCapabilities(workerOptions));

            return response;
        }

TBD: WorkerVersion 怎么来的?

InvocationRequest

                case MsgType.InvocationRequest:
                    responseMessage.InvocationResponse = await InvocationRequestHandlerAsync(request.InvocationRequest);
                    break;

InvocationRequestHandlerAsync 方法的实现:

        internal Task<InvocationResponse> InvocationRequestHandlerAsync(InvocationRequest request)
        {
            return _invocationHandler.InvokeAsync(request);
        }

1.5 - FunctionRpcClient

FunctionRpcClient

Azure/azure-functions-dotnet-worker 仓库下的 FunctionRpcClient protobuf 定义。

protobuf

proto文件地址:

protos\azure-functions-language-worker-protobuf\src\proto\FunctionRpc.proto

FunctionRpc service

FunctionRpc.proto  定义了 FunctionRpc 这个 grpc service:

option java_multiple_files = true;
option java_package = "com.microsoft.azure.functions.rpc.messages";
option java_outer_classname = "FunctionProto";
option csharp_namespace = "Microsoft.Azure.Functions.Worker.Grpc.Messages";
option go_package ="github.com/Azure/azure-functions-go-worker/internal/rpc";

package AzureFunctionsRpcMessages;

import "google/protobuf/duration.proto";
import "identity/ClaimsIdentityRpc.proto";
import "shared/NullableTypes.proto";

// Interface exported by the server.
service FunctionRpc {
}

EventStream 方法

只定义了一个 EventStream 方法:


 rpc EventStream (stream StreamingMessage) returns (stream StreamingMessage) {}

request 和 response 都是 stream,类型都是 StreamingMessage。

StreamingMessage

StreamingMessage 除了一个 request_id 用来在 host 和 worker 之间做唯一标识外,就只有一个 oneof content 字段:

message StreamingMessage {
  // Used to identify message between host and worker
  string request_id = 1;

  // Payload of the message
  oneof content {
  	......
  }
}

消息类型还挺多:

  oneof content {

    // Worker initiates stream
    StartStream start_stream = 20;

    // Host sends capabilities/init data to worker
    WorkerInitRequest worker_init_request = 17;
    // Worker responds after initializing with its capabilities & status
    WorkerInitResponse worker_init_response = 16;

    // MESSAGE NOT USED
    // Worker periodically sends empty heartbeat message to host
    WorkerHeartbeat worker_heartbeat = 15;

    // Host sends terminate message to worker.
    // Worker terminates if it can, otherwise host terminates after a grace period
    WorkerTerminate worker_terminate = 14;

    // Host periodically sends status request to the worker
    WorkerStatusRequest worker_status_request = 12;
    WorkerStatusResponse worker_status_response = 13;

    // On file change event, host sends notification to worker
    FileChangeEventRequest file_change_event_request = 6;

    // Worker requests a desired action (restart worker, reload function)
    WorkerActionResponse worker_action_response = 7;

    // Host sends required metadata to worker to load function
    FunctionLoadRequest function_load_request = 8;
    // Worker responds after loading with the load result
    FunctionLoadResponse function_load_response = 9;

    // Host requests a given invocation
    InvocationRequest invocation_request = 4;

    // Worker responds to a given invocation
    InvocationResponse invocation_response = 5;

    // Host sends cancel message to attempt to cancel an invocation.
    // If an invocation is cancelled, host will receive an invocation response with status cancelled.
    InvocationCancel invocation_cancel = 21;

    // Worker logs a message back to the host
    RpcLog rpc_log = 2;

    FunctionEnvironmentReloadRequest function_environment_reload_request = 25;

    FunctionEnvironmentReloadResponse function_environment_reload_response = 26;

    // Ask the worker to close any open shared memory resources for a given invocation
    CloseSharedMemoryResourcesRequest close_shared_memory_resources_request = 27;
    CloseSharedMemoryResourcesResponse close_shared_memory_resources_response = 28;

    // Worker indexing message types
    FunctionsMetadataRequest functions_metadata_request = 29;
    FunctionMetadataResponse function_metadata_response = 30;

    // Host sends required metadata to worker to load functions
    FunctionLoadRequestCollection function_load_request_collection = 31;

    // Host gets the list of function load responses
    FunctionLoadResponseCollection function_load_response_collection = 32;
    
    // Host sends required metadata to worker to warmup the worker
    WorkerWarmupRequest worker_warmup_request = 33;
    
    // Worker responds after warming up with the warmup result
    WorkerWarmupResponse worker_warmup_response = 34;
    
  }

StartStream

// Worker initiates stream
StartStream start_stream = 20;

// Process.Start required info
//   connection details
//   protocol type
//   protocol version

// Worker sends the host information identifying itself
message StartStream {
  // id of the worker
  string worker_id = 2;
}

TBD: 这里可以考虑增加一个 version 字段。

WorkerInitRequest / WorkerInitResponse

    // Host sends capabilities/init data to worker
    WorkerInitRequest worker_init_request = 17;
    // Worker responds after initializing with its capabilities & status
    WorkerInitResponse worker_init_response = 16;
    

// Host requests the worker to initialize itself
message WorkerInitRequest {
  // version of the host sending init request
  string host_version = 1;

  // A map of host supported features/capabilities
  map<string, string> capabilities = 2;

  // inform worker of supported categories and their levels
  // i.e. Worker = Verbose, Function.MyFunc = None
  map<string, RpcLog.Level> log_categories = 3;

  // Full path of worker.config.json location
  string worker_directory = 4;

  // base directory for function app
  string function_app_directory = 5;
}

// Worker responds with the result of initializing itself
message WorkerInitResponse {
  // PROPERTY NOT USED
  // TODO: Remove from protobuf during next breaking change release
  string worker_version = 1;

  // A map of worker supported features/capabilities
  map<string, string> capabilities = 2;

  // Status of the response
  StatusResult result = 3;

  // Worker metadata captured for telemetry purposes
  WorkerMetadata worker_metadata = 4;
}

WorkerHeartbeat

    // MESSAGE NOT USED
    // Worker periodically sends empty heartbeat message to host
    WorkerHeartbeat worker_heartbeat = 15;
    
    // MESSAGE NOT USED
// TODO: Remove from protobuf during next breaking change release
message WorkerHeartbeat {}

WorkerTerminate

    // Host sends terminate message to worker.
    // Worker terminates if it can, otherwise host terminates after a grace period
    WorkerTerminate worker_terminate = 14;

// Warning before killing the process after grace_period
// Worker self terminates ..no response on this
message WorkerTerminate {
  google.protobuf.Duration grace_period = 1;
}

WorkerStatusRequest / WorkerStatusResponse

    // Host periodically sends status request to the worker
    WorkerStatusRequest worker_status_request = 12;
    WorkerStatusResponse worker_status_response = 13;
    
// Used by the host to determine worker health
message WorkerStatusRequest {
}

// Worker responds with status message
// TODO: Add any worker relevant status to response
message WorkerStatusResponse {
}

InvocationRequest / InvocationResponse

    // Host requests a given invocation
    InvocationRequest invocation_request = 4;

    // Worker responds to a given invocation
    InvocationResponse invocation_response = 5;
    
    
    // Host requests worker to invoke a Function
message InvocationRequest {
  // Unique id for each invocation
  string invocation_id = 1;

  // Unique id for each Function
  string function_id = 2;

  // Input bindings (include trigger)
  repeated ParameterBinding input_data = 3;

  // binding metadata from trigger
  map<string, TypedData> trigger_metadata = 4;

  // Populates activityId, tracestate and tags from host
  RpcTraceContext trace_context = 5;

  // Current retry context
  RetryContext retry_context = 6;
}

// Worker responds with status of Invocation
message InvocationResponse {
  // Unique id for invocation
  string invocation_id = 1;

  // Output binding data
  repeated ParameterBinding output_data = 2;

  // data returned from Function (for $return and triggers with return support)
  TypedData return_value = 4;

  // Status of the invocation (success/failure/canceled)
  StatusResult result = 3;
}

WorkerWarmupRequest / WorkerWarmupResponse

    // Host sends required metadata to worker to warmup the worker
    WorkerWarmupRequest worker_warmup_request = 33;
    
    // Worker responds after warming up with the warmup result
    WorkerWarmupResponse worker_warmup_response = 34;
    
    
    message WorkerWarmupRequest {
  // Full path of worker.config.json location
  string worker_directory = 1;
}

message WorkerWarmupResponse {
  StatusResult result = 1;
}

2 - client run orchestrator

DurableTask client 运行 Orchestration 的源码分析

2.1 - 调用堆栈

DurableTask client 运行 Orchestration 的调用堆栈

调用堆栈概况

MyDurableFunction1.dll!Company.Function.HelloOrchestration.RunOrchestrator(Microsoft.DurableTask.TaskOrchestrationContext context) Line 16 (c:\Users\sky\work\code\durabletask\MyDurableFunction1\HelloOrchestration.cs:16)
MyDurableFunction1.dll!MyDurableFunction1.DirectFunctionExecutor.ExecuteAsync(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 32 (GeneratedFunctionExecutor.g.cs:32)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Azure.Functions.Worker.Pipeline.FunctionExecutionMiddleware.Invoke(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 20 (FunctionExecutionMiddleware.cs:20)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Extensions.Hosting.MiddlewareWorkerApplicationBuilderExtensions.UseFunctionExecutionMiddleware.AnonymousMethod__1_2(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 57 (WorkerMiddlewareWorkerApplicationBuilderExtensions.cs:57)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Azure.Functions.Worker.OutputBindings.OutputBindingsMiddleware.Invoke(Microsoft.Azure.Functions.Worker.FunctionContext context, Microsoft.Azure.Functions.Worker.Middleware.FunctionExecutionDelegate next) Line 13 (OutputBindingsMiddleware.cs:13)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Extensions.Hosting.MiddlewareWorkerApplicationBuilderExtensions.UseOutputBindingsMiddleware.AnonymousMethod__3(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 84 (WorkerMiddlewareWorkerApplicationBuilderExtensions.cs:84)
Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore.dll!Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore.FunctionsHttpProxyingMiddleware.Invoke(Microsoft.Azure.Functions.Worker.FunctionContext context, Microsoft.Azure.Functions.Worker.Middleware.FunctionExecutionDelegate next) Line 34 (FunctionsHttpProxyingMiddleware.cs:34)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Extensions.Hosting.MiddlewareWorkerApplicationBuilderExtensions.UseMiddleware.AnonymousMethod__1(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 105 (WorkerMiddlewareWorkerApplicationBuilderExtensions.cs:105)
Microsoft.Azure.Functions.Worker.Extensions.DurableTask.dll!Microsoft.Azure.Functions.Worker.Extensions.DurableTask.FunctionsOrchestrator.EnsureSynchronousExecution(Microsoft.Azure.Functions.Worker.FunctionContext functionContext, Microsoft.Azure.Functions.Worker.Middleware.FunctionExecutionDelegate next, Microsoft.Azure.Functions.Worker.Extensions.DurableTask.FunctionsOrchestrationContext orchestrationContext) Line 72 (c:\Users\sky\work\code\durabletask-fork\azure-functions-durable-extension\src\Worker.Extensions.DurableTask\FunctionsOrchestrator.cs:72)
Microsoft.Azure.Functions.Worker.Extensions.DurableTask.dll!Microsoft.Azure.Functions.Worker.Extensions.DurableTask.FunctionsOrchestrator.RunAsync(Microsoft.DurableTask.TaskOrchestrationContext context, object input) Line 51 (c:\Users\sky\work\code\durabletask-fork\azure-functions-durable-extension\src\Worker.Extensions.DurableTask\FunctionsOrchestrator.cs:51)
Microsoft.DurableTask.Worker.dll!Microsoft.DurableTask.Worker.Shims.TaskOrchestrationShim.Execute(DurableTask.Core.OrchestrationContext innerContext, string rawInput) Line 52 (c:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\src\Worker\Core\Shims\TaskOrchestrationShim.cs:52)
DurableTask.Core.dll!DurableTask.Core.TaskOrchestrationExecutor.ProcessEvent(DurableTask.Core.History.HistoryEvent historyEvent) Line 211 (TaskOrchestrationExecutor.cs:211)
DurableTask.Core.dll!DurableTask.Core.TaskOrchestrationExecutor.ExecuteCore.__ProcessEvents|12_0(System.Collections.Generic.IEnumerable<DurableTask.Core.History.HistoryEvent> events) Line 135 (TaskOrchestrationExecutor.cs:135)
DurableTask.Core.dll!DurableTask.Core.TaskOrchestrationExecutor.ExecuteCore(System.Collections.Generic.IEnumerable<DurableTask.Core.History.HistoryEvent> pastEvents, System.Collections.Generic.IEnumerable<DurableTask.Core.History.HistoryEvent> newEvents) Line 143 (TaskOrchestrationExecutor.cs:143)
DurableTask.Core.dll!DurableTask.Core.TaskOrchestrationExecutor.Execute() Line 93 (TaskOrchestrationExecutor.cs:93)
Microsoft.DurableTask.Worker.Grpc.dll!Microsoft.DurableTask.Worker.Grpc.GrpcOrchestrationRunner.LoadAndRun(string encodedOrchestratorRequest, Microsoft.DurableTask.ITaskOrchestrator implementation, System.IServiceProvider services) Line 113 (c:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\src\Worker\Grpc\GrpcOrchestrationRunner.cs:113)
Microsoft.Azure.Functions.Worker.Extensions.DurableTask.dll!Microsoft.Azure.Functions.Worker.Extensions.DurableTask.DurableTaskFunctionsMiddleware.RunOrchestrationAsync(Microsoft.Azure.Functions.Worker.FunctionContext context, Microsoft.Azure.Functions.Worker.BindingMetadata triggerBinding, Microsoft.Azure.Functions.Worker.Middleware.FunctionExecutionDelegate next) Line 59 (c:\Users\sky\work\code\durabletask-fork\azure-functions-durable-extension\src\Worker.Extensions.DurableTask\DurableTaskFunctionsMiddleware.cs:59)
Microsoft.Azure.Functions.Worker.Extensions.DurableTask.dll!Microsoft.Azure.Functions.Worker.Extensions.DurableTask.DurableTaskFunctionsMiddleware.Invoke(Microsoft.Azure.Functions.Worker.FunctionContext functionContext, Microsoft.Azure.Functions.Worker.Middleware.FunctionExecutionDelegate next) Line 22 (c:\Users\sky\work\code\durabletask-fork\azure-functions-durable-extension\src\Worker.Extensions.DurableTask\DurableTaskFunctionsMiddleware.cs:22)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Extensions.Hosting.MiddlewareWorkerApplicationBuilderExtensions.UseMiddleware.AnonymousMethod__1(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 105 (WorkerMiddlewareWorkerApplicationBuilderExtensions.cs:105)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Azure.Functions.Worker.FunctionsApplication.InvokeFunctionAsync(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 77 (FunctionsApplication.cs:77)
Microsoft.Azure.Functions.Worker.Grpc.dll!Microsoft.Azure.Functions.Worker.Handlers.InvocationHandler.InvokeAsync(Microsoft.Azure.Functions.Worker.Grpc.Messages.InvocationRequest request) Line 88 (InvocationHandler.cs:88)
Microsoft.Azure.Functions.Worker.Grpc.dll!Microsoft.Azure.Functions.Worker.GrpcWorker.InvocationRequestHandlerAsync(Microsoft.Azure.Functions.Worker.Grpc.Messages.InvocationRequest request) Line 122 (GrpcWorker.cs:122)
Microsoft.Azure.Functions.Worker.Grpc.dll!Microsoft.Azure.Functions.Worker.GrpcWorker.ProcessRequestCoreAsync(Microsoft.Azure.Functions.Worker.Grpc.Messages.StreamingMessage request) Line 81 (GrpcWorker.cs:81)
Microsoft.Azure.Functions.Worker.Grpc.dll!Microsoft.Azure.Functions.Worker.GrpcWorker.Microsoft.Azure.Functions.Worker.Grpc.IMessageProcessor.ProcessMessageAsync.AnonymousMethod__0() Line 66 (GrpcWorker.cs:66)
System.Private.CoreLib.dll!System.Threading.Tasks.Task<System.Threading.Tasks.Task>.InnerInvoke() (Unknown Source:0)
System.Private.CoreLib.dll!System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(System.Threading.Thread threadPoolThread, System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state) (Unknown Source:0)
System.Private.CoreLib.dll!System.Threading.Tasks.Task.ExecuteWithThreadLocal(ref System.Threading.Tasks.Task currentTaskSlot, System.Threading.Thread threadPoolThread) (Unknown Source:0)
System.Private.CoreLib.dll!System.Threading.ThreadPoolWorkQueue.Dispatch() (Unknown Source:0)
System.Private.CoreLib.dll!System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart() (Unknown Source:0)
[Native to Managed Transition] (Unknown Source:0)

Azure function dotnet worker

GrpcWorkerClientFactory

            private async Task StartReaderAsync(IAsyncStreamReader<StreamingMessage> responseStream)
            {
                while (await responseStream.MoveNext())
                {
                    await _processor!.ProcessMessageAsync(responseStream.Current);
                }
            }

这里的 _processor 实现的 Microsoft.Azure.Functions.Worker.GrpcWorker

GrpcWorker

grpc worker 收到 grpc 消息之后,调用 ProcessRequestCoreAsync() 方法进行处理,注意这里是异步:

        Task IMessageProcessor.ProcessMessageAsync(StreamingMessage message)
        {
            // Dispatch and return.
            Task.Run(() => ProcessRequestCoreAsync(message));

            return Task.CompletedTask;
        }

ProcessRequestCoreAsync() 方法的实现:

        private async Task ProcessRequestCoreAsync(StreamingMessage request)
        {
            StreamingMessage responseMessage = new StreamingMessage
            {
                RequestId = request.RequestId
            };

            switch (request.ContentCase)
            {
                case MsgType.InvocationRequest:
                    // 会走到这里
                    responseMessage.InvocationResponse = await InvocationRequestHandlerAsync(request.InvocationRequest);
                    break;

                case MsgType.WorkerInitRequest:
                    responseMessage.WorkerInitResponse = WorkerInitRequestHandler(request.WorkerInitRequest, _workerOptions);
                    break;

                case MsgType.WorkerStatusRequest:
                    responseMessage.WorkerStatusResponse = new WorkerStatusResponse();
                    break;

                case MsgType.FunctionsMetadataRequest:
                    responseMessage.FunctionMetadataResponse = await GetFunctionMetadataAsync(request.FunctionsMetadataRequest.FunctionAppDirectory);
                    break;

                case MsgType.WorkerTerminate:
                    WorkerTerminateRequestHandler(request.WorkerTerminate);
                    break;

                case MsgType.FunctionLoadRequest:
                    responseMessage.FunctionLoadResponse = FunctionLoadRequestHandler(request.FunctionLoadRequest, _application, _methodInfoLocator);
                    break;

                case MsgType.FunctionEnvironmentReloadRequest:
                    responseMessage.FunctionEnvironmentReloadResponse = EnvironmentReloadRequestHandler(_workerOptions);
                    break;

                case MsgType.InvocationCancel:
                    InvocationCancelRequestHandler(request.InvocationCancel);
                    break;

                default:
                    // TODO: Trace failure here.
                    return;
            }

            await _workerClient!.SendMessageAsync(responseMessage);
        }

InvocationRequestHandlerAsync() 方法

        internal Task<InvocationResponse> InvocationRequestHandlerAsync(InvocationRequest request)
        {
            return _invocationHandler.InvokeAsync(request);
        }

这里的 _invocationHandler 的定义类型是 IInvocationHandler,实现是 Microsoft.Azure.Functions.Worker.Handlers.InvocationHandler

InvocationHandler

代码在 azure-functions-dotnet-worker 仓库下的 src\DotNetWorker.Grpc\Handlers\InvocationHandler.cs :

        public async Task<InvocationResponse> InvokeAsync(InvocationRequest request)
        {
            using CancellationTokenSource cancellationTokenSource = new();
            FunctionContext? context = null;
            InvocationResponse response = new()
            {
                InvocationId = request.InvocationId,
                Result = new StatusResult()
            };

            if (!_inflightInvocations.TryAdd(request.InvocationId, cancellationTokenSource))
            {
                var exception = new InvalidOperationException("Unable to track CancellationTokenSource");
                response.Result.Status = StatusResult.Types.Status.Failure;
                response.Result.Exception = exception.ToRpcException();

                return response;
            }

            try
            {
                var invocation = new GrpcFunctionInvocation(request);

                IInvocationFeatures invocationFeatures = _invocationFeaturesFactory.Create();
                invocationFeatures.Set<FunctionInvocation>(invocation);
                invocationFeatures.Set<IExecutionRetryFeature>(invocation);

                context = _application.CreateContext(invocationFeatures, cancellationTokenSource.Token);
                invocationFeatures.Set<IFunctionBindingsFeature>(new GrpcFunctionBindingsFeature(context, request, _outputBindingsInfoProvider));

                if (_inputConversionFeatureProvider.TryCreate(typeof(DefaultInputConversionFeature), out var conversion))
                {
                    invocationFeatures.Set<IInputConversionFeature>(conversion!);
                }

                // 进入这里
                await _application.InvokeFunctionAsync(context);

                var serializer = _workerOptions.Serializer!;
                var functionBindings = context.GetBindings();

                foreach (var binding in functionBindings.OutputBindingData)
                {
                    var parameterBinding = new ParameterBinding
                    {
                        Name = binding.Key
                    };

                    if (binding.Value is not null)
                    {
                        parameterBinding.Data = await binding.Value.ToRpcAsync(serializer);
                    }

                    response.OutputData.Add(parameterBinding);
                }

                if (functionBindings.InvocationResult is not null)
                {
                    TypedData? returnVal = await functionBindings.InvocationResult.ToRpcAsync(serializer);
                    response.ReturnValue = returnVal;
                }

                response.Result.Status = StatusResult.Types.Status.Success;
            }
            catch (Exception ex)
            {
                response.Result.Exception = _workerOptions.EnableUserCodeException ? ex.ToUserRpcException() : ex.ToRpcException();
                response.Result.Status = StatusResult.Types.Status.Failure;

                if (ex.InnerException is TaskCanceledException or OperationCanceledException)
                {
                    response.Result.Status = StatusResult.Types.Status.Cancelled;
                }
            }
            finally
            {
                _inflightInvocations.TryRemove(request.InvocationId, out var cts);

                if (context is IAsyncDisposable asyncContext)
                {
                    await asyncContext.DisposeAsync();
                }

                (context as IDisposable)?.Dispose();
            }

            return response;
        }

_application 的定义类型是IFunctionsApplication ,实际实现是 Microsoft.Azure.Functions.Worker.FunctionsApplication

FunctionsApplication

代码在azure-functions-dotnet-worker 仓库下的 src\DotNetWorker.Core\FunctionsApplication.cs

		public async Task InvokeFunctionAsync(FunctionContext context)
        {
            var scope = new FunctionInvocationScope(context.FunctionDefinition.Name, context.InvocationId);

            using var logScope = _logger.BeginScope(scope);
            using Activity? invokeActivity = _functionActivitySourceFactory.StartInvoke(context);

            try
            {
                // 进入这里
                await _functionExecutionDelegate(context);
            }
            catch (Exception ex)
            {
                invokeActivity?.SetStatus(ActivityStatusCode.Error, ex.Message);

                Log.InvocationError(_logger, context.FunctionDefinition.Name, context.InvocationId, ex);

                throw;
            }
        }

_functionExecutionDelegate 的实现是 Microsoft.Azure.Functions.Worker.Middleware.FunctionExecutionDelegate

MiddlewareWorkerApplicationBuilderExtensions

        public static IFunctionsWorkerApplicationBuilder UseMiddleware<T>(this IFunctionsWorkerApplicationBuilder builder)
            where T : class, IFunctionsWorkerMiddleware
        {
            builder.Services.AddSingleton<T>();

            builder.Use(next =>
            {
                return context =>
                {
                    var middleware = context.InstanceServices.GetRequiredService<T>();

                    return middleware.Invoke(context, next);
                };
            });

            return builder;
        }

这里的 middleware 实现是 DurableTaskFunctionsMiddleware

Azure functions durable extension

DurableTaskFunctionsMiddleware

    public Task Invoke(FunctionContext functionContext, FunctionExecutionDelegate next)
    {
        if (IsOrchestrationTrigger(functionContext, out BindingMetadata? triggerBinding))
        {
            // 代码进入这里
            return RunOrchestrationAsync(functionContext, triggerBinding, next);
        }

        if (IsEntityTrigger(functionContext, out triggerBinding))
        {
            return RunEntityAsync(functionContext, triggerBinding, next);
        }

        return next(functionContext);
    }

RunOrchestrationAsync() 方法的实现:

    static async Task RunOrchestrationAsync(
        FunctionContext context, BindingMetadata triggerBinding, FunctionExecutionDelegate next)
    {
        InputBindingData<object> triggerInputData = await context.BindInputAsync<object>(triggerBinding);
        if (triggerInputData?.Value is not string encodedOrchestratorState)
        {
            throw new InvalidOperationException("Orchestration history state was either missing from the input or not a string value.");
        }

        FunctionsOrchestrator orchestrator = new(context, next, triggerInputData);
        string orchestratorOutput = GrpcOrchestrationRunner.LoadAndRun(
            encodedOrchestratorState, orchestrator, context.InstanceServices);

        // Send the encoded orchestrator output as the return value seen by the functions host extension
        context.GetInvocationResult().Value = orchestratorOutput;
    }

GrpcOrchestrationRunner

durabletask-dotnet 仓库下的 \src\Worker\Grpc\GrpcOrchestrationRunner.cs

    public static string LoadAndRun(
        string encodedOrchestratorRequest,
        ITaskOrchestrator implementation,
        IServiceProvider? services = null)
    {
        Check.NotNullOrEmpty(encodedOrchestratorRequest);
        Check.NotNull(implementation);

        P.OrchestratorRequest request = P.OrchestratorRequest.Parser.Base64Decode<P.OrchestratorRequest>(
            encodedOrchestratorRequest);

        List<HistoryEvent> pastEvents = request.PastEvents.Select(ProtoUtils.ConvertHistoryEvent).ToList();
        IEnumerable<HistoryEvent> newEvents = request.NewEvents.Select(ProtoUtils.ConvertHistoryEvent);

        // Re-construct the orchestration state from the history.
        // New events must be added using the AddEvent method.
        OrchestrationRuntimeState runtimeState = new(pastEvents);
        foreach (HistoryEvent newEvent in newEvents)
        {
            runtimeState.AddEvent(newEvent);
        }

        TaskName orchestratorName = new(runtimeState.Name);
        ParentOrchestrationInstance? parent = runtimeState.ParentInstance is ParentInstance p
            ? new(new(p.Name), p.OrchestrationInstance.InstanceId)
            : null;

        DurableTaskShimFactory factory = services is null
            ? DurableTaskShimFactory.Default
            : ActivatorUtilities.GetServiceOrCreateInstance<DurableTaskShimFactory>(services);
        TaskOrchestration shim = factory.CreateOrchestration(orchestratorName, implementation, parent);
        TaskOrchestrationExecutor executor = new(runtimeState, shim, BehaviorOnContinueAsNew.Carryover, request.EntityParameters.ToCore(), ErrorPropagationMode.UseFailureDetails);
        // 代码进入这里
        OrchestratorExecutionResult result = executor.Execute();

        P.OrchestratorResponse response = ProtoUtils.ConstructOrchestratorResponse(
            request.InstanceId,
            result.CustomStatus,
            result.Actions);
        byte[] responseBytes = response.ToByteArray();
        return Convert.ToBase64String(responseBytes);
    }

TaskOrchestrationExecutor

DurableTask.Core.TaskOrchestrationExecutor 这个类在 Azure/durabletask 项目中

        public OrchestratorExecutionResult Execute()
        {
            return this.ExecuteCore(
                pastEvents: this.orchestrationRuntimeState.PastEvents,
                newEvents: this.orchestrationRuntimeState.NewEvents);
        }

第一次执行时,PastEvents 为空,NewEvents 里面有两个 event:

  • DurableTask.Core.History.OrchestratorStartedEvent
  • DurableTask.Core.History.ExecutionStartedEvent

ExecuteCore() 方法的实现:

       OrchestratorExecutionResult ExecuteCore(IEnumerable<HistoryEvent> pastEvents, IEnumerable<HistoryEvent> newEvents)
        {
            SynchronizationContext prevCtx = SynchronizationContext.Current;

            try
            {
                SynchronizationContext syncCtx = new TaskOrchestrationSynchronizationContext(this.decisionScheduler);
                SynchronizationContext.SetSynchronizationContext(syncCtx);
                OrchestrationContext.IsOrchestratorThread = true;

                try
                {
                    void ProcessEvents(IEnumerable<HistoryEvent> events)
                    {
                        foreach (HistoryEvent historyEvent in events)
                        {
                            if (historyEvent.EventType == EventType.OrchestratorStarted)
                            {
                                var decisionStartedEvent = (OrchestratorStartedEvent)historyEvent;
                                this.context.CurrentUtcDateTime = decisionStartedEvent.Timestamp;
                                continue;
                            }

                            // 进入这里
                            this.ProcessEvent(historyEvent);
                            historyEvent.IsPlayed = true;
                        }
                    }

                    // Replay the old history to rebuild the local state of the orchestration.
                    // TODO: Log a verbose message indicating that the replay has started (include event count?)
                    this.context.IsReplaying = true;
                    ProcessEvents(pastEvents);

                    // Play the newly arrived events to determine the next action to take.
                    // TODO: Log a verbose message indicating that new events are being processed (include event count?)
                    this.context.IsReplaying = false;
                    // 第一次调用会进去这里,IsReplaying 设置为 false
                    ProcessEvents(newEvents);

                    // check if workflow is completed after this replay
                    // TODO: Create a setting that allows orchestrations to complete when the orchestrator
                    //       function completes, even if there are open tasks.
                    if (!this.context.HasOpenTasks)
                    {
                        if (this.result!.IsCompleted)
                        {
                            if (this.result.IsFaulted)
                            {
                                Exception? exception = this.result.Exception?.InnerExceptions.FirstOrDefault();
                                Debug.Assert(exception != null);

                                if (Utils.IsExecutionAborting(exception!))
                                {
                                    // Let this exception propagate out to be handled by the dispatcher
                                    ExceptionDispatchInfo.Capture(exception).Throw();
                                }
                                
                                this.context.FailOrchestration(exception);
                            }
                            else
                            {
                                this.context.CompleteOrchestration(this.result.Result);
                            }
                        }

                        // TODO: It is an error if result is not completed when all OpenTasks are done.
                        // Throw an exception in that case.
                    }
                }
                catch (NonDeterministicOrchestrationException exception)
                {
                    this.context.FailOrchestration(exception);
                }

                return new OrchestratorExecutionResult
                {
                    Actions = this.context.OrchestratorActions,
                    CustomStatus = this.taskOrchestration.GetStatus(),
                };
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(prevCtx);
                OrchestrationContext.IsOrchestratorThread = false;
            }
        }

ProcessEvent() 方法

void ProcessEvent(HistoryEvent historyEvent)
        {
            bool overrideSuspension = historyEvent.EventType == EventType.ExecutionResumed || historyEvent.EventType == EventType.ExecutionTerminated;
            if (this.context.IsSuspended && !overrideSuspension)
            {
                this.context.HandleEventWhileSuspended(historyEvent);
            }
            else
            {
                switch (historyEvent.EventType)
                {
                    case EventType.ExecutionStarted:
                        // 执行这里的代码
                        var executionStartedEvent = (ExecutionStartedEvent)historyEvent;
                        this.result = this.taskOrchestration.Execute(this.context, executionStartedEvent.Input);
                        break;
                    case EventType.ExecutionTerminated:
                        this.context.HandleExecutionTerminatedEvent((ExecutionTerminatedEvent)historyEvent);
                        break;
                    case EventType.TaskScheduled:
                        this.context.HandleTaskScheduledEvent((TaskScheduledEvent)historyEvent);
                        break;
                    case EventType.TaskCompleted:
                        this.context.HandleTaskCompletedEvent((TaskCompletedEvent)historyEvent);
                        break;
                    case EventType.TaskFailed:
                        this.context.HandleTaskFailedEvent((TaskFailedEvent)historyEvent);
                        break;
                    case EventType.SubOrchestrationInstanceCreated:
                        this.context.HandleSubOrchestrationCreatedEvent((SubOrchestrationInstanceCreatedEvent)historyEvent);
                        break;
                    case EventType.SubOrchestrationInstanceCompleted:
                        this.context.HandleSubOrchestrationInstanceCompletedEvent(
                            (SubOrchestrationInstanceCompletedEvent)historyEvent);
                        break;
                    case EventType.SubOrchestrationInstanceFailed:
                        this.context.HandleSubOrchestrationInstanceFailedEvent((SubOrchestrationInstanceFailedEvent)historyEvent);
                        break;
                    case EventType.TimerCreated:
                        this.context.HandleTimerCreatedEvent((TimerCreatedEvent)historyEvent);
                        break;
                    case EventType.TimerFired:
                        this.context.HandleTimerFiredEvent((TimerFiredEvent)historyEvent);
                        break;
                    case EventType.EventSent:
                        this.context.HandleEventSentEvent((EventSentEvent)historyEvent);
                        break;
                    case EventType.EventRaised:
                        this.context.HandleEventRaisedEvent((EventRaisedEvent)historyEvent, this.skipCarryOverEvents, this.taskOrchestration);
                        break;
                    case EventType.ExecutionSuspended:
                        this.context.HandleExecutionSuspendedEvent((ExecutionSuspendedEvent)historyEvent);
                        break;
                    case EventType.ExecutionResumed:
                        this.context.HandleExecutionResumedEvent((ExecutionResumedEvent)historyEvent, ProcessEvent);
                        break;
                }
            }
        }

versioning TODO: 这里的 ExecutionStartedEvent 的 version 字段暂时为空,后面需要更新。

TaskOrchestrationShim

    public override async Task<string?> Execute(OrchestrationContext innerContext, string rawInput)
    {
        Check.NotNull(innerContext);
        JsonDataConverterShim converterShim = new(this.invocationContext.Options.DataConverter);
        innerContext.MessageDataConverter = converterShim;
        innerContext.ErrorDataConverter = converterShim;

        object? input = this.DataConverter.Deserialize(rawInput, this.implementation.InputType);
        this.wrapperContext = new(innerContext, this.invocationContext, input);

        try
        {
            object? output = await this.implementation.RunAsync(this.wrapperContext, input);

            // Return the output (if any) as a serialized string.
            return this.DataConverter.Serialize(output);
        }
        finally
        {
            // if user code crashed inside a critical section, or did not exit it, do that now
            this.wrapperContext.ExitCriticalSectionIfNeeded();
        }
    }

versioning TODO: 这里的 OrchestrationContext 的 OrchestrationInstance 字段只包含 InstanceId 和 ExecutionId,需要增加一个 InstanceVersion 字段,其值应该从 ExecutionStartedEvent 的 version 字段中获取。

wrapperContext 的实现是 TaskOrchestrationContextWrapper

versioning TODO: TaskOrchestrationContextWrapper 需要增加一个 InstanceVersion 字段,其值从 this.innerContext.OrchestrationInstance.InstanceVersion 中获取

this.implementation 的实现是 Microsoft.Azure.Functions.Worker.Extensions.DurableTask.FunctionsOrchestrator

FunctionsOrchestrator

public async Task<object?> RunAsync(TaskOrchestrationContext context, object? input)
    {
        // Set the function input to be the orchestration context wrapped in our own object so that we can
        // intercept any of the calls and inject our own logic or tracking.
        FunctionsOrchestrationContext wrapperContext = new(context, this.functionContext);
        this.contextBinding.Value = wrapperContext;
        this.inputContext.PrepareInput(input);

        try
        {
            // This method will advance to the next middleware and throw if it detects an asynchronous execution.
            await EnsureSynchronousExecution(this.functionContext, this.next, wrapperContext);
        }
        catch (Exception ex)
        {
            this.functionContext.GetLogger<FunctionsOrchestrator>().LogError(
                ex,
                "An error occurred while executing the orchestrator function '{FunctionName}'.",
                this.functionContext.FunctionDefinition.Name);
            throw;
        }

        // Set the raw function output as the orchestrator output
        object? functionOutput = this.functionContext.GetInvocationResult().Value;
        return functionOutput;
    }

EnsureSynchronousExecution的实现:

    private static async Task EnsureSynchronousExecution(
        FunctionContext functionContext,
        FunctionExecutionDelegate next,
        FunctionsOrchestrationContext orchestrationContext)
    {
        Task orchestratorTask = next(functionContext);
        if (!orchestratorTask.IsCompleted && !orchestrationContext.IsAccessed)
        {
            // If the middleware returns before the orchestrator function's context object was accessed and before
            // it completes its execution, then we know that either some middleware component went async or that the
            // orchestrator function did some illegal await as its very first action.
            throw new InvalidOperationException(Constants.IllegalAwaitErrorMessage);
        }

        await orchestratorTask;

        // This will throw if either the orchestrator performed an illegal await or if some middleware ahead of this
        // one performed some illegal await.
        orchestrationContext.ThrowIfIllegalAccess();
    }

next 函数的实现是在前面定义的:

                return context =>
                {
                    var middleware = context.InstanceServices.GetRequiredService<T>();

                    return middleware.Invoke(context, next);
                };

Azure-functions-dotnet-worker

FunctionExecutionMiddleware

public async Task Invoke(FunctionContext context, FunctionExecutionDelegate next)
        {
            // Only use the coordinator for HttpTriggers
            if (!_isHttpTrigger.GetOrAdd(context.FunctionId, static (_, c) => IsHttpTriggerFunction(c), context))
            {
                await next(context);
                return;
            }

            var invocationId = context.InvocationId;

            // this call will block until the ASP.NET middleware pipeline has signaled that it's ready to run the function
            var httpContext = await _coordinator.SetFunctionContextAsync(invocationId, context);

            AddHttpContextToFunctionContext(context, httpContext);

            // Register additional context features
            context.Features.Set<IFromBodyConversionFeature>(FromBodyConverstionFeature.Instance);

            await next(context);

            var invocationResult = context.GetInvocationResult();

            if (invocationResult?.Value is IActionResult actionResult)
            {
                ActionContext actionContext = new ActionContext(httpContext, httpContext.GetRouteData(), new ActionDescriptor());

                await actionResult.ExecuteResultAsync(actionContext);
            }
            else if (invocationResult?.Value is AspNetCoreHttpResponseData)
            {
                // The AspNetCoreHttpResponseData implementation is
                // simply a wrapper over the underlying HttpResponse and
                // all APIs manipulate the request.
                // There's no need to return this result as no additional
                // processing is required.
                invocationResult.Value = null;
            }

            // allows asp.net middleware to continue
            _coordinator.CompleteFunctionInvocation(invocationId);
        }
        public Task Invoke(FunctionContext context)
        {
            return _functionExecutor.ExecuteAsync(context).AsTask();
        }

这里的 _functionExecutor 的实现是 MyDurableFunction1.DirectFunctionExecutor

OutputBindingsMiddleware

        public static async Task Invoke(FunctionContext context, FunctionExecutionDelegate next)
        {
            await next(context);

            AddOutputBindings(context);
        }

Work.Sdk.Generator.GeneratedFunctionExecutor

被 GeneratedFunctionExecutor.g.cs 调用:

        public async ValueTask ExecuteAsync(FunctionContext context)
        {
            var inputBindingFeature = context.Features.Get<IFunctionInputBindingFeature>();
            var inputBindingResult = await inputBindingFeature.BindFunctionInputAsync(context);
            var inputArguments = inputBindingResult.Values;

            if (string.Equals(context.FunctionDefinition.EntryPoint, "Company.Function.HelloOrchestration.RunOrchestrator", StringComparison.Ordinal))
            {
                context.GetInvocationResult().Value = await global::Company.Function.HelloOrchestration.RunOrchestrator((global::Microsoft.DurableTask.TaskOrchestrationContext)inputArguments[0]);
            }
            else if (string.Equals(context.FunctionDefinition.EntryPoint, "Company.Function.HelloOrchestration.SayHello", StringComparison.Ordinal))
            {
                context.GetInvocationResult().Value = global::Company.Function.HelloOrchestration.SayHello((string)inputArguments[0], (global::Microsoft.Azure.Functions.Worker.FunctionContext)inputArguments[1]);
            }
            else if (string.Equals(context.FunctionDefinition.EntryPoint, "Company.Function.HelloOrchestration.HttpStart", StringComparison.Ordinal))
            {
                context.GetInvocationResult().Value = await global::Company.Function.HelloOrchestration.HttpStart((global::Microsoft.Azure.Functions.Worker.Http.HttpRequestData)inputArguments[0], (global::Microsoft.DurableTask.Client.DurableTaskClient)inputArguments[1], (global::Microsoft.Azure.Functions.Worker.FunctionContext)inputArguments[2]);
            }
        }

通过检查 context.FunctionDefinition.EntryPoint 的值,如果为以下值时,则分别调用对应的 function:

context.FunctionDefinition.EntryPoint 的值 function funciton source code
“Company.Function.HelloOrchestration.SayHello” HelloOrchestration.RunOrchestrator() [Function(nameof(HelloOrchestration))]
“Company.Function.HelloOrchestration.RunOrchestrator” HelloOrchestration.SayHello() [Function(nameof(SayHello))]
“Company.Function.HelloOrchestration.HttpStart” HelloOrchestration.HttpStart() [Function(“HelloOrchestration_HttpStart”)]

其中,HttpStart() function 是用来接受 http 请求然后出发 Schedule New Orchestration Instance 操作的。

之后 Orchestration Engine 就会启动 Orchestration ,然后 RunOrchestrator() 方法被执行。

context.GetInvocationResult().Value = await global::Company.Function.HelloOrchestration.RunOrchestrator((global::Microsoft.DurableTask.TaskOrchestrationContext)inputArguments[0]);

Customer Code

HelloOrchestration function

以 quickstart HelloOrchestration.cs 为例:

        [Function(nameof(HelloOrchestration))]
        public static async Task<List<string>> RunOrchestrator(
            [OrchestrationTrigger("1.5.6")] TaskOrchestrationContext context)
        {
           var instanceId = context.InstanceId;
           var InstanceVersion = context.InstanceVersion;
            ......
        }