服务调用的主流程
- 1: 流程概述
- 2: 服务调用相关的Runtime初始化
- 3: 客户端sdk发出服务调用的outbound请求
- 4: Dapr Runtime接收服务调用的outbound请求
- 5: Dapr Runtime转发outbound请求
- 6: Dapr Runtime接收服务调用的inbound请求
- 7: Dapr Runtime转发inbound请求
- 8: 服务器端App接收inbound请求
1 - 流程概述
API 和端口
Dapr runtime 对外提供两个 API,分别是 Dapr HTTP API 和 Dapr gRPC API。另外两个 dapr runtime 之间的通讯 (Dapr internal API) 固定用 gRPC 协议。
两个 Dapr API 对外暴露的端口,默认是:
- 3500: HTTP 端口,可以通过命令行参数
dapr-http-port
设置 - 50001: gRPC 端口,可以通过命令行参数
dapr-grpc-port
设置
Dapr internal API 是内部端口,比较特殊,没有固定的默认值,而是取任意随机可用端口。也可以通过命令行参数 dapr-internal-grpc-port
设置。
为了向服务器端的应用发送请求,dapr 需要获知应用在哪个端口监听并处理请求,这个信息通过命令行参数 app-port
设置。Dapr 的示例中一般喜欢用 3000 端口。
调用流程
HTTP 方式
title Service Invoke via HTTP
hide footbox
skinparam style strictuml
box "App-1"
participant user_code_client [
=App-1
----
client
]
participant SDK_client [
=SDK
----
client
]
end box
participant daprd_client [
=daprd
----
client
]
participant daprd_server [
=daprd
----
server
]
box "App-2"
participant user_code_server [
=App-2
----
server
]
end box
user_code_client -> SDK_client : Invoke\nService()
note left: appId="app-2"\nmethodName="method-1"
SDK_client -[#blue]> daprd_client : HTTP (localhost)
note right: HTTP API @ 3500
|||
daprd_client -[#red]> daprd_server : gRPC (remote call)
note right: internal API @ ramdon free port
|||
daprd_server -[#blue]> user_code_server : http (localhost)
note right: HTTP endpoint "method-1" @ 3000
daprd_server <[#blue]-- user_code_server
daprd_client <[#red]-- daprd_server
SDK_client <[#blue]-- daprd_client
user_code_client <-- SDK_client
gRPC 方式
title Service Invoke via gRPC
hide footbox
skinparam style strictuml
box "App-1"
participant user_code_client [
=App-1
----
client
]
participant SDK_client [
=SDK
----
client
]
end box
participant daprd_client [
=daprd
----
client
]
participant daprd_server [
=daprd
----
server
]
box "App-2"
participant SDK_server [
=SDK
----
server
]
participant user_code_server [
=App-2
----
server
]
end box
user_code_server -> SDK_server: AddServiceInvocationHandler("method-1")
SDK_server -> SDK_server: save handler in invokeHandlers["method-1"]
SDK_server --> user_code_server
user_code_client -> SDK_client : Invoke\nService()
note left: appId="app-2"\nmethodName="method-1"
SDK_client -[#blue]> daprd_client : gRPC (localhost)
note right: gRPC API @ 50001\n/dapr.proto.runtime.v1.Dapr/InvokeService
|||
daprd_client -[#red]> daprd_server : gRPC (remote call)
note right: internal API @ random free port\n/dapr.proto.internals.v1.ServiceInvocation/CallLocal
|||
daprd_server -[#blue]> SDK_server : gRPC (localhost)
note right: 50001\n/dapr.proto.runtime.v1.AppCallback/OnInvoke
SDK_server -> SDK_server: get handler by invokeHandlers["method-1"]
SDK_server -> user_code_server : invoke handler of "method-1"
SDK_server <-- user_code_server
daprd_server <[#blue]-- SDK_server
daprd_client <[#red]-- daprd_server
SDK_client <[#blue]-- daprd_client
user_code_client <-- SDK_client
gRPC proxying 方式
title Service Invoke via gRPC proxying
hide footbox
skinparam style strictuml
box "App-1"
participant user_code_client [
=App-1
----
client
]
participant SDK_client [
=SDK
----
client
]
end box
participant daprd_client [
=daprd
----
client
]
participant daprd_server [
=daprd
----
server
]
box "App-2"
participant SDK_server [
=gRPC
----
server
]
participant user_code_server [
=App-2
----
server
]
end box
user_code_server -> SDK_server
SDK_server --> user_code_server
user_code_client -[#blue]> daprd_client : gRPC (localhost)
note right: gRPC\n/user.services.ServiceName/Method-1
|||
daprd_client -[#red]> daprd_server : gRPC proxy (remote call)
note right: gRPC\n/user.services.ServiceName/Method-1
|||
daprd_server -[#blue]> SDK_server : gRPC (localhost)
note right: gRPC\n/user.services.ServiceName/Method-1
SDK_server -> user_code_server :
SDK_server <-- user_code_server
daprd_server <[#blue]-- SDK_server
daprd_client <[#red]-- daprd_server
SDK_client <[#blue]-- daprd_client
user_code_client <-- SDK_client
2 - 服务调用相关的Runtime初始化
在 dapr runtime 启动进行初始化时,需要开启 API 端口并挂载相应的 handler 来接收并处理服务调用的 outbound 请求。另外为了接收来自其他 dapr runtime 的 inbound 请求,还要启动 dapr internal server。
Dapr HTTP API Server(outbound)
在 dapr runtime 中启动 HTTP server
dapr runtime 的 HTTP server 用的是 fasthttp。
在 dapr runtime 启动时的初始化过程中,会启动 HTTP server, 代码在 pkg/runtime/runtime.go
中:
func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
......
// Start HTTP Server
err = a.startHTTPServer(a.runtimeConfig.HTTPPort, a.runtimeConfig.PublicPort, a.runtimeConfig.ProfilePort, a.runtimeConfig.AllowedOrigins, pipeline)
if err != nil {
log.Fatalf("failed to start HTTP server: %s", err)
}
......
}
func (a *DaprRuntime) startHTTPServer(......) error {
a.daprHTTPAPI = http.NewAPI(......)
server := http.NewServer(a.daprHTTPAPI, ......)
if err := server.StartNonBlocking(); err != nil { // StartNonBlocking 启动 fasthttp server
return err
}
}
StartNonBlocking() 的实现代码在 pkg/http/server.go
中:
// StartNonBlocking starts a new server in a goroutine.
func (s *server) StartNonBlocking() error {
......
for _, apiListenAddress := range s.config.APIListenAddresses {
l, err := net.Listen("tcp", fmt.Sprintf("%s:%v", apiListenAddress, s.config.Port))
listeners = append(listeners, l)
}
for _, listener := range listeners {
// customServer is created in a loop because each instance
// has a handle on the underlying listener.
customServer := &fasthttp.Server{
Handler: handler,
MaxRequestBodySize: s.config.MaxRequestBodySize * 1024 * 1024,
ReadBufferSize: s.config.ReadBufferSize * 1024,
StreamRequestBody: s.config.StreamRequestBody,
}
s.servers = append(s.servers, customServer)
go func(l net.Listener) {
if err := customServer.Serve(l); err != nil {
log.Fatal(err)
}
}(listener)
}
}
挂载 DirectMessaging 的 HTTP 端点
在 HTTP API 的初始化过程中,会在 fast http server 上挂载 DirectMessaging 的 HTTP 端点,代码在 pkg/http/api.go
中:
func NewAPI(
appID string,
appChannel channel.AppChannel,
directMessaging messaging.DirectMessaging,
......
shutdown func()) API {
api := &api{
appChannel: appChannel,
directMessaging: directMessaging,
......
}
// 附加 DirectMessaging 的 HTTP 端点
api.endpoints = append(api.endpoints, api.constructDirectMessagingEndpoints()...)
}
DirectMessaging 的 HTTP 端点的具体信息在 constructDirectMessagingEndpoints() 方法中:
func (a *api) constructDirectMessagingEndpoints() []Endpoint {
return []Endpoint{
{
Methods: []string{router.MethodWild},
Route: "invoke/{id}/method/{method:*}",
Alias: "{method:*}",
Version: apiVersionV1,
KeepParamUnescape: true,
Handler: a.onDirectMessage,
},
}
}
注意这里的 Route 路径 “invoke/{id}/method/{method:*}", dapr sdk 就是就通过这样的 url 来发起 HTTP 请求。
title Dapr HTTP API
hide footbox
skinparam style strictuml
participant daprd_client [
=daprd
----
client
]
-[#blue]> daprd_client : HTTP (localhost)
note right: HTTP API @ 3500\n/v1.0/invoke/{id}/method/{method}
|||
<[#blue]-- daprd_client
Dapr gRPC API Server(outbound)
启动 gRPC 服务器
在 dapr runtime 启动时的初始化过程中,会启动 gRPC server, 代码在 pkg/runtime/runtime.go
中:
func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
// Create and start internal and external gRPC servers
grpcAPI := a.getGRPCAPI()
err = a.startGRPCAPIServer(grpcAPI, a.runtimeConfig.APIGRPCPort)
......
}
func (a *DaprRuntime) startGRPCAPIServer(api grpc.API, port int) error {
serverConf := a.getNewServerConfig(a.runtimeConfig.APIListenAddresses, port)
server := grpc.NewAPIServer(api, serverConf, a.globalConfig.Spec.TracingSpec, a.globalConfig.Spec.MetricSpec, a.globalConfig.Spec.APISpec, a.proxy)
if err := server.StartNonBlocking(); err != nil {
return err
}
......
}
// NewAPIServer returns a new user facing gRPC API server.
func NewAPIServer(api API, config ServerConfig, ......) Server {
return &server{
api: api,
config: config,
kind: apiServer, // const apiServer = "apiServer"
......
}
}
注册 Dapr API
为了让 dapr runtime 的 gRPC 服务器能挂载 Dapr API,需要进行注册上去。
注册的代码实现在 pkg/grpc/server.go
中, StartNonBlocking() 方法在启动 grpc 服务器时,会进行服务注册:
func (s *server) StartNonBlocking() error {
if s.kind == internalServer {
internalv1pb.RegisterServiceInvocationServer(server, s.api)
} else if s.kind == apiServer {
runtimev1pb.RegisterDaprServer(server, s.api) // 注意:s.api (即 gRPC api 实现) 被传递进去
}
......
}
而 RegisterDaprServer() 方法的实现代码在 pkg/proto/runtime/v1/dapr_grpc.pb.go
:
func RegisterDaprServer(s grpc.ServiceRegistrar, srv DaprServer) {
s.RegisterService(&Dapr_ServiceDesc, srv) // srv 即 gRPC api 实现
}
Dapr_ServiceDesc 定义
在文件 pkg/proto/runtime/v1/dapr_grpc.pb.go
中有 Dapr Service 的 grpc 服务定义,这是 protoc 生成的 gRPC 代码。
Dapr_ServiceDesc 中有 Dapr Service 各个方法的定义,和服务调用相关的是 InvokeService
方法:
var Dapr_ServiceDesc = grpc.ServiceDesc{
ServiceName: "dapr.proto.runtime.v1.Dapr",
HandlerType: (*DaprServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "InvokeService", # 注册方法名
Handler: _Dapr_InvokeService_Handler, # 关联实现的 Handler
},
......
},
},
Metadata: "dapr/proto/runtime/v1/dapr.proto",
}
这一段是告诉 gRPC server: 如果收到访问 dapr.proto.runtime.v1.Dapr
服务的 InvokeService
方法的 gRPC 请求,请把请求转给 _Dapr_InvokeService_Handler
处理。
title Dapr gRPC API
hide footbox
skinparam style strictuml
participant daprd_client [
=daprd
----
client
]
-[#blue]> daprd_client : gRPC (localhost)
note right: gRPC API @ 50001\n/dapr.proto.runtime.v1.Dapr/InvokeService
|||
<[#blue]-- daprd_client
而 InvokeService
方法相关联的 handler 方法 _Dapr_InvokeService_Handler
的实现代码是:
func _Dapr_InvokeService_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(InvokeServiceRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DaprServer).InvokeService(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/dapr.proto.runtime.v1.Dapr/InvokeService",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DaprServer).InvokeService(ctx, req.(*InvokeServiceRequest)) // 这里调用的 srv 即 gRPC api 实现
}
return interceptor(ctx, in, info, handler)
}
最后调用到了 DaprServer 接口实现的 InvokeService 方法,也就是 gPRC API 实现。
Dapr Internal API Server(inbound)
启动 gRPC 服务器
在 dapr runtime 启动时的初始化过程中,会启动 gRPC internal server, 代码在 pkg/runtime/runtime.go
中:
func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
err = a.startGRPCInternalServer(grpcAPI, a.runtimeConfig.InternalGRPCPort)
if err != nil {
log.Fatalf("failed to start internal gRPC server: %s", err)
}
log.Infof("internal gRPC server is running on port %v", a.runtimeConfig.InternalGRPCPort)
......
}
func (a *DaprRuntime) startGRPCInternalServer(api grpc.API, port int) error {
serverConf := a.getNewServerConfig([]string{""}, port)
server := grpc.NewInternalServer(api, serverConf, a.globalConfig.Spec.TracingSpec, a.globalConfig.Spec.MetricSpec, a.authenticator, a.proxy)
if err := server.StartNonBlocking(); err != nil {
return err
}
a.apiClosers = append(a.apiClosers, server)
return nil
}
特殊处理:端口
grpc internal server 的端口比较特殊,可以通过命令行参数 “–dapr-internal-grpc-port” 指定,而如果没有指定,是取一个随机的可用端口,而不是取某个固定值。这一点和 dapr HTTP api server 以及 dapr gRPC api server 不同。
具体代码实现在文件 pkg/runtime/cli.go
中:
func FromFlags() (*DaprRuntime, error) {
var daprInternalGRPC int
if *daprInternalGRPCPort != "" {
daprInternalGRPC, err = strconv.Atoi(*daprInternalGRPCPort)
if err != nil {
return nil, errors.Wrap(err, "error parsing dapr-internal-grpc-port")
}
} else {
daprInternalGRPC, err = grpc.GetFreePort()
if err != nil {
return nil, errors.Wrap(err, "failed to get free port for internal grpc server")
}
}
......
}
特殊处理:重用 gRPC API handler
Dapr gRPC internal API 实现时有点特殊:
- 启动了自己的 gRPC server,也有自己的端口。
- 但是注册的负责处理请求的 handler 却重用了 Dapr gRPC internal API
darp runtime 的初始化代码中,grpcAPI 对象是 GRPC API Server 和 GRPC Internal Server 共用的:
grpcAPI := a.getGRPCAPI()
err = a.startGRPCAPIServer(grpcAPI, a.runtimeConfig.APIGRPCPort)
err = a.startGRPCInternalServer(grpcAPI, a.runtimeConfig.InternalGRPCPort)
从设计的角度看,这样做不好:混淆了对 outbound 请求和 inbound 请求的处理,影响代码可读性。
注册 Dapr API
为了让 dapr runtime 的 gRPC 服务器能挂载 Dapr internal API,需要进行注册。
注册的代码实现在 pkg/grpc/server.go
中, StartNonBlocking() 方法在启动 grpc 服务器时,会进行服务注册:
func (s *server) StartNonBlocking() error {
if s.kind == internalServer {
internalv1pb.RegisterServiceInvocationServer(server, s.api) // 注意:s.api (即 gRPC api 实现) 被传递进去
} else if s.kind == apiServer {
runtimev1pb.RegisterDaprServer(server, s.api)
}
......
}
而 RegisterServiceInvocationServer() 方法的实现代码在 pkg/proto/internals/v1/service_invocation_grpc.pb.go
:
func RegisterServiceInvocationServer(s grpc.ServiceRegistrar, srv ServiceInvocationServer) {
s.RegisterService(&ServiceInvocation_ServiceDesc, srv) // srv 即 gRPC api 实现
}
ServiceInvocation_ServiceDesc 定义
在文件 pkg/proto/internals/v1/service_invocation_grpc.pb.go
中有 internal Service 的 grpc 服务定义,这是 protoc 生成的 gRPC 代码。
ServiceInvocation_ServiceDesc 中有两个方法的定义,和服务调用相关的是 CallLocal
方法:
var ServiceInvocation_ServiceDesc = grpc.ServiceDesc{
ServiceName: "dapr.proto.internals.v1.ServiceInvocation",
HandlerType: (*ServiceInvocationServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "CallActor",
Handler: _ServiceInvocation_CallActor_Handler,
},
{
MethodName: "CallLocal",
Handler: _ServiceInvocation_CallLocal_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "dapr/proto/internals/v1/service_invocation.proto",
}
这一段是告诉 gRPC server: 如果收到访问 dapr.proto.internals.v1.ServiceInvocation
服务的 CallLocal
方法的 gRPC 请求,请把请求转给 _ServiceInvocation_CallLocal_Handler
处理。
title Dapr gRPC internal API
hide footbox
skinparam style strictuml
participant daprd_client [
=daprd
----
client
]
-[#red]> daprd_client : gRPC (remote call)
note right: gRPC API @ ramdon port\n/dapr.proto.internals.v1.ServiceInvocation/CallLocal
|||
<[#red]-- daprd_client
而 CallLocal
方法相关联的 handler 方法 _ServiceInvocation_CallLocal_Handler
的实现代码是:
func _ServiceInvocation_CallLocal_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(InternalInvokeRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ServiceInvocationServer).CallLocal(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/dapr.proto.internals.v1.ServiceInvocation/CallLocal",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
// 这里调用的 srv 即 gRPC api 实现
return srv.(ServiceInvocationServer).CallLocal(ctx, req.(*InternalInvokeRequest))
}
return interceptor(ctx, in, info, handler)
}
最后调用到了 ServiceInvocationServer 接口实现的 CallLocal 方法,也就是 gPRC API 实现。
3 - 客户端sdk发出服务调用的outbound请求
Java SDK 实现
在业务代码中使用 service invoke 功能的示例可参考文件 java-sdk/examples/src/main/java/io/dapr/examples/invoke/http/InvokeClient.java
,代码示意如下:
DaprClient client = (new DaprClientBuilder()).build();
byte[] response = client.invokeMethod(SERVICE_APP_ID, "say", message, HttpExtension.POST, null,
byte[].class).block();
java sdk 中 service invoke 默认使用 HTTP ,而其他方法默认使用 gRPC,在 DaprClientProxy 类中初始化了两个 daprclient:
- client 字段: 类型为 DaprClientGrpc,连接到 127.0.0.1:5001
- methodInvocationOverrideClient 字段:类型为 DaprClientHttp,连接到 127.0.0.1:3500
service invoke 方法默认走 HTTP ,使用的是 DaprClientHttp 类型 (文件为 src/main/java/io/dapr/client/DaprClientHttp.java):
@Override
public <T> Mono<T> invokeMethod(String appId, String methodName,......) {
return methodInvocationOverrideClient.invokeMethod(appId, methodName, request, httpExtension, metadata, clazz);
}
public <T> Mono<T> invokeMethod(InvokeMethodRequest invokeMethodRequest, TypeRef<T> type) {
try {
final String appId = invokeMethodRequest.getAppId();
final String method = invokeMethodRequest.getMethod();
......
Mono<DaprHttp.Response> response = Mono.subscriberContext().flatMap(
context -> this.client.invokeApi(httpMethod, pathSegments,
httpExtension.getQueryParams(), serializedRequestBody, headers, context)
);
}
在这里根据请求条件设置 HTTP 请求的各种参数,debug 时可以看到如下图的数据v:
最后发出 HTTP 请求的代码在 src/main/java/io/dapr/client/DaprHttp.java
中的 doInvokeApi() 方法:
private CompletableFuture<Response> doInvokeApi(String method,
String[] pathSegments,
Map<String, List<String>> urlParameters,
byte[] content, Map<String, String> headers,
Context context) {
......
Request.Builder requestBuilder = new Request.Builder()
.url(urlBuilder.build())
.addHeader(HEADER_DAPR_REQUEST_ID, requestId);
CompletableFuture<Response> future = new CompletableFuture<>();
this.httpClient.newCall(request).enqueue(new ResponseFutureCallback(future));
return future;
}
发出去给 dapr runtime 的 HTTP 请求如下图所示:
调用的是 dapr runtime 的 HTTP API。
注意: 这里调用的 gRPC 服务是 dapr.proto.runtime.v1.Dapr
, 方法是 InvokeService
,和 dapr runtime 中 gRPC API 对应。
title Service Invoke via HTTP
hide footbox
skinparam style strictuml
box "App-1"
participant user_code_client [
=App-1
----
client
]
participant SDK_client [
=SDK
----
client
]
end box
participant daprd_client [
=daprd
----
client
]
user_code_client -> SDK_client : invokeMethod()
note left: appId="app-2"\nmethodName="method-1"
SDK_client -[#blue]> daprd_client : HTTP (localhost)
note right: HTTP API @ 3500\n/v1.0/invoke/app-2/method/method-1
|||
SDK_client <[#blue]-- daprd_client
user_code_client <-- SDK_client
Go sdk实现
在 go 业务代码中使用 service invoke 功能的示例可参考 https://github.com/dapr/go-sdk/blob/main/examples/service/client/main.go,代码示意如下:
client, err := dapr.NewClient()
content := &dapr.DataContent{
ContentType: "text/plain",
Data: []byte("hellow"),
}
// invoke a method named "app-2" on another dapr enabled service named "method-1"
resp, err := client.InvokeMethodWithContent(ctx, "app-2", "method-1", "post", content)
Go sdk 中定义了 Client 接口,文件为 client/client.go
:
// Client is the interface for Dapr client implementation.
type Client interface {
// InvokeMethod invokes service without raw data
InvokeMethod(ctx context.Context, appID, methodName, verb string) (out []byte, err error)
// InvokeMethodWithContent invokes service with content
InvokeMethodWithContent(ctx context.Context, appID, methodName, verb string, content *DataContent) (out []byte, err error)
// InvokeMethodWithCustomContent invokes app with custom content (struct + content type).
InvokeMethodWithCustomContent(ctx context.Context, appID, methodName, verb string, contentType string, content interface{}) (out []byte, err error)
......
}
这三个方法的实现在 client/invoke.go
中,都只是实现了对 InvokeRequest 对象的组装,核心的代码实现在 invokeServiceWithRequest 方法中::
func (c *GRPCClient) invokeServiceWithRequest(ctx context.Context, req *pb.InvokeServiceRequest) (out []byte, err error) {
resp, err := c.protoClient.InvokeService(c.withAuthToken(ctx), req)
......
}
InvokeService() 是 protoc 生成的 grpc 代码,在 dapr/proto/runtime/v1/dapr_grpc.pb.go
中,实现如下:
func (c *daprClient) InvokeService(ctx context.Context, in *InvokeServiceRequest, opts ...grpc.CallOption) (*v1.InvokeResponse, error) {
out := new(v1.InvokeResponse)
err := c.cc.Invoke(ctx, "/dapr.proto.runtime.v1.Dapr/InvokeService", in, out, opts...)
......
}
注意: 这里调用的 gRPC 服务是 dapr.proto.runtime.v1.Dapr
, 方法是 InvokeService
,和 dapr runtime 中 gRPC API 对应。
title Service Invoke via gRPC
hide footbox
skinparam style strictuml
box "App-1"
participant user_code_client [
=App-1
----
client
]
participant SDK_client [
=SDK
----
client
]
end box
participant daprd_client [
=daprd
----
client
]
user_code_client -> SDK_client : InvokeMethodWithContent()
note left: appId="app-2"\nmethodName="method-1"
SDK_client -[#blue]> daprd_client : gRPC (localhost)
note right: gRPC API @ 50001\n/dapr.proto.runtime.v1.Dapr/InvokeService
|||
SDK_client <[#blue]-- daprd_client
user_code_client <-- SDK_client
其他SDK
TODO
分析总结
所有的语言 SDK 都会实现了从客户端 SDK API 调用到发出远程调用请求给 dapr runtime的功能。具体实现上会有一些差别:
-
go sdk
全部请求走 gPRC API。
-
Java sdk
- service invoke 默认走 HTTP API,其他请求默认走 gRPC API。
-
其他SDK
- 待更新
4 - Dapr Runtime接收服务调用的outbound请求
Dapr runtime 有两种方式接收来自客户端发起的服务调用的 outbound 请求:gRPC API 和 HTTP API。在接收到请求之后,dapr runtime 会将 outbound 请求转发给目标服务的 dapr runtime。
title Daprd Receive inbound Request
hide footbox
skinparam style strictuml
participant daprd_client [
=daprd
----
client
]
participant daprd_server [
=daprd
----
server
]
-[#blue]> daprd_client : HTTP (localhost)
note right: HTTP API @ 3500 \n/v1.0/invoke/app-2/method/method-1
-[#blue]> daprd_client : gRPC (localhost)
note right: GRPC API @ 50001\n/dapr.proto.runtime.v1.Dapr/InvokeService
|||
daprd_client -> daprd_client: name resolution
|||
daprd_client -[#red]> daprd_server : gRPC (remote call)
HTTP API
Runtime 初始化时,在注册 HTTP 服务时绑定了 handler 实现和 URL 路由:
func (a *api) constructDirectMessagingEndpoints() []Endpoint {
return []Endpoint{
{
Methods: []string{router.MethodWild},
Route: "invoke/{id}/method/{method:*}",
Alias: "{method:*}",
Version: apiVersionV1,
KeepParamUnescape: true,
Handler: a.onDirectMessage,
},
}
}
当 service invoke 的 HTTP 请求进来后,就会被 fasthttp 路由到 Handler 即 HTTP API 实现的 onDirectMessage() 方法中进行处理。
onDirectMessage 的实现代码在文件 pkg/http/api.go
, 示意如下:
func (a *api) onDirectMessage(reqCtx *fasthttp.RequestCtx) {
......
req := invokev1.NewInvokeMethodRequest(...)
resp, err := a.directMessaging.Invoke(reqCtx, targetID, req)
......
}
备注: HTTP API 的这个 onDirectMessage() 方法取名不对,应该效仿 gRPC API,取名为 InvokeService(). 理由是:这是暴露给外部调用的方法,取名应该表现出它对外暴露的功能,即InvokeService。而不应该暴露内部的实现是调用 directMessaging。
HTTP API 的实现也简单,同样,除了基本的请求/应答参数处理之外,就是将转发请求的事情交给了 directMessaging。
gRPC API
Runtime 初始化时,在注册 gRPC 服务时绑定了 gPRC API 实现和 InvokeService gRPC 方法。
当 service invoke 的 gRPC 请求进来后,就会进入 pkc/grpc/api.go
中的 InvokeService 方法:
func (a *api) InvokeService(ctx context.Context, in *runtimev1pb.InvokeServiceRequest) (*commonv1pb.InvokeResponse, error) {
......
resp, err := a.directMessaging.Invoke(ctx, in.Id, req)
......
return resp.Message(), respError
}
gRPC API 的实现特别简单,除了基本的请求/应答参数处理之外,就是将转发请求的事情交给了 directMessaging。
Name Resolution
TBD
5 - Dapr Runtime转发outbound请求
Dapr runtime 之间相互通讯采用的是 gRPC 协议,定义有 Dapr gRPC internal API。比较特殊的是,采用随机空闲端口而不是默认端口。但也可以通过命令行参数 dapr-internal-grpc-port
指定。
title Daprd-Daprd Communication
hide footbox
skinparam style strictuml
participant daprd_client [
=daprd
----
client
]
participant daprd_server [
=daprd
----
server
]
-[#blue]> daprd_client : HTTP (localhost)
-[#blue]> daprd_client : gRPC (localhost)
|||
daprd_client -[#red]> daprd_server : gRPC (remote call)
note right: internal API @ ramdon free port\n/dapr.proto.internals.v1.ServiceInvocation/CallLocal
pkg/messaging/direct_messaging.go
中的 DirectMessaging 负责实现转发请求给远程 dapr runtime。
接口
DirectMessaging 接口定义,用来调用远程应用:
// DirectMessaging is the API interface for invoking a remote app.
type DirectMessaging interface {
Invoke(ctx context.Context, targetAppID string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error)
}
只有一个 invoke 方法。
实现流程
流程概况
invoke 方法的实现:
func (d *directMessaging) Invoke(ctx context.Context, targetAppID string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
app, err := d.getRemoteApp(targetAppID)
if app.id == d.appID && app.namespace == d.namespace {
return d.invokeLocal(ctx, req) // 如果调用的 appid 就是自己的 appid,这个场景好奇怪。忽略这里的代码先
}
return d.invokeWithRetry(ctx, retry.DefaultLinearRetryCount, retry.DefaultLinearBackoffInterval, app, d.invokeRemote, req)
}
invokeRemote 方法的代码简化如下:
func (d *directMessaging) invokeRemote(ctx context.Context, appID, namespace, appAddress string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
// 建立连接
conn, err := d.connectionCreatorFn(context.TODO(), appAddress, appID, namespace, false, false, false)
// 构建 gRPC stub 作为 client
clientV1 := internalv1pb.NewServiceInvocationClient(conn)
// 调用 gRPC 的 CallLocal 方法发出远程调用请求到另外一个 Dapr runtime
resp, err := clientV1.CallLocal(ctx, req.Proto(), opts...)
// 处理应答
return invokev1.InternalInvokeResponse(resp)
}
发出 gRPC 请求给远程 dapr runtime
CallLocal() 方法的实现在 service_invocation_grpc.pb.go
中,这是 protoc 成生的 gRPC 代码:
func (c *serviceInvocationClient) CallLocal(ctx context.Context, in *InternalInvokeRequest, opts ...grpc.CallOption) (*InternalInvokeResponse, error) {
out := new(InternalInvokeResponse)
err := c.cc.Invoke(ctx, "/dapr.proto.internals.v1.ServiceInvocation/CallLocal", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
可以看到这个 gRPC 请求调用的是 dapr.proto.internals.v1.ServiceInvocation 服务的 CallLocal 方法。
hide footbox
skinparam style strictuml
participant daprd_client [
=daprd
----
client
]
participant daprd_server [
=daprd
----
server
]
daprd_client -[#red]> daprd_server : gRPC (remote call)
note right: internal API @ ramdon free port\n/dapr.proto.internals.v1.ServiceInvocation/CallLocal
实现细节
获取远程地址
hide footbox
skinparam style strictuml
participant directMessaging
participant "Name resolver\n(consul/kubenetes/mdns)" as localNameReSolver
directMessaging -> localNameReSolver : ResolveID()
localNameReSolver -> localNameReSolver: loadBalance()
note right: kubernetes: dns name\ndns: dns name\nconsul: one address(random)\nmdsn: one address(round robbin)
localNameReSolver --> directMessaging
note right: return only one address in local cluster
hide footbox
skinparam style strictuml
participant directMessaging
participant "Local Name resolver\n(consul/kubenetes/mdns)" as localNameReSolver
participant "External Name resolver\n(synchronizer)" as externalNameReSolver
directMessaging -> localNameReSolver : ResolveID()
localNameReSolver --> directMessaging
note right: return service instance list in local cluster
directMessaging -[#red]> externalNameReSolver : ResolveID()
externalNameReSolver --> directMessaging
note right: return service instance list in external clusters
directMessaging -[#red]> directMessaging: combine the instance list
directMessaging -[#red]> directMessaging: filter by cluster strategy
note right: local-first\nexternal-first\nbroadcast\nlocal-only\nexternal-onluy
directMessaging -> directMessaging: loadBalance()
6 - Dapr Runtime接收服务调用的inbound请求
Dapr runtime 之间相互通讯走的是 gRPC internal API,这个 API 也只支持 gRPC 协议。
hide footbox
skinparam style strictuml
participant daprd_client [
=daprd
----
client
]
participant daprd_server [
=daprd
----
server
]
daprd_client -[#red]> daprd_server : gRPC (remote call)
note right: internal API @ ramdon free port\n/dapr.proto.internals.v1.ServiceInvocation/CallLocal
daprd_server -> daprd_server : interceptor
daprd_server -[#blue]> : appChannel.InvokeMethod()
接收请求
Runtime 初始化时,在注册 gRPC 服务时绑定了 gPRC Internal API 实现和 CallLocal gRPC 方法。对于访问 dapr.proto.internals.v1.ServiceInvocation
服务的 CallLocal
方法的 gRPC 请求,会将请求转给 _ServiceInvocation_CallLocal_Handler
处理:
func _ServiceInvocation_CallLocal_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
......
if interceptor == nil {
return srv.(ServiceInvocationServer).CallLocal(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/dapr.proto.internals.v1.ServiceInvocation/CallLocal",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
// 这里调用的 srv 即 gRPC api 实现
return srv.(ServiceInvocationServer).CallLocal(ctx, req.(*InternalInvokeRequest))
}
return interceptor(ctx, in, info, handler)
}
最后进入 CallLocal() 方法进行处理。
备注:初始化的细节,请见前面章节 “Runtime初始化”
期间会有一个 interceptor 的处理流程,细节后面展开。
转发请求
当 internal invoke 的 gRPC 请求进来后,就会进入 pkc/grpc/api.go
中的 CallLocal 方法:
func (a *api) CallLocal(ctx context.Context, in *internalv1pb.InternalInvokeRequest) (*internalv1pb.InternalInvokeResponse, error) {
// 1. 构造请求
req, err := invokev1.InternalInvokeRequest(in)
if a.accessControlList != nil {
......
}
// 2. 通过 appChannel 向应用发出请求
resp, err := a.appChannel.InvokeMethod(ctx, req)
// 3. 处理应答
return resp.Proto(), err
}
处理方式很清晰,基本上就是将请求通过 app channel 转发。Runtime 本身并没有什么额外的处理逻辑。InternalInvokeRequest() 只是简单处理一下参数:
// InternalInvokeRequest creates InvokeMethodRequest object from InternalInvokeRequest pb object.
func InternalInvokeRequest(pb *internalv1pb.InternalInvokeRequest) (*InvokeMethodRequest, error) {
req := &InvokeMethodRequest{r: pb}
if pb.Message == nil {
return nil, errors.New("Message field is nil")
}
return req, nil
}
访问控制
期间会有一个 access control (访问控制)的逻辑:
if a.accessControlList != nil {
// An access control policy has been specified for the app. Apply the policies.
operation := req.Message().Method
var httpVerb commonv1pb.HTTPExtension_Verb
// Get the http verb in case the application protocol is http
if a.appProtocol == config.HTTPProtocol && req.Metadata() != nil && len(req.Metadata()) > 0 {
httpExt := req.Message().GetHttpExtension()
if httpExt != nil {
httpVerb = httpExt.GetVerb()
}
}
callAllowed, errMsg := acl.ApplyAccessControlPolicies(ctx, operation, httpVerb, a.appProtocol, a.accessControlList)
if !callAllowed {
return nil, status.Errorf(codes.PermissionDenied, errMsg)
}
}
细节后面展开。
7 - Dapr Runtime转发inbound请求
协议和端口的配置
Dapr runtime 将 inbound 请求转发给服务器端应用:
title Daprd-Daprd Communication
hide footbox
skinparam style strictuml
participant daprd_client [
=daprd
----
client
]
participant daprd_server [
=daprd
----
server
]
participant user_code_server [
=App-2
----
server
]
daprd_client -[#red]> daprd_server : Dapr gRPC internal API (remote call)
daprd_server -[#blue]> user_code_server : Dapr HTTP channel API (localhost)
note right: HTTP endpoint @ 3000\nVERB http://localhost:3000/method?query1=value1
daprd_server -[#blue]> user_code_server : Dapr gRPC channel API (localhost)
note right: gRPC endpoint @ 3000\n/dapr.proto.runtime.v1.AppCallback/OnInvoke
- app channel 的通讯协议可以是 HTTP 或者 gRPC 协议,可以通过命令行参数
app-port
指定,默认是 HTTP - 应用接收请求的端口可以通过命令行参数
app-protocol
指定,没有默认值。 - 为了控制对应用造成的压力,还引入了最大并发度的概念,可以通过命令行参数
app-max-concurrency
指定。
请求发送的流程
前面分析过,当 internal invoke 的 gRPC 请求进来后,就会进入 pkc/grpc/api.go
中的 CallLocal 方法:
func (a *api) CallLocal(ctx context.Context, in *internalv1pb.InternalInvokeRequest) (*internalv1pb.InternalInvokeResponse, error) {
......
resp, err := a.appChannel.InvokeMethod(ctx, req)
......
}
然后通过 appChannel 发送请求。
app channel 的建立
app channel 的建立是在 runtime 初始化时,在 pkg/runtime/runtime.go
的 initRuntime() 方法中:
func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
......
a.blockUntilAppIsReady()
err = a.createAppChannel()
a.daprHTTPAPI.SetAppChannel(a.appChannel)
grpcAPI.SetAppChannel(a.appChannel)
......
}
createAppChannel() 的实现,目前只支持 HTTP 和 gRPC:
func (a *DaprRuntime) createAppChannel() error {
// 为了建立 app channel,必须配置有 app port
if a.runtimeConfig.ApplicationPort > 0 {
var channelCreatorFn func(port, maxConcurrency int, spec config.TracingSpec, sslEnabled bool, maxRequestBodySize int, readBufferSize int) (channel.AppChannel, error)
switch a.runtimeConfig.ApplicationProtocol {
case GRPCProtocol:
channelCreatorFn = a.grpc.CreateLocalChannel
case HTTPProtocol:
channelCreatorFn = http_channel.CreateLocalChannel
default:
// 只支持 HTTP 和 gRPC
return errors.Errorf("cannot create app channel for protocol %s", string(a.runtimeConfig.ApplicationProtocol))
}
ch, err := channelCreatorFn(a.runtimeConfig.ApplicationPort, a.runtimeConfig.MaxConcurrency, a.globalConfig.Spec.TracingSpec, a.runtimeConfig.AppSSL, a.runtimeConfig.MaxRequestBodySize, a.runtimeConfig.ReadBufferSize)
a.appChannel = ch
} else {
log.Warn("app channel is not initialized. did you make sure to configure an app-port?")
}
return nil
}
app channel 的配置参数
和 app channel 密切相关的三个配置项,可以从命令行参数中获取:
func FromFlags() (*DaprRuntime, error) {
......
appPort := flag.String("app-port", "", "The port the application is listening on")
appProtocol := flag.String("app-protocol", string(HTTPProtocol), "Protocol for the application: grpc or http")
appMaxConcurrency := flag.Int("app-max-concurrency", -1, "Controls the concurrency level when forwarding requests to user code")
TracingSpec / AppSSL / MaxRequestBodySize / ReadBufferSize 后面细说,先不展开。
HTTP 通道的实现
HTTP Channel 的实现在文件 pkg/channel/http/http_channel.go
中,其 InvokeMethod()方法:
func (h *Channel) InvokeMethod(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
......
switch req.APIVersion() {
case internalv1pb.APIVersion_V1:
rsp, err = h.invokeMethodV1(ctx, req)
......
return rsp, err
}
暂时只有 invokeMethodV1 版本:
func (h *Channel) invokeMethodV1(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
// 1. 构建HTTP请求
channelReq := h.constructRequest(ctx, req)
// 2. 发送请求到应用
err := h.client.DoTimeout(channelReq, resp, channel.DefaultChannelRequestTimeout)
// 3. 处理返回的应答
rsp := h.parseChannelResponse(req, resp, err)
return rsp, nil
}
这是将收到的请求内容,转成HTTP协议的标准格式,然后通过 fasthttp 发给用户代码。其中转为标准http请求的代码在方法 constructRequest() 中:
func (h *Channel) constructRequest(ctx context.Context, req *invokev1.InvokeMethodRequest) *fasthttp.Request {
var channelReq = fasthttp.AcquireRequest()
// Construct app channel URI: VERB http://localhost:3000/method?query1=value1
uri := fmt.Sprintf("%s/%s", h.baseAddress, req.Message().GetMethod())
channelReq.SetRequestURI(uri)
channelReq.URI().SetQueryString(req.EncodeHTTPQueryString())
channelReq.Header.SetMethod(req.Message().HttpExtension.Verb.String())
// Recover headers
invokev1.InternalMetadataToHTTPHeader(ctx, req.Metadata(), channelReq.Header.Set)
......
}
这样在服务器端的用户代码中,就可以用不引入 dapr sdk,只需要提供标准 http endpoint 即可。
title Daprd-Daprd Communication
hide footbox
skinparam style strictuml
participant daprd_server [
=daprd
----
server
]
participant user_code_server [
=App-2
----
server
]
daprd_server -[#blue]> user_code_server : HTTP (localhost)
note right: HTTP endpoint @ 3000\nVERB http://localhost:3000/method?query1=value1
gRPC 通道的实现
pkg/grpc/grpc.go
中的 CreateLocalChannel() 方法:
// CreateLocalChannel creates a new gRPC AppChannel.
func (g *Manager) CreateLocalChannel(port, maxConcurrency int, spec config.TracingSpec, sslEnabled bool, maxRequestBodySize int, readBufferSize int) (channel.AppChannel, error) {
// IP地址写死了 127.0.0.1
conn, err := g.GetGRPCConnection(context.TODO(), fmt.Sprintf("127.0.0.1:%v", port), "", "", true, false, sslEnabled)
......
g.AppClient = conn
ch := grpc_channel.CreateLocalChannel(port, maxConcurrency, conn, spec, maxRequestBodySize, readBufferSize)
return ch, nil
}
实现代码在 pkg/channel/grpc/grpc_channel.go
的 InvokeMethod()方法中:
func (g *Channel) InvokeMethod(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
......
switch req.APIVersion() {
case internalv1pb.APIVersion_V1:
rsp, err = g.invokeMethodV1(ctx, req)
......
return rsp, err
}
暂时只有 invokeMethodV1 版本:
func (g *Channel) invokeMethodV1(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
// 1. 创建 AppCallback 的 grpc client
clientV1 := runtimev1pb.NewAppCallbackClient(g.client)
// 2. 调用 AppCallback 的 OnInvoke() 方法
resp, err := clientV1.OnInvoke(ctx, req.Message(), grpc.Header(&header), grpc.Trailer(&trailer))
// 3. 处理返回的应答
return rsp.WithMessage(resp), nil
}
gRPC channel 是通过 gRPC 协议调用服务器端应用上的 gRPC 服务完成,具体是 AppCallback 的 OnInvoke() 方法。
title Dapr gRPC Channel
hide footbox
skinparam style strictuml
participant daprd_server [
=daprd
----
server
]
participant user_code_server [
=App-2
----
server
]
daprd_server -[#blue]> user_code_server : gRPC (localhost)
note right: gRPC endpoint @ 3000\n/dapr.proto.runtime.v1.AppCallback/OnInvoke
也就是说:如果要支持 gRPC channel,则要求服务器端应用必须实现 AppCallback gRPC 服务器,这一点和 HTTP 不同,对服务器端应用是有侵入的。
8 - 服务器端App接收inbound请求
pkg/proto/runtime/v1/appcallback.pb.go
中的 OnInvoke 方法:
// AppCallbackServer is the server API for AppCallback service.
type AppCallbackServer interface {
// Invokes service method with InvokeRequest.
OnInvoke(context.Context, *v1.InvokeRequest) (*v1.InvokeResponse, error)
}
为了接收来自daprd转发的来自客户端的service invoke 请求,服务器端的应用也需要做一些处理。
接收HTTP请求
对于通过 HTTP channel 过来的标准HTTP请求,服务器端的应用只需要提供标准的HTTP端口即可,无须引入dapr SDK。
title Daprd-Daprd Communication
hide footbox
skinparam style strictuml
participant daprd_server [
=daprd
----
server
]
participant user_code_server [
=App-2
----
server
]
daprd_server -[#blue]> user_code_server : HTTP (localhost)
note right: HTTP endpoint @ 3000\nVERB http://localhost:3000/method?query1=value1
接收gRPC请求
对于通过 gRPC channel 过来的 gRPC 请求,服务器端的应用则需要实现 gRPC AppCallback 服务的 OnInvoke() 方法:
title Dapr gRPC Channel
hide footbox
skinparam style strictuml
participant daprd_server [
=daprd
----
server
]
participant user_code_server [
=App-2
----
server
]
daprd_server -[#blue]> user_code_server : gRPC (localhost)
note right: gRPC endpoint @ 3000\n/dapr.proto.runtime.v1.AppCallback/OnInvoke
AppCallbackServer 的 proto 定义在 dapr 仓库下的文件dapr/proto/runtime/v1/appcallback.proto
中:
service AppCallback {
// Invokes service method with InvokeRequest.
rpc OnInvoke (common.v1.InvokeRequest) returns (common.v1.InvokeResponse) {}
......
}
而 AppCallbackServer 的具体实现则分布在各个不同语言的 sdk 里面。
go-sdk实现
实现在 go-sdk 的 service/grpc/invoke.go
文件的 OnInvoke方法,主要流程为:
func (s *Server) OnInvoke(ctx context.Context, in *cpb.InvokeRequest) (*cpb.InvokeResponse, error) {
if fn, ok := s.invokeHandlers[in.Method]; ok {
e := &cc.InvocationEvent{}
ct, er := fn(ctx, e)
return &cpb.InvokeResponse{......}, nil
}
return nil, fmt.Errorf("method not implemented: %s", in.Method)
}
其中 s.invokeHandlers
中保存处理请求的方法(由参数method作为key)。AddServiceInvocationHandler() 用于增加方法名和 handler 的映射 :
// Server is the gRPC service implementation for Dapr.
type Server struct {
invokeHandlers map[string]common.ServiceInvocationHandler
}
type ServiceInvocationHandler func(ctx context.Context, in *InvocationEvent) (out *Content, err error)
func (s *Server) AddServiceInvocationHandler(method string, fn func(ctx context.Context, in *cc.InvocationEvent) (our *cc.Content, err error)) error {
s.invokeHandlers[method] = fn
return nil
}
这意味着,在服务器端的应用中,并不需要为这些方法提供 gRPC 相关的 proto 定义,也不需要直接通过 gRPC 把这些方法暴露出去,只需要实现 AppCallback 的 OnInvode() 方法,然后把需要对外暴露的方法注册即可,OnInvode() 方法相当于一个简单的 API 网管。
title Dapr AppCallback OnInvoke gRPC impl
hide footbox
skinparam style strictuml
participant AppCallback [
=AppCallback
----
OnInvoke()
]
participant invokeHandlers
participant handler
-[#blue]> AppCallback : gRPC OnInvode()
note right: gRPC endpoint @ 3000\n/dapr.proto.runtime.v1.AppCallback/OnInvoke
AppCallback -> invokeHandlers: find handler by method name
invokeHandlers --> AppCallback: registered handler
AppCallback -> handler: call handler
note right: type ServiceInvocationHandler \nfunc(ctx context.Context, in *InvocationEvent) \n(out *Content, err error)
handler --> AppCallback
<-[#blue]- AppCallback
用户代码实现示例
用户在开发支持 dapr 的 go 服务器端应用时,需要在应用中启动 dapr service server,然后添加各种 handler,包括 ServiceInvocationHandler,如下面这个例子(go-sdk下的 example/serving/grpc/main.go
):
func main() {
// create a Dapr service server
s, err := daprd.NewService(":50001")
// add a service to service invocation handler
if err := s.AddServiceInvocationHandler("echo", echoHandler); err != nil {
log.Fatalf("error adding invocation handler: %v", err)
}
// start the server
if err := s.Start(); err != nil {
log.Fatalf("server error: %v", err)
}
}
java-sdk实现
java SDK 中没有找到服务器端实现的代码?待确定。