Dapr Runtime转发outbound请求
客户端的Dapr Runtime将outbound请求转发给远程服务器端的Dapr Runtime
Dapr runtime 之间相互通讯采用的是 gRPC 协议,定义有 Dapr gRPC internal API。比较特殊的是,采用随机空闲端口而不是默认端口。但也可以通过命令行参数 dapr-internal-grpc-port
指定。
title Daprd-Daprd Communication
hide footbox
skinparam style strictuml
participant daprd_client [
=daprd
----
client
]
participant daprd_server [
=daprd
----
server
]
-[#blue]> daprd_client : HTTP (localhost)
-[#blue]> daprd_client : gRPC (localhost)
|||
daprd_client -[#red]> daprd_server : gRPC (remote call)
note right: internal API @ ramdon free port\n/dapr.proto.internals.v1.ServiceInvocation/CallLocal
pkg/messaging/direct_messaging.go
中的 DirectMessaging 负责实现转发请求给远程 dapr runtime。
接口
DirectMessaging 接口定义,用来调用远程应用:
// DirectMessaging is the API interface for invoking a remote app.
type DirectMessaging interface {
Invoke(ctx context.Context, targetAppID string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error)
}
只有一个 invoke 方法。
实现流程
流程概况
invoke 方法的实现:
func (d *directMessaging) Invoke(ctx context.Context, targetAppID string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
app, err := d.getRemoteApp(targetAppID)
if app.id == d.appID && app.namespace == d.namespace {
return d.invokeLocal(ctx, req) // 如果调用的 appid 就是自己的 appid,这个场景好奇怪。忽略这里的代码先
}
return d.invokeWithRetry(ctx, retry.DefaultLinearRetryCount, retry.DefaultLinearBackoffInterval, app, d.invokeRemote, req)
}
invokeRemote 方法的代码简化如下:
func (d *directMessaging) invokeRemote(ctx context.Context, appID, namespace, appAddress string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
// 建立连接
conn, err := d.connectionCreatorFn(context.TODO(), appAddress, appID, namespace, false, false, false)
// 构建 gRPC stub 作为 client
clientV1 := internalv1pb.NewServiceInvocationClient(conn)
// 调用 gRPC 的 CallLocal 方法发出远程调用请求到另外一个 Dapr runtime
resp, err := clientV1.CallLocal(ctx, req.Proto(), opts...)
// 处理应答
return invokev1.InternalInvokeResponse(resp)
}
发出 gRPC 请求给远程 dapr runtime
CallLocal() 方法的实现在 service_invocation_grpc.pb.go
中,这是 protoc 成生的 gRPC 代码:
func (c *serviceInvocationClient) CallLocal(ctx context.Context, in *InternalInvokeRequest, opts ...grpc.CallOption) (*InternalInvokeResponse, error) {
out := new(InternalInvokeResponse)
err := c.cc.Invoke(ctx, "/dapr.proto.internals.v1.ServiceInvocation/CallLocal", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
可以看到这个 gRPC 请求调用的是 dapr.proto.internals.v1.ServiceInvocation 服务的 CallLocal 方法。
hide footbox
skinparam style strictuml
participant daprd_client [
=daprd
----
client
]
participant daprd_server [
=daprd
----
server
]
daprd_client -[#red]> daprd_server : gRPC (remote call)
note right: internal API @ ramdon free port\n/dapr.proto.internals.v1.ServiceInvocation/CallLocal
实现细节
获取远程地址
hide footbox
skinparam style strictuml
participant directMessaging
participant "Name resolver\n(consul/kubenetes/mdns)" as localNameReSolver
directMessaging -> localNameReSolver : ResolveID()
localNameReSolver -> localNameReSolver: loadBalance()
note right: kubernetes: dns name\ndns: dns name\nconsul: one address(random)\nmdsn: one address(round robbin)
localNameReSolver --> directMessaging
note right: return only one address in local cluster
hide footbox
skinparam style strictuml
participant directMessaging
participant "Local Name resolver\n(consul/kubenetes/mdns)" as localNameReSolver
participant "External Name resolver\n(synchronizer)" as externalNameReSolver
directMessaging -> localNameReSolver : ResolveID()
localNameReSolver --> directMessaging
note right: return service instance list in local cluster
directMessaging -[#red]> externalNameReSolver : ResolveID()
externalNameReSolver --> directMessaging
note right: return service instance list in external clusters
directMessaging -[#red]> directMessaging: combine the instance list
directMessaging -[#red]> directMessaging: filter by cluster strategy
note right: local-first\nexternal-first\nbroadcast\nlocal-only\nexternal-onluy
directMessaging -> directMessaging: loadBalance()