- 1: 服务调用的主流程
- 1.1: 流程概述
- 1.2: 服务调用相关的Runtime初始化
- 1.3: 客户端sdk发出服务调用的outbound请求
- 1.4: Dapr Runtime接收服务调用的outbound请求
- 1.5: Dapr Runtime转发outbound请求
- 1.6: Dapr Runtime接收服务调用的inbound请求
- 1.7: Dapr Runtime转发inbound请求
- 1.8: 服务器端App接收inbound请求
- 2: 命名解析的设计和实现
- 3: 访问控制的设计和实现
1 - 服务调用的主流程
1.1 - 流程概述
API 和端口
Dapr runtime 对外提供两个 API,分别是 Dapr HTTP API 和 Dapr gRPC API。另外两个 dapr runtime 之间的通讯 (Dapr internal API) 固定用 gRPC 协议。
两个 Dapr API 对外暴露的端口,默认是:
- 3500: HTTP 端口,可以通过命令行参数
设置 - 50001: gRPC 端口,可以通过命令行参数
Dapr internal API 是内部端口,比较特殊,没有固定的默认值,而是取任意随机可用端口。也可以通过命令行参数 dapr-internal-grpc-port
为了向服务器端的应用发送请求,dapr 需要获知应用在哪个端口监听并处理请求,这个信息通过命令行参数 app-port
设置。Dapr 的示例中一般喜欢用 3000 端口。
title Service Invoke via HTTP
hide footbox
skinparam style strictuml
box "App-1"
participant user_code_client [
participant SDK_client [
end box
participant daprd_client [
participant daprd_server [
box "App-2"
participant user_code_server [
end box
user_code_client -> SDK_client : Invoke\nService()
note left: appId="app-2"\nmethodName="method-1"
SDK_client -[#blue]> daprd_client : HTTP (localhost)
note right: HTTP API @ 3500
daprd_client -[#red]> daprd_server : gRPC (remote call)
note right: internal API @ ramdon free port
daprd_server -[#blue]> user_code_server : http (localhost)
note right: HTTP endpoint "method-1" @ 3000
daprd_server <[#blue]-- user_code_server
daprd_client <[#red]-- daprd_server
SDK_client <[#blue]-- daprd_client
user_code_client <-- SDK_client
gRPC 方式
title Service Invoke via gRPC
hide footbox
skinparam style strictuml
box "App-1"
participant user_code_client [
participant SDK_client [
end box
participant daprd_client [
participant daprd_server [
box "App-2"
participant SDK_server [
participant user_code_server [
end box
user_code_server -> SDK_server: AddServiceInvocationHandler("method-1")
SDK_server -> SDK_server: save handler in invokeHandlers["method-1"]
SDK_server --> user_code_server
user_code_client -> SDK_client : Invoke\nService()
note left: appId="app-2"\nmethodName="method-1"
SDK_client -[#blue]> daprd_client : gRPC (localhost)
note right: gRPC API @ 50001\n/dapr.proto.runtime.v1.Dapr/InvokeService
daprd_client -[#red]> daprd_server : gRPC (remote call)
note right: internal API @ random free port\n/dapr.proto.internals.v1.ServiceInvocation/CallLocal
daprd_server -[#blue]> SDK_server : gRPC (localhost)
note right: 50001\n/dapr.proto.runtime.v1.AppCallback/OnInvoke
SDK_server -> SDK_server: get handler by invokeHandlers["method-1"]
SDK_server -> user_code_server : invoke handler of "method-1"
SDK_server <-- user_code_server
daprd_server <[#blue]-- SDK_server
daprd_client <[#red]-- daprd_server
SDK_client <[#blue]-- daprd_client
user_code_client <-- SDK_client
gRPC proxying 方式
title Service Invoke via gRPC proxying
hide footbox
skinparam style strictuml
box "App-1"
participant user_code_client [
participant SDK_client [
end box
participant daprd_client [
participant daprd_server [
box "App-2"
participant SDK_server [
participant user_code_server [
end box
user_code_server -> SDK_server
SDK_server --> user_code_server
user_code_client -[#blue]> daprd_client : gRPC (localhost)
note right: gRPC\n/user.services.ServiceName/Method-1
daprd_client -[#red]> daprd_server : gRPC proxy (remote call)
note right: gRPC\n/user.services.ServiceName/Method-1
daprd_server -[#blue]> SDK_server : gRPC (localhost)
note right: gRPC\n/user.services.ServiceName/Method-1
SDK_server -> user_code_server :
SDK_server <-- user_code_server
daprd_server <[#blue]-- SDK_server
daprd_client <[#red]-- daprd_server
SDK_client <[#blue]-- daprd_client
user_code_client <-- SDK_client
1.2 - 服务调用相关的Runtime初始化
在 dapr runtime 启动进行初始化时,需要开启 API 端口并挂载相应的 handler 来接收并处理服务调用的 outbound 请求。另外为了接收来自其他 dapr runtime 的 inbound 请求,还要启动 dapr internal server。
Dapr HTTP API Server(outbound)
在 dapr runtime 中启动 HTTP server
dapr runtime 的 HTTP server 用的是 fasthttp。
在 dapr runtime 启动时的初始化过程中,会启动 HTTP server, 代码在 pkg/runtime/runtime.go
func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
// Start HTTP Server
err = a.startHTTPServer(a.runtimeConfig.HTTPPort, a.runtimeConfig.PublicPort, a.runtimeConfig.ProfilePort, a.runtimeConfig.AllowedOrigins, pipeline)
if err != nil {
log.Fatalf("failed to start HTTP server: %s", err)
func (a *DaprRuntime) startHTTPServer(......) error {
a.daprHTTPAPI = http.NewAPI(......)
server := http.NewServer(a.daprHTTPAPI, ......)
if err := server.StartNonBlocking(); err != nil { // StartNonBlocking 启动 fasthttp server
return err
StartNonBlocking() 的实现代码在 pkg/http/server.go
// StartNonBlocking starts a new server in a goroutine.
func (s *server) StartNonBlocking() error {
for _, apiListenAddress := range s.config.APIListenAddresses {
l, err := net.Listen("tcp", fmt.Sprintf("%s:%v", apiListenAddress, s.config.Port))
listeners = append(listeners, l)
for _, listener := range listeners {
// customServer is created in a loop because each instance
// has a handle on the underlying listener.
customServer := &fasthttp.Server{
Handler: handler,
MaxRequestBodySize: s.config.MaxRequestBodySize * 1024 * 1024,
ReadBufferSize: s.config.ReadBufferSize * 1024,
StreamRequestBody: s.config.StreamRequestBody,
s.servers = append(s.servers, customServer)
go func(l net.Listener) {
if err := customServer.Serve(l); err != nil {
挂载 DirectMessaging 的 HTTP 端点
在 HTTP API 的初始化过程中,会在 fast http server 上挂载 DirectMessaging 的 HTTP 端点,代码在 pkg/http/api.go
func NewAPI(
appID string,
appChannel channel.AppChannel,
directMessaging messaging.DirectMessaging,
shutdown func()) API {
api := &api{
appChannel: appChannel,
directMessaging: directMessaging,
// 附加 DirectMessaging 的 HTTP 端点
api.endpoints = append(api.endpoints, api.constructDirectMessagingEndpoints()...)
DirectMessaging 的 HTTP 端点的具体信息在 constructDirectMessagingEndpoints() 方法中:
func (a *api) constructDirectMessagingEndpoints() []Endpoint {
return []Endpoint{
Methods: []string{router.MethodWild},
Route: "invoke/{id}/method/{method:*}",
Alias: "{method:*}",
Version: apiVersionV1,
KeepParamUnescape: true,
Handler: a.onDirectMessage,
注意这里的 Route 路径 “invoke/{id}/method/{method:*}", dapr sdk 就是就通过这样的 url 来发起 HTTP 请求。
title Dapr HTTP API
hide footbox
skinparam style strictuml
participant daprd_client [
-[#blue]> daprd_client : HTTP (localhost)
note right: HTTP API @ 3500\n/v1.0/invoke/{id}/method/{method}
<[#blue]-- daprd_client
Dapr gRPC API Server(outbound)
启动 gRPC 服务器
在 dapr runtime 启动时的初始化过程中,会启动 gRPC server, 代码在 pkg/runtime/runtime.go
func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
// Create and start internal and external gRPC servers
grpcAPI := a.getGRPCAPI()
err = a.startGRPCAPIServer(grpcAPI, a.runtimeConfig.APIGRPCPort)
func (a *DaprRuntime) startGRPCAPIServer(api grpc.API, port int) error {
serverConf := a.getNewServerConfig(a.runtimeConfig.APIListenAddresses, port)
server := grpc.NewAPIServer(api, serverConf, a.globalConfig.Spec.TracingSpec, a.globalConfig.Spec.MetricSpec, a.globalConfig.Spec.APISpec, a.proxy)
if err := server.StartNonBlocking(); err != nil {
return err
// NewAPIServer returns a new user facing gRPC API server.
func NewAPIServer(api API, config ServerConfig, ......) Server {
return &server{
api: api,
config: config,
kind: apiServer, // const apiServer = "apiServer"
注册 Dapr API
为了让 dapr runtime 的 gRPC 服务器能挂载 Dapr API,需要进行注册上去。
注册的代码实现在 pkg/grpc/server.go
中, StartNonBlocking() 方法在启动 grpc 服务器时,会进行服务注册:
func (s *server) StartNonBlocking() error {
if s.kind == internalServer {
internalv1pb.RegisterServiceInvocationServer(server, s.api)
} else if s.kind == apiServer {
runtimev1pb.RegisterDaprServer(server, s.api) // 注意:s.api (即 gRPC api 实现) 被传递进去
而 RegisterDaprServer() 方法的实现代码在 pkg/proto/runtime/v1/dapr_grpc.pb.go
func RegisterDaprServer(s grpc.ServiceRegistrar, srv DaprServer) {
s.RegisterService(&Dapr_ServiceDesc, srv) // srv 即 gRPC api 实现
Dapr_ServiceDesc 定义
在文件 pkg/proto/runtime/v1/dapr_grpc.pb.go
中有 Dapr Service 的 grpc 服务定义,这是 protoc 生成的 gRPC 代码。
Dapr_ServiceDesc 中有 Dapr Service 各个方法的定义,和服务调用相关的是 InvokeService
var Dapr_ServiceDesc = grpc.ServiceDesc{
ServiceName: "dapr.proto.runtime.v1.Dapr",
HandlerType: (*DaprServer)(nil),
Methods: []grpc.MethodDesc{
MethodName: "InvokeService", # 注册方法名
Handler: _Dapr_InvokeService_Handler, # 关联实现的 Handler
Metadata: "dapr/proto/runtime/v1/dapr.proto",
这一段是告诉 gRPC server: 如果收到访问 dapr.proto.runtime.v1.Dapr
服务的 InvokeService
方法的 gRPC 请求,请把请求转给 _Dapr_InvokeService_Handler
title Dapr gRPC API
hide footbox
skinparam style strictuml
participant daprd_client [
-[#blue]> daprd_client : gRPC (localhost)
note right: gRPC API @ 50001\n/dapr.proto.runtime.v1.Dapr/InvokeService
<[#blue]-- daprd_client
而 InvokeService
方法相关联的 handler 方法 _Dapr_InvokeService_Handler
func _Dapr_InvokeService_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(InvokeServiceRequest)
if err := dec(in); err != nil {
return nil, err
if interceptor == nil {
return srv.(DaprServer).InvokeService(ctx, in)
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/dapr.proto.runtime.v1.Dapr/InvokeService",
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DaprServer).InvokeService(ctx, req.(*InvokeServiceRequest)) // 这里调用的 srv 即 gRPC api 实现
return interceptor(ctx, in, info, handler)
最后调用到了 DaprServer 接口实现的 InvokeService 方法,也就是 gPRC API 实现。
Dapr Internal API Server(inbound)
启动 gRPC 服务器
在 dapr runtime 启动时的初始化过程中,会启动 gRPC internal server, 代码在 pkg/runtime/runtime.go
func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
err = a.startGRPCInternalServer(grpcAPI, a.runtimeConfig.InternalGRPCPort)
if err != nil {
log.Fatalf("failed to start internal gRPC server: %s", err)
log.Infof("internal gRPC server is running on port %v", a.runtimeConfig.InternalGRPCPort)
func (a *DaprRuntime) startGRPCInternalServer(api grpc.API, port int) error {
serverConf := a.getNewServerConfig([]string{""}, port)
server := grpc.NewInternalServer(api, serverConf, a.globalConfig.Spec.TracingSpec, a.globalConfig.Spec.MetricSpec, a.authenticator, a.proxy)
if err := server.StartNonBlocking(); err != nil {
return err
a.apiClosers = append(a.apiClosers, server)
return nil
grpc internal server 的端口比较特殊,可以通过命令行参数 “–dapr-internal-grpc-port” 指定,而如果没有指定,是取一个随机的可用端口,而不是取某个固定值。这一点和 dapr HTTP api server 以及 dapr gRPC api server 不同。
具体代码实现在文件 pkg/runtime/cli.go
func FromFlags() (*DaprRuntime, error) {
var daprInternalGRPC int
if *daprInternalGRPCPort != "" {
daprInternalGRPC, err = strconv.Atoi(*daprInternalGRPCPort)
if err != nil {
return nil, errors.Wrap(err, "error parsing dapr-internal-grpc-port")
} else {
daprInternalGRPC, err = grpc.GetFreePort()
if err != nil {
return nil, errors.Wrap(err, "failed to get free port for internal grpc server")
特殊处理:重用 gRPC API handler
Dapr gRPC internal API 实现时有点特殊:
- 启动了自己的 gRPC server,也有自己的端口。
- 但是注册的负责处理请求的 handler 却重用了 Dapr gRPC internal API
darp runtime 的初始化代码中,grpcAPI 对象是 GRPC API Server 和 GRPC Internal Server 共用的:
grpcAPI := a.getGRPCAPI()
err = a.startGRPCAPIServer(grpcAPI, a.runtimeConfig.APIGRPCPort)
err = a.startGRPCInternalServer(grpcAPI, a.runtimeConfig.InternalGRPCPort)
从设计的角度看,这样做不好:混淆了对 outbound 请求和 inbound 请求的处理,影响代码可读性。
注册 Dapr API
为了让 dapr runtime 的 gRPC 服务器能挂载 Dapr internal API,需要进行注册。
注册的代码实现在 pkg/grpc/server.go
中, StartNonBlocking() 方法在启动 grpc 服务器时,会进行服务注册:
func (s *server) StartNonBlocking() error {
if s.kind == internalServer {
internalv1pb.RegisterServiceInvocationServer(server, s.api) // 注意:s.api (即 gRPC api 实现) 被传递进去
} else if s.kind == apiServer {
runtimev1pb.RegisterDaprServer(server, s.api)
而 RegisterServiceInvocationServer() 方法的实现代码在 pkg/proto/internals/v1/service_invocation_grpc.pb.go
func RegisterServiceInvocationServer(s grpc.ServiceRegistrar, srv ServiceInvocationServer) {
s.RegisterService(&ServiceInvocation_ServiceDesc, srv) // srv 即 gRPC api 实现
ServiceInvocation_ServiceDesc 定义
在文件 pkg/proto/internals/v1/service_invocation_grpc.pb.go
中有 internal Service 的 grpc 服务定义,这是 protoc 生成的 gRPC 代码。
ServiceInvocation_ServiceDesc 中有两个方法的定义,和服务调用相关的是 CallLocal
var ServiceInvocation_ServiceDesc = grpc.ServiceDesc{
ServiceName: "dapr.proto.internals.v1.ServiceInvocation",
HandlerType: (*ServiceInvocationServer)(nil),
Methods: []grpc.MethodDesc{
MethodName: "CallActor",
Handler: _ServiceInvocation_CallActor_Handler,
MethodName: "CallLocal",
Handler: _ServiceInvocation_CallLocal_Handler,
Streams: []grpc.StreamDesc{},
Metadata: "dapr/proto/internals/v1/service_invocation.proto",
这一段是告诉 gRPC server: 如果收到访问 dapr.proto.internals.v1.ServiceInvocation
服务的 CallLocal
方法的 gRPC 请求,请把请求转给 _ServiceInvocation_CallLocal_Handler
title Dapr gRPC internal API
hide footbox
skinparam style strictuml
participant daprd_client [
-[#red]> daprd_client : gRPC (remote call)
note right: gRPC API @ ramdon port\n/dapr.proto.internals.v1.ServiceInvocation/CallLocal
<[#red]-- daprd_client
而 CallLocal
方法相关联的 handler 方法 _ServiceInvocation_CallLocal_Handler
func _ServiceInvocation_CallLocal_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(InternalInvokeRequest)
if err := dec(in); err != nil {
return nil, err
if interceptor == nil {
return srv.(ServiceInvocationServer).CallLocal(ctx, in)
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/dapr.proto.internals.v1.ServiceInvocation/CallLocal",
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
// 这里调用的 srv 即 gRPC api 实现
return srv.(ServiceInvocationServer).CallLocal(ctx, req.(*InternalInvokeRequest))
return interceptor(ctx, in, info, handler)
最后调用到了 ServiceInvocationServer 接口实现的 CallLocal 方法,也就是 gPRC API 实现。
1.3 - 客户端sdk发出服务调用的outbound请求
Java SDK 实现
在业务代码中使用 service invoke 功能的示例可参考文件 java-sdk/examples/src/main/java/io/dapr/examples/invoke/http/InvokeClient.java
DaprClient client = (new DaprClientBuilder()).build();
byte[] response = client.invokeMethod(SERVICE_APP_ID, "say", message, HttpExtension.POST, null,
java sdk 中 service invoke 默认使用 HTTP ,而其他方法默认使用 gRPC,在 DaprClientProxy 类中初始化了两个 daprclient:
- client 字段: 类型为 DaprClientGrpc,连接到
- methodInvocationOverrideClient 字段:类型为 DaprClientHttp,连接到
service invoke 方法默认走 HTTP ,使用的是 DaprClientHttp 类型 (文件为 src/main/java/io/dapr/client/DaprClientHttp.java):
public <T> Mono<T> invokeMethod(String appId, String methodName,......) {
return methodInvocationOverrideClient.invokeMethod(appId, methodName, request, httpExtension, metadata, clazz);
public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef<T> type) {
try {
final String appId = invokeMethodRequest.getAppId();
final String method = invokeMethodRequest.getMethod();
Mono<DaprHttp.Response> response = Mono.subscriberContext().flatMap(
context -> this.client.invokeApi(httpMethod, pathSegments,
httpExtension.getQueryParams(), serializedRequestBody, headers, context)
在这里根据请求条件设置 HTTP 请求的各种参数,debug 时可以看到如下图的数据v:
最后发出 HTTP 请求的代码在 src/main/java/io/dapr/client/DaprHttp.java
中的 doInvokeApi() 方法:
private CompletableFuture<Response> doInvokeApi(String method,
String[] pathSegments,
Map<String, List<String>> urlParameters,
byte[] content, Map<String, String> headers,
Context context) {
Request.Builder requestBuilder = new Request.Builder()
.addHeader(HEADER_DAPR_REQUEST_ID, requestId);
CompletableFuture<Response> future = new CompletableFuture<>();
this.httpClient.newCall(request).enqueue(new ResponseFutureCallback(future));
return future;
发出去给 dapr runtime 的 HTTP 请求如下图所示:
调用的是 dapr runtime 的 HTTP API。
注意: 这里调用的 gRPC 服务是 dapr.proto.runtime.v1.Dapr
, 方法是 InvokeService
,和 dapr runtime 中 gRPC API 对应。
title Service Invoke via HTTP
hide footbox
skinparam style strictuml
box "App-1"
participant user_code_client [
participant SDK_client [
end box
participant daprd_client [
user_code_client -> SDK_client : invokeMethod()
note left: appId="app-2"\nmethodName="method-1"
SDK_client -[#blue]> daprd_client : HTTP (localhost)
note right: HTTP API @ 3500\n/v1.0/invoke/app-2/method/method-1
SDK_client <[#blue]-- daprd_client
user_code_client <-- SDK_client
Go sdk实现
在 go 业务代码中使用 service invoke 功能的示例可参考 https://github.com/dapr/go-sdk/blob/main/examples/service/client/main.go,代码示意如下:
client, err := dapr.NewClient()
content := &dapr.DataContent{
ContentType: "text/plain",
Data: []byte("hellow"),
// invoke a method named "app-2" on another dapr enabled service named "method-1"
resp, err := client.InvokeMethodWithContent(ctx, "app-2", "method-1", "post", content)
Go sdk 中定义了 Client 接口,文件为 client/client.go
// Client is the interface for Dapr client implementation.
type Client interface {
// InvokeMethod invokes service without raw data
InvokeMethod(ctx context.Context, appID, methodName, verb string) (out []byte, err error)
// InvokeMethodWithContent invokes service with content
InvokeMethodWithContent(ctx context.Context, appID, methodName, verb string, content *DataContent) (out []byte, err error)
// InvokeMethodWithCustomContent invokes app with custom content (struct + content type).
InvokeMethodWithCustomContent(ctx context.Context, appID, methodName, verb string, contentType string, content interface{}) (out []byte, err error)
这三个方法的实现在 client/invoke.go
中,都只是实现了对 InvokeRequest 对象的组装,核心的代码实现在 invokeServiceWithRequest 方法中::
func (c *GRPCClient) invokeServiceWithRequest(ctx context.Context, req *pb.InvokeServiceRequest) (out []byte, err error) {
resp, err := c.protoClient.InvokeService(c.withAuthToken(ctx), req)
InvokeService() 是 protoc 生成的 grpc 代码,在 dapr/proto/runtime/v1/dapr_grpc.pb.go
func (c *daprClient) InvokeService(ctx context.Context, in *InvokeServiceRequest, opts ...grpc.CallOption) (*v1.InvokeResponse, error) {
out := new(v1.InvokeResponse)
err := c.cc.Invoke(ctx, "/dapr.proto.runtime.v1.Dapr/InvokeService", in, out, opts...)
注意: 这里调用的 gRPC 服务是 dapr.proto.runtime.v1.Dapr
, 方法是 InvokeService
,和 dapr runtime 中 gRPC API 对应。
title Service Invoke via gRPC
hide footbox
skinparam style strictuml
box "App-1"
participant user_code_client [
participant SDK_client [
end box
participant daprd_client [
user_code_client -> SDK_client : InvokeMethodWithContent()
note left: appId="app-2"\nmethodName="method-1"
SDK_client -[#blue]> daprd_client : gRPC (localhost)
note right: gRPC API @ 50001\n/dapr.proto.runtime.v1.Dapr/InvokeService
SDK_client <[#blue]-- daprd_client
user_code_client <-- SDK_client
所有的语言 SDK 都会实现了从客户端 SDK API 调用到发出远程调用请求给 dapr runtime的功能。具体实现上会有一些差别:
go sdk
全部请求走 gPRC API。
Java sdk
- service invoke 默认走 HTTP API,其他请求默认走 gRPC API。
- 待更新
1.4 - Dapr Runtime接收服务调用的outbound请求
Dapr runtime 有两种方式接收来自客户端发起的服务调用的 outbound 请求:gRPC API 和 HTTP API。在接收到请求之后,dapr runtime 会将 outbound 请求转发给目标服务的 dapr runtime。
title Daprd Receive inbound Request
hide footbox
skinparam style strictuml
participant daprd_client [
participant daprd_server [
-[#blue]> daprd_client : HTTP (localhost)
note right: HTTP API @ 3500 \n/v1.0/invoke/app-2/method/method-1
-[#blue]> daprd_client : gRPC (localhost)
note right: GRPC API @ 50001\n/dapr.proto.runtime.v1.Dapr/InvokeService
daprd_client -> daprd_client: name resolution
daprd_client -[#red]> daprd_server : gRPC (remote call)
Runtime 初始化时,在注册 HTTP 服务时绑定了 handler 实现和 URL 路由:
func (a *api) constructDirectMessagingEndpoints() []Endpoint {
return []Endpoint{
Methods: []string{router.MethodWild},
Route: "invoke/{id}/method/{method:*}",
Alias: "{method:*}",
Version: apiVersionV1,
KeepParamUnescape: true,
Handler: a.onDirectMessage,
当 service invoke 的 HTTP 请求进来后,就会被 fasthttp 路由到 Handler 即 HTTP API 实现的 onDirectMessage() 方法中进行处理。
onDirectMessage 的实现代码在文件 pkg/http/api.go
, 示意如下:
func (a *api) onDirectMessage(reqCtx *fasthttp.RequestCtx) {
req := invokev1.NewInvokeMethodRequest(...)
resp, err := a.directMessaging.Invoke(reqCtx, targetID, req)
备注: HTTP API 的这个 onDirectMessage() 方法取名不对,应该效仿 gRPC API,取名为 InvokeService(). 理由是:这是暴露给外部调用的方法,取名应该表现出它对外暴露的功能,即InvokeService。而不应该暴露内部的实现是调用 directMessaging。
HTTP API 的实现也简单,同样,除了基本的请求/应答参数处理之外,就是将转发请求的事情交给了 directMessaging。
Runtime 初始化时,在注册 gRPC 服务时绑定了 gPRC API 实现和 InvokeService gRPC 方法。
当 service invoke 的 gRPC 请求进来后,就会进入 pkc/grpc/api.go
中的 InvokeService 方法:
func (a *api) InvokeService(ctx context.Context, in *runtimev1pb.InvokeServiceRequest) (*commonv1pb.InvokeResponse, error) {
resp, err := a.directMessaging.Invoke(ctx, in.Id, req)
return resp.Message(), respError
gRPC API 的实现特别简单,除了基本的请求/应答参数处理之外,就是将转发请求的事情交给了 directMessaging。
Name Resolution
1.5 - Dapr Runtime转发outbound请求
Dapr runtime 之间相互通讯采用的是 gRPC 协议,定义有 Dapr gRPC internal API。比较特殊的是,采用随机空闲端口而不是默认端口。但也可以通过命令行参数 dapr-internal-grpc-port
title Daprd-Daprd Communication
hide footbox
skinparam style strictuml
participant daprd_client [
participant daprd_server [
-[#blue]> daprd_client : HTTP (localhost)
-[#blue]> daprd_client : gRPC (localhost)
daprd_client -[#red]> daprd_server : gRPC (remote call)
note right: internal API @ ramdon free port\n/dapr.proto.internals.v1.ServiceInvocation/CallLocal
中的 DirectMessaging 负责实现转发请求给远程 dapr runtime。
DirectMessaging 接口定义,用来调用远程应用:
// DirectMessaging is the API interface for invoking a remote app.
type DirectMessaging interface {
Invoke(ctx context.Context, targetAppID string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error)
只有一个 invoke 方法。
invoke 方法的实现:
func (d *directMessaging) Invoke(ctx context.Context, targetAppID string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
app, err := d.getRemoteApp(targetAppID)
if app.id == d.appID && app.namespace == d.namespace {
return d.invokeLocal(ctx, req) // 如果调用的 appid 就是自己的 appid,这个场景好奇怪。忽略这里的代码先
return d.invokeWithRetry(ctx, retry.DefaultLinearRetryCount, retry.DefaultLinearBackoffInterval, app, d.invokeRemote, req)
invokeRemote 方法的代码简化如下:
func (d *directMessaging) invokeRemote(ctx context.Context, appID, namespace, appAddress string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
// 建立连接
conn, err := d.connectionCreatorFn(context.TODO(), appAddress, appID, namespace, false, false, false)
// 构建 gRPC stub 作为 client
clientV1 := internalv1pb.NewServiceInvocationClient(conn)
// 调用 gRPC 的 CallLocal 方法发出远程调用请求到另外一个 Dapr runtime
resp, err := clientV1.CallLocal(ctx, req.Proto(), opts...)
// 处理应答
return invokev1.InternalInvokeResponse(resp)
发出 gRPC 请求给远程 dapr runtime
CallLocal() 方法的实现在 service_invocation_grpc.pb.go
中,这是 protoc 成生的 gRPC 代码:
func (c *serviceInvocationClient) CallLocal(ctx context.Context, in *InternalInvokeRequest, opts ...grpc.CallOption) (*InternalInvokeResponse, error) {
out := new(InternalInvokeResponse)
err := c.cc.Invoke(ctx, "/dapr.proto.internals.v1.ServiceInvocation/CallLocal", in, out, opts...)
if err != nil {
return nil, err
return out, nil
可以看到这个 gRPC 请求调用的是 dapr.proto.internals.v1.ServiceInvocation 服务的 CallLocal 方法。
hide footbox
skinparam style strictuml
participant daprd_client [
participant daprd_server [
daprd_client -[#red]> daprd_server : gRPC (remote call)
note right: internal API @ ramdon free port\n/dapr.proto.internals.v1.ServiceInvocation/CallLocal
hide footbox
skinparam style strictuml
participant directMessaging
participant "Name resolver\n(consul/kubenetes/mdns)" as localNameReSolver
directMessaging -> localNameReSolver : ResolveID()
localNameReSolver -> localNameReSolver: loadBalance()
note right: kubernetes: dns name\ndns: dns name\nconsul: one address(random)\nmdsn: one address(round robbin)
localNameReSolver --> directMessaging
note right: return only one address in local cluster
hide footbox
skinparam style strictuml
participant directMessaging
participant "Local Name resolver\n(consul/kubenetes/mdns)" as localNameReSolver
participant "External Name resolver\n(synchronizer)" as externalNameReSolver
directMessaging -> localNameReSolver : ResolveID()
localNameReSolver --> directMessaging
note right: return service instance list in local cluster
directMessaging -[#red]> externalNameReSolver : ResolveID()
externalNameReSolver --> directMessaging
note right: return service instance list in external clusters
directMessaging -[#red]> directMessaging: combine the instance list
directMessaging -[#red]> directMessaging: filter by cluster strategy
note right: local-first\nexternal-first\nbroadcast\nlocal-only\nexternal-onluy
directMessaging -> directMessaging: loadBalance()
1.6 - Dapr Runtime接收服务调用的inbound请求
Dapr runtime 之间相互通讯走的是 gRPC internal API,这个 API 也只支持 gRPC 协议。
hide footbox
skinparam style strictuml
participant daprd_client [
participant daprd_server [
daprd_client -[#red]> daprd_server : gRPC (remote call)
note right: internal API @ ramdon free port\n/dapr.proto.internals.v1.ServiceInvocation/CallLocal
daprd_server -> daprd_server : interceptor
daprd_server -[#blue]> : appChannel.InvokeMethod()
Runtime 初始化时,在注册 gRPC 服务时绑定了 gPRC Internal API 实现和 CallLocal gRPC 方法。对于访问 dapr.proto.internals.v1.ServiceInvocation
服务的 CallLocal
方法的 gRPC 请求,会将请求转给 _ServiceInvocation_CallLocal_Handler
func _ServiceInvocation_CallLocal_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
if interceptor == nil {
return srv.(ServiceInvocationServer).CallLocal(ctx, in)
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/dapr.proto.internals.v1.ServiceInvocation/CallLocal",
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
// 这里调用的 srv 即 gRPC api 实现
return srv.(ServiceInvocationServer).CallLocal(ctx, req.(*InternalInvokeRequest))
return interceptor(ctx, in, info, handler)
最后进入 CallLocal() 方法进行处理。
备注:初始化的细节,请见前面章节 “Runtime初始化”
期间会有一个 interceptor 的处理流程,细节后面展开。
当 internal invoke 的 gRPC 请求进来后,就会进入 pkc/grpc/api.go
中的 CallLocal 方法:
func (a *api) CallLocal(ctx context.Context, in *internalv1pb.InternalInvokeRequest) (*internalv1pb.InternalInvokeResponse, error) {
// 1. 构造请求
req, err := invokev1.InternalInvokeRequest(in)
if a.accessControlList != nil {
// 2. 通过 appChannel 向应用发出请求
resp, err := a.appChannel.InvokeMethod(ctx, req)
// 3. 处理应答
return resp.Proto(), err
处理方式很清晰,基本上就是将请求通过 app channel 转发。Runtime 本身并没有什么额外的处理逻辑。InternalInvokeRequest() 只是简单处理一下参数:
// InternalInvokeRequest creates InvokeMethodRequest object from InternalInvokeRequest pb object.
func InternalInvokeRequest(pb *internalv1pb.InternalInvokeRequest) (*InvokeMethodRequest, error) {
req := &InvokeMethodRequest{r: pb}
if pb.Message == nil {
return nil, errors.New("Message field is nil")
return req, nil
期间会有一个 access control (访问控制)的逻辑:
if a.accessControlList != nil {
// An access control policy has been specified for the app. Apply the policies.
operation := req.Message().Method
var httpVerb commonv1pb.HTTPExtension_Verb
// Get the http verb in case the application protocol is http
if a.appProtocol == config.HTTPProtocol && req.Metadata() != nil && len(req.Metadata()) > 0 {
httpExt := req.Message().GetHttpExtension()
if httpExt != nil {
httpVerb = httpExt.GetVerb()
callAllowed, errMsg := acl.ApplyAccessControlPolicies(ctx, operation, httpVerb, a.appProtocol, a.accessControlList)
if !callAllowed {
return nil, status.Errorf(codes.PermissionDenied, errMsg)
1.7 - Dapr Runtime转发inbound请求
Dapr runtime 将 inbound 请求转发给服务器端应用:
title Daprd-Daprd Communication
hide footbox
skinparam style strictuml
participant daprd_client [
participant daprd_server [
participant user_code_server [
daprd_client -[#red]> daprd_server : Dapr gRPC internal API (remote call)
daprd_server -[#blue]> user_code_server : Dapr HTTP channel API (localhost)
note right: HTTP endpoint @ 3000\nVERB http://localhost:3000/method?query1=value1
daprd_server -[#blue]> user_code_server : Dapr gRPC channel API (localhost)
note right: gRPC endpoint @ 3000\n/dapr.proto.runtime.v1.AppCallback/OnInvoke
- app channel 的通讯协议可以是 HTTP 或者 gRPC 协议,可以通过命令行参数
指定,默认是 HTTP - 应用接收请求的端口可以通过命令行参数
指定,没有默认值。 - 为了控制对应用造成的压力,还引入了最大并发度的概念,可以通过命令行参数
前面分析过,当 internal invoke 的 gRPC 请求进来后,就会进入 pkc/grpc/api.go
中的 CallLocal 方法:
func (a *api) CallLocal(ctx context.Context, in *internalv1pb.InternalInvokeRequest) (*internalv1pb.InternalInvokeResponse, error) {
resp, err := a.appChannel.InvokeMethod(ctx, req)
然后通过 appChannel 发送请求。
app channel 的建立
app channel 的建立是在 runtime 初始化时,在 pkg/runtime/runtime.go
的 initRuntime() 方法中:
func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
err = a.createAppChannel()
createAppChannel() 的实现,目前只支持 HTTP 和 gRPC:
func (a *DaprRuntime) createAppChannel() error {
// 为了建立 app channel,必须配置有 app port
if a.runtimeConfig.ApplicationPort > 0 {
var channelCreatorFn func(port, maxConcurrency int, spec config.TracingSpec, sslEnabled bool, maxRequestBodySize int, readBufferSize int) (channel.AppChannel, error)
switch a.runtimeConfig.ApplicationProtocol {
case GRPCProtocol:
channelCreatorFn = a.grpc.CreateLocalChannel
case HTTPProtocol:
channelCreatorFn = http_channel.CreateLocalChannel
// 只支持 HTTP 和 gRPC
return errors.Errorf("cannot create app channel for protocol %s", string(a.runtimeConfig.ApplicationProtocol))
ch, err := channelCreatorFn(a.runtimeConfig.ApplicationPort, a.runtimeConfig.MaxConcurrency, a.globalConfig.Spec.TracingSpec, a.runtimeConfig.AppSSL, a.runtimeConfig.MaxRequestBodySize, a.runtimeConfig.ReadBufferSize)
a.appChannel = ch
} else {
log.Warn("app channel is not initialized. did you make sure to configure an app-port?")
return nil
app channel 的配置参数
和 app channel 密切相关的三个配置项,可以从命令行参数中获取:
func FromFlags() (*DaprRuntime, error) {
appPort := flag.String("app-port", "", "The port the application is listening on")
appProtocol := flag.String("app-protocol", string(HTTPProtocol), "Protocol for the application: grpc or http")
appMaxConcurrency := flag.Int("app-max-concurrency", -1, "Controls the concurrency level when forwarding requests to user code")
TracingSpec / AppSSL / MaxRequestBodySize / ReadBufferSize 后面细说,先不展开。
HTTP 通道的实现
HTTP Channel 的实现在文件 pkg/channel/http/http_channel.go
中,其 InvokeMethod()方法:
func (h *Channel) InvokeMethod(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
switch req.APIVersion() {
case internalv1pb.APIVersion_V1:
rsp, err = h.invokeMethodV1(ctx, req)
return rsp, err
暂时只有 invokeMethodV1 版本:
func (h *Channel) invokeMethodV1(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
// 1. 构建HTTP请求
channelReq := h.constructRequest(ctx, req)
// 2. 发送请求到应用
err := h.client.DoTimeout(channelReq, resp, channel.DefaultChannelRequestTimeout)
// 3. 处理返回的应答
rsp := h.parseChannelResponse(req, resp, err)
return rsp, nil
这是将收到的请求内容,转成HTTP协议的标准格式,然后通过 fasthttp 发给用户代码。其中转为标准http请求的代码在方法 constructRequest() 中:
func (h *Channel) constructRequest(ctx context.Context, req *invokev1.InvokeMethodRequest) *fasthttp.Request {
var channelReq = fasthttp.AcquireRequest()
// Construct app channel URI: VERB http://localhost:3000/method?query1=value1
uri := fmt.Sprintf("%s/%s", h.baseAddress, req.Message().GetMethod())
// Recover headers
invokev1.InternalMetadataToHTTPHeader(ctx, req.Metadata(), channelReq.Header.Set)
这样在服务器端的用户代码中,就可以用不引入 dapr sdk,只需要提供标准 http endpoint 即可。
title Daprd-Daprd Communication
hide footbox
skinparam style strictuml
participant daprd_server [
participant user_code_server [
daprd_server -[#blue]> user_code_server : HTTP (localhost)
note right: HTTP endpoint @ 3000\nVERB http://localhost:3000/method?query1=value1
gRPC 通道的实现
中的 CreateLocalChannel() 方法:
// CreateLocalChannel creates a new gRPC AppChannel.
func (g *Manager) CreateLocalChannel(port, maxConcurrency int, spec config.TracingSpec, sslEnabled bool, maxRequestBodySize int, readBufferSize int) (channel.AppChannel, error) {
// IP地址写死了
conn, err := g.GetGRPCConnection(context.TODO(), fmt.Sprintf("", port), "", "", true, false, sslEnabled)
g.AppClient = conn
ch := grpc_channel.CreateLocalChannel(port, maxConcurrency, conn, spec, maxRequestBodySize, readBufferSize)
return ch, nil
实现代码在 pkg/channel/grpc/grpc_channel.go
的 InvokeMethod()方法中:
func (g *Channel) InvokeMethod(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
switch req.APIVersion() {
case internalv1pb.APIVersion_V1:
rsp, err = g.invokeMethodV1(ctx, req)
return rsp, err
暂时只有 invokeMethodV1 版本:
func (g *Channel) invokeMethodV1(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
// 1. 创建 AppCallback 的 grpc client
clientV1 := runtimev1pb.NewAppCallbackClient(g.client)
// 2. 调用 AppCallback 的 OnInvoke() 方法
resp, err := clientV1.OnInvoke(ctx, req.Message(), grpc.Header(&header), grpc.Trailer(&trailer))
// 3. 处理返回的应答
return rsp.WithMessage(resp), nil
gRPC channel 是通过 gRPC 协议调用服务器端应用上的 gRPC 服务完成,具体是 AppCallback 的 OnInvoke() 方法。
title Dapr gRPC Channel
hide footbox
skinparam style strictuml
participant daprd_server [
participant user_code_server [
daprd_server -[#blue]> user_code_server : gRPC (localhost)
note right: gRPC endpoint @ 3000\n/dapr.proto.runtime.v1.AppCallback/OnInvoke
也就是说:如果要支持 gRPC channel,则要求服务器端应用必须实现 AppCallback gRPC 服务器,这一点和 HTTP 不同,对服务器端应用是有侵入的。
1.8 - 服务器端App接收inbound请求
中的 OnInvoke 方法:
// AppCallbackServer is the server API for AppCallback service.
type AppCallbackServer interface {
// Invokes service method with InvokeRequest.
OnInvoke(context.Context, *v1.InvokeRequest) (*v1.InvokeResponse, error)
为了接收来自daprd转发的来自客户端的service invoke 请求,服务器端的应用也需要做一些处理。
对于通过 HTTP channel 过来的标准HTTP请求,服务器端的应用只需要提供标准的HTTP端口即可,无须引入dapr SDK。
title Daprd-Daprd Communication
hide footbox
skinparam style strictuml
participant daprd_server [
participant user_code_server [
daprd_server -[#blue]> user_code_server : HTTP (localhost)
note right: HTTP endpoint @ 3000\nVERB http://localhost:3000/method?query1=value1
对于通过 gRPC channel 过来的 gRPC 请求,服务器端的应用则需要实现 gRPC AppCallback 服务的 OnInvoke() 方法:
title Dapr gRPC Channel
hide footbox
skinparam style strictuml
participant daprd_server [
participant user_code_server [
daprd_server -[#blue]> user_code_server : gRPC (localhost)
note right: gRPC endpoint @ 3000\n/dapr.proto.runtime.v1.AppCallback/OnInvoke
AppCallbackServer 的 proto 定义在 dapr 仓库下的文件dapr/proto/runtime/v1/appcallback.proto
service AppCallback {
// Invokes service method with InvokeRequest.
rpc OnInvoke (common.v1.InvokeRequest) returns (common.v1.InvokeResponse) {}
而 AppCallbackServer 的具体实现则分布在各个不同语言的 sdk 里面。
实现在 go-sdk 的 service/grpc/invoke.go
文件的 OnInvoke方法,主要流程为:
func (s *Server) OnInvoke(ctx context.Context, in *cpb.InvokeRequest) (*cpb.InvokeResponse, error) {
if fn, ok := s.invokeHandlers[in.Method]; ok {
e := &cc.InvocationEvent{}
ct, er := fn(ctx, e)
return &cpb.InvokeResponse{......}, nil
return nil, fmt.Errorf("method not implemented: %s", in.Method)
其中 s.invokeHandlers
中保存处理请求的方法(由参数method作为key)。AddServiceInvocationHandler() 用于增加方法名和 handler 的映射 :
// Server is the gRPC service implementation for Dapr.
type Server struct {
invokeHandlers map[string]common.ServiceInvocationHandler
type ServiceInvocationHandler func(ctx context.Context, in *InvocationEvent) (out *Content, err error)
func (s *Server) AddServiceInvocationHandler(method string, fn func(ctx context.Context, in *cc.InvocationEvent) (our *cc.Content, err error)) error {
s.invokeHandlers[method] = fn
return nil
这意味着,在服务器端的应用中,并不需要为这些方法提供 gRPC 相关的 proto 定义,也不需要直接通过 gRPC 把这些方法暴露出去,只需要实现 AppCallback 的 OnInvode() 方法,然后把需要对外暴露的方法注册即可,OnInvode() 方法相当于一个简单的 API 网管。
title Dapr AppCallback OnInvoke gRPC impl
hide footbox
skinparam style strictuml
participant AppCallback [
participant invokeHandlers
participant handler
-[#blue]> AppCallback : gRPC OnInvode()
note right: gRPC endpoint @ 3000\n/dapr.proto.runtime.v1.AppCallback/OnInvoke
AppCallback -> invokeHandlers: find handler by method name
invokeHandlers --> AppCallback: registered handler
AppCallback -> handler: call handler
note right: type ServiceInvocationHandler \nfunc(ctx context.Context, in *InvocationEvent) \n(out *Content, err error)
handler --> AppCallback
<-[#blue]- AppCallback
用户在开发支持 dapr 的 go 服务器端应用时,需要在应用中启动 dapr service server,然后添加各种 handler,包括 ServiceInvocationHandler,如下面这个例子(go-sdk下的 example/serving/grpc/main.go
func main() {
// create a Dapr service server
s, err := daprd.NewService(":50001")
// add a service to service invocation handler
if err := s.AddServiceInvocationHandler("echo", echoHandler); err != nil {
log.Fatalf("error adding invocation handler: %v", err)
// start the server
if err := s.Start(); err != nil {
log.Fatalf("server error: %v", err)
java SDK 中没有找到服务器端实现的代码?待确定。
2 - 命名解析的设计和实现
2.1 - 命名解析概述
Name resolvers provide a common way to interact with different name resolvers, which are used to return the address or IP of other services your applications may connect to.
兼容的名称解析器需要实现 nameresolution.go
文件中的 Resolver
// Resolver是命名解析器的接口。
type Resolver interface {
// Init initializes name resolver.
Init(metadata Metadata) error
// ResolveID resolves name to address.
ResolveID(req ResolveRequest) (string, error)
// ResolveRequest 表示服务发现解析器请求。
type ResolveRequest struct {
ID string
Namespace string
Port int
Data map[string]string
2.2 - 使用方式
name resolver 被调用的地方只有一个:
func (d *directMessaging) getRemoteApp(appID string) (remoteApp, error) {
// 从appID中获取id和namespace
// appID 可能是类似 "appID.namespace" 的格式
id, namespace, err := d.requestAppIDAndNamespace(appID)
if err != nil {
return remoteApp{}, err
// 执行 resolver 的解析
request := nr.ResolveRequest{ID: id, Namespace: namespace, Port: d.grpcPort}
address, err := d.resolver.ResolveID(request)
if err != nil {
return remoteApp{}, err
// 返回 remoteApp 的地址
return remoteApp{
namespace: namespace,
id: id,
address: address,
}, nil
解析出来的地址在 directMessaging 的 Invoke() 中使用,用来执行远程调用:
// Invoke takes a message requests and invokes an app, either local or remote.
func (d *directMessaging) Invoke(ctx context.Context, targetAppID string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
app, err := d.getRemoteApp(targetAppID)
if err != nil {
return nil, err
// 如果目标应用的 id 和 namespace 都和 directMessaging 的一致,则执行 invokeLocal()
if app.id == d.appID && app.namespace == d.namespace {
return d.invokeLocal(ctx, req)
// 这是在带有重试机制的情况下调用 invokeRemote
return d.invokeWithRetry(ctx, retry.DefaultLinearRetryCount, retry.DefaultLinearBackoffInterval, app, d.invokeRemote, req)
invokeWithRetry() 中忽略重试的代码:
func (d *directMessaging) invokeWithRetry(
ctx context.Context,
numRetries int,
backoffInterval time.Duration,
app remoteApp,
fn func(ctx context.Context, appID, namespace, appAddress string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error),
req *invokev1.InvokeMethodRequest,
) (*invokev1.InvokeMethodResponse, error) {
func (d *directMessaging) invokeRemote(ctx context.Context, appID, namespace, appAddress string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
conn, teardown, err := d.connectionCreatorFn(context.TODO(), appAddress, appID, namespace, false, false, false)
defer teardown()
if err != nil {
return nil, err
ctx = d.setContextSpan(ctx)
d.addDestinationAppIDHeaderToMetadata(appID, req)
clientV1 := internalv1pb.NewServiceInvocationClient(conn)
var opts []grpc.CallOption
opts = append(opts, grpc.MaxCallRecvMsgSize(d.maxRequestBodySize*1024*1024), grpc.MaxCallSendMsgSize(d.maxRequestBodySize*1024*1024))
resp, err := clientV1.CallLocal(ctx, req.Proto(), opts...)
if err != nil {
return nil, err
return invokev1.InternalInvokeResponse(resp)
2.3 - mdns命名解析
// ResolveID 通过 mDNS 将名称解析为地址。
func (m *Resolver) ResolveID(req nameresolution.ResolveRequest) (string, error) {
m.browseOne(ctx, req.ID, published)
select {
case addr := <-sub.AddrChan:
return addr, nil
case err := <-sub.ErrChan:
return "", err
case <-time.After(subscriberTimeout):
return "", fmt.Errorf("timeout waiting for address for app id %s", req.ID)
func (m *Resolver) browseOne(ctx context.Context, appID string, published chan struct{}) {
err := m.browse(browseCtx, appID, onFirst)
注意:只用到了 req.ID, 全程没有使用 req.Namespace,也就是 MDNS 根本不支持 Namespace.
mdns 的核心实现在 browseOne() 方法中:
func (m *Resolver) browseOne(ctx context.Context, appID string, published chan struct{}) {
// 启动一个 goroutine 异步执行
go func() {
var addr string
browseCtx, cancel := context.WithCancel(ctx)
defer cancel()
// 准备回调函数,收到第一个地址之后就取消 browse,所以这个函数名为 browseOne
onFirst := func(ip string) {
addr = ip
cancel() // cancel to stop browsing.
m.logger.Debugf("Browsing for first mDNS address for app id %s", appID)
// 执行 browse
err := m.browse(browseCtx, appID, onFirst)
// 忽略错误处理
m.pubAddrToSubs(appID, addr)
published <- struct{}{} // signal that all subscribers have been notified.
继续看 browse 的实现:
// browse 将对所提供的 App ID 进行无阻塞的 mdns 网络浏览
func (m *Resolver) browse(ctx context.Context, appID string, onEach func(ip string)) error {
首先通过 zeroconf.NewResolver 构建一个 Resolver:
import "github.com/grandcat/zeroconf"
resolver, err := zeroconf.NewResolver(nil)
if err != nil {
return fmt.Errorf("failed to initialize resolver: %w", err)
zeroconf 是一个纯Golang库,采用多播 DNS-SD 来浏览和解析网络中的服务,并在本地网络中注册自己的服务。
执行mdns解析的代码是 resolver.Browse() 方法,解析的结果会异步发送到 entries 这个 channel 中:
entries := make(chan *zeroconf.ServiceEntry)
if err = resolver.Browse(ctx, appID, "local.", entries); err != nil {
return fmt.Errorf("failed to browse: %w", err)
每个从 mDNS browse 返回的 service entry 会这样处理:
// handle each service entry returned from the mDNS browse.
go func(results <-chan *zeroconf.ServiceEntry) {
for {
select {
case entry := <-results:
if entry == nil {
// 调用 handleEntry 方法来处理每个返回的 service entry
case <-ctx.Done():
// 如果所有 service entry 都处理完成了,或者是出错(取消或者超时)
// 此时需要推出 browse,但在退出之前需要检查一下是否有已经收到但还没有处理的结果
for len(results) > 0 {
if errors.Is(ctx.Err(), context.Canceled) {
m.logger.Debugf("mDNS browse for app id %s canceled.", appID)
} else if errors.Is(ctx.Err(), context.DeadlineExceeded) {
m.logger.Debugf("mDNS browse for app id %s timed out.", appID)
return // stop listening for results.
handleEntry() 方法的实现:
handleEntry := func(entry *zeroconf.ServiceEntry) {
for _, text := range entry.Text {
// 检查appID看是否是自己要查找的app
if text != appID {
m.logger.Debugf("mDNS response doesn't match app id %s, skipping.", appID)
m.logger.Debugf("mDNS response for app id %s received.", appID)
// 检查是否有 IPv4 或者 ipv6 地址
hasIPv4Address := len(entry.AddrIPv4) > 0
hasIPv6Address := len(entry.AddrIPv6) > 0
if !hasIPv4Address && !hasIPv6Address {
m.logger.Debugf("mDNS response for app id %s doesn't contain any IPv4 or IPv6 addresses, skipping.", appID)
var addr string
port := entry.Port
// 目前只支持取第一个地址
// TODO: we currently only use the first IPv4 and IPv6 address.
// We should understand the cases in which additional addresses
// are returned and whether we need to support them.
// 加入到缓存中,缓存后面细看
if hasIPv4Address {
addr = fmt.Sprintf("%s:%d", entry.AddrIPv4[0].String(), port)
m.addAppAddressIPv4(appID, addr)
if hasIPv6Address {
addr = fmt.Sprintf("%s:%d", entry.AddrIPv6[0].String(), port)
m.addAppAddressIPv6(appID, addr)
// 开始回调,就是前面说的拿到第一个地址就取消 browse
if onEach != nil {
onEach(addr) // invoke callback.
至此就完成了 mdns 的解析,从 ID 到 address。
mdns 是非常慢的,为了性能就需要缓存解析后的地址,前面的代码在解析完成之后会保存这些地址:
// addAppAddressIPv4 adds an IPv4 address to the
// cache for the provided app id.
func (m *Resolver) addAppAddressIPv4(appID string, addr string) {
defer m.ipv4Mu.Unlock()
m.logger.Debugf("Adding IPv4 address %s for app id %s cache entry.", addr, appID)
if _, ok := m.appAddressesIPv4[appID]; !ok {
var addrList addressList
m.appAddressesIPv4[appID] = &addrList
在解析之前,在 ResolveID() 方法中会线尝试检查缓存中是否有数据,如果有就直接使用:
func (m *Resolver) ResolveID(req nameresolution.ResolveRequest) (string, error) {
// check for cached IPv4 addresses for this app id first.
if addr := m.nextIPv4Address(req.ID); addr != nil {
return *addr, nil
// check for cached IPv6 addresses for this app id second.
if addr := m.nextIPv6Address(req.ID); addr != nil {
return *addr, nil
// nextIPv4Address returns the next IPv4 address for
// the provided app id from the cache.
func (m *Resolver) nextIPv4Address(appID string) *string {
defer m.ipv4Mu.RUnlock()
addrList, exists := m.appAddressesIPv4[appID]
if exists {
addr := addrList.next()
if addr != nil {
m.logger.Debugf("found mDNS IPv4 address in cache: %s", *addr)
return addr
return nil
addrList.next() 比较有意思,这里不是要获取地址列表,而是取单个地址。也就是说,当有多个地址时,这里 addrList.next() 实际上实现了负载均衡 ^0^
addressList 结构体的组成:
// addressList represents a set of addresses along with
// data used to control and access said addresses.
type addressList struct {
addresses []address
counter int
mu sync.RWMutex
除了地址数组之外,还有一个 counter ,以及并发保护的读写锁。
// max integer value supported on this architecture.
const maxInt = int(^uint(0) >> 1)
// next 从列表中获取下一个地址,考虑到当前的循环实现。除了尽力而为的线性迭代,对选择没有任何保证。
func (a *addressList) next() *string {
// 获取读锁
defer a.mu.RUnlock()
if len(a.addresses) == 0 {
return nil
// 如果 counter 达到 maxInt,就从头再来
if a.counter == maxInt {
a.counter = 0
// 用地址数量 对 counter 求余,去余数所对应的地址,然后counter递增
// 相当于一个最简单常见的 轮询 算法
index := a.counter % len(a.addresses)
addr := a.addresses[index]
return &addr.ip
为了避免多个请求同时去解析同一个 ID,因此设计了并发保护机制,对于单个ID,只容许一个请求执行解析,其他请求会等待这个解析的结果:
// ResolveID resolves name to address via mDNS.
func (m *Resolver) ResolveID(req nameresolution.ResolveRequest) (string, error) {
sub := NewSubscriber()
// add the sub to the pool of subs for this app id.
appIDSubs, exists := m.subs[req.ID]
if !exists {
// WARN: must set appIDSubs variable for use below.
appIDSubs = NewSubscriberPool(sub)
m.subs[req.ID] = appIDSubs
} else {
// only one subscriber per pool will perform the first browse for the
// requested app id. The rest will subscribe for an address or error.
var once *sync.Once
var published chan struct{}
ctx, cancel := context.WithTimeout(context.Background(), browseOneTimeout)
defer cancel()
appIDSubs.Once.Do(func() {
published = make(chan struct{})
m.browseOne(ctx, req.ID, published)
// once will only be set for the first browser.
once = new(sync.Once)
mdns name resolver 返回的是一个简单的 ip 地址+端口(v4或者v6),形如 “”。
2.4 - kubernetes
kubernetes 的实现超级简单,直接按照 Kubernetes services 的格式要求,评出一个 Kubernetes services 的 name 即可:
// ResolveID resolves name to address in Kubernetes.
func (k *resolver) ResolveID(req nameresolution.ResolveRequest) (string, error) {
// Dapr requires this formatting for Kubernetes services
return fmt.Sprintf("%s-dapr.%s.svc.%s:%d", req.ID, req.Namespace, k.clusterDomain, req.Port), nil
其中, req.ID 和 req.Namespace 对应到 Kubernetes 的 service name 和 namespace,注意这里的 Kubernetes service 是在 ID 后面加了 “-dapr” 后缀。Port 来自请求参数,简单拼接而已。
clusterDomain 的设置
clusterDomain 稍微复杂一点,默认值是 “cluster.local”,在构建 Resolver 时设置:
const (
DefaultClusterDomain = "cluster.local"
type resolver struct {
logger logger.Logger
clusterDomain string
// NewResolver creates Kubernetes name resolver.
func NewResolver(logger logger.Logger) nameresolution.Resolver {
return &resolver{
logger: logger,
clusterDomain: DefaultClusterDomain,
可以在配置中设置名为 “clusterDomain” 的 metadata 来覆盖默认值:
const (
ClusterDomainKey = "clusterDomain"
func (k *resolver) Init(metadata nameresolution.Metadata) error {
configInterface, err := config.Normalize(metadata.Configuration)
if err != nil {
return err
if config, ok := configInterface.(map[string]string); ok {
clusterDomain := config[ClusterDomainKey]
if clusterDomain != "" {
k.clusterDomain = clusterDomain
return nil
kubernetes name resolver 返回的是一个简单的 Kubernetes services 的 name,形如 “app1-dapr.default.svc.cluster.local:80”。而不是一般意义上的 IP 地址。
2.5 - dns
dns 的实现也是超级简单,类似 kubernetes 的实现,直接按照 DNS 的格式要求,评出一个 Kubernetes services 的 name 即可:
// ResolveID resolves name to address in orchestrator.
func (k *resolver) ResolveID(req nameresolution.ResolveRequest) (string, error) {
return fmt.Sprintf("%s-dapr.%s.svc:%d", req.ID, req.Namespace, req.Port), nil
DNS name resolver 返回的是一个简单的 Kubernetes services 的 name,形如 “app1-dapr.default.svc:80”。而不是一般意义上的 IP 地址。
2.6 - consul
func (r *resolver) Init(metadata nr.Metadata) error {
var err error
r.config, err = getConfig(metadata)
if err != nil {
return err
if err = r.client.InitClient(r.config.Client); err != nil {
return fmt.Errorf("failed to init consul client: %w", err)
// register service to consul
return nil
在 init 函数中,还可以根据配置的要求执行 consul 的服务注册功能:
// register service to consul
if r.config.Registration != nil {
if err := r.client.Agent().ServiceRegister(r.config.Registration); err != nil {
return fmt.Errorf("failed to register consul service: %w", err)
r.logger.Infof("service:%s registered on consul agent", r.config.Registration.Name)
} else if _, err := r.client.Agent().Self(); err != nil {
return fmt.Errorf("failed check on consul agent: %w", err)
consul 命名解析器的实现比较简单:
// ResolveID resolves name to address via consul.
func (r *resolver) ResolveID(req nr.ResolveRequest) (string, error) {
cfg := r.config
// 查询 consul 中对应服务的健康实例
// 只用到 req.ID,namespace 没有用到
services, _, err := r.client.Health().Service(req.ID, "", true, cfg.QueryOptions)
if err != nil {
return "", fmt.Errorf("failed to query healthy consul services: %w", err)
if len(services) == 0 {
return "", fmt.Errorf("no healthy services found with AppID:%s", req.ID)
// shuffle:洗牌,将传入的 services 按照随机方式对调位置
shuffle := func(services []*consul.ServiceEntry) []*consul.ServiceEntry {
for i := len(services) - 1; i > 0; i-- {
rndbig, _ := rand.Int(rand.Reader, big.NewInt(int64(i+1)))
j := rndbig.Int64()
services[i], services[j] = services[j], services[i]
return services
// 先洗牌,然后取结果中的第一个地址,相当于负载均衡中的随机算法
svc := shuffle(services)[0]
addr := ""
// 取地址和port信息
if port, ok := svc.Service.Meta[cfg.DaprPortMetaKey]; ok {
if svc.Service.Address != "" {
addr = fmt.Sprintf("%s:%s", svc.Service.Address, port)
} else if svc.Node.Address != "" {
addr = fmt.Sprintf("%s:%s", svc.Node.Address, port)
} else {
return "", fmt.Errorf("no healthy services found with AppID:%s", req.ID)
} else {
return "", fmt.Errorf("target service AppID:%s found but DAPR_PORT missing from meta", req.ID)
return addr, nil
consul name resolver 返回的是一个简单的ip/端口字符串,形如 “”。对于多个实例,内部实现了随机算法。