worker initial
- 1: 过时的初始化
- 2: 调用堆栈
- 3: GrpcWorkerClient
- 4: GrpcWorker
- 5: FunctionRpcClient
1 - 过时的初始化
GrpcDurableTaskWorker
仓库 durabletask-dotnet
下的文件 src\Worker\Grpc\GrpcDurableTaskWorker.Processor.cs
public async Task ExecuteAsync(CancellationToken cancellation)
{
while (!cancellation.IsCancellationRequested)
{
try
{
AsyncServerStreamingCall<P.WorkItem> stream = await this.ConnectAsync(cancellation);
await this.ProcessWorkItemsAsync(stream, cancellation);
}
......
}
}
ConnectAsync() 方法中会调用 grpc protobuf 文件中定义的 GetWorkItems() 方法:
async Task<AsyncServerStreamingCall<P.WorkItem>> ConnectAsync(CancellationToken cancellation)
{
await this.sidecar!.HelloAsync(EmptyMessage, cancellationToken: cancellation);
this.Logger.EstablishedWorkItemConnection();
Console.WriteLine("********GrpcDurableTaskWorker call GetWorkItems()********");
// Get the stream for receiving work-items
return this.sidecar!.GetWorkItems(new P.GetWorkItemsRequest(), cancellationToken: cancellation);
}
这本该是 worker 正常的初始化流程,但现在已经被废弃。
备注:记录一下避免误解,事实上我被耽误了很多时间在这里。
2 - 调用堆栈
调用堆栈概况
在 azure-functions-dotnet-worker
仓库下的 src\DotNetWorker.Grpc\GrpcWorker.cs
中增加日志打印:
public Task StartAsync(CancellationToken token)
{
Console.WriteLine(new System.Diagnostics.StackTrace(true));
_workerClient = _workerClientFactory.CreateClient(this);
Console.WriteLine("_workerClient is " + _workerClient.GetType().Name);
return _workerClient.StartAsync(token);
}
得到启动时初始化 worker 的调用堆栈:
[2024-04-08T09:07:04.591Z] at Microsoft.Azure.Functions.Worker.GrpcWorker.StartAsync(CancellationToken token) in C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\src\DotNetWorker.Grpc\GrpcWorker.cs:line 58
[2024-04-08T09:07:04.592Z] at Microsoft.Azure.Functions.Worker.WorkerHostedService.StartAsync(CancellationToken cancellationToken) in C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\src\DotNetWorker.Core\WorkerHostedService.cs:line 25
[2024-04-08T09:07:04.592Z] at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
[2024-04-08T09:07:04.593Z] at Microsoft.Azure.Functions.Worker.WorkerHostedService.StartAsync(CancellationToken cancellationToken)
[2024-04-08T09:07:04.593Z] at Microsoft.Extensions.Hosting.Internal.Host.<StartAsync>b__15_1(IHostedService service, CancellationToken token)
[2024-04-08T09:07:04.593Z] at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
[2024-04-08T09:07:04.593Z] at Microsoft.Extensions.Hosting.Internal.Host.<StartAsync>b__15_1(IHostedService service, CancellationToken token)
[2024-04-08T09:07:04.594Z] at Microsoft.Extensions.Hosting.Internal.Host.ForeachService[T](IEnumerable`1 services, CancellationToken token, Boolean concurrent, Boolean abortOnFirstException, List`1 exceptions, Func`3 operation)
[2024-04-08T09:07:04.594Z] at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
[2024-04-08T09:07:04.594Z] at Microsoft.Extensions.Hosting.Internal.Host.ForeachService[T](IEnumerable`1 services, CancellationToken token, Boolean concurrent, Boolean abortOnFirstException, List`1 exceptions, Func`3 operation)
[2024-04-08T09:07:04.594Z] at Microsoft.Extensions.Hosting.Internal.Host.StartAsync(CancellationToken cancellationToken)
[2024-04-08T09:07:04.595Z] at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
[2024-04-08T09:07:04.595Z] at Microsoft.Extensions.Hosting.Internal.Host.StartAsync(CancellationToken cancellationToken)
[2024-04-08T09:07:04.595Z] at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.RunAsync(IHost host, CancellationToken token)
[2024-04-08T09:07:04.596Z] at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
[2024-04-08T09:07:04.596Z] at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.RunAsync(IHost host, CancellationToken token)
[2024-04-08T09:07:04.596Z] at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.Run(IHost host)
[2024-04-08T09:07:04.596Z] at Program.<Main>$(String[] args) in C:\Users\sky\work\code\durabletask\MyDurableFunction1\Program.cs:line 13
......
[2024-04-08T09:07:04.598Z] _workerClient is GrpcWorkerClient
[2024-04-08T09:07:04.629Z] Worker process started and initialized.
调用堆栈
GrpcWorker
public Task StartAsync(CancellationToken token)
{
_workerClient = _workerClientFactory.CreateClient(this);
return _workerClient.StartAsync(token);
}
GrpcWorkerClient
src\DotNetWorker.Grpc\GrpcWorkerClientFactory.cs
文件中的 GrpcWorkerClient
public async Task StartAsync(CancellationToken token)
{
if (_running)
{
throw new InvalidOperationException($"The client is already running. Multiple calls to {nameof(StartAsync)} are not supported.");
}
_running = true;
var eventStream = _grpcClient.EventStream(cancellationToken: token);
await SendStartStreamMessageAsync(eventStream.RequestStream);
_ = StartWriterAsync(eventStream.RequestStream);
_ = StartReaderAsync(eventStream.ResponseStream);
}
3 - GrpcWorkerClient
src\DotNetWorker.Grpc\GrpcWorkerClientFactory.cs
文件中的 GrpcWorkerClient
类定义
private class GrpcWorkerClient : IWorkerClient
{
private readonly FunctionRpcClient _grpcClient;
private readonly GrpcWorkerStartupOptions _startupOptions;
private readonly ChannelReader<StreamingMessage> _outputReader;
private readonly ChannelWriter<StreamingMessage> _outputWriter;
private bool _running;
private IMessageProcessor? _processor;
}
构造函数
public GrpcWorkerClient(GrpcHostChannel outputChannel, GrpcWorkerStartupOptions startupOptions, IMessageProcessor processor)
{
_startupOptions = startupOptions ?? throw new ArgumentNullException(nameof(startupOptions));
_processor = processor ?? throw new ArgumentNullException(nameof(processor));
// 初始化 reader 和 writer,都来自 outputChannel
_outputReader = outputChannel.Channel.Reader;
_outputWriter = outputChannel.Channel.Writer;
// 创建 _grpcClient,FunctionRpcClient 类型
_grpcClient = CreateClient();
}
CreateClient() 方法:
private FunctionRpcClient CreateClient()
{
#if NET5_0_OR_GREATER
GrpcChannel grpcChannel = GrpcChannel.ForAddress(_startupOptions.HostEndpoint!.AbsoluteUri, new GrpcChannelOptions()
{
MaxReceiveMessageSize = _startupOptions.GrpcMaxMessageLength,
MaxSendMessageSize = _startupOptions.GrpcMaxMessageLength,
Credentials = ChannelCredentials.Insecure
});
#else
var options = new ChannelOption[]
{
new ChannelOption(GrpcCore.ChannelOptions.MaxReceiveMessageLength, _startupOptions.GrpcMaxMessageLength),
new ChannelOption(GrpcCore.ChannelOptions.MaxSendMessageLength, _startupOptions.GrpcMaxMessageLength)
};
GrpcCore.Channel grpcChannel = new GrpcCore.Channel(_startupOptions.HostEndpoint!.Host, _startupOptions.HostEndpoint.Port, ChannelCredentials.Insecure, options);
#endif
return new FunctionRpcClient(grpcChannel);
}
}
start过程
StartAsync
public async Task StartAsync(CancellationToken token)
{
if (_running)
{
throw new InvalidOperationException($"The client is already running. Multiple calls to {nameof(StartAsync)} are not supported.");
}
_running = true;
var eventStream = _grpcClient.EventStream(cancellationToken: token);
await SendStartStreamMessageAsync(eventStream.RequestStream);
_ = StartWriterAsync(eventStream.RequestStream);
_ = StartReaderAsync(eventStream.ResponseStream);
}
4 - GrpcWorker
src\DotNetWorker.Grpc\GrpcWorkerClientFactory.cs
文件中的 GrpcWorkerClient
类定义
private class GrpcWorkerClient : IWorkerClient
{
private readonly FunctionRpcClient _grpcClient;
private readonly GrpcWorkerStartupOptions _startupOptions;
private readonly ChannelReader<StreamingMessage> _outputReader;
private readonly ChannelWriter<StreamingMessage> _outputWriter;
private bool _running;
private IMessageProcessor? _processor;
}
构造函数
public GrpcWorkerClient(GrpcHostChannel outputChannel, GrpcWorkerStartupOptions startupOptions, IMessageProcessor processor)
{
_startupOptions = startupOptions ?? throw new ArgumentNullException(nameof(startupOptions));
_processor = processor ?? throw new ArgumentNullException(nameof(processor));
// 初始化 reader 和 writer,都来自 outputChannel
_outputReader = outputChannel.Channel.Reader;
_outputWriter = outputChannel.Channel.Writer;
// 创建 _grpcClient,FunctionRpcClient 类型
_grpcClient = CreateClient();
}
CreateClient() 方法:
private FunctionRpcClient CreateClient()
{
#if NET5_0_OR_GREATER
GrpcChannel grpcChannel = GrpcChannel.ForAddress(_startupOptions.HostEndpoint!.AbsoluteUri, new GrpcChannelOptions()
{
MaxReceiveMessageSize = _startupOptions.GrpcMaxMessageLength,
MaxSendMessageSize = _startupOptions.GrpcMaxMessageLength,
Credentials = ChannelCredentials.Insecure
});
#else
var options = new ChannelOption[]
{
new ChannelOption(GrpcCore.ChannelOptions.MaxReceiveMessageLength, _startupOptions.GrpcMaxMessageLength),
new ChannelOption(GrpcCore.ChannelOptions.MaxSendMessageLength, _startupOptions.GrpcMaxMessageLength)
};
GrpcCore.Channel grpcChannel = new GrpcCore.Channel(_startupOptions.HostEndpoint!.Host, _startupOptions.HostEndpoint.Port, ChannelCredentials.Insecure, options);
#endif
return new FunctionRpcClient(grpcChannel);
}
}
处理消息
ProcessMessageAsync
Task IMessageProcessor.ProcessMessageAsync(StreamingMessage message)
{
// Dispatch and return.
Task.Run(() => ProcessRequestCoreAsync(message));
return Task.CompletedTask;
}
ProcessRequestCoreAsync
private async Task ProcessRequestCoreAsync(StreamingMessage request)
{
StreamingMessage responseMessage = new StreamingMessage
{
RequestId = request.RequestId
};
switch (request.ContentCase)
{
case MsgType.InvocationRequest:
responseMessage.InvocationResponse = await InvocationRequestHandlerAsync(request.InvocationRequest);
break;
case MsgType.WorkerInitRequest:
Console.WriteLine("GrpcWorker received WorkerInitRequest");
responseMessage.WorkerInitResponse = WorkerInitRequestHandler(request.WorkerInitRequest, _workerOptions);
break;
case MsgType.WorkerStatusRequest:
responseMessage.WorkerStatusResponse = new WorkerStatusResponse();
break;
case MsgType.FunctionsMetadataRequest:
responseMessage.FunctionMetadataResponse = await GetFunctionMetadataAsync(request.FunctionsMetadataRequest.FunctionAppDirectory);
break;
case MsgType.WorkerTerminate:
WorkerTerminateRequestHandler(request.WorkerTerminate);
break;
case MsgType.FunctionLoadRequest:
responseMessage.FunctionLoadResponse = FunctionLoadRequestHandler(request.FunctionLoadRequest, _application, _methodInfoLocator);
break;
case MsgType.FunctionEnvironmentReloadRequest:
responseMessage.FunctionEnvironmentReloadResponse = EnvironmentReloadRequestHandler(_workerOptions);
break;
case MsgType.InvocationCancel:
InvocationCancelRequestHandler(request.InvocationCancel);
break;
default:
// TODO: Trace failure here.
return;
}
await _workerClient!.SendMessageAsync(responseMessage);
}
WorkerInitRequest
case MsgType.WorkerInitRequest:
responseMessage.WorkerInitResponse = WorkerInitRequestHandler(request.WorkerInitRequest, _workerOptions);
break;
WorkerInitRequestHandler() 方法的实现:
internal static WorkerInitResponse WorkerInitRequestHandler(WorkerInitRequest request, WorkerOptions workerOptions)
{
var response = new WorkerInitResponse
{
Result = new StatusResult { Status = StatusResult.Types.Status.Success },
WorkerVersion = WorkerInformation.Instance.WorkerVersion,
WorkerMetadata = GetWorkerMetadata()
};
response.Capabilities.Add(GetWorkerCapabilities(workerOptions));
return response;
}
TBD: WorkerVersion 怎么来的?
InvocationRequest
case MsgType.InvocationRequest:
responseMessage.InvocationResponse = await InvocationRequestHandlerAsync(request.InvocationRequest);
break;
InvocationRequestHandlerAsync 方法的实现:
internal Task<InvocationResponse> InvocationRequestHandlerAsync(InvocationRequest request)
{
return _invocationHandler.InvokeAsync(request);
}
5 - FunctionRpcClient
Azure/azure-functions-dotnet-worker
仓库下的 FunctionRpcClient protobuf 定义。
protobuf
proto文件地址:
protos\azure-functions-language-worker-protobuf\src\proto\FunctionRpc.proto
FunctionRpc service
FunctionRpc.proto 定义了 FunctionRpc 这个 grpc service:
option java_multiple_files = true;
option java_package = "com.microsoft.azure.functions.rpc.messages";
option java_outer_classname = "FunctionProto";
option csharp_namespace = "Microsoft.Azure.Functions.Worker.Grpc.Messages";
option go_package ="github.com/Azure/azure-functions-go-worker/internal/rpc";
package AzureFunctionsRpcMessages;
import "google/protobuf/duration.proto";
import "identity/ClaimsIdentityRpc.proto";
import "shared/NullableTypes.proto";
// Interface exported by the server.
service FunctionRpc {
}
EventStream 方法
只定义了一个 EventStream 方法:
rpc EventStream (stream StreamingMessage) returns (stream StreamingMessage) {}
request 和 response 都是 stream,类型都是 StreamingMessage。
StreamingMessage
StreamingMessage 除了一个 request_id 用来在 host 和 worker 之间做唯一标识外,就只有一个 oneof content 字段:
message StreamingMessage {
// Used to identify message between host and worker
string request_id = 1;
// Payload of the message
oneof content {
......
}
}
消息类型还挺多:
oneof content {
// Worker initiates stream
StartStream start_stream = 20;
// Host sends capabilities/init data to worker
WorkerInitRequest worker_init_request = 17;
// Worker responds after initializing with its capabilities & status
WorkerInitResponse worker_init_response = 16;
// MESSAGE NOT USED
// Worker periodically sends empty heartbeat message to host
WorkerHeartbeat worker_heartbeat = 15;
// Host sends terminate message to worker.
// Worker terminates if it can, otherwise host terminates after a grace period
WorkerTerminate worker_terminate = 14;
// Host periodically sends status request to the worker
WorkerStatusRequest worker_status_request = 12;
WorkerStatusResponse worker_status_response = 13;
// On file change event, host sends notification to worker
FileChangeEventRequest file_change_event_request = 6;
// Worker requests a desired action (restart worker, reload function)
WorkerActionResponse worker_action_response = 7;
// Host sends required metadata to worker to load function
FunctionLoadRequest function_load_request = 8;
// Worker responds after loading with the load result
FunctionLoadResponse function_load_response = 9;
// Host requests a given invocation
InvocationRequest invocation_request = 4;
// Worker responds to a given invocation
InvocationResponse invocation_response = 5;
// Host sends cancel message to attempt to cancel an invocation.
// If an invocation is cancelled, host will receive an invocation response with status cancelled.
InvocationCancel invocation_cancel = 21;
// Worker logs a message back to the host
RpcLog rpc_log = 2;
FunctionEnvironmentReloadRequest function_environment_reload_request = 25;
FunctionEnvironmentReloadResponse function_environment_reload_response = 26;
// Ask the worker to close any open shared memory resources for a given invocation
CloseSharedMemoryResourcesRequest close_shared_memory_resources_request = 27;
CloseSharedMemoryResourcesResponse close_shared_memory_resources_response = 28;
// Worker indexing message types
FunctionsMetadataRequest functions_metadata_request = 29;
FunctionMetadataResponse function_metadata_response = 30;
// Host sends required metadata to worker to load functions
FunctionLoadRequestCollection function_load_request_collection = 31;
// Host gets the list of function load responses
FunctionLoadResponseCollection function_load_response_collection = 32;
// Host sends required metadata to worker to warmup the worker
WorkerWarmupRequest worker_warmup_request = 33;
// Worker responds after warming up with the warmup result
WorkerWarmupResponse worker_warmup_response = 34;
}
StartStream
// Worker initiates stream
StartStream start_stream = 20;
// Process.Start required info
// connection details
// protocol type
// protocol version
// Worker sends the host information identifying itself
message StartStream {
// id of the worker
string worker_id = 2;
}
TBD: 这里可以考虑增加一个 version 字段。
WorkerInitRequest / WorkerInitResponse
// Host sends capabilities/init data to worker
WorkerInitRequest worker_init_request = 17;
// Worker responds after initializing with its capabilities & status
WorkerInitResponse worker_init_response = 16;
// Host requests the worker to initialize itself
message WorkerInitRequest {
// version of the host sending init request
string host_version = 1;
// A map of host supported features/capabilities
map<string, string> capabilities = 2;
// inform worker of supported categories and their levels
// i.e. Worker = Verbose, Function.MyFunc = None
map<string, RpcLog.Level> log_categories = 3;
// Full path of worker.config.json location
string worker_directory = 4;
// base directory for function app
string function_app_directory = 5;
}
// Worker responds with the result of initializing itself
message WorkerInitResponse {
// PROPERTY NOT USED
// TODO: Remove from protobuf during next breaking change release
string worker_version = 1;
// A map of worker supported features/capabilities
map<string, string> capabilities = 2;
// Status of the response
StatusResult result = 3;
// Worker metadata captured for telemetry purposes
WorkerMetadata worker_metadata = 4;
}
WorkerHeartbeat
// MESSAGE NOT USED
// Worker periodically sends empty heartbeat message to host
WorkerHeartbeat worker_heartbeat = 15;
// MESSAGE NOT USED
// TODO: Remove from protobuf during next breaking change release
message WorkerHeartbeat {}
WorkerTerminate
// Host sends terminate message to worker.
// Worker terminates if it can, otherwise host terminates after a grace period
WorkerTerminate worker_terminate = 14;
// Warning before killing the process after grace_period
// Worker self terminates ..no response on this
message WorkerTerminate {
google.protobuf.Duration grace_period = 1;
}
WorkerStatusRequest / WorkerStatusResponse
// Host periodically sends status request to the worker
WorkerStatusRequest worker_status_request = 12;
WorkerStatusResponse worker_status_response = 13;
// Used by the host to determine worker health
message WorkerStatusRequest {
}
// Worker responds with status message
// TODO: Add any worker relevant status to response
message WorkerStatusResponse {
}
InvocationRequest / InvocationResponse
// Host requests a given invocation
InvocationRequest invocation_request = 4;
// Worker responds to a given invocation
InvocationResponse invocation_response = 5;
// Host requests worker to invoke a Function
message InvocationRequest {
// Unique id for each invocation
string invocation_id = 1;
// Unique id for each Function
string function_id = 2;
// Input bindings (include trigger)
repeated ParameterBinding input_data = 3;
// binding metadata from trigger
map<string, TypedData> trigger_metadata = 4;
// Populates activityId, tracestate and tags from host
RpcTraceContext trace_context = 5;
// Current retry context
RetryContext retry_context = 6;
}
// Worker responds with status of Invocation
message InvocationResponse {
// Unique id for invocation
string invocation_id = 1;
// Output binding data
repeated ParameterBinding output_data = 2;
// data returned from Function (for $return and triggers with return support)
TypedData return_value = 4;
// Status of the invocation (success/failure/canceled)
StatusResult result = 3;
}
WorkerWarmupRequest / WorkerWarmupResponse
// Host sends required metadata to worker to warmup the worker
WorkerWarmupRequest worker_warmup_request = 33;
// Worker responds after warming up with the warmup result
WorkerWarmupResponse worker_warmup_response = 34;
message WorkerWarmupRequest {
// Full path of worker.config.json location
string worker_directory = 1;
}
message WorkerWarmupResponse {
StatusResult result = 1;
}