Dapr Runtime接收服务调用的inbound请求

Dapr Runtime通过gRPC internal API接收来自客户端Dapr Runtime的inbound请求

Dapr runtime 之间相互通讯走的是 gRPC internal API,这个 API 也只支持 gRPC 协议。

hide footbox
skinparam style strictuml

participant daprd_client [
    =daprd
    ----
    client
]
participant daprd_server [
    =daprd
    ----
    server
]

daprd_client -[#red]> daprd_server : gRPC (remote call)
note right: internal API @ ramdon free port\n/dapr.proto.internals.v1.ServiceInvocation/CallLocal
daprd_server -> daprd_server : interceptor
daprd_server -[#blue]>  : appChannel.InvokeMethod()

接收请求

Runtime 初始化时,在注册 gRPC 服务时绑定了 gPRC Internal API 实现和 CallLocal gRPC 方法。对于访问 dapr.proto.internals.v1.ServiceInvocation 服务的 CallLocal 方法的 gRPC 请求,会将请求转给 _ServiceInvocation_CallLocal_Handler 处理:

func _ServiceInvocation_CallLocal_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
	......
	if interceptor == nil {
		return srv.(ServiceInvocationServer).CallLocal(ctx, in)
	}
	info := &grpc.UnaryServerInfo{
		Server:     srv,
		FullMethod: "/dapr.proto.internals.v1.ServiceInvocation/CallLocal",
	}
	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
        // 这里调用的 srv 即 gRPC api 实现
		return srv.(ServiceInvocationServer).CallLocal(ctx, req.(*InternalInvokeRequest))  
	}
	return interceptor(ctx, in, info, handler)
}

最后进入 CallLocal() 方法进行处理。

备注:初始化的细节,请见前面章节 “Runtime初始化”

期间会有一个 interceptor 的处理流程,细节后面展开。

转发请求

当 internal invoke 的 gRPC 请求进来后,就会进入 pkc/grpc/api.go 中的 CallLocal 方法:

func (a *api) CallLocal(ctx context.Context, in *internalv1pb.InternalInvokeRequest) (*internalv1pb.InternalInvokeResponse, error) {
	// 1. 构造请求
	req, err := invokev1.InternalInvokeRequest(in)
  if a.accessControlList != nil {
		......
	}
	// 2. 通过 appChannel 向应用发出请求
	resp, err := a.appChannel.InvokeMethod(ctx, req)
    // 3. 处理应答
	return resp.Proto(), err
}

处理方式很清晰,基本上就是将请求通过 app channel 转发。Runtime 本身并没有什么额外的处理逻辑。InternalInvokeRequest() 只是简单处理一下参数:

// InternalInvokeRequest creates InvokeMethodRequest object from InternalInvokeRequest pb object.
func InternalInvokeRequest(pb *internalv1pb.InternalInvokeRequest) (*InvokeMethodRequest, error) {
	req := &InvokeMethodRequest{r: pb}
	if pb.Message == nil {
		return nil, errors.New("Message field is nil")
	}

	return req, nil
}

访问控制

期间会有一个 access control (访问控制)的逻辑:

	if a.accessControlList != nil {
		// An access control policy has been specified for the app. Apply the policies.
		operation := req.Message().Method
		var httpVerb commonv1pb.HTTPExtension_Verb
		// Get the http verb in case the application protocol is http
		if a.appProtocol == config.HTTPProtocol && req.Metadata() != nil && len(req.Metadata()) > 0 {
			httpExt := req.Message().GetHttpExtension()
			if httpExt != nil {
				httpVerb = httpExt.GetVerb()
			}
		}
		callAllowed, errMsg := acl.ApplyAccessControlPolicies(ctx, operation, httpVerb, a.appProtocol, a.accessControlList)

		if !callAllowed {
			return nil, status.Errorf(codes.PermissionDenied, errMsg)
		}
	}

细节后面展开。