1 - 服务调用的主流程

服务调用的主流程分析

1.1 - 流程概述

Dapr服务调用的流程和API概述

API 和端口

Dapr runtime 对外提供两个 API,分别是 Dapr HTTP API 和 Dapr gRPC API。另外两个 dapr runtime 之间的通讯 (Dapr internal API) 固定用 gRPC 协议。

两个 Dapr API 对外暴露的端口,默认是:

  • 3500: HTTP 端口,可以通过命令行参数 dapr-http-port 设置
  • 50001: gRPC 端口,可以通过命令行参数 dapr-grpc-port 设置

Dapr internal API 是内部端口,比较特殊,没有固定的默认值,而是取任意随机可用端口。也可以通过命令行参数 dapr-internal-grpc-port 设置。

为了向服务器端的应用发送请求,dapr 需要获知应用在哪个端口监听并处理请求,这个信息通过命令行参数 app-port 设置。Dapr 的示例中一般喜欢用 3000 端口。

调用流程

HTTP 方式

title Service Invoke via HTTP
hide footbox
skinparam style strictuml
box "App-1"
participant user_code_client [
    =App-1
    ----
    client
]
participant SDK_client [
    =SDK
    ----
    client
]
end box
participant daprd_client [
    =daprd
    ----
    client
]
participant daprd_server [
    =daprd
    ----
    server
]

box "App-2"
participant user_code_server [
    =App-2
    ----
    server
]
end box

user_code_client -> SDK_client : Invoke\nService() 
note left: appId="app-2"\nmethodName="method-1"
SDK_client -[#blue]> daprd_client : HTTP (localhost)
note right: HTTP API @ 3500
|||
daprd_client -[#red]> daprd_server : gRPC (remote call)
note right: internal API @ ramdon free port
|||
daprd_server -[#blue]> user_code_server :  http (localhost)
note right: HTTP endpoint "method-1" @ 3000

daprd_server <[#blue]-- user_code_server
daprd_client <[#red]-- daprd_server
SDK_client <[#blue]-- daprd_client
user_code_client <-- SDK_client

gRPC 方式

title Service Invoke via gRPC
hide footbox
skinparam style strictuml
box "App-1"
participant user_code_client [
    =App-1
    ----
    client
]
participant SDK_client [
    =SDK
    ----
    client
]
end box
participant daprd_client [
    =daprd
    ----
    client
]
participant daprd_server [
    =daprd
    ----
    server
]

box "App-2"
participant SDK_server [
    =SDK
    ----
    server
]
participant user_code_server [
    =App-2
    ----
    server
]
end box
user_code_server -> SDK_server: AddServiceInvocationHandler("method-1")
SDK_server -> SDK_server: save handler in invokeHandlers["method-1"]
SDK_server --> user_code_server
user_code_client -> SDK_client : Invoke\nService() 
note left: appId="app-2"\nmethodName="method-1"
SDK_client -[#blue]> daprd_client : gRPC (localhost)
note right: gRPC API @ 50001\n/dapr.proto.runtime.v1.Dapr/InvokeService
|||
daprd_client -[#red]> daprd_server : gRPC (remote call)
note right: internal API @ random free port\n/dapr.proto.internals.v1.ServiceInvocation/CallLocal
|||
daprd_server -[#blue]> SDK_server : gRPC (localhost)
note right: 50001\n/dapr.proto.runtime.v1.AppCallback/OnInvoke
SDK_server -> SDK_server: get handler by invokeHandlers["method-1"]
SDK_server -> user_code_server : invoke handler of "method-1"

SDK_server <-- user_code_server
daprd_server <[#blue]-- SDK_server
daprd_client <[#red]-- daprd_server
SDK_client <[#blue]-- daprd_client
user_code_client <-- SDK_client

gRPC proxying 方式

title Service Invoke via gRPC proxying
hide footbox
skinparam style strictuml
box "App-1"
participant user_code_client [
    =App-1
    ----
    client
]
participant SDK_client [
    =SDK
    ----
    client
]
end box
participant daprd_client [
    =daprd
    ----
    client
]
participant daprd_server [
    =daprd
    ----
    server
]

box "App-2"
participant SDK_server [
    =gRPC
    ----
    server
]
participant user_code_server [
    =App-2
    ----
    server
]
end box
user_code_server -> SDK_server
SDK_server --> user_code_server
user_code_client -[#blue]> daprd_client : gRPC (localhost)
note right: gRPC\n/user.services.ServiceName/Method-1
|||
daprd_client -[#red]> daprd_server : gRPC proxy (remote call)
note right: gRPC\n/user.services.ServiceName/Method-1
|||
daprd_server -[#blue]> SDK_server : gRPC (localhost)
note right: gRPC\n/user.services.ServiceName/Method-1
SDK_server -> user_code_server : 

SDK_server <-- user_code_server
daprd_server <[#blue]-- SDK_server
daprd_client <[#red]-- daprd_server
SDK_client <[#blue]-- daprd_client
user_code_client <-- SDK_client

1.2 - 服务调用相关的Runtime初始化

Dapr Runtime中和服务调用相关的初始化流程

在 dapr runtime 启动进行初始化时,需要开启 API 端口并挂载相应的 handler 来接收并处理服务调用的 outbound 请求。另外为了接收来自其他 dapr runtime 的 inbound 请求,还要启动 dapr internal server。

Dapr HTTP API Server(outbound)

在 dapr runtime 中启动 HTTP server

dapr runtime 的 HTTP server 用的是 fasthttp。

在 dapr runtime 启动时的初始化过程中,会启动 HTTP server, 代码在 pkg/runtime/runtime.go 中:

func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
  ......
  // Start HTTP Server
	err = a.startHTTPServer(a.runtimeConfig.HTTPPort, a.runtimeConfig.PublicPort, a.runtimeConfig.ProfilePort, a.runtimeConfig.AllowedOrigins, pipeline)
	if err != nil {
		log.Fatalf("failed to start HTTP server: %s", err)
	}
  ......
}

func (a *DaprRuntime) startHTTPServer(......) error {
	a.daprHTTPAPI = http.NewAPI(......)

	server := http.NewServer(a.daprHTTPAPI, ......)
  if err := server.StartNonBlocking(); err != nil {		// StartNonBlocking 启动 fasthttp server
		return err
	}
}

StartNonBlocking() 的实现代码在 pkg/http/server.go 中:

// StartNonBlocking starts a new server in a goroutine.
func (s *server) StartNonBlocking() error {
  	......
  	for _, apiListenAddress := range s.config.APIListenAddresses {
			l, err := net.Listen("tcp", fmt.Sprintf("%s:%v", apiListenAddress, s.config.Port))
      listeners = append(listeners, l)
		}
  
  	for _, listener := range listeners {
		// customServer is created in a loop because each instance
		// has a handle on the underlying listener.
		customServer := &fasthttp.Server{
			Handler:            handler,
			MaxRequestBodySize: s.config.MaxRequestBodySize * 1024 * 1024,
			ReadBufferSize:     s.config.ReadBufferSize * 1024,
			StreamRequestBody:  s.config.StreamRequestBody,
		}
		s.servers = append(s.servers, customServer)

		go func(l net.Listener) {
			if err := customServer.Serve(l); err != nil {
				log.Fatal(err)
			}
		}(listener)
	}
}

挂载 DirectMessaging 的 HTTP 端点

在 HTTP API 的初始化过程中,会在 fast http server 上挂载 DirectMessaging 的 HTTP 端点,代码在 pkg/http/api.go 中:

func NewAPI(
  appID string,
	appChannel channel.AppChannel,
	directMessaging messaging.DirectMessaging,
  ......
  	shutdown func()) API {
  
  	api := &api{
		appChannel:               appChannel,
		directMessaging:          directMessaging,
		......
	}
  
  	// 附加 DirectMessaging 的 HTTP 端点
  	api.endpoints = append(api.endpoints, api.constructDirectMessagingEndpoints()...)
}

DirectMessaging 的 HTTP 端点的具体信息在 constructDirectMessagingEndpoints() 方法中:

func (a *api) constructDirectMessagingEndpoints() []Endpoint {
	return []Endpoint{
		{
			Methods:           []string{router.MethodWild},
			Route:             "invoke/{id}/method/{method:*}",
			Alias:             "{method:*}",
			Version:           apiVersionV1,
			KeepParamUnescape: true,
			Handler:           a.onDirectMessage,
		},
	}
}

注意这里的 Route 路径 “invoke/{id}/method/{method:*}", dapr sdk 就是就通过这样的 url 来发起 HTTP 请求。

title Dapr HTTP API 
hide footbox
skinparam style strictuml

participant daprd_client [
    =daprd
    ----
    client
]

-[#blue]> daprd_client : HTTP (localhost)
note right: HTTP API @ 3500\n/v1.0/invoke/{id}/method/{method}
|||
<[#blue]-- daprd_client

Dapr gRPC API Server(outbound)

启动 gRPC 服务器

在 dapr runtime 启动时的初始化过程中,会启动 gRPC server, 代码在 pkg/runtime/runtime.go 中:

func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
    // Create and start internal and external gRPC servers
	grpcAPI := a.getGRPCAPI()
    
	err = a.startGRPCAPIServer(grpcAPI, a.runtimeConfig.APIGRPCPort)
    ......
}

func (a *DaprRuntime) startGRPCAPIServer(api grpc.API, port int) error {
	serverConf := a.getNewServerConfig(a.runtimeConfig.APIListenAddresses, port)
	server := grpc.NewAPIServer(api, serverConf, a.globalConfig.Spec.TracingSpec, a.globalConfig.Spec.MetricSpec, a.globalConfig.Spec.APISpec, a.proxy)
    if err := server.StartNonBlocking(); err != nil {
		return err
	}
	......
}

// NewAPIServer returns a new user facing gRPC API server.
func NewAPIServer(api API, config ServerConfig, ......) Server {
	return &server{
		api:         api,
		config:      config,
		kind:        apiServer, // const apiServer = "apiServer"
		......
	}
}

注册 Dapr API

为了让 dapr runtime 的 gRPC 服务器能挂载 Dapr API,需要进行注册上去。

注册的代码实现在 pkg/grpc/server.go 中, StartNonBlocking() 方法在启动 grpc 服务器时,会进行服务注册:

func (s *server) StartNonBlocking() error {
		if s.kind == internalServer {
			internalv1pb.RegisterServiceInvocationServer(server, s.api)
		} else if s.kind == apiServer {
            runtimev1pb.RegisterDaprServer(server, s.api)		// 注意:s.api (即 gRPC api 实现) 被传递进去
		}
		......
}

而 RegisterDaprServer() 方法的实现代码在 pkg/proto/runtime/v1/dapr_grpc.pb.go:

func RegisterDaprServer(s grpc.ServiceRegistrar, srv DaprServer) {
	s.RegisterService(&Dapr_ServiceDesc, srv)					// srv 即 gRPC api 实现
}

Dapr_ServiceDesc 定义

在文件 pkg/proto/runtime/v1/dapr_grpc.pb.go 中有 Dapr Service 的 grpc 服务定义,这是 protoc 生成的 gRPC 代码。

Dapr_ServiceDesc 中有 Dapr Service 各个方法的定义,和服务调用相关的是 InvokeService 方法:

var Dapr_ServiceDesc = grpc.ServiceDesc{
	ServiceName: "dapr.proto.runtime.v1.Dapr",
	HandlerType: (*DaprServer)(nil),
	Methods: []grpc.MethodDesc{
		{
			MethodName: "InvokeService",				# 注册方法名
			Handler:    _Dapr_InvokeService_Handler,	# 关联实现的 Handler
		},
        ......
        },
	},
	Metadata: "dapr/proto/runtime/v1/dapr.proto",
}

这一段是告诉 gRPC server: 如果收到访问 dapr.proto.runtime.v1.Dapr 服务的 InvokeService 方法的 gRPC 请求,请把请求转给 _Dapr_InvokeService_Handler 处理。

title Dapr gRPC API 
hide footbox
skinparam style strictuml

participant daprd_client [
    =daprd
    ----
    client
]

-[#blue]> daprd_client : gRPC (localhost)
note right: gRPC API @ 50001\n/dapr.proto.runtime.v1.Dapr/InvokeService
|||
<[#blue]-- daprd_client

InvokeService 方法相关联的 handler 方法 _Dapr_InvokeService_Handler 的实现代码是:

func _Dapr_InvokeService_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
	in := new(InvokeServiceRequest)
	if err := dec(in); err != nil {
		return nil, err
	}
	if interceptor == nil {
		return srv.(DaprServer).InvokeService(ctx, in)
	}
	info := &grpc.UnaryServerInfo{
		Server:     srv,
		FullMethod: "/dapr.proto.runtime.v1.Dapr/InvokeService",
	}
	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
		return srv.(DaprServer).InvokeService(ctx, req.(*InvokeServiceRequest))		// 这里调用的 srv 即 gRPC api 实现
	}
	return interceptor(ctx, in, info, handler)
}

最后调用到了 DaprServer 接口实现的 InvokeService 方法,也就是 gPRC API 实现。

Dapr Internal API Server(inbound)

启动 gRPC 服务器

在 dapr runtime 启动时的初始化过程中,会启动 gRPC internal server, 代码在 pkg/runtime/runtime.go 中:

func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
	err = a.startGRPCInternalServer(grpcAPI, a.runtimeConfig.InternalGRPCPort)
	if err != nil {
		log.Fatalf("failed to start internal gRPC server: %s", err)
	}
	log.Infof("internal gRPC server is running on port %v", a.runtimeConfig.InternalGRPCPort)
    ......
}

func (a *DaprRuntime) startGRPCInternalServer(api grpc.API, port int) error {
	serverConf := a.getNewServerConfig([]string{""}, port)
	server := grpc.NewInternalServer(api, serverConf, a.globalConfig.Spec.TracingSpec, a.globalConfig.Spec.MetricSpec, a.authenticator, a.proxy)
	if err := server.StartNonBlocking(); err != nil {
		return err
	}
	a.apiClosers = append(a.apiClosers, server)

	return nil
}

特殊处理:端口

grpc internal server 的端口比较特殊,可以通过命令行参数 “–dapr-internal-grpc-port” 指定,而如果没有指定,是取一个随机的可用端口,而不是取某个固定值。这一点和 dapr HTTP api server 以及 dapr gRPC api server 不同。

具体代码实现在文件 pkg/runtime/cli.go 中:

func FromFlags() (*DaprRuntime, error) {	
	var daprInternalGRPC int
	if *daprInternalGRPCPort != "" {
		daprInternalGRPC, err = strconv.Atoi(*daprInternalGRPCPort)
		if err != nil {
			return nil, errors.Wrap(err, "error parsing dapr-internal-grpc-port")
		}
	} else {
		daprInternalGRPC, err = grpc.GetFreePort()
		if err != nil {
			return nil, errors.Wrap(err, "failed to get free port for internal grpc server")
		}
	}
    ......
}

特殊处理:重用 gRPC API handler

Dapr gRPC internal API 实现时有点特殊:

  • 启动了自己的 gRPC server,也有自己的端口。
  • 但是注册的负责处理请求的 handler 却重用了 Dapr gRPC internal API

darp runtime 的初始化代码中,grpcAPI 对象是 GRPC API Server 和 GRPC Internal Server 共用的:

grpcAPI := a.getGRPCAPI()

err = a.startGRPCAPIServer(grpcAPI, a.runtimeConfig.APIGRPCPort)
err = a.startGRPCInternalServer(grpcAPI, a.runtimeConfig.InternalGRPCPort)

从设计的角度看,这样做不好:混淆了对 outbound 请求和 inbound 请求的处理,影响代码可读性。

注册 Dapr API

为了让 dapr runtime 的 gRPC 服务器能挂载 Dapr internal API,需要进行注册。

注册的代码实现在 pkg/grpc/server.go 中, StartNonBlocking() 方法在启动 grpc 服务器时,会进行服务注册:

func (s *server) StartNonBlocking() error {
		if s.kind == internalServer {
			internalv1pb.RegisterServiceInvocationServer(server, s.api)		// 注意:s.api (即 gRPC api 实现) 被传递进去
		} else if s.kind == apiServer {
			runtimev1pb.RegisterDaprServer(server, s.api)
		}
		......
}

而 RegisterServiceInvocationServer() 方法的实现代码在 pkg/proto/internals/v1/service_invocation_grpc.pb.go:

func RegisterServiceInvocationServer(s grpc.ServiceRegistrar, srv ServiceInvocationServer) {
	s.RegisterService(&ServiceInvocation_ServiceDesc, srv)  					// srv 即 gRPC api 实现
}

ServiceInvocation_ServiceDesc 定义

在文件 pkg/proto/internals/v1/service_invocation_grpc.pb.go 中有 internal Service 的 grpc 服务定义,这是 protoc 生成的 gRPC 代码。

ServiceInvocation_ServiceDesc 中有两个方法的定义,和服务调用相关的是 CallLocal 方法:

var ServiceInvocation_ServiceDesc = grpc.ServiceDesc{
	ServiceName: "dapr.proto.internals.v1.ServiceInvocation",
	HandlerType: (*ServiceInvocationServer)(nil),
	Methods: []grpc.MethodDesc{
		{
			MethodName: "CallActor",
			Handler:    _ServiceInvocation_CallActor_Handler,
		},
		{
			MethodName: "CallLocal",
			Handler:    _ServiceInvocation_CallLocal_Handler,
		},
	},
	Streams:  []grpc.StreamDesc{},
	Metadata: "dapr/proto/internals/v1/service_invocation.proto",
}

这一段是告诉 gRPC server: 如果收到访问 dapr.proto.internals.v1.ServiceInvocation 服务的 CallLocal 方法的 gRPC 请求,请把请求转给 _ServiceInvocation_CallLocal_Handler 处理。

title Dapr gRPC internal API 
hide footbox
skinparam style strictuml

participant daprd_client [
    =daprd
    ----
    client
]

-[#red]> daprd_client : gRPC (remote call)
note right: gRPC API @ ramdon port\n/dapr.proto.internals.v1.ServiceInvocation/CallLocal
|||
<[#red]-- daprd_client

CallLocal 方法相关联的 handler 方法 _ServiceInvocation_CallLocal_Handler 的实现代码是:

func _ServiceInvocation_CallLocal_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
	in := new(InternalInvokeRequest)
	if err := dec(in); err != nil {
		return nil, err
	}
	if interceptor == nil {
		return srv.(ServiceInvocationServer).CallLocal(ctx, in)
	}
	info := &grpc.UnaryServerInfo{
		Server:     srv,
		FullMethod: "/dapr.proto.internals.v1.ServiceInvocation/CallLocal",
	}
	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
        // 这里调用的 srv 即 gRPC api 实现
		return srv.(ServiceInvocationServer).CallLocal(ctx, req.(*InternalInvokeRequest))  
	}
	return interceptor(ctx, in, info, handler)
}

最后调用到了 ServiceInvocationServer 接口实现的 CallLocal 方法,也就是 gPRC API 实现。

1.3 - 客户端sdk发出服务调用的outbound请求

Dapr客户端sdk封装dapr api,发出服务调用的outbound请求

Java SDK 实现

在业务代码中使用 service invoke 功能的示例可参考文件 java-sdk/examples/src/main/java/io/dapr/examples/invoke/http/InvokeClient.java,代码示意如下:

DaprClient client = (new DaprClientBuilder()).build();
byte[] response = client.invokeMethod(SERVICE_APP_ID, "say", message, HttpExtension.POST, null,
            byte[].class).block();

java sdk 中 service invoke 默认使用 HTTP ,而其他方法默认使用 gRPC,在 DaprClientProxy 类中初始化了两个 daprclient:

  1. client 字段: 类型为 DaprClientGrpc,连接到 127.0.0.1:5001
  2. methodInvocationOverrideClient 字段:类型为 DaprClientHttp,连接到 127.0.0.1:3500

service invoke 方法默认走 HTTP ,使用的是 DaprClientHttp 类型 (文件为 src/main/java/io/dapr/client/DaprClientHttp.java):

  @Override
  public <T> Mono<T> invokeMethod(String appId, String methodName,......) {
    return methodInvocationOverrideClient.invokeMethod(appId, methodName, request, httpExtension, metadata, clazz);
  }
  
  public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef<T> type) {
    try {
      final String appId = invokeMethodRequest.getAppId();
      final String method = invokeMethodRequest.getMethod();
      ......
      Mono<DaprHttp.Response> response = Mono.subscriberContext().flatMap(
          context -> this.client.invokeApi(httpMethod, pathSegments,
              httpExtension.getQueryParams(), serializedRequestBody, headers, context)
      );
  }

在这里根据请求条件设置 HTTP 请求的各种参数,debug 时可以看到如下图的数据v:

最后发出 HTTP 请求的代码在 src/main/java/io/dapr/client/DaprHttp.java 中的 doInvokeApi() 方法:

  private CompletableFuture<Response> doInvokeApi(String method,
                               String[] pathSegments,
                               Map<String, List<String>> urlParameters,
                               byte[] content, Map<String, String> headers,
                               Context context) {
      ......
      Request.Builder requestBuilder = new Request.Builder()
        .url(urlBuilder.build())
        .addHeader(HEADER_DAPR_REQUEST_ID, requestId);
      
    CompletableFuture<Response> future = new CompletableFuture<>();
    this.httpClient.newCall(request).enqueue(new ResponseFutureCallback(future));
    return future;
  }

发出去给 dapr runtime 的 HTTP 请求如下图所示:

调用的是 dapr runtime 的 HTTP API。

注意: 这里调用的 gRPC 服务是 dapr.proto.runtime.v1.Dapr, 方法是 InvokeService,和 dapr runtime 中 gRPC API 对应。

title Service Invoke via HTTP
hide footbox
skinparam style strictuml
box "App-1"
participant user_code_client [
    =App-1
    ----
    client
]
participant SDK_client [
    =SDK
    ----
    client
]
end box
participant daprd_client [
    =daprd
    ----
    client
]

user_code_client -> SDK_client : invokeMethod() 
note left: appId="app-2"\nmethodName="method-1"
SDK_client -[#blue]> daprd_client : HTTP (localhost)
note right: HTTP API @ 3500\n/v1.0/invoke/app-2/method/method-1
|||
SDK_client <[#blue]-- daprd_client
user_code_client <-- SDK_client

Go sdk实现

在 go 业务代码中使用 service invoke 功能的示例可参考 https://github.com/dapr/go-sdk/blob/main/examples/service/client/main.go,代码示意如下:

client, err := dapr.NewClient()
content := &dapr.DataContent{
		ContentType: "text/plain",
		Data:        []byte("hellow"),
	}
// invoke a method named "app-2" on another dapr enabled service named "method-1"
resp, err := client.InvokeMethodWithContent(ctx, "app-2", "method-1", "post", content)

Go sdk 中定义了 Client 接口,文件为 client/client.go

// Client is the interface for Dapr client implementation.
type Client interface {
    	// InvokeMethod invokes service without raw data
	InvokeMethod(ctx context.Context, appID, methodName, verb string) (out []byte, err error)

	// InvokeMethodWithContent invokes service with content
	InvokeMethodWithContent(ctx context.Context, appID, methodName, verb string, content *DataContent) (out []byte, err error)

	// InvokeMethodWithCustomContent invokes app with custom content (struct + content type).
	InvokeMethodWithCustomContent(ctx context.Context, appID, methodName, verb string, contentType string, content interface{}) (out []byte, err error)
    ......
}

这三个方法的实现在 client/invoke.go 中,都只是实现了对 InvokeRequest 对象的组装,核心的代码实现在 invokeServiceWithRequest 方法中::

func (c *GRPCClient) invokeServiceWithRequest(ctx context.Context, req *pb.InvokeServiceRequest) (out []byte, err error) {
	resp, err := c.protoClient.InvokeService(c.withAuthToken(ctx), req)
	......
}

InvokeService() 是 protoc 生成的 grpc 代码,在 dapr/proto/runtime/v1/dapr_grpc.pb.go 中,实现如下:

func (c *daprClient) InvokeService(ctx context.Context, in *InvokeServiceRequest, opts ...grpc.CallOption) (*v1.InvokeResponse, error) {
	out := new(v1.InvokeResponse)
	err := c.cc.Invoke(ctx, "/dapr.proto.runtime.v1.Dapr/InvokeService", in, out, opts...)
	......
}

注意: 这里调用的 gRPC 服务是 dapr.proto.runtime.v1.Dapr, 方法是 InvokeService,和 dapr runtime 中 gRPC API 对应。

title Service Invoke via gRPC
hide footbox
skinparam style strictuml
box "App-1"
participant user_code_client [
    =App-1
    ----
    client
]
participant SDK_client [
    =SDK
    ----
    client
]
end box
participant daprd_client [
    =daprd
    ----
    client
]

user_code_client -> SDK_client : InvokeMethodWithContent() 
note left: appId="app-2"\nmethodName="method-1"
SDK_client -[#blue]> daprd_client : gRPC (localhost)
note right: gRPC API @ 50001\n/dapr.proto.runtime.v1.Dapr/InvokeService
|||
SDK_client <[#blue]-- daprd_client
user_code_client <-- SDK_client

其他SDK

TODO

分析总结

所有的语言 SDK 都会实现了从客户端 SDK API 调用到发出远程调用请求给 dapr runtime的功能。具体实现上会有一些差别:

  • go sdk

    全部请求走 gPRC API。

  • Java sdk

    • service invoke 默认走 HTTP API,其他请求默认走 gRPC API。
  • 其他SDK

    • 待更新

1.4 - Dapr Runtime接收服务调用的outbound请求

Dapr Runtime通过gRPC API 和 HTTP API接收来自应用的outbound请求

Dapr runtime 有两种方式接收来自客户端发起的服务调用的 outbound 请求:gRPC API 和 HTTP API。在接收到请求之后,dapr runtime 会将 outbound 请求转发给目标服务的 dapr runtime。

title Daprd Receive inbound Request
hide footbox
skinparam style strictuml

participant daprd_client [
    =daprd
    ----
    client
]
participant daprd_server [
    =daprd
    ----
    server
]

 -[#blue]> daprd_client : HTTP (localhost)
note right: HTTP API @ 3500 \n/v1.0/invoke/app-2/method/method-1
 -[#blue]> daprd_client : gRPC (localhost)
note right: GRPC API @ 50001\n/dapr.proto.runtime.v1.Dapr/InvokeService
|||
daprd_client -> daprd_client: name resolution
|||
daprd_client -[#red]> daprd_server : gRPC (remote call)

HTTP API

Runtime 初始化时,在注册 HTTP 服务时绑定了 handler 实现和 URL 路由:

func (a *api) constructDirectMessagingEndpoints() []Endpoint {
	return []Endpoint{
		{
			Methods:           []string{router.MethodWild},
			Route:             "invoke/{id}/method/{method:*}",
			Alias:             "{method:*}",
			Version:           apiVersionV1,
			KeepParamUnescape: true,
			Handler:           a.onDirectMessage,
		},
	}
}

当 service invoke 的 HTTP 请求进来后,就会被 fasthttp 路由到 Handler 即 HTTP API 实现的 onDirectMessage() 方法中进行处理。

onDirectMessage 的实现代码在文件 pkg/http/api.go, 示意如下:

func (a *api) onDirectMessage(reqCtx *fasthttp.RequestCtx) {
	......
  req := invokev1.NewInvokeMethodRequest(...)
	resp, err := a.directMessaging.Invoke(reqCtx, targetID, req)
	......
}

备注: HTTP API 的这个 onDirectMessage() 方法取名不对,应该效仿 gRPC API,取名为 InvokeService(). 理由是:这是暴露给外部调用的方法,取名应该表现出它对外暴露的功能,即InvokeService。而不应该暴露内部的实现是调用 directMessaging。

HTTP API 的实现也简单,同样,除了基本的请求/应答参数处理之外,就是将转发请求的事情交给了 directMessaging。

gRPC API

Runtime 初始化时,在注册 gRPC 服务时绑定了 gPRC API 实现和 InvokeService gRPC 方法。

当 service invoke 的 gRPC 请求进来后,就会进入 pkc/grpc/api.go 中的 InvokeService 方法:

func (a *api) InvokeService(ctx context.Context, in *runtimev1pb.InvokeServiceRequest) (*commonv1pb.InvokeResponse, error) {
	......
	resp, err := a.directMessaging.Invoke(ctx, in.Id, req)
	......
	return resp.Message(), respError
}

gRPC API 的实现特别简单,除了基本的请求/应答参数处理之外,就是将转发请求的事情交给了 directMessaging。

Name Resolution

TBD

1.5 - Dapr Runtime转发outbound请求

客户端的Dapr Runtime将outbound请求转发给远程服务器端的Dapr Runtime

Dapr runtime 之间相互通讯采用的是 gRPC 协议,定义有 Dapr gRPC internal API。比较特殊的是,采用随机空闲端口而不是默认端口。但也可以通过命令行参数 dapr-internal-grpc-port 指定。

title Daprd-Daprd Communication
hide footbox
skinparam style strictuml

participant daprd_client [
    =daprd
    ----
    client
]
participant daprd_server [
    =daprd
    ----
    server
]

 -[#blue]> daprd_client : HTTP (localhost)
 -[#blue]> daprd_client : gRPC (localhost)
|||
daprd_client -[#red]> daprd_server : gRPC (remote call)
note right: internal API @ ramdon free port\n/dapr.proto.internals.v1.ServiceInvocation/CallLocal

pkg/messaging/direct_messaging.go 中的 DirectMessaging 负责实现转发请求给远程 dapr runtime。

接口

DirectMessaging 接口定义,用来调用远程应用:

// DirectMessaging is the API interface for invoking a remote app.
type DirectMessaging interface {
	Invoke(ctx context.Context, targetAppID string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error)
}

只有一个 invoke 方法。

实现流程

流程概况

invoke 方法的实现:

func (d *directMessaging) Invoke(ctx context.Context, targetAppID string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
	app, err := d.getRemoteApp(targetAppID)

	if app.id == d.appID && app.namespace == d.namespace {
		return d.invokeLocal(ctx, req)   // 如果调用的 appid 就是自己的 appid,这个场景好奇怪。忽略这里的代码先
	}
	return d.invokeWithRetry(ctx, retry.DefaultLinearRetryCount, retry.DefaultLinearBackoffInterval, app, d.invokeRemote, req)
}

invokeRemote 方法的代码简化如下:

func (d *directMessaging) invokeRemote(ctx context.Context, appID, namespace, appAddress string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
    // 建立连接
	conn, err := d.connectionCreatorFn(context.TODO(), appAddress, appID, namespace, false, false, false)
    // 构建 gRPC stub 作为 client
	clientV1 := internalv1pb.NewServiceInvocationClient(conn)
    // 调用 gRPC 的 CallLocal 方法发出远程调用请求到另外一个 Dapr runtime
	resp, err := clientV1.CallLocal(ctx, req.Proto(), opts...)
    // 处理应答
	return invokev1.InternalInvokeResponse(resp)
}

发出 gRPC 请求给远程 dapr runtime

CallLocal() 方法的实现在 service_invocation_grpc.pb.go 中,这是 protoc 成生的 gRPC 代码:

func (c *serviceInvocationClient) CallLocal(ctx context.Context, in *InternalInvokeRequest, opts ...grpc.CallOption) (*InternalInvokeResponse, error) {
	out := new(InternalInvokeResponse)
	err := c.cc.Invoke(ctx, "/dapr.proto.internals.v1.ServiceInvocation/CallLocal", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

可以看到这个 gRPC 请求调用的是 dapr.proto.internals.v1.ServiceInvocation 服务的 CallLocal 方法。

hide footbox
skinparam style strictuml

participant daprd_client [
    =daprd
    ----
    client
]
participant daprd_server [
    =daprd
    ----
    server
]

daprd_client -[#red]> daprd_server : gRPC (remote call)
note right: internal API @ ramdon free port\n/dapr.proto.internals.v1.ServiceInvocation/CallLocal

实现细节

获取远程地址

hide footbox
skinparam style strictuml

participant directMessaging 
participant "Name resolver\n(consul/kubenetes/mdns)" as localNameReSolver

directMessaging -> localNameReSolver : ResolveID()
localNameReSolver -> localNameReSolver: loadBalance()
note right: kubernetes: dns name\ndns: dns name\nconsul: one address(random)\nmdsn: one address(round robbin)
localNameReSolver --> directMessaging
note right: return only one address in local cluster
hide footbox
skinparam style strictuml

participant directMessaging 
participant "Local Name resolver\n(consul/kubenetes/mdns)" as localNameReSolver
participant "External Name resolver\n(synchronizer)" as externalNameReSolver

directMessaging -> localNameReSolver : ResolveID()
localNameReSolver --> directMessaging
note right: return service instance list in local cluster
directMessaging -[#red]> externalNameReSolver : ResolveID()
externalNameReSolver --> directMessaging
note right: return service instance list in external clusters
directMessaging -[#red]> directMessaging: combine the instance list
directMessaging -[#red]> directMessaging: filter by cluster strategy
note right: local-first\nexternal-first\nbroadcast\nlocal-only\nexternal-onluy
directMessaging -> directMessaging: loadBalance()

1.6 - Dapr Runtime接收服务调用的inbound请求

Dapr Runtime通过gRPC internal API接收来自客户端Dapr Runtime的inbound请求

Dapr runtime 之间相互通讯走的是 gRPC internal API,这个 API 也只支持 gRPC 协议。

hide footbox
skinparam style strictuml

participant daprd_client [
    =daprd
    ----
    client
]
participant daprd_server [
    =daprd
    ----
    server
]

daprd_client -[#red]> daprd_server : gRPC (remote call)
note right: internal API @ ramdon free port\n/dapr.proto.internals.v1.ServiceInvocation/CallLocal
daprd_server -> daprd_server : interceptor
daprd_server -[#blue]>  : appChannel.InvokeMethod()

接收请求

Runtime 初始化时,在注册 gRPC 服务时绑定了 gPRC Internal API 实现和 CallLocal gRPC 方法。对于访问 dapr.proto.internals.v1.ServiceInvocation 服务的 CallLocal 方法的 gRPC 请求,会将请求转给 _ServiceInvocation_CallLocal_Handler 处理:

func _ServiceInvocation_CallLocal_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
	......
	if interceptor == nil {
		return srv.(ServiceInvocationServer).CallLocal(ctx, in)
	}
	info := &grpc.UnaryServerInfo{
		Server:     srv,
		FullMethod: "/dapr.proto.internals.v1.ServiceInvocation/CallLocal",
	}
	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
        // 这里调用的 srv 即 gRPC api 实现
		return srv.(ServiceInvocationServer).CallLocal(ctx, req.(*InternalInvokeRequest))  
	}
	return interceptor(ctx, in, info, handler)
}

最后进入 CallLocal() 方法进行处理。

备注:初始化的细节,请见前面章节 “Runtime初始化”

期间会有一个 interceptor 的处理流程,细节后面展开。

转发请求

当 internal invoke 的 gRPC 请求进来后,就会进入 pkc/grpc/api.go 中的 CallLocal 方法:

func (a *api) CallLocal(ctx context.Context, in *internalv1pb.InternalInvokeRequest) (*internalv1pb.InternalInvokeResponse, error) {
	// 1. 构造请求
	req, err := invokev1.InternalInvokeRequest(in)
  if a.accessControlList != nil {
		......
	}
	// 2. 通过 appChannel 向应用发出请求
	resp, err := a.appChannel.InvokeMethod(ctx, req)
    // 3. 处理应答
	return resp.Proto(), err
}

处理方式很清晰,基本上就是将请求通过 app channel 转发。Runtime 本身并没有什么额外的处理逻辑。InternalInvokeRequest() 只是简单处理一下参数:

// InternalInvokeRequest creates InvokeMethodRequest object from InternalInvokeRequest pb object.
func InternalInvokeRequest(pb *internalv1pb.InternalInvokeRequest) (*InvokeMethodRequest, error) {
	req := &InvokeMethodRequest{r: pb}
	if pb.Message == nil {
		return nil, errors.New("Message field is nil")
	}

	return req, nil
}

访问控制

期间会有一个 access control (访问控制)的逻辑:

	if a.accessControlList != nil {
		// An access control policy has been specified for the app. Apply the policies.
		operation := req.Message().Method
		var httpVerb commonv1pb.HTTPExtension_Verb
		// Get the http verb in case the application protocol is http
		if a.appProtocol == config.HTTPProtocol && req.Metadata() != nil && len(req.Metadata()) > 0 {
			httpExt := req.Message().GetHttpExtension()
			if httpExt != nil {
				httpVerb = httpExt.GetVerb()
			}
		}
		callAllowed, errMsg := acl.ApplyAccessControlPolicies(ctx, operation, httpVerb, a.appProtocol, a.accessControlList)

		if !callAllowed {
			return nil, status.Errorf(codes.PermissionDenied, errMsg)
		}
	}

细节后面展开。

1.7 - Dapr Runtime转发inbound请求

服务器端的Dapr Runtime将inbound请求转发给服务器端的应用

协议和端口的配置

Dapr runtime 将 inbound 请求转发给服务器端应用:

title Daprd-Daprd Communication
hide footbox
skinparam style strictuml

participant daprd_client [
    =daprd
    ----
    client
]
participant daprd_server [
    =daprd
    ----
    server
]
participant user_code_server [
    =App-2
    ----
    server
]

daprd_client -[#red]> daprd_server : Dapr gRPC internal API (remote call)
daprd_server -[#blue]> user_code_server : Dapr HTTP channel API (localhost)
note right: HTTP endpoint @ 3000\nVERB http://localhost:3000/method?query1=value1
daprd_server -[#blue]> user_code_server : Dapr gRPC channel API (localhost)
note right: gRPC endpoint @ 3000\n/dapr.proto.runtime.v1.AppCallback/OnInvoke
  • app channel 的通讯协议可以是 HTTP 或者 gRPC 协议,可以通过命令行参数 app-port 指定,默认是 HTTP
  • 应用接收请求的端口可以通过命令行参数 app-protocol 指定,没有默认值。
  • 为了控制对应用造成的压力,还引入了最大并发度的概念,可以通过命令行参数 app-max-concurrency 指定。

请求发送的流程

前面分析过,当 internal invoke 的 gRPC 请求进来后,就会进入 pkc/grpc/api.go 中的 CallLocal 方法:

func (a *api) CallLocal(ctx context.Context, in *internalv1pb.InternalInvokeRequest) (*internalv1pb.InternalInvokeResponse, error) {
	......
	resp, err := a.appChannel.InvokeMethod(ctx, req)
  ......
}

然后通过 appChannel 发送请求。

app channel 的建立

app channel 的建立是在 runtime 初始化时,在 pkg/runtime/runtime.go 的 initRuntime() 方法中:

func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
    ......
    a.blockUntilAppIsReady()

	err = a.createAppChannel()
	a.daprHTTPAPI.SetAppChannel(a.appChannel)
	grpcAPI.SetAppChannel(a.appChannel)
    ......
}

createAppChannel() 的实现,目前只支持 HTTP 和 gRPC:

func (a *DaprRuntime) createAppChannel() error {
    // 为了建立 app channel,必须配置有 app port
	if a.runtimeConfig.ApplicationPort > 0 {
		var channelCreatorFn func(port, maxConcurrency int, spec config.TracingSpec, sslEnabled bool, maxRequestBodySize int, readBufferSize int) (channel.AppChannel, error)

		switch a.runtimeConfig.ApplicationProtocol {
		case GRPCProtocol:
			channelCreatorFn = a.grpc.CreateLocalChannel
		case HTTPProtocol:
			channelCreatorFn = http_channel.CreateLocalChannel
		default:
      // 只支持 HTTP 和 gRPC
			return errors.Errorf("cannot create app channel for protocol %s", string(a.runtimeConfig.ApplicationProtocol))
		}

		ch, err := channelCreatorFn(a.runtimeConfig.ApplicationPort, a.runtimeConfig.MaxConcurrency, a.globalConfig.Spec.TracingSpec, a.runtimeConfig.AppSSL, a.runtimeConfig.MaxRequestBodySize, a.runtimeConfig.ReadBufferSize)
		a.appChannel = ch
	} else {
		log.Warn("app channel is not initialized. did you make sure to configure an app-port?")
	}

	return nil
}

app channel 的配置参数

和 app channel 密切相关的三个配置项,可以从命令行参数中获取:

func FromFlags() (*DaprRuntime, error) {
    ......
    appPort := flag.String("app-port", "", "The port the application is listening on")
	appProtocol := flag.String("app-protocol", string(HTTPProtocol), "Protocol for the application: grpc or http")	
	appMaxConcurrency := flag.Int("app-max-concurrency", -1, "Controls the concurrency level when forwarding requests to user code")

TracingSpec / AppSSL / MaxRequestBodySize / ReadBufferSize 后面细说,先不展开。

HTTP 通道的实现

HTTP Channel 的实现在文件 pkg/channel/http/http_channel.go 中,其 InvokeMethod()方法:

func (h *Channel) InvokeMethod(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
  ......
	switch req.APIVersion() {
	case internalv1pb.APIVersion_V1:
		rsp, err = h.invokeMethodV1(ctx, req)
  ......
	return rsp, err
}

暂时只有 invokeMethodV1 版本:

func (h *Channel) invokeMethodV1(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
  // 1. 构建HTTP请求
	channelReq := h.constructRequest(ctx, req)
  // 2. 发送请求到应用
	err := h.client.DoTimeout(channelReq, resp, channel.DefaultChannelRequestTimeout)
  // 3. 处理返回的应答
	rsp := h.parseChannelResponse(req, resp, err)
	return rsp, nil
}

这是将收到的请求内容,转成HTTP协议的标准格式,然后通过 fasthttp 发给用户代码。其中转为标准http请求的代码在方法 constructRequest() 中:

func (h *Channel) constructRequest(ctx context.Context, req *invokev1.InvokeMethodRequest) *fasthttp.Request {
	var channelReq = fasthttp.AcquireRequest()

	// Construct app channel URI: VERB http://localhost:3000/method?query1=value1
	uri := fmt.Sprintf("%s/%s", h.baseAddress, req.Message().GetMethod())
	channelReq.SetRequestURI(uri)
	channelReq.URI().SetQueryString(req.EncodeHTTPQueryString())
	channelReq.Header.SetMethod(req.Message().HttpExtension.Verb.String())

	// Recover headers
	invokev1.InternalMetadataToHTTPHeader(ctx, req.Metadata(), channelReq.Header.Set)

  ......
}

这样在服务器端的用户代码中,就可以用不引入 dapr sdk,只需要提供标准 http endpoint 即可。

title Daprd-Daprd Communication
hide footbox
skinparam style strictuml

participant daprd_server [
    =daprd
    ----
    server
]
participant user_code_server [
    =App-2
    ----
    server
]

daprd_server -[#blue]> user_code_server : HTTP (localhost)
note right: HTTP endpoint @ 3000\nVERB http://localhost:3000/method?query1=value1

gRPC 通道的实现

pkg/grpc/grpc.go 中的 CreateLocalChannel() 方法:

// CreateLocalChannel creates a new gRPC AppChannel.
func (g *Manager) CreateLocalChannel(port, maxConcurrency int, spec config.TracingSpec, sslEnabled bool, maxRequestBodySize int, readBufferSize int) (channel.AppChannel, error) {
  // IP地址写死了 127.0.0.1
	conn, err := g.GetGRPCConnection(context.TODO(), fmt.Sprintf("127.0.0.1:%v", port), "", "", true, false, sslEnabled)
  ......
	g.AppClient = conn
	ch := grpc_channel.CreateLocalChannel(port, maxConcurrency, conn, spec, maxRequestBodySize, readBufferSize)
	return ch, nil
}

实现代码在 pkg/channel/grpc/grpc_channel.go 的 InvokeMethod()方法中:

func (g *Channel) InvokeMethod(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
  ......
	switch req.APIVersion() {
	case internalv1pb.APIVersion_V1:
		rsp, err = g.invokeMethodV1(ctx, req)
  ......
	return rsp, err
}

暂时只有 invokeMethodV1 版本:

func (g *Channel) invokeMethodV1(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
  // 1. 创建 AppCallback 的 grpc client
	clientV1 := runtimev1pb.NewAppCallbackClient(g.client)
  // 2. 调用 AppCallback 的 OnInvoke() 方法
	resp, err := clientV1.OnInvoke(ctx, req.Message(), grpc.Header(&header), grpc.Trailer(&trailer))
  // 3. 处理返回的应答
	return rsp.WithMessage(resp), nil
}

gRPC channel 是通过 gRPC 协议调用服务器端应用上的 gRPC 服务完成,具体是 AppCallback 的 OnInvoke() 方法。

title Dapr gRPC Channel
hide footbox
skinparam style strictuml

participant daprd_server [
    =daprd
    ----
    server
]
participant user_code_server [
    =App-2
    ----
    server
]


daprd_server -[#blue]> user_code_server : gRPC (localhost)
note right: gRPC endpoint @ 3000\n/dapr.proto.runtime.v1.AppCallback/OnInvoke

也就是说:如果要支持 gRPC channel,则要求服务器端应用必须实现 AppCallback gRPC 服务器,这一点和 HTTP 不同,对服务器端应用是有侵入的。

1.8 - 服务器端App接收inbound请求

服务器端App接收标准HTTP请求,或者实现AppCallbackServer以接受gRPC请求

pkg/proto/runtime/v1/appcallback.pb.go 中的 OnInvoke 方法:

// AppCallbackServer is the server API for AppCallback service.
type AppCallbackServer interface {
	// Invokes service method with InvokeRequest.
	OnInvoke(context.Context, *v1.InvokeRequest) (*v1.InvokeResponse, error)
}

为了接收来自daprd转发的来自客户端的service invoke 请求,服务器端的应用也需要做一些处理。

接收HTTP请求

对于通过 HTTP channel 过来的标准HTTP请求,服务器端的应用只需要提供标准的HTTP端口即可,无须引入dapr SDK。

title Daprd-Daprd Communication
hide footbox
skinparam style strictuml

participant daprd_server [
    =daprd
    ----
    server
]
participant user_code_server [
    =App-2
    ----
    server
]

daprd_server -[#blue]> user_code_server : HTTP (localhost)
note right: HTTP endpoint @ 3000\nVERB http://localhost:3000/method?query1=value1

接收gRPC请求

对于通过 gRPC channel 过来的 gRPC 请求,服务器端的应用则需要实现 gRPC AppCallback 服务的 OnInvoke() 方法:

title Dapr gRPC Channel
hide footbox
skinparam style strictuml

participant daprd_server [
    =daprd
    ----
    server
]
participant user_code_server [
    =App-2
    ----
    server
]


daprd_server -[#blue]> user_code_server : gRPC (localhost)
note right: gRPC endpoint @ 3000\n/dapr.proto.runtime.v1.AppCallback/OnInvoke

AppCallbackServer 的 proto 定义在 dapr 仓库下的文件dapr/proto/runtime/v1/appcallback.proto中:

service AppCallback {
  // Invokes service method with InvokeRequest.
  rpc OnInvoke (common.v1.InvokeRequest) returns (common.v1.InvokeResponse) {}
  ......
}

而 AppCallbackServer 的具体实现则分布在各个不同语言的 sdk 里面。

go-sdk实现

实现在 go-sdk 的 service/grpc/invoke.go 文件的 OnInvoke方法,主要流程为:

func (s *Server) OnInvoke(ctx context.Context, in *cpb.InvokeRequest) (*cpb.InvokeResponse, error) {
	if fn, ok := s.invokeHandlers[in.Method]; ok {
		e := &cc.InvocationEvent{}
		ct, er := fn(ctx, e)
		return &cpb.InvokeResponse{......}, nil
	}
	return nil, fmt.Errorf("method not implemented: %s", in.Method)
}

其中 s.invokeHandlers 中保存处理请求的方法(由参数method作为key)。AddServiceInvocationHandler() 用于增加方法名和 handler 的映射 :

// Server is the gRPC service implementation for Dapr.
type Server struct {
	invokeHandlers  map[string]common.ServiceInvocationHandler
}
type  ServiceInvocationHandler func(ctx context.Context, in *InvocationEvent) (out *Content, err error)

func (s *Server) AddServiceInvocationHandler(method string, fn func(ctx context.Context, in *cc.InvocationEvent) (our *cc.Content, err error)) error {
	s.invokeHandlers[method] = fn
	return nil
}

这意味着,在服务器端的应用中,并不需要为这些方法提供 gRPC 相关的 proto 定义,也不需要直接通过 gRPC 把这些方法暴露出去,只需要实现 AppCallback 的 OnInvode() 方法,然后把需要对外暴露的方法注册即可,OnInvode() 方法相当于一个简单的 API 网管。

title Dapr AppCallback OnInvoke gRPC impl
hide footbox
skinparam style strictuml

participant AppCallback [
    =AppCallback
    ----
    OnInvoke()
]

participant invokeHandlers
participant handler

-[#blue]> AppCallback : gRPC OnInvode()
note right: gRPC endpoint @ 3000\n/dapr.proto.runtime.v1.AppCallback/OnInvoke
AppCallback -> invokeHandlers: find handler by method name
invokeHandlers --> AppCallback: registered handler
AppCallback -> handler: call handler
note right: type  ServiceInvocationHandler \nfunc(ctx context.Context, in *InvocationEvent) \n(out *Content, err error)
handler --> AppCallback
<-[#blue]- AppCallback

用户代码实现示例

用户在开发支持 dapr 的 go 服务器端应用时,需要在应用中启动 dapr service server,然后添加各种 handler,包括 ServiceInvocationHandler,如下面这个例子(go-sdk下的 example/serving/grpc/main.go ):

func main() {
	// create a Dapr service server
	s, err := daprd.NewService(":50001")

	// add a service to service invocation handler
	if err := s.AddServiceInvocationHandler("echo", echoHandler); err != nil {
		log.Fatalf("error adding invocation handler: %v", err)
	}

	// start the server
	if err := s.Start(); err != nil {
		log.Fatalf("server error: %v", err)
	}
}

java-sdk实现

java SDK 中没有找到服务器端实现的代码?待确定。

2 - 命名解析的设计和实现

命名解析

2.1 - 命名解析概述

命名解析

介绍

Name resolvers provide a common way to interact with different name resolvers, which are used to return the address or IP of other services your applications may connect to.

命名解析器提供了一种与不同命名解析器互动的通用方法,这些解析器用于返回你的应用程序可能要连接到的其他服务的地址或IP。

接口定义

兼容的名称解析器需要实现 nameresolution.go 文件中的 Resolver 接口。

// Resolver是命名解析器的接口。
type Resolver interface {
	// Init initializes name resolver.
	Init(metadata Metadata) error
	// ResolveID resolves name to address.
	ResolveID(req ResolveRequest) (string, error)
}
// ResolveRequest 表示服务发现解析器请求。
type ResolveRequest struct {
	ID        string
	Namespace string
	Port      int
	Data      map[string]string
}

2.2 - 使用方式

命名解析在service invoke 流程中的使用方式

解析地址

name resolver 被调用的地方只有一个:

func (d *directMessaging) getRemoteApp(appID string) (remoteApp, error) {
  // 从appID中获取id和namespace
  // appID 可能是类似 "appID.namespace" 的格式
	id, namespace, err := d.requestAppIDAndNamespace(appID)
	if err != nil {
		return remoteApp{}, err
	}

  // 执行 resolver 的解析
	request := nr.ResolveRequest{ID: id, Namespace: namespace, Port: d.grpcPort}
	address, err := d.resolver.ResolveID(request)
	if err != nil {
		return remoteApp{}, err
	}

  // 返回 remoteApp 的地址
	return remoteApp{
		namespace: namespace,
		id:        id,
		address:   address,
	}, nil
}

解析出来的地址在 directMessaging 的 Invoke() 中使用,用来执行远程调用:

// Invoke takes a message requests and invokes an app, either local or remote.
func (d *directMessaging) Invoke(ctx context.Context, targetAppID string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
	app, err := d.getRemoteApp(targetAppID)
	if err != nil {
		return nil, err
	}

  // 如果目标应用的 id 和 namespace 都和 directMessaging 的一致,则执行 invokeLocal()
	if app.id == d.appID && app.namespace == d.namespace {
		return d.invokeLocal(ctx, req)
	}
  
  // 这是在带有重试机制的情况下调用 invokeRemote
	return d.invokeWithRetry(ctx, retry.DefaultLinearRetryCount, retry.DefaultLinearBackoffInterval, app, d.invokeRemote, req)
}

invokeWithRetry() 中忽略重试的代码:

func (d *directMessaging) invokeWithRetry(
	ctx context.Context,
	numRetries int,
	backoffInterval time.Duration,
	app remoteApp,
	fn func(ctx context.Context, appID, namespace, appAddress string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error),
	req *invokev1.InvokeMethodRequest,
) (*invokev1.InvokeMethodResponse, error) {
  
}

invokeRemote()

func (d *directMessaging) invokeRemote(ctx context.Context, appID, namespace, appAddress string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
  // 
	conn, teardown, err := d.connectionCreatorFn(context.TODO(), appAddress, appID, namespace, false, false, false)
	defer teardown()
	if err != nil {
		return nil, err
	}

	ctx = d.setContextSpan(ctx)

	d.addForwardedHeadersToMetadata(req)
	d.addDestinationAppIDHeaderToMetadata(appID, req)

	clientV1 := internalv1pb.NewServiceInvocationClient(conn)

	var opts []grpc.CallOption
	opts = append(opts, grpc.MaxCallRecvMsgSize(d.maxRequestBodySize*1024*1024), grpc.MaxCallSendMsgSize(d.maxRequestBodySize*1024*1024))

	resp, err := clientV1.CallLocal(ctx, req.Proto(), opts...)
	if err != nil {
		return nil, err
	}

	return invokev1.InternalInvokeResponse(resp)
}

2.3 - mdns命名解析

mdns命名解析实现

基本输入输出

跳过细节和错误处理,尤其是去除所有同步保护代码(很复杂),只简单看输入和输出:

// ResolveID 通过 mDNS 将名称解析为地址。
func (m *Resolver) ResolveID(req nameresolution.ResolveRequest) (string, error) {
	m.browseOne(ctx, req.ID, published)

	select {
	case addr := <-sub.AddrChan:
		return addr, nil
	case err := <-sub.ErrChan:
		return "", err
	case <-time.After(subscriberTimeout):
		return "", fmt.Errorf("timeout waiting for address for app id %s", req.ID)
	}
}

func (m *Resolver) browseOne(ctx context.Context, appID string, published chan struct{}) {
  err := m.browse(browseCtx, appID, onFirst)
}

注意:只用到了 req.ID, 全程没有使用 req.Namespace,也就是 MDNS 根本不支持 Namespace.

mdns解析方式

mdns 的核心实现在 browseOne() 方法中:

func (m *Resolver) browseOne(ctx context.Context, appID string, published chan struct{}) {
  // 启动一个 goroutine 异步执行
	go func() {
		var addr string

		browseCtx, cancel := context.WithCancel(ctx)
		defer cancel()

    // 准备回调函数,收到第一个地址之后就取消 browse,所以这个函数名为 browseOne
		onFirst := func(ip string) {
			addr = ip
			cancel() // cancel to stop browsing.
		}

		m.logger.Debugf("Browsing for first mDNS address for app id %s", appID)

    // 执行 browse
		err := m.browse(browseCtx, appID, onFirst)
		// 忽略错误处理
    ......
    
		m.pubAddrToSubs(appID, addr)

		published <- struct{}{} // signal that all subscribers have been notified.
	}()
}

继续看 browse 的实现:

// browse 将对所提供的 App ID 进行无阻塞的 mdns 网络浏览
func (m *Resolver) browse(ctx context.Context, appID string, onEach func(ip string)) error {
  ......
}

首先通过 zeroconf.NewResolver 构建一个 Resolver:

  import "github.com/grandcat/zeroconf"

	resolver, err := zeroconf.NewResolver(nil)
  if err != nil {
		return fmt.Errorf("failed to initialize resolver: %w", err)
	}
  ......

zeroconf 是一个纯Golang库,采用多播 DNS-SD 来浏览和解析网络中的服务,并在本地网络中注册自己的服务。

执行mdns解析的代码是 resolver.Browse() 方法,解析的结果会异步发送到 entries 这个 channel 中:

	entries := make(chan *zeroconf.ServiceEntry)	
  if err = resolver.Browse(ctx, appID, "local.", entries); err != nil {
		return fmt.Errorf("failed to browse: %w", err)
	}

每个从 mDNS browse 返回的 service entry 会这样处理:

	// handle each service entry returned from the mDNS browse.
	go func(results <-chan *zeroconf.ServiceEntry) {
		for {
			select {
			case entry := <-results:
				if entry == nil {
					break
				}
        // 调用 handleEntry 方法来处理每个返回的 service entry
				handleEntry(entry)
			case <-ctx.Done():
        // 如果所有 service entry 都处理完成了,或者是出错(取消或者超时)
        // 此时需要推出 browse,但在退出之前需要检查一下是否有已经收到但还没有处理的结果
				for len(results) > 0 {
					handleEntry(<-results)
				}

				if errors.Is(ctx.Err(), context.Canceled) {
					m.logger.Debugf("mDNS browse for app id %s canceled.", appID)
				} else if errors.Is(ctx.Err(), context.DeadlineExceeded) {
					m.logger.Debugf("mDNS browse for app id %s timed out.", appID)
				}

				return // stop listening for results.
			}
		}
	}(entries)

handleEntry() 方法的实现:

	handleEntry := func(entry *zeroconf.ServiceEntry) {
		for _, text := range entry.Text {
      // 检查appID看是否是自己要查找的app
			if text != appID {
				m.logger.Debugf("mDNS response doesn't match app id %s, skipping.", appID)
				break
			}

			m.logger.Debugf("mDNS response for app id %s received.", appID)

      // 检查是否有 IPv4 或者 ipv6 地址
			hasIPv4Address := len(entry.AddrIPv4) > 0
			hasIPv6Address := len(entry.AddrIPv6) > 0

			if !hasIPv4Address && !hasIPv6Address {
				m.logger.Debugf("mDNS response for app id %s doesn't contain any IPv4 or IPv6 addresses, skipping.", appID)
				break
			}

			var addr string
			port := entry.Port
      // 目前只支持取第一个地址
			// TODO: we currently only use the first IPv4 and IPv6 address.
			// We should understand the cases in which additional addresses
			// are returned and whether we need to support them.
      // 加入到缓存中,缓存后面细看
			if hasIPv4Address {
				addr = fmt.Sprintf("%s:%d", entry.AddrIPv4[0].String(), port)
				m.addAppAddressIPv4(appID, addr)
			}
			if hasIPv6Address {
				addr = fmt.Sprintf("%s:%d", entry.AddrIPv6[0].String(), port)
				m.addAppAddressIPv6(appID, addr)
			}

      // 开始回调,就是前面说的拿到第一个地址就取消 browse
			if onEach != nil {
				onEach(addr) // invoke callback.
			}
		}
	}

至此就完成了 mdns 的解析,从 ID 到 address。

缓存设计

mdns 是非常慢的,为了性能就需要缓存解析后的地址,前面的代码在解析完成之后会保存这些地址:

// addAppAddressIPv4 adds an IPv4 address to the
// cache for the provided app id.
func (m *Resolver) addAppAddressIPv4(appID string, addr string) {
	m.ipv4Mu.Lock()
	defer m.ipv4Mu.Unlock()

	m.logger.Debugf("Adding IPv4 address %s for app id %s cache entry.", addr, appID)
	if _, ok := m.appAddressesIPv4[appID]; !ok {
		var addrList addressList
		m.appAddressesIPv4[appID] = &addrList
	}
	m.appAddressesIPv4[appID].add(addr)
}

在解析之前,在 ResolveID() 方法中会线尝试检查缓存中是否有数据,如果有就直接使用:

func (m *Resolver) ResolveID(req nameresolution.ResolveRequest) (string, error) {
	// check for cached IPv4 addresses for this app id first.
	if addr := m.nextIPv4Address(req.ID); addr != nil {
		return *addr, nil
	}

	// check for cached IPv6 addresses for this app id second.
	if addr := m.nextIPv6Address(req.ID); addr != nil {
		return *addr, nil
	}
  ......
}

从缓存中获取appID对应的地址:

// nextIPv4Address returns the next IPv4 address for
// the provided app id from the cache.
func (m *Resolver) nextIPv4Address(appID string) *string {
	m.ipv4Mu.RLock()
	defer m.ipv4Mu.RUnlock()
	addrList, exists := m.appAddressesIPv4[appID]
	if exists {
		addr := addrList.next()
		if addr != nil {
			m.logger.Debugf("found mDNS IPv4 address in cache: %s", *addr)

			return addr
		}
	}

	return nil
}

addrList.next() 比较有意思,这里不是要获取地址列表,而是取单个地址。也就是说,当有多个地址时,这里 addrList.next() 实际上实现了负载均衡 ^0^

负载均衡

addressList 结构体的组成:

// addressList represents a set of addresses along with
// data used to control and access said addresses.
type addressList struct {
	addresses []address
	counter   int
	mu        sync.RWMutex
}

除了地址数组之外,还有一个 counter ,以及并发保护的读写锁。

// max integer value supported on this architecture.
const maxInt = int(^uint(0) >> 1)

// next 从列表中获取下一个地址,考虑到当前的循环实现。除了尽力而为的线性迭代,对选择没有任何保证。
func (a *addressList) next() *string {
  // 获取读锁
	a.mu.RLock()
	defer a.mu.RUnlock()

	if len(a.addresses) == 0 {
		return nil
	}
  // 如果 counter 达到 maxInt,就从头再来
	if a.counter == maxInt {
		a.counter = 0
	}
  // 用地址数量 对 counter 求余,去余数所对应的地址,然后counter递增
  // 相当于一个最简单常见的 轮询 算法
	index := a.counter % len(a.addresses)
	addr := a.addresses[index]
	a.counter++

	return &addr.ip
}

并发保护

为了避免多个请求同时去解析同一个 ID,因此设计了并发保护机制,对于单个ID,只容许一个请求执行解析,其他请求会等待这个解析的结果:


// ResolveID resolves name to address via mDNS.
func (m *Resolver) ResolveID(req nameresolution.ResolveRequest) (string, error) {

	sub := NewSubscriber()

	// add the sub to the pool of subs for this app id.
	m.subMu.Lock()
	appIDSubs, exists := m.subs[req.ID]
	if !exists {
		// WARN: must set appIDSubs variable for use below.
		appIDSubs = NewSubscriberPool(sub)
		m.subs[req.ID] = appIDSubs
	} else {
		appIDSubs.Add(sub)
	}
	m.subMu.Unlock()

	// only one subscriber per pool will perform the first browse for the
	// requested app id. The rest will subscribe for an address or error.
	var once *sync.Once
	var published chan struct{}
	ctx, cancel := context.WithTimeout(context.Background(), browseOneTimeout)
	defer cancel()
	appIDSubs.Once.Do(func() {
		published = make(chan struct{})
		m.browseOne(ctx, req.ID, published)

		// once will only be set for the first browser.
		once = new(sync.Once)
	})
	......
}

总结

mdns name resolver 返回的是一个简单的 ip 地址+端口(v4或者v6),形如 “192.168.0.100:8000”。

2.4 - kubernetes

kubernetes 命名解析实现

实现

kubernetes 的实现超级简单,直接按照 Kubernetes services 的格式要求,评出一个 Kubernetes services 的 name 即可:

// ResolveID resolves name to address in Kubernetes.
func (k *resolver) ResolveID(req nameresolution.ResolveRequest) (string, error) {
	// Dapr requires this formatting for Kubernetes services
	return fmt.Sprintf("%s-dapr.%s.svc.%s:%d", req.ID, req.Namespace, k.clusterDomain, req.Port), nil
}

其中, req.ID 和 req.Namespace 对应到 Kubernetes 的 service name 和 namespace,注意这里的 Kubernetes service 是在 ID 后面加了 “-dapr” 后缀。Port 来自请求参数,简单拼接而已。

clusterDomain 的设置

clusterDomain 稍微复杂一点,默认值是 “cluster.local”,在构建 Resolver 时设置:

const (
	DefaultClusterDomain = "cluster.local"
)

type resolver struct {
	logger        logger.Logger
	clusterDomain string
}

// NewResolver creates Kubernetes name resolver.
func NewResolver(logger logger.Logger) nameresolution.Resolver {
	return &resolver{
		logger:        logger,
		clusterDomain: DefaultClusterDomain,
	}
}

可以在配置中设置名为 “clusterDomain” 的 metadata 来覆盖默认值:

const (
	ClusterDomainKey     = "clusterDomain"
)

func (k *resolver) Init(metadata nameresolution.Metadata) error {
	configInterface, err := config.Normalize(metadata.Configuration)
	if err != nil {
		return err
	}
	if config, ok := configInterface.(map[string]string); ok {
		clusterDomain := config[ClusterDomainKey]
		if clusterDomain != "" {
			k.clusterDomain = clusterDomain
		}
	}

	return nil
}

总结

kubernetes name resolver 返回的是一个简单的 Kubernetes services 的 name,形如 “app1-dapr.default.svc.cluster.local:80”。而不是一般意义上的 IP 地址。

2.5 - dns

dns 命名解析实现

实现

dns 的实现也是超级简单,类似 kubernetes 的实现,直接按照 DNS 的格式要求,评出一个 Kubernetes services 的 name 即可:

// ResolveID resolves name to address in orchestrator.
func (k *resolver) ResolveID(req nameresolution.ResolveRequest) (string, error) {
	return fmt.Sprintf("%s-dapr.%s.svc:%d", req.ID, req.Namespace, req.Port), nil
}

所有参数都来自请求,只是拼接而已。

总结

DNS name resolver 返回的是一个简单的 Kubernetes services 的 name,形如 “app1-dapr.default.svc:80”。而不是一般意义上的 IP 地址。

2.6 - consul

consul 命名解析实现

初始化

初始化需要读取配置,建立连接:

func (r *resolver) Init(metadata nr.Metadata) error {
	var err error

	r.config, err = getConfig(metadata)
	if err != nil {
		return err
	}

	if err = r.client.InitClient(r.config.Client); err != nil {
		return fmt.Errorf("failed to init consul client: %w", err)
	}

	// register service to consul
	......

	return nil
}

服务注册

在 init 函数中,还可以根据配置的要求执行 consul 的服务注册功能:

	// register service to consul
	if r.config.Registration != nil {
		if err := r.client.Agent().ServiceRegister(r.config.Registration); err != nil {
			return fmt.Errorf("failed to register consul service: %w", err)
		}

		r.logger.Infof("service:%s registered on consul agent", r.config.Registration.Name)
	} else if _, err := r.client.Agent().Self(); err != nil {
		return fmt.Errorf("failed check on consul agent: %w", err)
	}

解析器实现

consul 命名解析器的实现比较简单:

// ResolveID resolves name to address via consul.
func (r *resolver) ResolveID(req nr.ResolveRequest) (string, error) {
	cfg := r.config
  // 查询 consul 中对应服务的健康实例
  // 只用到 req.ID,namespace 没有用到
	services, _, err := r.client.Health().Service(req.ID, "", true, cfg.QueryOptions)
	if err != nil {
		return "", fmt.Errorf("failed to query healthy consul services: %w", err)
	}

	if len(services) == 0 {
		return "", fmt.Errorf("no healthy services found with AppID:%s", req.ID)
	}

  // shuffle:洗牌,将传入的 services 按照随机方式对调位置
	shuffle := func(services []*consul.ServiceEntry) []*consul.ServiceEntry {
		for i := len(services) - 1; i > 0; i-- {
			rndbig, _ := rand.Int(rand.Reader, big.NewInt(int64(i+1)))
			j := rndbig.Int64()

			services[i], services[j] = services[j], services[i]
		}

		return services
	}

  // 先洗牌,然后取结果中的第一个地址,相当于负载均衡中的随机算法
	svc := shuffle(services)[0]

	addr := ""

  // 取地址和port信息
	if port, ok := svc.Service.Meta[cfg.DaprPortMetaKey]; ok {
		if svc.Service.Address != "" {
			addr = fmt.Sprintf("%s:%s", svc.Service.Address, port)
		} else if svc.Node.Address != "" {
			addr = fmt.Sprintf("%s:%s", svc.Node.Address, port)
		} else {
			return "", fmt.Errorf("no healthy services found with AppID:%s", req.ID)
		}
	} else {
		return "", fmt.Errorf("target service AppID:%s found but DAPR_PORT missing from meta", req.ID)
	}

	return addr, nil
}

总结

consul name resolver 返回的是一个简单的ip/端口字符串,形如 “192.168.0.100:80”。对于多个实例,内部实现了随机算法。

3 - 访问控制的设计和实现

访问控制