FunctionRpcClient
FunctionRpcClient
Azure/azure-functions-dotnet-worker
仓库下的 FunctionRpcClient protobuf 定义。
protobuf
proto文件地址:
protos\azure-functions-language-worker-protobuf\src\proto\FunctionRpc.proto
FunctionRpc service
FunctionRpc.proto 定义了 FunctionRpc 这个 grpc service:
option java_multiple_files = true;
option java_package = "com.microsoft.azure.functions.rpc.messages";
option java_outer_classname = "FunctionProto";
option csharp_namespace = "Microsoft.Azure.Functions.Worker.Grpc.Messages";
option go_package ="github.com/Azure/azure-functions-go-worker/internal/rpc";
package AzureFunctionsRpcMessages;
import "google/protobuf/duration.proto";
import "identity/ClaimsIdentityRpc.proto";
import "shared/NullableTypes.proto";
// Interface exported by the server.
service FunctionRpc {
}
EventStream 方法
只定义了一个 EventStream 方法:
rpc EventStream (stream StreamingMessage) returns (stream StreamingMessage) {}
request 和 response 都是 stream,类型都是 StreamingMessage。
StreamingMessage
StreamingMessage 除了一个 request_id 用来在 host 和 worker 之间做唯一标识外,就只有一个 oneof content 字段:
message StreamingMessage {
// Used to identify message between host and worker
string request_id = 1;
// Payload of the message
oneof content {
......
}
}
消息类型还挺多:
oneof content {
// Worker initiates stream
StartStream start_stream = 20;
// Host sends capabilities/init data to worker
WorkerInitRequest worker_init_request = 17;
// Worker responds after initializing with its capabilities & status
WorkerInitResponse worker_init_response = 16;
// MESSAGE NOT USED
// Worker periodically sends empty heartbeat message to host
WorkerHeartbeat worker_heartbeat = 15;
// Host sends terminate message to worker.
// Worker terminates if it can, otherwise host terminates after a grace period
WorkerTerminate worker_terminate = 14;
// Host periodically sends status request to the worker
WorkerStatusRequest worker_status_request = 12;
WorkerStatusResponse worker_status_response = 13;
// On file change event, host sends notification to worker
FileChangeEventRequest file_change_event_request = 6;
// Worker requests a desired action (restart worker, reload function)
WorkerActionResponse worker_action_response = 7;
// Host sends required metadata to worker to load function
FunctionLoadRequest function_load_request = 8;
// Worker responds after loading with the load result
FunctionLoadResponse function_load_response = 9;
// Host requests a given invocation
InvocationRequest invocation_request = 4;
// Worker responds to a given invocation
InvocationResponse invocation_response = 5;
// Host sends cancel message to attempt to cancel an invocation.
// If an invocation is cancelled, host will receive an invocation response with status cancelled.
InvocationCancel invocation_cancel = 21;
// Worker logs a message back to the host
RpcLog rpc_log = 2;
FunctionEnvironmentReloadRequest function_environment_reload_request = 25;
FunctionEnvironmentReloadResponse function_environment_reload_response = 26;
// Ask the worker to close any open shared memory resources for a given invocation
CloseSharedMemoryResourcesRequest close_shared_memory_resources_request = 27;
CloseSharedMemoryResourcesResponse close_shared_memory_resources_response = 28;
// Worker indexing message types
FunctionsMetadataRequest functions_metadata_request = 29;
FunctionMetadataResponse function_metadata_response = 30;
// Host sends required metadata to worker to load functions
FunctionLoadRequestCollection function_load_request_collection = 31;
// Host gets the list of function load responses
FunctionLoadResponseCollection function_load_response_collection = 32;
// Host sends required metadata to worker to warmup the worker
WorkerWarmupRequest worker_warmup_request = 33;
// Worker responds after warming up with the warmup result
WorkerWarmupResponse worker_warmup_response = 34;
}
StartStream
// Worker initiates stream
StartStream start_stream = 20;
// Process.Start required info
// connection details
// protocol type
// protocol version
// Worker sends the host information identifying itself
message StartStream {
// id of the worker
string worker_id = 2;
}
TBD: 这里可以考虑增加一个 version 字段。
WorkerInitRequest / WorkerInitResponse
// Host sends capabilities/init data to worker
WorkerInitRequest worker_init_request = 17;
// Worker responds after initializing with its capabilities & status
WorkerInitResponse worker_init_response = 16;
// Host requests the worker to initialize itself
message WorkerInitRequest {
// version of the host sending init request
string host_version = 1;
// A map of host supported features/capabilities
map<string, string> capabilities = 2;
// inform worker of supported categories and their levels
// i.e. Worker = Verbose, Function.MyFunc = None
map<string, RpcLog.Level> log_categories = 3;
// Full path of worker.config.json location
string worker_directory = 4;
// base directory for function app
string function_app_directory = 5;
}
// Worker responds with the result of initializing itself
message WorkerInitResponse {
// PROPERTY NOT USED
// TODO: Remove from protobuf during next breaking change release
string worker_version = 1;
// A map of worker supported features/capabilities
map<string, string> capabilities = 2;
// Status of the response
StatusResult result = 3;
// Worker metadata captured for telemetry purposes
WorkerMetadata worker_metadata = 4;
}
WorkerHeartbeat
// MESSAGE NOT USED
// Worker periodically sends empty heartbeat message to host
WorkerHeartbeat worker_heartbeat = 15;
// MESSAGE NOT USED
// TODO: Remove from protobuf during next breaking change release
message WorkerHeartbeat {}
WorkerTerminate
// Host sends terminate message to worker.
// Worker terminates if it can, otherwise host terminates after a grace period
WorkerTerminate worker_terminate = 14;
// Warning before killing the process after grace_period
// Worker self terminates ..no response on this
message WorkerTerminate {
google.protobuf.Duration grace_period = 1;
}
WorkerStatusRequest / WorkerStatusResponse
// Host periodically sends status request to the worker
WorkerStatusRequest worker_status_request = 12;
WorkerStatusResponse worker_status_response = 13;
// Used by the host to determine worker health
message WorkerStatusRequest {
}
// Worker responds with status message
// TODO: Add any worker relevant status to response
message WorkerStatusResponse {
}
InvocationRequest / InvocationResponse
// Host requests a given invocation
InvocationRequest invocation_request = 4;
// Worker responds to a given invocation
InvocationResponse invocation_response = 5;
// Host requests worker to invoke a Function
message InvocationRequest {
// Unique id for each invocation
string invocation_id = 1;
// Unique id for each Function
string function_id = 2;
// Input bindings (include trigger)
repeated ParameterBinding input_data = 3;
// binding metadata from trigger
map<string, TypedData> trigger_metadata = 4;
// Populates activityId, tracestate and tags from host
RpcTraceContext trace_context = 5;
// Current retry context
RetryContext retry_context = 6;
}
// Worker responds with status of Invocation
message InvocationResponse {
// Unique id for invocation
string invocation_id = 1;
// Output binding data
repeated ParameterBinding output_data = 2;
// data returned from Function (for $return and triggers with return support)
TypedData return_value = 4;
// Status of the invocation (success/failure/canceled)
StatusResult result = 3;
}
WorkerWarmupRequest / WorkerWarmupResponse
// Host sends required metadata to worker to warmup the worker
WorkerWarmupRequest worker_warmup_request = 33;
// Worker responds after warming up with the warmup result
WorkerWarmupResponse worker_warmup_response = 34;
message WorkerWarmupRequest {
// Full path of worker.config.json location
string worker_directory = 1;
}
message WorkerWarmupResponse {
StatusResult result = 1;
}