1 - 流程概述
API 和端口
订阅流程实际包含三个子流程:
-
获取应用订阅消息
daprd 需要获知应用的订阅信息。
实现中,dapr 会要求应用收集订阅信息并通过指定方式暴露(SDK 可以提供帮助),以便 daprd 可以通过给应用发送请求来获取这些订阅信息。
-
执行消息订阅
Daprd 在拿到应用的订阅信息之后,就可以使用底层组件的订阅机制进行消息订阅。
-
转发消息给应用
daprd 收到来自底层组件的订阅的消息之后,需要将消息转发给应用。
以上子流程1和3都需要 daprd 主动访问应用,因此 dapr 需要获知应用在哪个端口监听并处理订阅请求,这个信息通过命令行参数 app-port
设置。Dapr 的示例中一般喜欢用 3000 端口。
gRPC API
gRPC API 定义在 dapr/proto/runtime/v1/appcallback.proto
文件中的 AppCallback service 中:
service AppCallback {
// 子流程1:获取应用订阅消息
rpc ListTopicSubscriptions(google.protobuf.Empty) returns (ListTopicSubscriptionsResponse) {}
// 子流程3:转发消息给应用
rpc OnTopicEvent(TopicEventRequest) returns (TopicEventResponse) {}
......
}
ListTopicSubscriptionsResponse 的定义:
message ListTopicSubscriptionsResponse {
repeated common.v1.TopicSubscription subscriptions = 1;
}
message TopicSubscription {
// pubsub的组件名
string pubsub_name = 1;
// 要订阅的topic
string topic = 2;
// 可选参数,后面展开
map<string,string> metadata = 3;
TopicRoutes routes = 5;
string dead_letter_topic = 6;
}
即应用可以有多个消息订阅,每个订阅都必须提供 pubsub_name 和 topic 参数。
TopicEventRequest 的定义:
message TopicEventRequest {
// 这几个参数先忽略
string id = 1;
string source = 2;
string type = 3;
string spec_version = 4;
string path = 9;
// 事件的基本信息
string data_content_type = 5;
bytes data = 7;
string topic = 6;
string pubsub_name = 8;
}
HTTP API
发布流程
HTTP 协议
title Subscribe via http
hide footbox
skinparam style strictuml
box "App-1"
participant user_code [
=App-1
----
producer
]
participant SDK [
=SDK
----
producer
]
end box
participant daprd [
=daprd
----
producer
]
participant message_broker as "Message Broker"
SDK -> user_code: collection subscribe
user_code --> SDK
daprd -[#blue]> SDK : http
note left: appChannel.InvokeMethod("dapr/subscribe")
SDK --[#blue]> daprd :
daprd -[#red]> message_broker : subscribe topics
message_broker --[#red]> daprd
|||
|||
|||
|||
message_broker -[#red]> daprd: event
daprd -[#blue]> SDK : http
note left: appChannel.InvokeMethod("/{route}")
SDK -> user_code :
user_code --> SDK
SDK --[#blue]> daprd
|||
gRPC 方式
title Subscribe via gRPC
hide footbox
skinparam style strictuml
box "App-1"
participant user_code [
=App-1
----
producer
]
participant SDK [
=SDK
----
producer
]
end box
participant daprd [
=daprd
----
producer
]
participant message_broker as "Message Broker"
SDK -> user_code: collection subscribe
user_code --> SDK
daprd -[#blue]> SDK : gRPC
note left: appChannel.ListTopicSubscriptions()
SDK --[#blue]> daprd :
daprd -[#red]> message_broker : subscribe topics
message_broker --[#red]> daprd
|||
|||
|||
|||
message_broker -[#red]> daprd: event
daprd -[#blue]> SDK : gRPC
note left: appChannel.OnTopicEvent()
SDK -> user_code :
user_code --> SDK
SDK --[#blue]> daprd
|||
2 - 订阅相关的Runtime初始化
在 dapr runtime 启动进行初始化时,需要
- 访问应用以获取应用的订阅信息:比如应用订阅了哪些topic
- 根据配置文件启动 subscribe component 以便连接到外部 message broker 进行订阅
- 将订阅更新的 event 转发给应用
Dapr runtime初始化component列表
dapr runtime 初始化时会创建和 app 的连接,称为 app channel,然后开始发布订阅的初始化:
func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
......
// 有一个单独的 go routine 负责处理 component 的初始化
go a.processComponents()
err = a.loadComponents(opts)
// 等待应用ready: 前提是设置了 app port
a.blockUntilAppIsReady()
// 创建 app channel
err = a.createAppChannel()
// app channel 支持 http 和 grpc
a.daprHTTPAPI.SetAppChannel(a.appChannel)
grpcAPI.SetAppChannel(a.appChannel)
......
// 开始发布订阅的初始化
a.startSubscribing()
}
这里有一段复杂的并行初始化components并处理相互依赖的逻辑,忽略这些细节,只看执行 component 初始化的代码:
func (a *DaprRuntime) doProcessOneComponent(category ComponentCategory, comp components_v1alpha1.Component) error {
switch category {
case pubsubComponent:
return a.initPubSub(comp)
......
}
return nil
}
func (a *DaprRuntime) initPubSub(c components_v1alpha1.Component) error {
pubSub, err := a.pubSubRegistry.Create(c.Spec.Type, c.Spec.Version)
// 初始化 pubSub component
err = pubSub.Init(pubsub.Metadata{
Properties: properties,
})
pubsubName := c.ObjectMeta.Name
a.pubSubs[pubsubName] = pubSub
return nil
}
这个执行完成之后,a.pubSubs 中便保存有当前配置并初始化好的 pubsub 组件列表。
pubsub组件启动
订阅的初始化在 dapr runtime 启动过程的最后阶段
func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
......
// 开始发布订阅的初始化
a.startSubscribing()
}
startSubscribing() 方法逐个处理 pubSub 组件:
func (a *DaprRuntime) startSubscribing() {
for name, pubsub := range a.pubSubs {
if err := a.beginPubSub(name, pubsub); err != nil {
log.Errorf("error occurred while beginning pubsub %s: %s", name, err)
}
}
}
beginPubSub 方法做了两个事情: 1. 获取应用的订阅信息 2. 让组件开始订阅
func (a *DaprRuntime) beginPubSub(name string, ps pubsub.PubSub) error {
var publishFunc func(ctx context.Context, msg *pubsubSubscribedMessage) error
......
topicRoutes, err := a.getTopicRoutes()
......
}
获取应用订阅信息(AppCallback)
在 getTopicRoutes() 方法中,可以通过 HTTP 或者 gRPC 的方式来获取应用订阅信息:
func (a *DaprRuntime) getTopicRoutes() (map[string]TopicRoute, error) {
......
if a.runtimeConfig.ApplicationProtocol == HTTPProtocol {
// 走 http channel
subscriptions, err = runtime_pubsub.GetSubscriptionsHTTP(a.appChannel, log)
} else if a.runtimeConfig.ApplicationProtocol == GRPCProtocol {
// 走 grpc channel
client := runtimev1pb.NewAppCallbackClient(a.grpc.AppClient)
subscriptions, err = runtime_pubsub.GetSubscriptionsGRPC(client, log)
}
......
}
对于 HTTP 方式,调用的是 AppChannel 上定义的 InvokeMethod 方法,这个方法原来设计是用来实现 service invoke 的,dapr runtime 用来通过它将 service invoke 的 http inbound 请求转发给作为服务器端的应用。而在这里,被用来调用 dapr/subscribe
路径:
func GetSubscriptionsHTTP(channel channel.AppChannel, log logger.Logger) ([]Subscription, error) {
req := invokev1.NewInvokeMethodRequest("dapr/subscribe")
channel.InvokeMethod(ctx, req)
......
}
感想:理论上说这也不是为一种方便的方式,只是总感觉有点怪怪,pubsub 模块的初始化用到了 service invoke 模块的功能。直接发个http请求代码也不复杂。另外 http AppChannel / app callback 的方法和 grpc AppChannel / app callback 不对称,这在设计上缺乏美感。
对于 gRPC 方式,就比较老实的调用了 gRPC AppCallbackClient 的方法 ListTopicSubscriptions():
resp, err = channel.ListTopicSubscriptions(context.Background(), &emptypb.Empty{})
pubsub 组件开始订阅
在获取到应用的订阅信息之后,dapr runtime 就知道这个应用需要订阅哪些topic了。因此就可以继续开始订阅操作:
func (a *DaprRuntime) beginPubSub(name string, ps pubsub.PubSub) error {
var publishFunc func(ctx context.Context, msg *pubsubSubscribedMessage) error
......
// 获取订阅信息
topicRoutes, err := a.getTopicRoutes()
......
// 开始订阅
for topic, route := range v.routes {
// 在当前 pubsub 组件上为每个 topic 进行订阅
err := ps.Subscribe(pubsub.SubscribeRequest{
Topic: topic,
Metadata: route.metadata,
}, func(ctx context.Context, msg *pubsub.NewMessage) error {......}
}
}
这里的 Subscribe() 方法的定义在 PubSub 接口上,每个 dapr pubsub 组件都会实现这个接口:
type PubSub interface {
Publish(req *PublishRequest) error
Subscribe(req SubscribeRequest, handler Handler) error
}
handler 方法的具体实现后面再展开。
3 - 客户端sdk为dapr提供订阅信息
工作原理
对于订阅信息而言,有四个关键的信息。在 dapr proto 中的定义如下:
message TopicSubscription {
// Required. The name of the pubsub containing the topic below to subscribe to.
string pubsub_name = 1;
// Required. The name of topic which will be subscribed
string topic = 2;
// The optional properties used for this topic's subscription e.g. session id
map<string,string> metadata = 3;
// The optional routing rules to match against. In the gRPC interface, OnTopicEvent
// is still invoked but the matching path is sent in the TopicEventRequest.
TopicRoutes routes = 5;
}
pubsub_name 指定要使用的 pubsub component,topic 是要订阅的主题, metadata 携带扩展信息,而 routes 路由则是标记 dapr 应该如何将订阅到的事件发送给应用。
TODO:对于 HTTP 协议和 gRPC 协议处理会有不同。
java sdk中的封装如下:
public class DaprTopicSubscription {
private final String pubsubName;
private final String topic;
private final String route;
private final Map<String, String> metadata;
}
dapr sdk 需要帮助应用方便的提供上述订阅信息。
Java SDK 实现
在业务代码中使用 subscribe 功能的示例可参考文件 dapr java-sdk 中的代码 /src/main/java/io/dapr/examples/pubsub/http/subscribe.java
,代码示意如下:
// 启动应用,监听端口,一般喜欢使用 3000
public static void main(String[] args) throws Exception {
......
DaprApplication.start(port);
}
@RestController
public class SubscriberController {
@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}")
@PostMapping(path = "/testingtopic")
public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent<String> cloudEvent) {
......
}
}
sdk收集订阅信息
上面代码中的 @Topic 注解是 dapr java sdk 提供的,用来标记需要进行 subscribe 的 topic,代码在src/main/java/io/dapr/Topic.java
:
@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Topic {
String name();
String pubsubName();
String metadata() default "{}";
}
topic 的收集是典型的 springboot 风格,代码在 sdk-springboot/src/main/java/io/dapr/springboot/DaprBeanPostProcessor.java
:
@Component
public class DaprBeanPostProcessor implements BeanPostProcessor {
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
subscribeToTopics(bean.getClass(), embeddedValueResolver);
return bean;
}
}
subscribeToTopics() 方法通过扫描 @topic 注解和 @PostMapping 注解来获取订阅相关的信息:
private static void subscribeToTopics(Class clazz, EmbeddedValueResolver embeddedValueResolver) {
for (Method method : clazz.getDeclaredMethods()) {
// 获取 @topic 注解
Topic topic = method.getAnnotation(Topic.class);
if (topic == null) {
continue;
}
String route = topic.name();
// 获取 @PostMapping 注解
PostMapping mapping = method.getAnnotation(PostMapping.class);
// 根据 PostMapping 注解获取 route 信息
if (mapping != null && mapping.path() != null && mapping.path().length >= 1) {
route = mapping.path()[0];
} else if (mapping != null && mapping.value() != null && mapping.value().length >= 1) {
route = mapping.value()[0];
}
String topicName = embeddedValueResolver.resolveStringValue(topic.name());
String pubSubName = embeddedValueResolver.resolveStringValue(topic.pubsubName());
if ((topicName != null) && (topicName.length() > 0) && pubSubName != null && pubSubName.length() > 0) {
try {
TypeReference<HashMap<String, String>> typeRef
= new TypeReference<HashMap<String, String>>() {};
Map<String, String> metadata = MAPPER.readValue(topic.metadata(), typeRef);
// 保存 subscribe 信息
DaprRuntime.getInstance().addSubscribedTopic(pubSubName, topicName, route, metadata);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Error while parsing metadata: " + e.toString());
}
}
}
}
DaprRuntime 是一个单例对象,这里保存有订阅的 topic 列表:
class DaprRuntime {
private final Set<String> subscribedTopics = new HashSet<>();
private final List<DaprTopicSubscription> subscriptions = new ArrayList<>();
public synchronized void addSubscribedTopic(String pubsubName,
String topicName,
String route,
Map<String,String> metadata) {
if (!this.subscribedTopics.contains(topicName)) {
this.subscribedTopics.add(topicName);
this.subscriptions.add(new DaprTopicSubscription(pubsubName, topicName, route, metadata));
}
}
}
sdk暴露订阅信息
为了让 dapr 在 springboot 体系中方便使用,dapr java sdk 提供了 DaprController ,以提供诸如健康检查等通用功能,还有和dapr相关的各种端点,其中就有为 dapr runtime 提供订阅信息的接口:
@RestController
public class DaprController {
......
@GetMapping(path = "/dapr/subscribe", produces = MediaType.APPLICATION_JSON_VALUE)
public byte[] daprSubscribe() throws IOException {
return SERIALIZER.serialize(DaprRuntime.getInstance().listSubscribedTopics());
}
}
通过这个URL,就可以将之前收集到的 topic 信息都暴露出去,可以在浏览器中直接访问 http://127.0.0.1:3000/dapr/subscribe
,应答内容为:
[{"pubsubName":"messagebus","topic":"testingtopic","route":"/testingtopic","metadata":{}}]
Go sdk实现
在 go 业务代码中使用 subscribe 功能的示例可参考 https://github.com/dapr/go-sdk/blob/main/examples/pubsub/sub/sub.go,代码示意如下:
func main() {
s := daprd.NewService(":8080")
err := s.AddTopicEventHandler(defaultSubscription, eventHandler)
err = s.Start()
}
func eventHandler(ctx context.Context, e *common.TopicEvent) (retry bool, err error) {
......
return false, nil
}
sdk收集订阅信息
Go sdk 中定义了 Service 接口
// Service represents Dapr callback service.
type Service interface {
// AddTopicEventHandler appends provided event handler with its topic and optional metadata to the service.
// Note, retries are only considered when there is an error. Lack of error is considered as a success
AddTopicEventHandler(sub *Subscription, fn TopicEventHandler) error
......
}
Subscription 的定义如下:
// Subscription represents single topic subscription.
type Subscription struct {
PubsubName string `json:"pubsubname"`
Topic string `json:"topic"`
Metadata map[string]string `json:"metadata,omitempty"`
Route string `json:"route"`
......
}
这样订阅相关的主要4个参数就通过这个方式指明了。
sdk暴露订阅信息
go sdk 中有 http 和 grpc 两套机制可以实现对外暴露访问端点。
http 的实现在 http/topic.go
中:
func (s *Server) AddTopicEventHandler(sub *common.Subscription, fn common.TopicEventHandler) error {
if err := s.topicRegistrar.AddSubscription(sub, fn); err != nil {
return err
}
// 注册 http handle,关联 Route 和 fn
s.mux.Handle(sub.Route, optionsHandler(http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
......
retry, err := fn(r.Context(), &te)
......
}
}
grpc类似。
其他SDK
TODO