这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

worker initial

DurableTask client 初始化的源码分析

1 - 过时的初始化

DurableTask worker 已经过时的初始化过程

GrpcDurableTaskWorker

仓库 durabletask-dotnet 下的文件 src\Worker\Grpc\GrpcDurableTaskWorker.Processor.cs

 public async Task ExecuteAsync(CancellationToken cancellation)
        {
            while (!cancellation.IsCancellationRequested)
            {
                try
                {
                    AsyncServerStreamingCall<P.WorkItem> stream = await this.ConnectAsync(cancellation);
                    await this.ProcessWorkItemsAsync(stream, cancellation);
                }
                ......
            }
        }

ConnectAsync() 方法中会调用 grpc protobuf 文件中定义的 GetWorkItems() 方法:

        async Task<AsyncServerStreamingCall<P.WorkItem>> ConnectAsync(CancellationToken cancellation)
        {
            await this.sidecar!.HelloAsync(EmptyMessage, cancellationToken: cancellation);
            this.Logger.EstablishedWorkItemConnection();

            Console.WriteLine("********GrpcDurableTaskWorker call GetWorkItems()********");

            // Get the stream for receiving work-items
            return this.sidecar!.GetWorkItems(new P.GetWorkItemsRequest(), cancellationToken: cancellation);
        }

这本该是 worker 正常的初始化流程,但现在已经被废弃。

备注:记录一下避免误解,事实上我被耽误了很多时间在这里。

2 - 调用堆栈

DurableTask client 启动新的实例时的调用堆栈

调用堆栈概况

azure-functions-dotnet-worker 仓库下的 src\DotNetWorker.Grpc\GrpcWorker.cs 中增加日志打印:

        public Task StartAsync(CancellationToken token)
        {
            Console.WriteLine(new System.Diagnostics.StackTrace(true));

            _workerClient = _workerClientFactory.CreateClient(this);

            Console.WriteLine("_workerClient is " + _workerClient.GetType().Name);

            return _workerClient.StartAsync(token);
        }

得到启动时初始化 worker 的调用堆栈:

[2024-04-08T09:07:04.591Z]    at Microsoft.Azure.Functions.Worker.GrpcWorker.StartAsync(CancellationToken token) in C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\src\DotNetWorker.Grpc\GrpcWorker.cs:line 58
[2024-04-08T09:07:04.592Z]    at Microsoft.Azure.Functions.Worker.WorkerHostedService.StartAsync(CancellationToken cancellationToken) in C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\src\DotNetWorker.Core\WorkerHostedService.cs:line 25
[2024-04-08T09:07:04.592Z]    at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
[2024-04-08T09:07:04.593Z]    at Microsoft.Azure.Functions.Worker.WorkerHostedService.StartAsync(CancellationToken cancellationToken)
[2024-04-08T09:07:04.593Z]    at Microsoft.Extensions.Hosting.Internal.Host.<StartAsync>b__15_1(IHostedService service, CancellationToken token)
[2024-04-08T09:07:04.593Z]    at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
[2024-04-08T09:07:04.593Z]    at Microsoft.Extensions.Hosting.Internal.Host.<StartAsync>b__15_1(IHostedService service, CancellationToken token)
[2024-04-08T09:07:04.594Z]    at Microsoft.Extensions.Hosting.Internal.Host.ForeachService[T](IEnumerable`1 services, CancellationToken token, Boolean concurrent, Boolean abortOnFirstException, List`1 exceptions, Func`3 operation)
[2024-04-08T09:07:04.594Z]    at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
[2024-04-08T09:07:04.594Z]    at Microsoft.Extensions.Hosting.Internal.Host.ForeachService[T](IEnumerable`1 services, CancellationToken token, Boolean concurrent, Boolean abortOnFirstException, List`1 exceptions, Func`3 operation)
[2024-04-08T09:07:04.594Z]    at Microsoft.Extensions.Hosting.Internal.Host.StartAsync(CancellationToken cancellationToken)
[2024-04-08T09:07:04.595Z]    at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
[2024-04-08T09:07:04.595Z]    at Microsoft.Extensions.Hosting.Internal.Host.StartAsync(CancellationToken cancellationToken)
[2024-04-08T09:07:04.595Z]    at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.RunAsync(IHost host, CancellationToken token)
[2024-04-08T09:07:04.596Z]    at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
[2024-04-08T09:07:04.596Z]    at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.RunAsync(IHost host, CancellationToken token)
[2024-04-08T09:07:04.596Z]    at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.Run(IHost host)
[2024-04-08T09:07:04.596Z]    at Program.<Main>$(String[] args) in C:\Users\sky\work\code\durabletask\MyDurableFunction1\Program.cs:line 13
    
......
[2024-04-08T09:07:04.598Z] _workerClient is GrpcWorkerClient
[2024-04-08T09:07:04.629Z] Worker process started and initialized.

调用堆栈

GrpcWorker

        public Task StartAsync(CancellationToken token)
        {
          _workerClient = _workerClientFactory.CreateClient(this);
          return _workerClient.StartAsync(token);
        }

GrpcWorkerClient

src\DotNetWorker.Grpc\GrpcWorkerClientFactory.cs 文件中的 GrpcWorkerClient


            public async Task StartAsync(CancellationToken token)
            {
                if (_running)
                {
                    throw new InvalidOperationException($"The client is already running. Multiple calls to {nameof(StartAsync)} are not supported.");
                }

                _running = true;

                var eventStream = _grpcClient.EventStream(cancellationToken: token);

                await SendStartStreamMessageAsync(eventStream.RequestStream);

                _ = StartWriterAsync(eventStream.RequestStream);
                _ = StartReaderAsync(eventStream.ResponseStream);
            }

3 - GrpcWorkerClient

DurableTask GrpcWorkerClient

src\DotNetWorker.Grpc\GrpcWorkerClientFactory.cs 文件中的 GrpcWorkerClient

类定义

         private class GrpcWorkerClient : IWorkerClient
        {
            private readonly FunctionRpcClient _grpcClient;
            private readonly GrpcWorkerStartupOptions _startupOptions;
            private readonly ChannelReader<StreamingMessage> _outputReader;
            private readonly ChannelWriter<StreamingMessage> _outputWriter;
            private bool _running;
            private IMessageProcessor? _processor;
           
         }

构造函数

            public GrpcWorkerClient(GrpcHostChannel outputChannel, GrpcWorkerStartupOptions startupOptions, IMessageProcessor processor)
            {
                _startupOptions = startupOptions ?? throw new ArgumentNullException(nameof(startupOptions));
                _processor = processor ?? throw new ArgumentNullException(nameof(processor));

              // 初始化 reader 和 writer,都来自 outputChannel
                _outputReader = outputChannel.Channel.Reader;
                _outputWriter = outputChannel.Channel.Writer;

              // 创建 _grpcClient,FunctionRpcClient 类型
                _grpcClient = CreateClient();
            }

CreateClient() 方法:

private FunctionRpcClient CreateClient()
            {
#if NET5_0_OR_GREATER
                GrpcChannel grpcChannel = GrpcChannel.ForAddress(_startupOptions.HostEndpoint!.AbsoluteUri, new GrpcChannelOptions()
                {
                    MaxReceiveMessageSize = _startupOptions.GrpcMaxMessageLength,
                    MaxSendMessageSize = _startupOptions.GrpcMaxMessageLength,
                    Credentials = ChannelCredentials.Insecure
                });
#else

                var options = new ChannelOption[]
                {
                    new ChannelOption(GrpcCore.ChannelOptions.MaxReceiveMessageLength, _startupOptions.GrpcMaxMessageLength),
                    new ChannelOption(GrpcCore.ChannelOptions.MaxSendMessageLength, _startupOptions.GrpcMaxMessageLength)
                };

                GrpcCore.Channel grpcChannel = new GrpcCore.Channel(_startupOptions.HostEndpoint!.Host, _startupOptions.HostEndpoint.Port, ChannelCredentials.Insecure, options);

#endif
                return new FunctionRpcClient(grpcChannel);
            }
        }

start过程

StartAsync


            public async Task StartAsync(CancellationToken token)
            {
                if (_running)
                {
                    throw new InvalidOperationException($"The client is already running. Multiple calls to {nameof(StartAsync)} are not supported.");
                }

                _running = true;

                var eventStream = _grpcClient.EventStream(cancellationToken: token);

                await SendStartStreamMessageAsync(eventStream.RequestStream);

                _ = StartWriterAsync(eventStream.RequestStream);
                _ = StartReaderAsync(eventStream.ResponseStream);
            }

4 - GrpcWorker

DurableTask GrpcWorker

src\DotNetWorker.Grpc\GrpcWorkerClientFactory.cs 文件中的 GrpcWorkerClient

类定义

         private class GrpcWorkerClient : IWorkerClient
        {
            private readonly FunctionRpcClient _grpcClient;
            private readonly GrpcWorkerStartupOptions _startupOptions;
            private readonly ChannelReader<StreamingMessage> _outputReader;
            private readonly ChannelWriter<StreamingMessage> _outputWriter;
            private bool _running;
            private IMessageProcessor? _processor;
           
         }

构造函数

            public GrpcWorkerClient(GrpcHostChannel outputChannel, GrpcWorkerStartupOptions startupOptions, IMessageProcessor processor)
            {
                _startupOptions = startupOptions ?? throw new ArgumentNullException(nameof(startupOptions));
                _processor = processor ?? throw new ArgumentNullException(nameof(processor));

              // 初始化 reader 和 writer,都来自 outputChannel
                _outputReader = outputChannel.Channel.Reader;
                _outputWriter = outputChannel.Channel.Writer;

              // 创建 _grpcClient,FunctionRpcClient 类型
                _grpcClient = CreateClient();
            }

CreateClient() 方法:

private FunctionRpcClient CreateClient()
            {
#if NET5_0_OR_GREATER
                GrpcChannel grpcChannel = GrpcChannel.ForAddress(_startupOptions.HostEndpoint!.AbsoluteUri, new GrpcChannelOptions()
                {
                    MaxReceiveMessageSize = _startupOptions.GrpcMaxMessageLength,
                    MaxSendMessageSize = _startupOptions.GrpcMaxMessageLength,
                    Credentials = ChannelCredentials.Insecure
                });
#else

                var options = new ChannelOption[]
                {
                    new ChannelOption(GrpcCore.ChannelOptions.MaxReceiveMessageLength, _startupOptions.GrpcMaxMessageLength),
                    new ChannelOption(GrpcCore.ChannelOptions.MaxSendMessageLength, _startupOptions.GrpcMaxMessageLength)
                };

                GrpcCore.Channel grpcChannel = new GrpcCore.Channel(_startupOptions.HostEndpoint!.Host, _startupOptions.HostEndpoint.Port, ChannelCredentials.Insecure, options);

#endif
                return new FunctionRpcClient(grpcChannel);
            }
        }

处理消息

ProcessMessageAsync

        Task IMessageProcessor.ProcessMessageAsync(StreamingMessage message)
        {
            // Dispatch and return.
            Task.Run(() => ProcessRequestCoreAsync(message));

            return Task.CompletedTask;
        }

ProcessRequestCoreAsync

private async Task ProcessRequestCoreAsync(StreamingMessage request)
        {
            StreamingMessage responseMessage = new StreamingMessage
            {
                RequestId = request.RequestId
            };

            switch (request.ContentCase)
            {
                case MsgType.InvocationRequest:
                    responseMessage.InvocationResponse = await InvocationRequestHandlerAsync(request.InvocationRequest);
                    break;

                case MsgType.WorkerInitRequest:
                    Console.WriteLine("GrpcWorker received WorkerInitRequest");
                    responseMessage.WorkerInitResponse = WorkerInitRequestHandler(request.WorkerInitRequest, _workerOptions);
                    break;

                case MsgType.WorkerStatusRequest:
                    responseMessage.WorkerStatusResponse = new WorkerStatusResponse();
                    break;

                case MsgType.FunctionsMetadataRequest:
                    responseMessage.FunctionMetadataResponse = await GetFunctionMetadataAsync(request.FunctionsMetadataRequest.FunctionAppDirectory);
                    break;

                case MsgType.WorkerTerminate:
                    WorkerTerminateRequestHandler(request.WorkerTerminate);
                    break;

                case MsgType.FunctionLoadRequest:
                    responseMessage.FunctionLoadResponse = FunctionLoadRequestHandler(request.FunctionLoadRequest, _application, _methodInfoLocator);
                    break;

                case MsgType.FunctionEnvironmentReloadRequest:
                    responseMessage.FunctionEnvironmentReloadResponse = EnvironmentReloadRequestHandler(_workerOptions);
                    break;

                case MsgType.InvocationCancel:
                    InvocationCancelRequestHandler(request.InvocationCancel);
                    break;

                default:
                    // TODO: Trace failure here.
                    return;
            }

            await _workerClient!.SendMessageAsync(responseMessage);
        }

WorkerInitRequest

                case MsgType.WorkerInitRequest:
                    responseMessage.WorkerInitResponse = WorkerInitRequestHandler(request.WorkerInitRequest, _workerOptions);
                    break;

WorkerInitRequestHandler() 方法的实现:

        internal static WorkerInitResponse WorkerInitRequestHandler(WorkerInitRequest request, WorkerOptions workerOptions)
        {
            var response = new WorkerInitResponse
            {
                Result = new StatusResult { Status = StatusResult.Types.Status.Success },
                WorkerVersion = WorkerInformation.Instance.WorkerVersion,
                WorkerMetadata = GetWorkerMetadata()
            };

            response.Capabilities.Add(GetWorkerCapabilities(workerOptions));

            return response;
        }

TBD: WorkerVersion 怎么来的?

InvocationRequest

                case MsgType.InvocationRequest:
                    responseMessage.InvocationResponse = await InvocationRequestHandlerAsync(request.InvocationRequest);
                    break;

InvocationRequestHandlerAsync 方法的实现:

        internal Task<InvocationResponse> InvocationRequestHandlerAsync(InvocationRequest request)
        {
            return _invocationHandler.InvokeAsync(request);
        }

5 - FunctionRpcClient

FunctionRpcClient

Azure/azure-functions-dotnet-worker 仓库下的 FunctionRpcClient protobuf 定义。

protobuf

proto文件地址:

protos\azure-functions-language-worker-protobuf\src\proto\FunctionRpc.proto

FunctionRpc service

FunctionRpc.proto  定义了 FunctionRpc 这个 grpc service:

option java_multiple_files = true;
option java_package = "com.microsoft.azure.functions.rpc.messages";
option java_outer_classname = "FunctionProto";
option csharp_namespace = "Microsoft.Azure.Functions.Worker.Grpc.Messages";
option go_package ="github.com/Azure/azure-functions-go-worker/internal/rpc";

package AzureFunctionsRpcMessages;

import "google/protobuf/duration.proto";
import "identity/ClaimsIdentityRpc.proto";
import "shared/NullableTypes.proto";

// Interface exported by the server.
service FunctionRpc {
}

EventStream 方法

只定义了一个 EventStream 方法:


 rpc EventStream (stream StreamingMessage) returns (stream StreamingMessage) {}

request 和 response 都是 stream,类型都是 StreamingMessage。

StreamingMessage

StreamingMessage 除了一个 request_id 用来在 host 和 worker 之间做唯一标识外,就只有一个 oneof content 字段:

message StreamingMessage {
  // Used to identify message between host and worker
  string request_id = 1;

  // Payload of the message
  oneof content {
  	......
  }
}

消息类型还挺多:

  oneof content {

    // Worker initiates stream
    StartStream start_stream = 20;

    // Host sends capabilities/init data to worker
    WorkerInitRequest worker_init_request = 17;
    // Worker responds after initializing with its capabilities & status
    WorkerInitResponse worker_init_response = 16;

    // MESSAGE NOT USED
    // Worker periodically sends empty heartbeat message to host
    WorkerHeartbeat worker_heartbeat = 15;

    // Host sends terminate message to worker.
    // Worker terminates if it can, otherwise host terminates after a grace period
    WorkerTerminate worker_terminate = 14;

    // Host periodically sends status request to the worker
    WorkerStatusRequest worker_status_request = 12;
    WorkerStatusResponse worker_status_response = 13;

    // On file change event, host sends notification to worker
    FileChangeEventRequest file_change_event_request = 6;

    // Worker requests a desired action (restart worker, reload function)
    WorkerActionResponse worker_action_response = 7;

    // Host sends required metadata to worker to load function
    FunctionLoadRequest function_load_request = 8;
    // Worker responds after loading with the load result
    FunctionLoadResponse function_load_response = 9;

    // Host requests a given invocation
    InvocationRequest invocation_request = 4;

    // Worker responds to a given invocation
    InvocationResponse invocation_response = 5;

    // Host sends cancel message to attempt to cancel an invocation.
    // If an invocation is cancelled, host will receive an invocation response with status cancelled.
    InvocationCancel invocation_cancel = 21;

    // Worker logs a message back to the host
    RpcLog rpc_log = 2;

    FunctionEnvironmentReloadRequest function_environment_reload_request = 25;

    FunctionEnvironmentReloadResponse function_environment_reload_response = 26;

    // Ask the worker to close any open shared memory resources for a given invocation
    CloseSharedMemoryResourcesRequest close_shared_memory_resources_request = 27;
    CloseSharedMemoryResourcesResponse close_shared_memory_resources_response = 28;

    // Worker indexing message types
    FunctionsMetadataRequest functions_metadata_request = 29;
    FunctionMetadataResponse function_metadata_response = 30;

    // Host sends required metadata to worker to load functions
    FunctionLoadRequestCollection function_load_request_collection = 31;

    // Host gets the list of function load responses
    FunctionLoadResponseCollection function_load_response_collection = 32;
    
    // Host sends required metadata to worker to warmup the worker
    WorkerWarmupRequest worker_warmup_request = 33;
    
    // Worker responds after warming up with the warmup result
    WorkerWarmupResponse worker_warmup_response = 34;
    
  }

StartStream

// Worker initiates stream
StartStream start_stream = 20;

// Process.Start required info
//   connection details
//   protocol type
//   protocol version

// Worker sends the host information identifying itself
message StartStream {
  // id of the worker
  string worker_id = 2;
}

TBD: 这里可以考虑增加一个 version 字段。

WorkerInitRequest / WorkerInitResponse

    // Host sends capabilities/init data to worker
    WorkerInitRequest worker_init_request = 17;
    // Worker responds after initializing with its capabilities & status
    WorkerInitResponse worker_init_response = 16;
    

// Host requests the worker to initialize itself
message WorkerInitRequest {
  // version of the host sending init request
  string host_version = 1;

  // A map of host supported features/capabilities
  map<string, string> capabilities = 2;

  // inform worker of supported categories and their levels
  // i.e. Worker = Verbose, Function.MyFunc = None
  map<string, RpcLog.Level> log_categories = 3;

  // Full path of worker.config.json location
  string worker_directory = 4;

  // base directory for function app
  string function_app_directory = 5;
}

// Worker responds with the result of initializing itself
message WorkerInitResponse {
  // PROPERTY NOT USED
  // TODO: Remove from protobuf during next breaking change release
  string worker_version = 1;

  // A map of worker supported features/capabilities
  map<string, string> capabilities = 2;

  // Status of the response
  StatusResult result = 3;

  // Worker metadata captured for telemetry purposes
  WorkerMetadata worker_metadata = 4;
}

WorkerHeartbeat

    // MESSAGE NOT USED
    // Worker periodically sends empty heartbeat message to host
    WorkerHeartbeat worker_heartbeat = 15;
    
    // MESSAGE NOT USED
// TODO: Remove from protobuf during next breaking change release
message WorkerHeartbeat {}

WorkerTerminate

    // Host sends terminate message to worker.
    // Worker terminates if it can, otherwise host terminates after a grace period
    WorkerTerminate worker_terminate = 14;

// Warning before killing the process after grace_period
// Worker self terminates ..no response on this
message WorkerTerminate {
  google.protobuf.Duration grace_period = 1;
}

WorkerStatusRequest / WorkerStatusResponse

    // Host periodically sends status request to the worker
    WorkerStatusRequest worker_status_request = 12;
    WorkerStatusResponse worker_status_response = 13;
    
// Used by the host to determine worker health
message WorkerStatusRequest {
}

// Worker responds with status message
// TODO: Add any worker relevant status to response
message WorkerStatusResponse {
}

InvocationRequest / InvocationResponse

    // Host requests a given invocation
    InvocationRequest invocation_request = 4;

    // Worker responds to a given invocation
    InvocationResponse invocation_response = 5;
    
    
    // Host requests worker to invoke a Function
message InvocationRequest {
  // Unique id for each invocation
  string invocation_id = 1;

  // Unique id for each Function
  string function_id = 2;

  // Input bindings (include trigger)
  repeated ParameterBinding input_data = 3;

  // binding metadata from trigger
  map<string, TypedData> trigger_metadata = 4;

  // Populates activityId, tracestate and tags from host
  RpcTraceContext trace_context = 5;

  // Current retry context
  RetryContext retry_context = 6;
}

// Worker responds with status of Invocation
message InvocationResponse {
  // Unique id for invocation
  string invocation_id = 1;

  // Output binding data
  repeated ParameterBinding output_data = 2;

  // data returned from Function (for $return and triggers with return support)
  TypedData return_value = 4;

  // Status of the invocation (success/failure/canceled)
  StatusResult result = 3;
}

WorkerWarmupRequest / WorkerWarmupResponse

    // Host sends required metadata to worker to warmup the worker
    WorkerWarmupRequest worker_warmup_request = 33;
    
    // Worker responds after warming up with the warmup result
    WorkerWarmupResponse worker_warmup_response = 34;
    
    
    message WorkerWarmupRequest {
  // Full path of worker.config.json location
  string worker_directory = 1;
}

message WorkerWarmupResponse {
  StatusResult result = 1;
}