发布的主流程
- 1: 流程概述
- 2: 发布相关的Runtime初始化
- 3: 客户端sdk发出publish请求
- 4: Dapr Runtime 处理来自客户端的 publish 请求
- 5: 组件实现
1 - 流程概述
API 和端口
Dapr runtime 对外提供两个 API,分别是 Dapr HTTP API 和 Dapr gRPC API。两个 Dapr API 对外暴露的端口,默认是:
- 3500: HTTP 端口,可以通过命令行参数
dapr-http-port
设置 - 50001: gRPC 端口,可以通过命令行参数
dapr-grpc-port
设置
gRPC API
gRPC API 定义在 dapr/proto/runtime/v1/dapr.proto
文件中的 Dapr service 中:
service Dapr {
// Publishes events to the specific topic.
rpc PublishEvent(PublishEventRequest) returns (google.protobuf.Empty) {}
......
}
// PublishEventRequest is the message to publish event data to pubsub topic
message PublishEventRequest {
// The name of the pubsub component
string pubsub_name = 1;
// The pubsub topic
string topic = 2;
// The data which will be published to topic.
bytes data = 3;
// The content type for the data (optional).
string data_content_type = 4;
// The metadata passing to pub components
//
// metadata property:
// - key : the key of the message.
map<string, string> metadata = 5;
}
主要的参数是:
- pubsub_name:dapr pubsub component的名字
- topic:发布消息的目标topic
- data:消息的数据
可选参数有:
- data_content_type:消息数据的内容类型
- metadata:可选的元数据信息,用于扩展
HTTP API
HTTP API 没有明确的单独定义,不过可以从代码中获知。在 pkg/http/api.go
中,构建用于 publish 的 endpoint 的代码如下:
func (a *api) constructPubSubEndpoints() []Endpoint {
return []Endpoint{
{
// 发送 POST 或者 PUT 请求
Methods: []string{fasthttp.MethodPost, fasthttp.MethodPut},
// 到这个 URL
Route: "publish/{pubsubname}/{topic:*}",
Version: apiVersionV1,
Handler: a.onPublish,
},
}
}
因此,用于 publish 的 daprd URL 类似于 http://localhost:3500/v1.0/publish/pubsubname1/topic1
。
处理请求的 handler 方法 a.onPublish() 中读取参数的代码如下(忽略其他细节):
const (
pubsubnameparam = "pubsubname"
)
// 从 url 中读取 pubsubname
pubsubName := reqCtx.UserValue(pubsubnameparam).(string)
// 从 url 中读取 topic
topic := reqCtx.UserValue(topicParam).(string)
// 从 HTTP body
body := reqCtx.PostBody()
// 从 HTTP 的 Content-Type header 中读取 data_content_type
contentType := string(reqCtx.Request.Header.Peek("Content-Type"))
// 从 HTTP URL query 中读取 metadata
metadata := getMetadataFromRequest(reqCtx)
Metadata 的读取要稍微复杂一些,需要读取所有的 url query 参数,然后根据 key 的前缀判断是不是 metadata:
const (
metadataPrefix = "metadata."
)
func getMetadataFromRequest(reqCtx *fasthttp.RequestCtx) map[string]string {
metadata := map[string]string{}
// 游历所有的 url query 参数
reqCtx.QueryArgs().VisitAll(func(key []byte, value []byte) {
queryKey := string(key)
// 如果 query 参数的 key 以 "metadata." 开头,就视为一个 metadata 的key
if strings.HasPrefix(queryKey, metadataPrefix) {
// key 的 前缀 "metadata." 要去掉
k := strings.TrimPrefix(queryKey, metadataPrefix)
metadata[k] = string(value)
}
})
return metadata
}
总结:用于 publish 的完整的 daprd URL 类似于 http://localhost:3500/v1.0/publish/pubsubname1/topic1?metadata.k1=v1&metadata.k2=v2&metadata.k3=v3
。消息内容通过 HTTP body 传递,另外可以通过 Content-Type header 传递消息内容类型参数。
发布流程
gRPC 协议
默认情况下使用 gRPC 协议进行消息发布,daprd 在默认的 50001 端口,通过注册的 dapr service 的 PublishEvent() 方法接收来自客户端通过 dapr SDK 发出的 gRPC 请求,之后根据具体的组件实现,对底层实际使用的消息中间件发布事件。流程大体如下:
title Pub-Sub via gRPC Protocol
hide footbox
skinparam style strictuml
box "App-1"
participant user_code_client [
=User Code
----
producer
]
participant SDK_client [
=Dapr SDK
----
producer
]
end box
participant daprd_client [
=daprd
----
producer
]
participant message_broker as "Message Broker"
user_code_client -> SDK_client : PublishEvent()
note left: pubsub_name="name-1"\ntopic="topic-1"\ndata="[...]"\ndata_content_type=""\nmetadata="[...]"
note right: PublishEvent() @ Dapr service
SDK_client -[#blue]> daprd_client : gRPC (localhost)
note right: gRPC API @ 50001
|||
daprd_client -[#red]> message_broker : native protocol (remote call)
|||
message_broker --[#red]> daprd_client :
SDK_client <[#blue]-- daprd_client
user_code_client <-- SDK_client
HTTP 协议
HTTP协议类似,daprd 在默认的 3500 端口,通过前面所述的URL接收客户端通过 dapr SDK 发出的 HTTP 请求。流程大体如下:
title Pub-Sub via HTTP Protocol
hide footbox
skinparam style strictuml
box "App-1"
participant user_code_client [
=User Code
----
producer
]
participant SDK_client [
=Dapr SDK
----
producer
]
end box
participant daprd_client [
=daprd
----
producer
]
participant message_broker as "Message Broker"
user_code_client -> SDK_client : PublishEvent()
note left: pubsub_name="name-1"\ntopic="topic-1"\ndata="[...]"\ndata_content_type=""\nmetadata="[...]"
note right: POST http://localhost:3500/v1.0/publish/pubsubname1/topic1?\nmetadata.k1=v1&metadata.k2=v2&metadata.k3=v3
SDK_client -[#blue]> daprd_client : HTTP (localhost)
note right: HTTP API @ 3500
|||
daprd_client -[#red]> message_broker : native protocol (remote call)
|||
message_broker --[#red]> daprd_client :
SDK_client <[#blue]-- daprd_client
user_code_client <-- SDK_client
2 - 发布相关的Runtime初始化
在 dapr runtime 启动进行初始化时,需要开启 API 端口并挂载相应的 handler 来接收并处理发布订阅中的发布请求。另外需要根据配置文件启动 pubsub component 以便连接到外部 message broker。
启动 Dapr gRPC API Server(outbound)
启动 gRPC 服务器
在 dapr runtime 启动时的初始化过程中,会启动 gRPC server, 代码在 pkg/runtime/runtime.go
中:
func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
// Create and start internal and external gRPC servers
grpcAPI := a.getGRPCAPI()
err = a.startGRPCAPIServer(grpcAPI, a.runtimeConfig.APIGRPCPort)
......
}
func (a *DaprRuntime) startGRPCAPIServer(api grpc.API, port int) error {
serverConf := a.getNewServerConfig(a.runtimeConfig.APIListenAddresses, port)
server := grpc.NewAPIServer(api, serverConf, a.globalConfig.Spec.TracingSpec, a.globalConfig.Spec.MetricSpec, a.globalConfig.Spec.APISpec, a.proxy)
if err := server.StartNonBlocking(); err != nil {
return err
}
......
}
// NewAPIServer returns a new user facing gRPC API server.
func NewAPIServer(api API, config ServerConfig, ......) Server {
return &server{
api: api,
config: config,
kind: apiServer, // const apiServer = "apiServer"
......
}
}
注册 Dapr API
为了让 dapr runtime 的 gRPC 服务器能挂载 Dapr API,需要将定义 dapr api 的 dapr service 注册到 gRPC 服务器上去。
注册的代码实现在 pkg/grpc/server.go
中, StartNonBlocking() 方法在启动 grpc 服务器时,会进行服务注册:
func (s *server) StartNonBlocking() error {
if s.kind == internalServer {
internalv1pb.RegisterServiceInvocationServer(server, s.api)
} else if s.kind == apiServer {
runtimev1pb.RegisterDaprServer(server, s.api) // 注意:s.api (即 gRPC api 实现) 被传递进去
}
......
}
而 RegisterDaprServer() 方法的实现代码在 pkg/proto/runtime/v1/dapr_grpc.pb.go
:
func RegisterDaprServer(s grpc.ServiceRegistrar, srv DaprServer) {
s.RegisterService(&Dapr_ServiceDesc, srv) // srv 即 gRPC api 实现
}
Dapr_ServiceDesc 定义
在文件 pkg/proto/runtime/v1/dapr_grpc.pb.go
中有 Dapr Service 的 grpc 服务定义,这是 protoc 生成的 gRPC 代码。
Dapr_ServiceDesc 中有 Dapr Service 各个方法的定义,和发布相关的是 PublishEvent
方法:
var Dapr_ServiceDesc = grpc.ServiceDesc{
ServiceName: "dapr.proto.runtime.v1.Dapr",
HandlerType: (*DaprServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "PublishEvent", # 注册方法名
Handler: _Dapr_PublishEvent_Handler, # 关联实现的 Handler
},
......
},
},
Metadata: "dapr/proto/runtime/v1/dapr.proto",
}
这一段是告诉 gRPC server: 如果收到访问 dapr.proto.runtime.v1.Dapr
服务的 PublishEvent
方法的 gRPC 请求,请把请求转给 _Dapr_PublishEvent_Handler
处理。
title Dapr publish gRPC API
hide footbox
skinparam style strictuml
participant daprd_client [
=daprd
----
producer
]
-[#blue]> daprd_client : gRPC (localhost)
note right: gRPC API @ 50001\n/dapr.proto.runtime.v1.Dapr/PublishEvent
|||
<[#blue]-- daprd_client
而 PublishEvent
方法相关联的 handler 方法 _Dapr_PublishEvent_Handler
的实现代码是:
func _Dapr_PublishEvent_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(PublishEventRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(DaprServer).PublishEvent(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/dapr.proto.runtime.v1.Dapr/PublishEvent",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(DaprServer).PublishEvent(ctx, req.(*PublishEventRequest))
}
return interceptor(ctx, in, info, handler)
}
最后调用到了 DaprServer 接口实现的 PublishEvent 方法,也就是 gPRC API 实现。
启动 Dapr HTTP API Server(outbound)
在 dapr runtime 中启动 HTTP server
在 dapr runtime 启动时的初始化过程中,会启动 HTTP server, 代码在 pkg/runtime/runtime.go
中
dapr runtime 的 HTTP server 用的是 fasthttp。
在 dapr runtime 启动时的初始化过程中,会启动 HTTP server, 代码在 pkg/runtime/runtime.go 中:
func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
......
// Start HTTP Server
err = a.startHTTPServer(a.runtimeConfig.HTTPPort, a.runtimeConfig.PublicPort, a.runtimeConfig.ProfilePort, a.runtimeConfig.AllowedOrigins, pipeline)
if err != nil {
log.Fatalf("failed to start HTTP server: %s", err)
}
......
}
func (a *DaprRuntime) startHTTPServer(......) error {
a.daprHTTPAPI = http.NewAPI(......)
server := http.NewServer(a.daprHTTPAPI, ......)
if err := server.StartNonBlocking(); err != nil { // StartNonBlocking 启动 fasthttp server
return err
}
}
挂载 PubSub 的 HTTP 端点
在 HTTP API 的初始化过程中,会在 fast http server 上挂载 PubSub 的 HTTP 端点,代码在 pkg/http/api.go
中:
func NewAPI(
appID string,
appChannel channel.AppChannel,
directMessaging messaging.DirectMessaging,
......
shutdown func()) API {
api := &api{
appChannel: appChannel,
directMessaging: directMessaging,
......
}
// 附加 PubSub 的 HTTP 端点
api.endpoints = append(api.endpoints, api.constructPubSubEndpoints()...)
}
PubSub 的 HTTP 端点的具体信息在 constructPubSubEndpoints() 方法中:
func (a *api) constructPubSubEndpoints() []Endpoint {
return []Endpoint{
{
Methods: []string{fasthttp.MethodPost, fasthttp.MethodPut},
Route: "publish/{pubsubname}/{topic:*}",
Version: apiVersionV1,
Handler: a.onPublish,
},
}
}
注意这里的 Route 路径 “publish/{pubsubname}/{topic:*}", dapr sdk 就是就通过这样的 url 来发起 HTTP publish 请求。
title Dapr Publish HTTP API
hide footbox
skinparam style strictuml
participant daprd_client [
=daprd
----
producer
]
-[#blue]> daprd_client : HTTP (localhost)
note right: HTTP API @ 3500\n/v1.0/publish/{pubsubname}/{topic:*}
|||
<[#blue]-- daprd_client
pubsub 组件初始化
为了提供对 pubsub 的功能支持,需要为 dapr runtime 配置 pubsub component。
pubSubRegistry 和 pubSubs 列表
DaprRuntime 的结构体中保存有 pubSubRegistry 和 pubSubs 列表:
type DaprRuntime struct {
......
pubSubRegistry pubsub_loader.Registry
pubSubs map[string]pubsub.PubSub
......
}
runtime 构建时会初始化这两个结构体:
func NewDaprRuntime(runtimeConfig *Config, globalConfig *config.Configuration, accessControlList *config.AccessControlList, resiliencyProvider resiliency.Provider) *DaprRuntime {
ctx, cancel := context.WithCancel(context.Background())
return &DaprRuntime{
......
pubSubs: map[string]pubsub.PubSub{},
pubSubRegistry: pubsub_loader.NewRegistry(),
......
PubSubRegistry 保存pubsub组件列表
pubSubRegistry 用于保存 dapr runtime 中支持的所有 pubsub component :
pubSubRegistry struct {
messageBuses map[string]func() pubsub.PubSub
}
在 runtime binary (cmd/daprd/main.go
)的代码中,会列举出所有的 pubsub component ,这也是 darp 和 conponents-contrib 两个仓库的直接联系:
err = rt.Run(
......
runtime.WithPubSubs(
pubsub_loader.New("azure.eventhubs", func() pubs.PubSub {
return pubsub_eventhubs.NewAzureEventHubs(logContrib)
}),
pubsub_loader.New("azure.servicebus", func() pubs.PubSub {
return servicebus.NewAzureServiceBus(logContrib)
}),
pubsub_loader.New("gcp.pubsub", func() pubs.PubSub {
return pubsub_gcp.NewGCPPubSub(logContrib)
}),
pubsub_loader.New("hazelcast", func() pubs.PubSub {
return pubsub_hazelcast.NewHazelcastPubSub(logContrib)
}),
pubsub_loader.New("jetstream", func() pubs.PubSub {
return pubsub_jetstream.NewJetStream(logContrib)
}),
pubsub_loader.New("kafka", func() pubs.PubSub {
return pubsub_kafka.NewKafka(logContrib)
}),
pubsub_loader.New("mqtt", func() pubs.PubSub {
return pubsub_mqtt.NewMQTTPubSub(logContrib)
}),
pubsub_loader.New("natsstreaming", func() pubs.PubSub {
return natsstreaming.NewNATSStreamingPubSub(logContrib)
}),
pubsub_loader.New("pulsar", func() pubs.PubSub {
return pubsub_pulsar.NewPulsar(logContrib)
}),
pubsub_loader.New("rabbitmq", func() pubs.PubSub {
return rabbitmq.NewRabbitMQ(logContrib)
}),
pubsub_loader.New("redis", func() pubs.PubSub {
return pubsub_redis.NewRedisStreams(logContrib)
}),
pubsub_loader.New("snssqs", func() pubs.PubSub {
return pubsub_snssqs.NewSnsSqs(logContrib)
}),
pubsub_loader.New("in-memory", func() pubs.PubSub {
return pubsub_inmemory.New(logContrib)
}),
),
......
)
runtime 在初始化时会将这些 pubsub component 信息保存在 pubSubRegistry 中:
func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
......
a.pubSubRegistry.Register(opts.pubsubs...)
}
需要注意的是,pubSubRegistry 中保存的组件列表是所有的被 dapr runtime 支持的组件列表,但是,不是每个组件在 runtime 启动时都会被装载。组件的安装时按需的,由组件配置文件(yaml)来决定装载和初始化那些组件的示例。
runtime 装载 pubsub 组件
组件在 dapr runtime 初始化时统一装载:
func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
......
a.pubSubRegistry.Register(opts.pubsubs...)
a.secretStoresRegistry.Register(opts.secretStores...)
a.stateStoreRegistry.Register(opts.states...)
......
err = a.loadComponents(opts)
a.flushOutstandingComponents()
......
}
有两种实现,KubernetesMode 和 StandaloneMode:
func (a *DaprRuntime) loadComponents(opts *runtimeOpts) error {
var loader components.ComponentLoader
switch a.runtimeConfig.Mode {
case modes.KubernetesMode:
loader = components.NewKubernetesComponents(a.runtimeConfig.Kubernetes, a.namespace, a.operatorClient, a.podName)
case modes.StandaloneMode:
loader = components.NewStandaloneComponents(a.runtimeConfig.Standalone)
default:
return errors.Errorf("components loader for mode %s not found", a.runtimeConfig.Mode)
}
comps, err := loader.LoadComponents()
......
}
KubernetesMode 下读取的是 k8s 下的 component CRD:
func (k *KubernetesComponents) LoadComponents() ([]components_v1alpha1.Component, error) {
resp, err := k.client.ListComponents(context.Background(), &operatorv1pb.ListComponentsRequest{
Namespace: k.namespace,
PodName: k.podName,
}, ......
}
StandaloneMode 下读取的是由 ComponentsPath 配置(--componentspath
)指定的目录下的 component CRD 文件:
func (s *StandaloneComponents) LoadComponents() ([]components_v1alpha1.Component, error) {
files, err := os.ReadDir(s.config.ComponentsPath)
......
}
总结
在完成 HTTP server 和 gRPC server 的初始化之后,dapr runtime 就做好了接收 publish 请求的准备。
3 - 客户端sdk发出publish请求
Java SDK 实现
在业务代码中使用 pubsub 功能的示例可参考文件 dapr java-sdk 中的代码 /src/main/java/io/dapr/examples/pubsub/http/Publisher.java
,代码示意如下:
DaprClient client = (new DaprClientBuilder()).build();
String message = String.format("This is message #%d", i);
client.publishEvent(
"messagebus",
"testingtopic",
message,
singletonMap(Metadata."ttlInSeconds", "1000")).block();
java SDK 中除了 service invoke 默认使用 HTTP ,其他方法都是默认使用 gRPC,在 DaprClientProxy 类中初始化了两个 daprclient:
- client 字段: 类型为 DaprClientGrpc,连接到 127.0.0.1:5001
- methodInvocationOverrideClient 字段:类型为 DaprClientHttp,连接到 127.0.0.1:3500
pubsub 方法默认走 gRPC ,使用的是 DaprClientGrpc
类型 (文件为 src/main/java/io/dapr/client/DaprClientGrpc.java
):
@Override
public Mono<Void> publishEvent(PublishEventRequest request) {
try {
String pubsubName = request.getPubsubName();
String topic = request.getTopic();
Object data = request.getData();
DaprProtos.PublishEventRequest.Builder envelopeBuilder = DaprProtos.PublishEventRequest.newBuilder()
......
return Mono.subscriberContext().flatMap(
context ->
this.<Empty>createMono(
it -> intercept(context, asyncStub).publishEvent(envelopeBuilder.build(), it)
)
).then();
}
在这里根据请求条件设置 PublishEvent 请求的各种参数,debug 时可以看到如下图的数据:
发出去给 dapr runtime 的 gRPC 请求如下图所示:
这里调用的 gRPC 服务是 dapr.proto.runtime.v1.Dapr
, 方法是 PublishEvent
,和前一章中 dapr runtime 初始化中设定的 gRPC API 对应。
title PublishEvent via gRPC
hide footbox
skinparam style strictuml
box "App-1"
participant user_code_client [
=App-1
----
producer
]
participant SDK_client [
=SDK
----
producer
]
end box
participant daprd_client [
=daprd
----
producer
]
user_code_client -> SDK_client : PublishEvent()
note left: pubsub_name="name-1"\ntopic="topic-1"\ndata="[...]"\ndata_content_type=""\nmetadata="[...]"
SDK_client -[#blue]> daprd_client : gRPC (localhost)
note right: gRPC API @ 50001\n"dapr.proto.runtime.v1.Dapr/PublishEvent"
|||
SDK_client <[#blue]-- daprd_client
user_code_client <-- SDK_client
Go sdk实现
在 go 业务代码中使用 service invoke 功能的示例可参考 https://github.com/dapr/go-sdk/blob/main/examples/pubsub/pub/pub.go,代码示意如下:
client, err := dapr.NewClient()
err := client.PublishEvent(ctx, pubsubName, topicName, data)
Go SDK 中定义了 Client 接口,文件为 client/client.go
:
// Client is the interface for Dapr client implementation.
type Client interface {
// PublishEvent publishes data onto topic in specific pubsub component.
PublishEvent(ctx context.Context, pubsubName, topicName string, data interface{}, opts ...PublishEventOption) error
......
}
方法的实现在 client/pubsub.go
中,都只是实现了对 PublishEventRequest 对象的组装:
func (c *GRPCClient) invokeServiceWithRequest(ctx context.Context, req *pb.InvokeServiceRequest) (out []byte, err error) {
request := &pb.PublishEventRequest{
PubsubName: pubsubName,
Topic: topicName,
}
_, err := c.protoClient.PublishEvent(c.withAuthToken(ctx), request)
......
}
PublishEvent() 是 protoc 生成的 grpc 代码,在 dapr/proto/runtime/v1/dapr_grpc.pb.go
中,实现如下:
func (c *daprClient) PublishEvent(ctx context.Context, in *PublishEventRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, "/dapr.proto.runtime.v1.Dapr/PublishEvent", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
注意: 这里调用的 gRPC 服务是 dapr.proto.runtime.v1.Dapr
, 方法是 InvokeService
,和 dapr runtime 中 gRPC API 对应。
其他SDK
TODO
4 - Dapr Runtime 处理来自客户端的 publish 请求
在 dapr runtime 中,提供 HTTP 和 gRPC 两种协议,前面 runtime 初始化时介绍了 HTTP 和 gRPC 两种协议是如何在 runtime 初始化时准备好接收来自客户端的 publish 请求的。现在我们介绍在接收到来自客户端的 publish 请求后,dapr runtime 是如何处理请求的。
gRPC API
在 gRPC API 的实现中,PublishEvent() 方法负责处理接收到的 publish 请求,其主要流程大体是如下4个步骤:
type api struct {
pubsubAdapter runtimePubsub.Adapter
}
func (a *api) PublishEvent(ctx context.Context, in *runtimev1pb.PublishEventRequest) (*emptypb.Empty, error) {
// 1. 根据名称找到可以处理请求的 pubsub 组件
thepubsub := a.pubsubAdapter.GetPubSub(pubsubName)
// 2. 处理参数的细节:如是否要封装为 cloudevent
// 细节忽略,后续展开
// 3. 构建 PublishRequest 请求对象
req := pubsub.PublishRequest{
PubsubName: pubsubName,
Topic: topic,
Data: data,
Metadata: in.Metadata,
}
// 4. 未退 pubsub 组件来负责具体的请求发送
err := a.pubsubAdapter.Publish(&req)
}
查找处理请求的 pubsub 组件
// 检查是否有初始化 pubsubAdapter,没有的话报错退出
if a.pubsubAdapter == nil {
err := status.Error(codes.FailedPrecondition, messages.ErrPubsubNotConfigured)
apiServerLogger.Debug(err)
return &emptypb.Empty{}, err
}
pubsubName := in.PubsubName
// 检查请求,pubsubName 参数不能为空
if pubsubName == "" {
err := status.Error(codes.InvalidArgument, messages.ErrPubsubEmpty)
apiServerLogger.Debug(err)
return &emptypb.Empty{}, err
}
// 根据 pubsubName 参数在 pubsubAdapter 中找到对应的组件
thepubsub := a.pubsubAdapter.GetPubSub(pubsubName)
if thepubsub == nil {
// 如果找不到,则报错退出
err := status.Errorf(codes.InvalidArgument, messages.ErrPubsubNotFound, pubsubName)
apiServerLogger.Debug(err)
return &emptypb.Empty{}, err
}
GetPubSub() 方法的实现很简单,就是根据 pubsubName 在现有已经初始化的 pubsub 组件中进行简单的map查找:
// GetPubSub is an adapter method to find a pubsub by name.
func (a *DaprRuntime) GetPubSub(pubsubName string) pubsub.PubSub {
ps, ok := a.pubSubs[pubsubName]
if !ok {
return nil
}
return ps.component
}
委托 pubsub 组件发送请求
func (a *DaprRuntime) Publish(req *pubsub.PublishRequest) error {
// 这里又根据名称做了一次查找
// TBD:可以考虑做代码优化了,从前面把找到的组件传递过来就好了
ps, ok := a.pubSubs[req.PubsubName]
if !ok {
return runtimePubsub.NotFoundError{PubsubName: req.PubsubName}
}
// 检查 pubsub 操作是否被容许
if allowed := a.isPubSubOperationAllowed(req.PubsubName, req.Topic, ps.scopedPublishings); !allowed {
return runtimePubsub.NotAllowedError{Topic: req.Topic, ID: a.runtimeConfig.ID}
}
// 执行策略
policy := a.resiliency.ComponentOutboundPolicy(a.ctx, req.PubsubName)
return policy(func(ctx context.Context) (err error) {
// 最终调用到底层实际组件的 Publish 方法来发送请求
return ps.component.Publish(req)
})
}
HTTP API
HTTP API 的处理方式和 gRPC API 是一致的,只是 HTTP API 这边由于 HTTP 协议的原因,在请求参数的获取上无法像 gRPC API 那样有一个的 runtimev1pb.PublishEventRequest 对象可以完整的封装所有请求参数,HTTP API 会多出一个请求参数的获取过程。
从 HTTP 请求中获取所有参数
HTTP API 实现中的 onPublish() 方法的前面一段代码就是在处理如何从 HTTP 请求中获取 publish 所需的所有参数:
func (a *api) onPublish(reqCtx *fasthttp.RequestCtx) {
// 1. pubsubName
pubsubName := reqCtx.UserValue(pubsubnameparam).(string)
// 2. topic
topic := reqCtx.UserValue(topicParam).(string)
// 3. data
body := reqCtx.PostBody()
// 4. data content type
contentType := string(reqCtx.Request.Header.Peek("Content-Type"))
// 5. metadata
metadata := getMetadataFromRequest(reqCtx)
// 后续处理和 gRPC 协议一致
......
}
5 - 组件实现
组件接口中的 Publish() 方法定义
在 dapr runtime API 实现(包括 HTTP API 和 gRPC API)和底层 pubsub 组件之间,还有一个简单的内部接口,定义了 pubsub 组件的功能:
// PubSub is the interface for message buses.
type PubSub interface {
Init(metadata Metadata) error
Features() []Feature
Publish(req *PublishRequest) error
Subscribe(ctx context.Context, req SubscribeRequest, handler Handler) error
Close() error
}
其中的 Publish() 用来发送消息。请求参数 PublishRequest 的字段和 Dapr API 定义中保持一致:
// PublishRequest is the request to publish a message.
type PublishRequest struct {
Data []byte `json:"data"`
PubsubName string `json:"pubsubname"`
Topic string `json:"topic"`
Metadata map[string]string `json:"metadata"`
ContentType *string `json:"contentType,omitempty"`
}
redis 组件实现
以 redis stream 为例,看看 publish 方法的实现:
func (r *redisStreams) Publish(req *pubsub.PublishRequest) error {
_, err := r.client.XAdd(r.ctx, &redis.XAddArgs{
Stream: req.Topic,
MaxLenApprox: r.metadata.maxLenApprox,
Values: map[string]interface{}{"data": req.Data},
}).Result()
if err != nil {
return fmt.Errorf("redis streams: error from publish: %s", err)
}
return nil
}
redis stream 的实现很简单,req.Topic 参数指定要写入的 redis stream,内容为一个map,其中 key “data” 的值为 req.Data。