1 - 流程概述

Dapr服务调用的流程和API概述

API 和端口

Dapr runtime 对外提供两个 API,分别是 Dapr HTTP API 和 Dapr gRPC API。另外两个 dapr runtime 之间的通讯 (Dapr internal API) 固定用 gRPC 协议。

两个 Dapr API 对外暴露的端口,默认是:

  • 3500: HTTP 端口,可以通过命令行参数 dapr-http-port 设置
  • 50001: gRPC 端口,可以通过命令行参数 dapr-grpc-port 设置

Dapr internal API 是内部端口,比较特殊,没有固定的默认值,而是取任意随机可用端口。也可以通过命令行参数 dapr-internal-grpc-port 设置。

为了向服务器端的应用发送请求,dapr 需要获知应用在哪个端口监听并处理请求,这个信息通过命令行参数 app-port 设置。Dapr 的示例中一般喜欢用 3000 端口。

调用流程

HTTP 方式

title Service Invoke via HTTP
hide footbox
skinparam style strictuml
box "App-1"
participant user_code_client [
    =App-1
    ----
    client
]
participant SDK_client [
    =SDK
    ----
    client
]
end box
participant daprd_client [
    =daprd
    ----
    client
]
participant daprd_server [
    =daprd
    ----
    server
]

box "App-2"
participant user_code_server [
    =App-2
    ----
    server
]
end box

user_code_client -> SDK_client : Invoke\nService() 
note left: appId="app-2"\nmethodName="method-1"
SDK_client -[#blue]> daprd_client : HTTP (localhost)
note right: HTTP API @ 3500
|||
daprd_client -[#red]> daprd_server : gRPC (remote call)
note right: internal API @ ramdon free port
|||
daprd_server -[#blue]> user_code_server :  http (localhost)
note right: HTTP endpoint "method-1" @ 3000

daprd_server <[#blue]-- user_code_server
daprd_client <[#red]-- daprd_server
SDK_client <[#blue]-- daprd_client
user_code_client <-- SDK_client

gRPC 方式

title Service Invoke via gRPC
hide footbox
skinparam style strictuml
box "App-1"
participant user_code_client [
    =App-1
    ----
    client
]
participant SDK_client [
    =SDK
    ----
    client
]
end box
participant daprd_client [
    =daprd
    ----
    client
]
participant daprd_server [
    =daprd
    ----
    server
]

box "App-2"
participant SDK_server [
    =SDK
    ----
    server
]
participant user_code_server [
    =App-2
    ----
    server
]
end box
user_code_server -> SDK_server: AddServiceInvocationHandler("method-1")
SDK_server -> SDK_server: save handler in invokeHandlers["method-1"]
SDK_server --> user_code_server
user_code_client -> SDK_client : Invoke\nService() 
note left: appId="app-2"\nmethodName="method-1"
SDK_client -[#blue]> daprd_client : gRPC (localhost)
note right: gRPC API @ 50001\n/dapr.proto.runtime.v1.Dapr/InvokeService
|||
daprd_client -[#red]> daprd_server : gRPC (remote call)
note right: internal API @ random free port\n/dapr.proto.internals.v1.ServiceInvocation/CallLocal
|||
daprd_server -[#blue]> SDK_server : gRPC (localhost)
note right: 50001\n/dapr.proto.runtime.v1.AppCallback/OnInvoke
SDK_server -> SDK_server: get handler by invokeHandlers["method-1"]
SDK_server -> user_code_server : invoke handler of "method-1"

SDK_server <-- user_code_server
daprd_server <[#blue]-- SDK_server
daprd_client <[#red]-- daprd_server
SDK_client <[#blue]-- daprd_client
user_code_client <-- SDK_client

gRPC proxying 方式

title Service Invoke via gRPC proxying
hide footbox
skinparam style strictuml
box "App-1"
participant user_code_client [
    =App-1
    ----
    client
]
participant SDK_client [
    =SDK
    ----
    client
]
end box
participant daprd_client [
    =daprd
    ----
    client
]
participant daprd_server [
    =daprd
    ----
    server
]

box "App-2"
participant SDK_server [
    =gRPC
    ----
    server
]
participant user_code_server [
    =App-2
    ----
    server
]
end box
user_code_server -> SDK_server
SDK_server --> user_code_server
user_code_client -[#blue]> daprd_client : gRPC (localhost)
note right: gRPC\n/user.services.ServiceName/Method-1
|||
daprd_client -[#red]> daprd_server : gRPC proxy (remote call)
note right: gRPC\n/user.services.ServiceName/Method-1
|||
daprd_server -[#blue]> SDK_server : gRPC (localhost)
note right: gRPC\n/user.services.ServiceName/Method-1
SDK_server -> user_code_server : 

SDK_server <-- user_code_server
daprd_server <[#blue]-- SDK_server
daprd_client <[#red]-- daprd_server
SDK_client <[#blue]-- daprd_client
user_code_client <-- SDK_client

2 - 服务调用相关的Runtime初始化

Dapr Runtime中和服务调用相关的初始化流程

在 dapr runtime 启动进行初始化时,需要开启 API 端口并挂载相应的 handler 来接收并处理服务调用的 outbound 请求。另外为了接收来自其他 dapr runtime 的 inbound 请求,还要启动 dapr internal server。

Dapr HTTP API Server(outbound)

在 dapr runtime 中启动 HTTP server

dapr runtime 的 HTTP server 用的是 fasthttp。

在 dapr runtime 启动时的初始化过程中,会启动 HTTP server, 代码在 pkg/runtime/runtime.go 中:

func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
  ......
  // Start HTTP Server
	err = a.startHTTPServer(a.runtimeConfig.HTTPPort, a.runtimeConfig.PublicPort, a.runtimeConfig.ProfilePort, a.runtimeConfig.AllowedOrigins, pipeline)
	if err != nil {
		log.Fatalf("failed to start HTTP server: %s", err)
	}
  ......
}

func (a *DaprRuntime) startHTTPServer(......) error {
	a.daprHTTPAPI = http.NewAPI(......)

	server := http.NewServer(a.daprHTTPAPI, ......)
  if err := server.StartNonBlocking(); err != nil {		// StartNonBlocking 启动 fasthttp server
		return err
	}
}

StartNonBlocking() 的实现代码在 pkg/http/server.go 中:

// StartNonBlocking starts a new server in a goroutine.
func (s *server) StartNonBlocking() error {
  	......
  	for _, apiListenAddress := range s.config.APIListenAddresses {
			l, err := net.Listen("tcp", fmt.Sprintf("%s:%v", apiListenAddress, s.config.Port))
      listeners = append(listeners, l)
		}
  
  	for _, listener := range listeners {
		// customServer is created in a loop because each instance
		// has a handle on the underlying listener.
		customServer := &fasthttp.Server{
			Handler:            handler,
			MaxRequestBodySize: s.config.MaxRequestBodySize * 1024 * 1024,
			ReadBufferSize:     s.config.ReadBufferSize * 1024,
			StreamRequestBody:  s.config.StreamRequestBody,
		}
		s.servers = append(s.servers, customServer)

		go func(l net.Listener) {
			if err := customServer.Serve(l); err != nil {
				log.Fatal(err)
			}
		}(listener)
	}
}

挂载 DirectMessaging 的 HTTP 端点

在 HTTP API 的初始化过程中,会在 fast http server 上挂载 DirectMessaging 的 HTTP 端点,代码在 pkg/http/api.go 中:

func NewAPI(
  appID string,
	appChannel channel.AppChannel,
	directMessaging messaging.DirectMessaging,
  ......
  	shutdown func()) API {
  
  	api := &api{
		appChannel:               appChannel,
		directMessaging:          directMessaging,
		......
	}
  
  	// 附加 DirectMessaging 的 HTTP 端点
  	api.endpoints = append(api.endpoints, api.constructDirectMessagingEndpoints()...)
}

DirectMessaging 的 HTTP 端点的具体信息在 constructDirectMessagingEndpoints() 方法中:

func (a *api) constructDirectMessagingEndpoints() []Endpoint {
	return []Endpoint{
		{
			Methods:           []string{router.MethodWild},
			Route:             "invoke/{id}/method/{method:*}",
			Alias:             "{method:*}",
			Version:           apiVersionV1,
			KeepParamUnescape: true,
			Handler:           a.onDirectMessage,
		},
	}
}

注意这里的 Route 路径 “invoke/{id}/method/{method:*}", dapr sdk 就是就通过这样的 url 来发起 HTTP 请求。

title Dapr HTTP API 
hide footbox
skinparam style strictuml

participant daprd_client [
    =daprd
    ----
    client
]

-[#blue]> daprd_client : HTTP (localhost)
note right: HTTP API @ 3500\n/v1.0/invoke/{id}/method/{method}
|||
<[#blue]-- daprd_client

Dapr gRPC API Server(outbound)

启动 gRPC 服务器

在 dapr runtime 启动时的初始化过程中,会启动 gRPC server, 代码在 pkg/runtime/runtime.go 中:

func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
    // Create and start internal and external gRPC servers
	grpcAPI := a.getGRPCAPI()
    
	err = a.startGRPCAPIServer(grpcAPI, a.runtimeConfig.APIGRPCPort)
    ......
}

func (a *DaprRuntime) startGRPCAPIServer(api grpc.API, port int) error {
	serverConf := a.getNewServerConfig(a.runtimeConfig.APIListenAddresses, port)
	server := grpc.NewAPIServer(api, serverConf, a.globalConfig.Spec.TracingSpec, a.globalConfig.Spec.MetricSpec, a.globalConfig.Spec.APISpec, a.proxy)
    if err := server.StartNonBlocking(); err != nil {
		return err
	}
	......
}

// NewAPIServer returns a new user facing gRPC API server.
func NewAPIServer(api API, config ServerConfig, ......) Server {
	return &server{
		api:         api,
		config:      config,
		kind:        apiServer, // const apiServer = "apiServer"
		......
	}
}

注册 Dapr API

为了让 dapr runtime 的 gRPC 服务器能挂载 Dapr API,需要进行注册上去。

注册的代码实现在 pkg/grpc/server.go 中, StartNonBlocking() 方法在启动 grpc 服务器时,会进行服务注册:

func (s *server) StartNonBlocking() error {
		if s.kind == internalServer {
			internalv1pb.RegisterServiceInvocationServer(server, s.api)
		} else if s.kind == apiServer {
            runtimev1pb.RegisterDaprServer(server, s.api)		// 注意:s.api (即 gRPC api 实现) 被传递进去
		}
		......
}

而 RegisterDaprServer() 方法的实现代码在 pkg/proto/runtime/v1/dapr_grpc.pb.go:

func RegisterDaprServer(s grpc.ServiceRegistrar, srv DaprServer) {
	s.RegisterService(&Dapr_ServiceDesc, srv)					// srv 即 gRPC api 实现
}

Dapr_ServiceDesc 定义

在文件 pkg/proto/runtime/v1/dapr_grpc.pb.go 中有 Dapr Service 的 grpc 服务定义,这是 protoc 生成的 gRPC 代码。

Dapr_ServiceDesc 中有 Dapr Service 各个方法的定义,和服务调用相关的是 InvokeService 方法:

var Dapr_ServiceDesc = grpc.ServiceDesc{
	ServiceName: "dapr.proto.runtime.v1.Dapr",
	HandlerType: (*DaprServer)(nil),
	Methods: []grpc.MethodDesc{
		{
			MethodName: "InvokeService",				# 注册方法名
			Handler:    _Dapr_InvokeService_Handler,	# 关联实现的 Handler
		},
        ......
        },
	},
	Metadata: "dapr/proto/runtime/v1/dapr.proto",
}

这一段是告诉 gRPC server: 如果收到访问 dapr.proto.runtime.v1.Dapr 服务的 InvokeService 方法的 gRPC 请求,请把请求转给 _Dapr_InvokeService_Handler 处理。

title Dapr gRPC API 
hide footbox
skinparam style strictuml

participant daprd_client [
    =daprd
    ----
    client
]

-[#blue]> daprd_client : gRPC (localhost)
note right: gRPC API @ 50001\n/dapr.proto.runtime.v1.Dapr/InvokeService
|||
<[#blue]-- daprd_client

InvokeService 方法相关联的 handler 方法 _Dapr_InvokeService_Handler 的实现代码是:

func _Dapr_InvokeService_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
	in := new(InvokeServiceRequest)
	if err := dec(in); err != nil {
		return nil, err
	}
	if interceptor == nil {
		return srv.(DaprServer).InvokeService(ctx, in)
	}
	info := &grpc.UnaryServerInfo{
		Server:     srv,
		FullMethod: "/dapr.proto.runtime.v1.Dapr/InvokeService",
	}
	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
		return srv.(DaprServer).InvokeService(ctx, req.(*InvokeServiceRequest))		// 这里调用的 srv 即 gRPC api 实现
	}
	return interceptor(ctx, in, info, handler)
}

最后调用到了 DaprServer 接口实现的 InvokeService 方法,也就是 gPRC API 实现。

Dapr Internal API Server(inbound)

启动 gRPC 服务器

在 dapr runtime 启动时的初始化过程中,会启动 gRPC internal server, 代码在 pkg/runtime/runtime.go 中:

func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
	err = a.startGRPCInternalServer(grpcAPI, a.runtimeConfig.InternalGRPCPort)
	if err != nil {
		log.Fatalf("failed to start internal gRPC server: %s", err)
	}
	log.Infof("internal gRPC server is running on port %v", a.runtimeConfig.InternalGRPCPort)
    ......
}

func (a *DaprRuntime) startGRPCInternalServer(api grpc.API, port int) error {
	serverConf := a.getNewServerConfig([]string{""}, port)
	server := grpc.NewInternalServer(api, serverConf, a.globalConfig.Spec.TracingSpec, a.globalConfig.Spec.MetricSpec, a.authenticator, a.proxy)
	if err := server.StartNonBlocking(); err != nil {
		return err
	}
	a.apiClosers = append(a.apiClosers, server)

	return nil
}

特殊处理:端口

grpc internal server 的端口比较特殊,可以通过命令行参数 “–dapr-internal-grpc-port” 指定,而如果没有指定,是取一个随机的可用端口,而不是取某个固定值。这一点和 dapr HTTP api server 以及 dapr gRPC api server 不同。

具体代码实现在文件 pkg/runtime/cli.go 中:

func FromFlags() (*DaprRuntime, error) {	
	var daprInternalGRPC int
	if *daprInternalGRPCPort != "" {
		daprInternalGRPC, err = strconv.Atoi(*daprInternalGRPCPort)
		if err != nil {
			return nil, errors.Wrap(err, "error parsing dapr-internal-grpc-port")
		}
	} else {
		daprInternalGRPC, err = grpc.GetFreePort()
		if err != nil {
			return nil, errors.Wrap(err, "failed to get free port for internal grpc server")
		}
	}
    ......
}

特殊处理:重用 gRPC API handler

Dapr gRPC internal API 实现时有点特殊:

  • 启动了自己的 gRPC server,也有自己的端口。
  • 但是注册的负责处理请求的 handler 却重用了 Dapr gRPC internal API

darp runtime 的初始化代码中,grpcAPI 对象是 GRPC API Server 和 GRPC Internal Server 共用的:

grpcAPI := a.getGRPCAPI()

err = a.startGRPCAPIServer(grpcAPI, a.runtimeConfig.APIGRPCPort)
err = a.startGRPCInternalServer(grpcAPI, a.runtimeConfig.InternalGRPCPort)

从设计的角度看,这样做不好:混淆了对 outbound 请求和 inbound 请求的处理,影响代码可读性。

注册 Dapr API

为了让 dapr runtime 的 gRPC 服务器能挂载 Dapr internal API,需要进行注册。

注册的代码实现在 pkg/grpc/server.go 中, StartNonBlocking() 方法在启动 grpc 服务器时,会进行服务注册:

func (s *server) StartNonBlocking() error {
		if s.kind == internalServer {
			internalv1pb.RegisterServiceInvocationServer(server, s.api)		// 注意:s.api (即 gRPC api 实现) 被传递进去
		} else if s.kind == apiServer {
			runtimev1pb.RegisterDaprServer(server, s.api)
		}
		......
}

而 RegisterServiceInvocationServer() 方法的实现代码在 pkg/proto/internals/v1/service_invocation_grpc.pb.go:

func RegisterServiceInvocationServer(s grpc.ServiceRegistrar, srv ServiceInvocationServer) {
	s.RegisterService(&ServiceInvocation_ServiceDesc, srv)  					// srv 即 gRPC api 实现
}

ServiceInvocation_ServiceDesc 定义

在文件 pkg/proto/internals/v1/service_invocation_grpc.pb.go 中有 internal Service 的 grpc 服务定义,这是 protoc 生成的 gRPC 代码。

ServiceInvocation_ServiceDesc 中有两个方法的定义,和服务调用相关的是 CallLocal 方法:

var ServiceInvocation_ServiceDesc = grpc.ServiceDesc{
	ServiceName: "dapr.proto.internals.v1.ServiceInvocation",
	HandlerType: (*ServiceInvocationServer)(nil),
	Methods: []grpc.MethodDesc{
		{
			MethodName: "CallActor",
			Handler:    _ServiceInvocation_CallActor_Handler,
		},
		{
			MethodName: "CallLocal",
			Handler:    _ServiceInvocation_CallLocal_Handler,
		},
	},
	Streams:  []grpc.StreamDesc{},
	Metadata: "dapr/proto/internals/v1/service_invocation.proto",
}

这一段是告诉 gRPC server: 如果收到访问 dapr.proto.internals.v1.ServiceInvocation 服务的 CallLocal 方法的 gRPC 请求,请把请求转给 _ServiceInvocation_CallLocal_Handler 处理。

title Dapr gRPC internal API 
hide footbox
skinparam style strictuml

participant daprd_client [
    =daprd
    ----
    client
]

-[#red]> daprd_client : gRPC (remote call)
note right: gRPC API @ ramdon port\n/dapr.proto.internals.v1.ServiceInvocation/CallLocal
|||
<[#red]-- daprd_client

CallLocal 方法相关联的 handler 方法 _ServiceInvocation_CallLocal_Handler 的实现代码是:

func _ServiceInvocation_CallLocal_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
	in := new(InternalInvokeRequest)
	if err := dec(in); err != nil {
		return nil, err
	}
	if interceptor == nil {
		return srv.(ServiceInvocationServer).CallLocal(ctx, in)
	}
	info := &grpc.UnaryServerInfo{
		Server:     srv,
		FullMethod: "/dapr.proto.internals.v1.ServiceInvocation/CallLocal",
	}
	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
        // 这里调用的 srv 即 gRPC api 实现
		return srv.(ServiceInvocationServer).CallLocal(ctx, req.(*InternalInvokeRequest))  
	}
	return interceptor(ctx, in, info, handler)
}

最后调用到了 ServiceInvocationServer 接口实现的 CallLocal 方法,也就是 gPRC API 实现。

3 - 客户端sdk发出服务调用的outbound请求

Dapr客户端sdk封装dapr api,发出服务调用的outbound请求

Java SDK 实现

在业务代码中使用 service invoke 功能的示例可参考文件 java-sdk/examples/src/main/java/io/dapr/examples/invoke/http/InvokeClient.java,代码示意如下:

DaprClient client = (new DaprClientBuilder()).build();
byte[] response = client.invokeMethod(SERVICE_APP_ID, "say", message, HttpExtension.POST, null,
            byte[].class).block();

java sdk 中 service invoke 默认使用 HTTP ,而其他方法默认使用 gRPC,在 DaprClientProxy 类中初始化了两个 daprclient:

  1. client 字段: 类型为 DaprClientGrpc,连接到 127.0.0.1:5001
  2. methodInvocationOverrideClient 字段:类型为 DaprClientHttp,连接到 127.0.0.1:3500

service invoke 方法默认走 HTTP ,使用的是 DaprClientHttp 类型 (文件为 src/main/java/io/dapr/client/DaprClientHttp.java):

  @Override
  public <T> Mono<T> invokeMethod(String appId, String methodName,......) {
    return methodInvocationOverrideClient.invokeMethod(appId, methodName, request, httpExtension, metadata, clazz);
  }
  
  public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef<T> type) {
    try {
      final String appId = invokeMethodRequest.getAppId();
      final String method = invokeMethodRequest.getMethod();
      ......
      Mono<DaprHttp.Response> response = Mono.subscriberContext().flatMap(
          context -> this.client.invokeApi(httpMethod, pathSegments,
              httpExtension.getQueryParams(), serializedRequestBody, headers, context)
      );
  }

在这里根据请求条件设置 HTTP 请求的各种参数,debug 时可以看到如下图的数据v:

最后发出 HTTP 请求的代码在 src/main/java/io/dapr/client/DaprHttp.java 中的 doInvokeApi() 方法:

  private CompletableFuture<Response> doInvokeApi(String method,
                               String[] pathSegments,
                               Map<String, List<String>> urlParameters,
                               byte[] content, Map<String, String> headers,
                               Context context) {
      ......
      Request.Builder requestBuilder = new Request.Builder()
        .url(urlBuilder.build())
        .addHeader(HEADER_DAPR_REQUEST_ID, requestId);
      
    CompletableFuture<Response> future = new CompletableFuture<>();
    this.httpClient.newCall(request).enqueue(new ResponseFutureCallback(future));
    return future;
  }

发出去给 dapr runtime 的 HTTP 请求如下图所示:

调用的是 dapr runtime 的 HTTP API。

注意: 这里调用的 gRPC 服务是 dapr.proto.runtime.v1.Dapr, 方法是 InvokeService,和 dapr runtime 中 gRPC API 对应。

title Service Invoke via HTTP
hide footbox
skinparam style strictuml
box "App-1"
participant user_code_client [
    =App-1
    ----
    client
]
participant SDK_client [
    =SDK
    ----
    client
]
end box
participant daprd_client [
    =daprd
    ----
    client
]

user_code_client -> SDK_client : invokeMethod() 
note left: appId="app-2"\nmethodName="method-1"
SDK_client -[#blue]> daprd_client : HTTP (localhost)
note right: HTTP API @ 3500\n/v1.0/invoke/app-2/method/method-1
|||
SDK_client <[#blue]-- daprd_client
user_code_client <-- SDK_client

Go sdk实现

在 go 业务代码中使用 service invoke 功能的示例可参考 https://github.com/dapr/go-sdk/blob/main/examples/service/client/main.go,代码示意如下:

client, err := dapr.NewClient()
content := &dapr.DataContent{
		ContentType: "text/plain",
		Data:        []byte("hellow"),
	}
// invoke a method named "app-2" on another dapr enabled service named "method-1"
resp, err := client.InvokeMethodWithContent(ctx, "app-2", "method-1", "post", content)

Go sdk 中定义了 Client 接口,文件为 client/client.go

// Client is the interface for Dapr client implementation.
type Client interface {
    	// InvokeMethod invokes service without raw data
	InvokeMethod(ctx context.Context, appID, methodName, verb string) (out []byte, err error)

	// InvokeMethodWithContent invokes service with content
	InvokeMethodWithContent(ctx context.Context, appID, methodName, verb string, content *DataContent) (out []byte, err error)

	// InvokeMethodWithCustomContent invokes app with custom content (struct + content type).
	InvokeMethodWithCustomContent(ctx context.Context, appID, methodName, verb string, contentType string, content interface{}) (out []byte, err error)
    ......
}

这三个方法的实现在 client/invoke.go 中,都只是实现了对 InvokeRequest 对象的组装,核心的代码实现在 invokeServiceWithRequest 方法中::

func (c *GRPCClient) invokeServiceWithRequest(ctx context.Context, req *pb.InvokeServiceRequest) (out []byte, err error) {
	resp, err := c.protoClient.InvokeService(c.withAuthToken(ctx), req)
	......
}

InvokeService() 是 protoc 生成的 grpc 代码,在 dapr/proto/runtime/v1/dapr_grpc.pb.go 中,实现如下:

func (c *daprClient) InvokeService(ctx context.Context, in *InvokeServiceRequest, opts ...grpc.CallOption) (*v1.InvokeResponse, error) {
	out := new(v1.InvokeResponse)
	err := c.cc.Invoke(ctx, "/dapr.proto.runtime.v1.Dapr/InvokeService", in, out, opts...)
	......
}

注意: 这里调用的 gRPC 服务是 dapr.proto.runtime.v1.Dapr, 方法是 InvokeService,和 dapr runtime 中 gRPC API 对应。

title Service Invoke via gRPC
hide footbox
skinparam style strictuml
box "App-1"
participant user_code_client [
    =App-1
    ----
    client
]
participant SDK_client [
    =SDK
    ----
    client
]
end box
participant daprd_client [
    =daprd
    ----
    client
]

user_code_client -> SDK_client : InvokeMethodWithContent() 
note left: appId="app-2"\nmethodName="method-1"
SDK_client -[#blue]> daprd_client : gRPC (localhost)
note right: gRPC API @ 50001\n/dapr.proto.runtime.v1.Dapr/InvokeService
|||
SDK_client <[#blue]-- daprd_client
user_code_client <-- SDK_client

其他SDK

TODO

分析总结

所有的语言 SDK 都会实现了从客户端 SDK API 调用到发出远程调用请求给 dapr runtime的功能。具体实现上会有一些差别:

  • go sdk

    全部请求走 gPRC API。

  • Java sdk

    • service invoke 默认走 HTTP API,其他请求默认走 gRPC API。
  • 其他SDK

    • 待更新

4 - Dapr Runtime接收服务调用的outbound请求

Dapr Runtime通过gRPC API 和 HTTP API接收来自应用的outbound请求

Dapr runtime 有两种方式接收来自客户端发起的服务调用的 outbound 请求:gRPC API 和 HTTP API。在接收到请求之后,dapr runtime 会将 outbound 请求转发给目标服务的 dapr runtime。

title Daprd Receive inbound Request
hide footbox
skinparam style strictuml

participant daprd_client [
    =daprd
    ----
    client
]
participant daprd_server [
    =daprd
    ----
    server
]

 -[#blue]> daprd_client : HTTP (localhost)
note right: HTTP API @ 3500 \n/v1.0/invoke/app-2/method/method-1
 -[#blue]> daprd_client : gRPC (localhost)
note right: GRPC API @ 50001\n/dapr.proto.runtime.v1.Dapr/InvokeService
|||
daprd_client -> daprd_client: name resolution
|||
daprd_client -[#red]> daprd_server : gRPC (remote call)

HTTP API

Runtime 初始化时,在注册 HTTP 服务时绑定了 handler 实现和 URL 路由:

func (a *api) constructDirectMessagingEndpoints() []Endpoint {
	return []Endpoint{
		{
			Methods:           []string{router.MethodWild},
			Route:             "invoke/{id}/method/{method:*}",
			Alias:             "{method:*}",
			Version:           apiVersionV1,
			KeepParamUnescape: true,
			Handler:           a.onDirectMessage,
		},
	}
}

当 service invoke 的 HTTP 请求进来后,就会被 fasthttp 路由到 Handler 即 HTTP API 实现的 onDirectMessage() 方法中进行处理。

onDirectMessage 的实现代码在文件 pkg/http/api.go, 示意如下:

func (a *api) onDirectMessage(reqCtx *fasthttp.RequestCtx) {
	......
  req := invokev1.NewInvokeMethodRequest(...)
	resp, err := a.directMessaging.Invoke(reqCtx, targetID, req)
	......
}

备注: HTTP API 的这个 onDirectMessage() 方法取名不对,应该效仿 gRPC API,取名为 InvokeService(). 理由是:这是暴露给外部调用的方法,取名应该表现出它对外暴露的功能,即InvokeService。而不应该暴露内部的实现是调用 directMessaging。

HTTP API 的实现也简单,同样,除了基本的请求/应答参数处理之外,就是将转发请求的事情交给了 directMessaging。

gRPC API

Runtime 初始化时,在注册 gRPC 服务时绑定了 gPRC API 实现和 InvokeService gRPC 方法。

当 service invoke 的 gRPC 请求进来后,就会进入 pkc/grpc/api.go 中的 InvokeService 方法:

func (a *api) InvokeService(ctx context.Context, in *runtimev1pb.InvokeServiceRequest) (*commonv1pb.InvokeResponse, error) {
	......
	resp, err := a.directMessaging.Invoke(ctx, in.Id, req)
	......
	return resp.Message(), respError
}

gRPC API 的实现特别简单,除了基本的请求/应答参数处理之外,就是将转发请求的事情交给了 directMessaging。

Name Resolution

TBD

5 - Dapr Runtime转发outbound请求

客户端的Dapr Runtime将outbound请求转发给远程服务器端的Dapr Runtime

Dapr runtime 之间相互通讯采用的是 gRPC 协议,定义有 Dapr gRPC internal API。比较特殊的是,采用随机空闲端口而不是默认端口。但也可以通过命令行参数 dapr-internal-grpc-port 指定。

title Daprd-Daprd Communication
hide footbox
skinparam style strictuml

participant daprd_client [
    =daprd
    ----
    client
]
participant daprd_server [
    =daprd
    ----
    server
]

 -[#blue]> daprd_client : HTTP (localhost)
 -[#blue]> daprd_client : gRPC (localhost)
|||
daprd_client -[#red]> daprd_server : gRPC (remote call)
note right: internal API @ ramdon free port\n/dapr.proto.internals.v1.ServiceInvocation/CallLocal

pkg/messaging/direct_messaging.go 中的 DirectMessaging 负责实现转发请求给远程 dapr runtime。

接口

DirectMessaging 接口定义,用来调用远程应用:

// DirectMessaging is the API interface for invoking a remote app.
type DirectMessaging interface {
	Invoke(ctx context.Context, targetAppID string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error)
}

只有一个 invoke 方法。

实现流程

流程概况

invoke 方法的实现:

func (d *directMessaging) Invoke(ctx context.Context, targetAppID string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
	app, err := d.getRemoteApp(targetAppID)

	if app.id == d.appID && app.namespace == d.namespace {
		return d.invokeLocal(ctx, req)   // 如果调用的 appid 就是自己的 appid,这个场景好奇怪。忽略这里的代码先
	}
	return d.invokeWithRetry(ctx, retry.DefaultLinearRetryCount, retry.DefaultLinearBackoffInterval, app, d.invokeRemote, req)
}

invokeRemote 方法的代码简化如下:

func (d *directMessaging) invokeRemote(ctx context.Context, appID, namespace, appAddress string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
    // 建立连接
	conn, err := d.connectionCreatorFn(context.TODO(), appAddress, appID, namespace, false, false, false)
    // 构建 gRPC stub 作为 client
	clientV1 := internalv1pb.NewServiceInvocationClient(conn)
    // 调用 gRPC 的 CallLocal 方法发出远程调用请求到另外一个 Dapr runtime
	resp, err := clientV1.CallLocal(ctx, req.Proto(), opts...)
    // 处理应答
	return invokev1.InternalInvokeResponse(resp)
}

发出 gRPC 请求给远程 dapr runtime

CallLocal() 方法的实现在 service_invocation_grpc.pb.go 中,这是 protoc 成生的 gRPC 代码:

func (c *serviceInvocationClient) CallLocal(ctx context.Context, in *InternalInvokeRequest, opts ...grpc.CallOption) (*InternalInvokeResponse, error) {
	out := new(InternalInvokeResponse)
	err := c.cc.Invoke(ctx, "/dapr.proto.internals.v1.ServiceInvocation/CallLocal", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

可以看到这个 gRPC 请求调用的是 dapr.proto.internals.v1.ServiceInvocation 服务的 CallLocal 方法。

hide footbox
skinparam style strictuml

participant daprd_client [
    =daprd
    ----
    client
]
participant daprd_server [
    =daprd
    ----
    server
]

daprd_client -[#red]> daprd_server : gRPC (remote call)
note right: internal API @ ramdon free port\n/dapr.proto.internals.v1.ServiceInvocation/CallLocal

实现细节

获取远程地址

hide footbox
skinparam style strictuml

participant directMessaging 
participant "Name resolver\n(consul/kubenetes/mdns)" as localNameReSolver

directMessaging -> localNameReSolver : ResolveID()
localNameReSolver -> localNameReSolver: loadBalance()
note right: kubernetes: dns name\ndns: dns name\nconsul: one address(random)\nmdsn: one address(round robbin)
localNameReSolver --> directMessaging
note right: return only one address in local cluster
hide footbox
skinparam style strictuml

participant directMessaging 
participant "Local Name resolver\n(consul/kubenetes/mdns)" as localNameReSolver
participant "External Name resolver\n(synchronizer)" as externalNameReSolver

directMessaging -> localNameReSolver : ResolveID()
localNameReSolver --> directMessaging
note right: return service instance list in local cluster
directMessaging -[#red]> externalNameReSolver : ResolveID()
externalNameReSolver --> directMessaging
note right: return service instance list in external clusters
directMessaging -[#red]> directMessaging: combine the instance list
directMessaging -[#red]> directMessaging: filter by cluster strategy
note right: local-first\nexternal-first\nbroadcast\nlocal-only\nexternal-onluy
directMessaging -> directMessaging: loadBalance()

6 - Dapr Runtime接收服务调用的inbound请求

Dapr Runtime通过gRPC internal API接收来自客户端Dapr Runtime的inbound请求

Dapr runtime 之间相互通讯走的是 gRPC internal API,这个 API 也只支持 gRPC 协议。

hide footbox
skinparam style strictuml

participant daprd_client [
    =daprd
    ----
    client
]
participant daprd_server [
    =daprd
    ----
    server
]

daprd_client -[#red]> daprd_server : gRPC (remote call)
note right: internal API @ ramdon free port\n/dapr.proto.internals.v1.ServiceInvocation/CallLocal
daprd_server -> daprd_server : interceptor
daprd_server -[#blue]>  : appChannel.InvokeMethod()

接收请求

Runtime 初始化时,在注册 gRPC 服务时绑定了 gPRC Internal API 实现和 CallLocal gRPC 方法。对于访问 dapr.proto.internals.v1.ServiceInvocation 服务的 CallLocal 方法的 gRPC 请求,会将请求转给 _ServiceInvocation_CallLocal_Handler 处理:

func _ServiceInvocation_CallLocal_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
	......
	if interceptor == nil {
		return srv.(ServiceInvocationServer).CallLocal(ctx, in)
	}
	info := &grpc.UnaryServerInfo{
		Server:     srv,
		FullMethod: "/dapr.proto.internals.v1.ServiceInvocation/CallLocal",
	}
	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
        // 这里调用的 srv 即 gRPC api 实现
		return srv.(ServiceInvocationServer).CallLocal(ctx, req.(*InternalInvokeRequest))  
	}
	return interceptor(ctx, in, info, handler)
}

最后进入 CallLocal() 方法进行处理。

备注:初始化的细节,请见前面章节 “Runtime初始化”

期间会有一个 interceptor 的处理流程,细节后面展开。

转发请求

当 internal invoke 的 gRPC 请求进来后,就会进入 pkc/grpc/api.go 中的 CallLocal 方法:

func (a *api) CallLocal(ctx context.Context, in *internalv1pb.InternalInvokeRequest) (*internalv1pb.InternalInvokeResponse, error) {
	// 1. 构造请求
	req, err := invokev1.InternalInvokeRequest(in)
  if a.accessControlList != nil {
		......
	}
	// 2. 通过 appChannel 向应用发出请求
	resp, err := a.appChannel.InvokeMethod(ctx, req)
    // 3. 处理应答
	return resp.Proto(), err
}

处理方式很清晰,基本上就是将请求通过 app channel 转发。Runtime 本身并没有什么额外的处理逻辑。InternalInvokeRequest() 只是简单处理一下参数:

// InternalInvokeRequest creates InvokeMethodRequest object from InternalInvokeRequest pb object.
func InternalInvokeRequest(pb *internalv1pb.InternalInvokeRequest) (*InvokeMethodRequest, error) {
	req := &InvokeMethodRequest{r: pb}
	if pb.Message == nil {
		return nil, errors.New("Message field is nil")
	}

	return req, nil
}

访问控制

期间会有一个 access control (访问控制)的逻辑:

	if a.accessControlList != nil {
		// An access control policy has been specified for the app. Apply the policies.
		operation := req.Message().Method
		var httpVerb commonv1pb.HTTPExtension_Verb
		// Get the http verb in case the application protocol is http
		if a.appProtocol == config.HTTPProtocol && req.Metadata() != nil && len(req.Metadata()) > 0 {
			httpExt := req.Message().GetHttpExtension()
			if httpExt != nil {
				httpVerb = httpExt.GetVerb()
			}
		}
		callAllowed, errMsg := acl.ApplyAccessControlPolicies(ctx, operation, httpVerb, a.appProtocol, a.accessControlList)

		if !callAllowed {
			return nil, status.Errorf(codes.PermissionDenied, errMsg)
		}
	}

细节后面展开。

7 - Dapr Runtime转发inbound请求

服务器端的Dapr Runtime将inbound请求转发给服务器端的应用

协议和端口的配置

Dapr runtime 将 inbound 请求转发给服务器端应用:

title Daprd-Daprd Communication
hide footbox
skinparam style strictuml

participant daprd_client [
    =daprd
    ----
    client
]
participant daprd_server [
    =daprd
    ----
    server
]
participant user_code_server [
    =App-2
    ----
    server
]

daprd_client -[#red]> daprd_server : Dapr gRPC internal API (remote call)
daprd_server -[#blue]> user_code_server : Dapr HTTP channel API (localhost)
note right: HTTP endpoint @ 3000\nVERB http://localhost:3000/method?query1=value1
daprd_server -[#blue]> user_code_server : Dapr gRPC channel API (localhost)
note right: gRPC endpoint @ 3000\n/dapr.proto.runtime.v1.AppCallback/OnInvoke
  • app channel 的通讯协议可以是 HTTP 或者 gRPC 协议,可以通过命令行参数 app-port 指定,默认是 HTTP
  • 应用接收请求的端口可以通过命令行参数 app-protocol 指定,没有默认值。
  • 为了控制对应用造成的压力,还引入了最大并发度的概念,可以通过命令行参数 app-max-concurrency 指定。

请求发送的流程

前面分析过,当 internal invoke 的 gRPC 请求进来后,就会进入 pkc/grpc/api.go 中的 CallLocal 方法:

func (a *api) CallLocal(ctx context.Context, in *internalv1pb.InternalInvokeRequest) (*internalv1pb.InternalInvokeResponse, error) {
	......
	resp, err := a.appChannel.InvokeMethod(ctx, req)
  ......
}

然后通过 appChannel 发送请求。

app channel 的建立

app channel 的建立是在 runtime 初始化时,在 pkg/runtime/runtime.go 的 initRuntime() 方法中:

func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
    ......
    a.blockUntilAppIsReady()

	err = a.createAppChannel()
	a.daprHTTPAPI.SetAppChannel(a.appChannel)
	grpcAPI.SetAppChannel(a.appChannel)
    ......
}

createAppChannel() 的实现,目前只支持 HTTP 和 gRPC:

func (a *DaprRuntime) createAppChannel() error {
    // 为了建立 app channel,必须配置有 app port
	if a.runtimeConfig.ApplicationPort > 0 {
		var channelCreatorFn func(port, maxConcurrency int, spec config.TracingSpec, sslEnabled bool, maxRequestBodySize int, readBufferSize int) (channel.AppChannel, error)

		switch a.runtimeConfig.ApplicationProtocol {
		case GRPCProtocol:
			channelCreatorFn = a.grpc.CreateLocalChannel
		case HTTPProtocol:
			channelCreatorFn = http_channel.CreateLocalChannel
		default:
      // 只支持 HTTP 和 gRPC
			return errors.Errorf("cannot create app channel for protocol %s", string(a.runtimeConfig.ApplicationProtocol))
		}

		ch, err := channelCreatorFn(a.runtimeConfig.ApplicationPort, a.runtimeConfig.MaxConcurrency, a.globalConfig.Spec.TracingSpec, a.runtimeConfig.AppSSL, a.runtimeConfig.MaxRequestBodySize, a.runtimeConfig.ReadBufferSize)
		a.appChannel = ch
	} else {
		log.Warn("app channel is not initialized. did you make sure to configure an app-port?")
	}

	return nil
}

app channel 的配置参数

和 app channel 密切相关的三个配置项,可以从命令行参数中获取:

func FromFlags() (*DaprRuntime, error) {
    ......
    appPort := flag.String("app-port", "", "The port the application is listening on")
	appProtocol := flag.String("app-protocol", string(HTTPProtocol), "Protocol for the application: grpc or http")	
	appMaxConcurrency := flag.Int("app-max-concurrency", -1, "Controls the concurrency level when forwarding requests to user code")

TracingSpec / AppSSL / MaxRequestBodySize / ReadBufferSize 后面细说,先不展开。

HTTP 通道的实现

HTTP Channel 的实现在文件 pkg/channel/http/http_channel.go 中,其 InvokeMethod()方法:

func (h *Channel) InvokeMethod(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
  ......
	switch req.APIVersion() {
	case internalv1pb.APIVersion_V1:
		rsp, err = h.invokeMethodV1(ctx, req)
  ......
	return rsp, err
}

暂时只有 invokeMethodV1 版本:

func (h *Channel) invokeMethodV1(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
  // 1. 构建HTTP请求
	channelReq := h.constructRequest(ctx, req)
  // 2. 发送请求到应用
	err := h.client.DoTimeout(channelReq, resp, channel.DefaultChannelRequestTimeout)
  // 3. 处理返回的应答
	rsp := h.parseChannelResponse(req, resp, err)
	return rsp, nil
}

这是将收到的请求内容,转成HTTP协议的标准格式,然后通过 fasthttp 发给用户代码。其中转为标准http请求的代码在方法 constructRequest() 中:

func (h *Channel) constructRequest(ctx context.Context, req *invokev1.InvokeMethodRequest) *fasthttp.Request {
	var channelReq = fasthttp.AcquireRequest()

	// Construct app channel URI: VERB http://localhost:3000/method?query1=value1
	uri := fmt.Sprintf("%s/%s", h.baseAddress, req.Message().GetMethod())
	channelReq.SetRequestURI(uri)
	channelReq.URI().SetQueryString(req.EncodeHTTPQueryString())
	channelReq.Header.SetMethod(req.Message().HttpExtension.Verb.String())

	// Recover headers
	invokev1.InternalMetadataToHTTPHeader(ctx, req.Metadata(), channelReq.Header.Set)

  ......
}

这样在服务器端的用户代码中,就可以用不引入 dapr sdk,只需要提供标准 http endpoint 即可。

title Daprd-Daprd Communication
hide footbox
skinparam style strictuml

participant daprd_server [
    =daprd
    ----
    server
]
participant user_code_server [
    =App-2
    ----
    server
]

daprd_server -[#blue]> user_code_server : HTTP (localhost)
note right: HTTP endpoint @ 3000\nVERB http://localhost:3000/method?query1=value1

gRPC 通道的实现

pkg/grpc/grpc.go 中的 CreateLocalChannel() 方法:

// CreateLocalChannel creates a new gRPC AppChannel.
func (g *Manager) CreateLocalChannel(port, maxConcurrency int, spec config.TracingSpec, sslEnabled bool, maxRequestBodySize int, readBufferSize int) (channel.AppChannel, error) {
  // IP地址写死了 127.0.0.1
	conn, err := g.GetGRPCConnection(context.TODO(), fmt.Sprintf("127.0.0.1:%v", port), "", "", true, false, sslEnabled)
  ......
	g.AppClient = conn
	ch := grpc_channel.CreateLocalChannel(port, maxConcurrency, conn, spec, maxRequestBodySize, readBufferSize)
	return ch, nil
}

实现代码在 pkg/channel/grpc/grpc_channel.go 的 InvokeMethod()方法中:

func (g *Channel) InvokeMethod(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
  ......
	switch req.APIVersion() {
	case internalv1pb.APIVersion_V1:
		rsp, err = g.invokeMethodV1(ctx, req)
  ......
	return rsp, err
}

暂时只有 invokeMethodV1 版本:

func (g *Channel) invokeMethodV1(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
  // 1. 创建 AppCallback 的 grpc client
	clientV1 := runtimev1pb.NewAppCallbackClient(g.client)
  // 2. 调用 AppCallback 的 OnInvoke() 方法
	resp, err := clientV1.OnInvoke(ctx, req.Message(), grpc.Header(&header), grpc.Trailer(&trailer))
  // 3. 处理返回的应答
	return rsp.WithMessage(resp), nil
}

gRPC channel 是通过 gRPC 协议调用服务器端应用上的 gRPC 服务完成,具体是 AppCallback 的 OnInvoke() 方法。

title Dapr gRPC Channel
hide footbox
skinparam style strictuml

participant daprd_server [
    =daprd
    ----
    server
]
participant user_code_server [
    =App-2
    ----
    server
]


daprd_server -[#blue]> user_code_server : gRPC (localhost)
note right: gRPC endpoint @ 3000\n/dapr.proto.runtime.v1.AppCallback/OnInvoke

也就是说:如果要支持 gRPC channel,则要求服务器端应用必须实现 AppCallback gRPC 服务器,这一点和 HTTP 不同,对服务器端应用是有侵入的。

8 - 服务器端App接收inbound请求

服务器端App接收标准HTTP请求,或者实现AppCallbackServer以接受gRPC请求

pkg/proto/runtime/v1/appcallback.pb.go 中的 OnInvoke 方法:

// AppCallbackServer is the server API for AppCallback service.
type AppCallbackServer interface {
	// Invokes service method with InvokeRequest.
	OnInvoke(context.Context, *v1.InvokeRequest) (*v1.InvokeResponse, error)
}

为了接收来自daprd转发的来自客户端的service invoke 请求,服务器端的应用也需要做一些处理。

接收HTTP请求

对于通过 HTTP channel 过来的标准HTTP请求,服务器端的应用只需要提供标准的HTTP端口即可,无须引入dapr SDK。

title Daprd-Daprd Communication
hide footbox
skinparam style strictuml

participant daprd_server [
    =daprd
    ----
    server
]
participant user_code_server [
    =App-2
    ----
    server
]

daprd_server -[#blue]> user_code_server : HTTP (localhost)
note right: HTTP endpoint @ 3000\nVERB http://localhost:3000/method?query1=value1

接收gRPC请求

对于通过 gRPC channel 过来的 gRPC 请求,服务器端的应用则需要实现 gRPC AppCallback 服务的 OnInvoke() 方法:

title Dapr gRPC Channel
hide footbox
skinparam style strictuml

participant daprd_server [
    =daprd
    ----
    server
]
participant user_code_server [
    =App-2
    ----
    server
]


daprd_server -[#blue]> user_code_server : gRPC (localhost)
note right: gRPC endpoint @ 3000\n/dapr.proto.runtime.v1.AppCallback/OnInvoke

AppCallbackServer 的 proto 定义在 dapr 仓库下的文件dapr/proto/runtime/v1/appcallback.proto中:

service AppCallback {
  // Invokes service method with InvokeRequest.
  rpc OnInvoke (common.v1.InvokeRequest) returns (common.v1.InvokeResponse) {}
  ......
}

而 AppCallbackServer 的具体实现则分布在各个不同语言的 sdk 里面。

go-sdk实现

实现在 go-sdk 的 service/grpc/invoke.go 文件的 OnInvoke方法,主要流程为:

func (s *Server) OnInvoke(ctx context.Context, in *cpb.InvokeRequest) (*cpb.InvokeResponse, error) {
	if fn, ok := s.invokeHandlers[in.Method]; ok {
		e := &cc.InvocationEvent{}
		ct, er := fn(ctx, e)
		return &cpb.InvokeResponse{......}, nil
	}
	return nil, fmt.Errorf("method not implemented: %s", in.Method)
}

其中 s.invokeHandlers 中保存处理请求的方法(由参数method作为key)。AddServiceInvocationHandler() 用于增加方法名和 handler 的映射 :

// Server is the gRPC service implementation for Dapr.
type Server struct {
	invokeHandlers  map[string]common.ServiceInvocationHandler
}
type  ServiceInvocationHandler func(ctx context.Context, in *InvocationEvent) (out *Content, err error)

func (s *Server) AddServiceInvocationHandler(method string, fn func(ctx context.Context, in *cc.InvocationEvent) (our *cc.Content, err error)) error {
	s.invokeHandlers[method] = fn
	return nil
}

这意味着,在服务器端的应用中,并不需要为这些方法提供 gRPC 相关的 proto 定义,也不需要直接通过 gRPC 把这些方法暴露出去,只需要实现 AppCallback 的 OnInvode() 方法,然后把需要对外暴露的方法注册即可,OnInvode() 方法相当于一个简单的 API 网管。

title Dapr AppCallback OnInvoke gRPC impl
hide footbox
skinparam style strictuml

participant AppCallback [
    =AppCallback
    ----
    OnInvoke()
]

participant invokeHandlers
participant handler

-[#blue]> AppCallback : gRPC OnInvode()
note right: gRPC endpoint @ 3000\n/dapr.proto.runtime.v1.AppCallback/OnInvoke
AppCallback -> invokeHandlers: find handler by method name
invokeHandlers --> AppCallback: registered handler
AppCallback -> handler: call handler
note right: type  ServiceInvocationHandler \nfunc(ctx context.Context, in *InvocationEvent) \n(out *Content, err error)
handler --> AppCallback
<-[#blue]- AppCallback

用户代码实现示例

用户在开发支持 dapr 的 go 服务器端应用时,需要在应用中启动 dapr service server,然后添加各种 handler,包括 ServiceInvocationHandler,如下面这个例子(go-sdk下的 example/serving/grpc/main.go ):

func main() {
	// create a Dapr service server
	s, err := daprd.NewService(":50001")

	// add a service to service invocation handler
	if err := s.AddServiceInvocationHandler("echo", echoHandler); err != nil {
		log.Fatalf("error adding invocation handler: %v", err)
	}

	// start the server
	if err := s.Start(); err != nil {
		log.Fatalf("server error: %v", err)
	}
}

java-sdk实现

java SDK 中没有找到服务器端实现的代码?待确定。