springboot
1 - spring auto configuration
meta-inf
按照 springboot 的标准做法,src/main/resources/META-INF/spring.factories
文件内容如下:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
io.dapr.springboot.DaprAutoConfiguration
DaprAutoConfiguration
DaprAutoConfiguration 的内容非常简单:
@Configuration
@ConditionalOnWebApplication
@ComponentScan("io.dapr.springboot")
public class DaprAutoConfiguration {
}
DaprBeanPostProcessor
DaprBeanPostProcessor 用来处理 dapr 注解。
@Component
public class DaprBeanPostProcessor implements BeanPostProcessor {
private static final ObjectMapper MAPPER = new ObjectMapper();
private final EmbeddedValueResolver embeddedValueResolver;
DaprBeanPostProcessor(ConfigurableBeanFactory beanFactory) {
embeddedValueResolver = new EmbeddedValueResolver(beanFactory);
}
......
}
BeanPostProcessor 接口的 postProcessBeforeInitialization() 的说明如下:
在任何 Bean 初始化回调(如 InitializingBean 的
afterPropertiesSet
或自定义init-method
)之前,将此 BeanPostProcessor 应用于给定的新 Bean 实例。 该 bean 将已经被填充了属性值。返回的 Bean 实例可能是一个围绕原始 Bean 的包装器。
也就是每个 bean 在初始化后都会调用这个方法以便植入我们需要的逻辑,如在这里就需要扫描 bean 是否带有 dapr 的 topic 注解:
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if (bean == null) {
return null;
}
subscribeToTopics(bean.getClass(), embeddedValueResolver);
return bean;
}
subscribeToTopics() 方法的具体实现后面再详细看,期间还有规则匹配的实现代码。
postProcessAfterInitialization() 方法没有特殊逻辑,简单返回原始bean:
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
2 - controller
@RestController
public class DaprController {
}
healthz endpoint
用于 health check 的 endpoint,路径为 “/healthz”,实现为空。
@GetMapping(path = "/healthz")
public void healthz() {
}
TBD:这里是否要考虑 sidecar 的某些状态?目前这是只要 sidecar 进程和端口可以访问就会应答状态OK,而不管sidecar 中的功能是否正常。
dapr configuration endpoint
用于获取 dapr sidecar 的自身配置, 路径为 “/dapr/config”
@GetMapping(path = "/dapr/config", produces = MediaType.APPLICATION_JSON_VALUE)
public byte[] daprConfig() throws IOException {
return ActorRuntime.getInstance().serializeConfig();
}
但看 ActorRuntime 的代码实现,这个 config 是指 actor configuration:
public byte[] serializeConfig() throws IOException {
return INTERNAL_SERIALIZER.serialize(this.config);
}
private ActorRuntime(ManagedChannel channel, DaprClient daprClient) throws IllegalStateException {
this.config = new ActorRuntimeConfig();
}
subscribe endpoint
用于获取当前 dapr sidecar 的 pub/sub 订阅信息,路径为 “/dapr/subscribe”:
@GetMapping(path = "/dapr/subscribe", produces = MediaType.APPLICATION_JSON_VALUE)
public byte[] daprSubscribe() throws IOException {
return SERIALIZER.serialize(DaprRuntime.getInstance().listSubscribedTopics());
}
actor endpoint
用于 actor 的 endpoint,包括 deactive, invoke actor method, invoke actor timer 和 invoke actor reminder:
@DeleteMapping(path = "/actors/{type}/{id}")
public Mono<Void> deactivateActor(@PathVariable("type") String type,
@PathVariable("id") String id) {
return ActorRuntime.getInstance().deactivate(type, id);
}
@PutMapping(path = "/actors/{type}/{id}/method/{method}")
public Mono<byte[]> invokeActorMethod(@PathVariable("type") String type,
@PathVariable("id") String id,
@PathVariable("method") String method,
@RequestBody(required = false) byte[] body) {
return ActorRuntime.getInstance().invoke(type, id, method, body);
}
@PutMapping(path = "/actors/{type}/{id}/method/timer/{timer}")
public Mono<Void> invokeActorTimer(@PathVariable("type") String type,
@PathVariable("id") String id,
@PathVariable("timer") String timer,
@RequestBody byte[] body) {
return ActorRuntime.getInstance().invokeTimer(type, id, timer, body);
}
@PutMapping(path = "/actors/{type}/{id}/method/remind/{reminder}")
public Mono<Void> invokeActorReminder(@PathVariable("type") String type,
@PathVariable("id") String id,
@PathVariable("reminder") String reminder,
@RequestBody(required = false) byte[] body) {
return ActorRuntime.getInstance().invokeReminder(type, id, reminder, body);
}
3 - topic subscription
读取 topic 订阅注解
订阅 topic 的具体代码实现在类 DaprBeanPostProcessor 的 subscribeToTopics() 方法中,在 bean 初始化时被调用。
topic 注解使用的例子如下:
@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}",
rule = @Rule(match = "event.type == \"v2\"", priority = 1))
@PostMapping(path = "/testingtopicV2")
public Mono<Void> handleMessageV2(@RequestBody(required = false) CloudEvent cloudEvent) {
......
}
读取 topic 注解
现在需要在 postProcessBeforeInitialization() 方法中扫描并解析所有有 topic 注解的 bean:
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
subscribeToTopics(bean.getClass(), embeddedValueResolver);
return bean;
}
private static void subscribeToTopics(Class clazz, EmbeddedValueResolver embeddedValueResolver) {
if (clazz == null) {
return;
}
// 先用 Superclass 做一次递归调用,这样就会从当前类的父类开始先推衍
// 由于每次都是父类先执行,因此这会一直递归到最顶层的 Object 类
subscribeToTopics(clazz.getSuperclass(), embeddedValueResolver);
// 取当前类的所有方法
for (Method method : clazz.getDeclaredMethods()) {
// 然后看方法上是不是标记了 dapr 的 topic 注解
Topic topic = method.getAnnotation(Topic.class);
if (topic == null) {
continue;
}
// 如果方法上有标记 dapr 的 topic 注解,则开始处理
// 先获取 topic 注解上的属性 topic name, pubsub name, rule
Rule rule = topic.rule();
String topicName = embeddedValueResolver.resolveStringValue(topic.name());
String pubSubName = embeddedValueResolver.resolveStringValue(topic.pubsubName());
// rule 也是一个注解,获取 match 属性
String match = embeddedValueResolver.resolveStringValue(rule.match());
if ((topicName != null) && (topicName.length() > 0) && pubSubName != null && pubSubName.length() > 0) {
// topicName 和 pubSubName 不能为空 (metadata 可以为空,rule可以为空)
try {
TypeReference<HashMap<String, String>> typeRef
= new TypeReference<HashMap<String, String>>() {};
// 读取 topic 注解上的 metadata 属性
Map<String, String> metadata = MAPPER.readValue(topic.metadata(), typeRef);
// 读取路由信息,细节看下一节
List<String> routes = getAllCompleteRoutesForPost(clazz, method, topicName);
for (String route : routes) {
// 将读取的路由信息添加到 dapr runtime 中。
// 细节看下一节
DaprRuntime.getInstance().addSubscribedTopic(
pubSubName, topicName, match, rule.priority(), route, metadata);
}
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Error while parsing metadata: " + e);
}
}
}
}
读取路由信息
路由信息配置方法如下:
@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}",
rule = @Rule(match = "event.type == \"v2\"", priority = 1))
@PostMapping(path = "/testingtopicV2")
public Mono<Void> handleMessageV2(@RequestBody(required = false) CloudEvent cloudEvent) {
......
}
getAllCompleteRoutesForPost() 方法负责读取 @rule 注解相关的路由信息:
private static List<String> getAllCompleteRoutesForPost(Class clazz, Method method, String topicName) {
List<String> routesList = new ArrayList<>();
RequestMapping clazzRequestMapping =
(RequestMapping) clazz.getAnnotation(RequestMapping.class);
String[] clazzLevelRoute = null;
if (clazzRequestMapping != null) {
clazzLevelRoute = clazzRequestMapping.value();
}
// 读取该方法上的路由信息,注意必须是 POST
String[] postValueArray = getRoutesForPost(method, topicName);
if (postValueArray != null && postValueArray.length >= 1) {
for (String postValue : postValueArray) {
if (clazzLevelRoute != null && clazzLevelRoute.length >= 1) {
for (String clazzLevelValue : clazzLevelRoute) {
// 完整的路由路径应该是类级别 + 方法级别
String route = clazzLevelValue + confirmLeadingSlash(postValue);
routesList.add(route);
}
} else {
routesList.add(postValue);
}
}
}
return routesList;
}
getRoutesForPost() 方法用来读取 @topic 注解所在方法的 @PostMapping 注解,以便获得路由的 path 信息,对应例子如下:
@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}",
rule = @Rule(match = "event.type == \"v2\"", priority = 1))
@PostMapping(path = "/testingtopicV2")
public Mono<Void> handleMessageV2(@RequestBody(required = false) CloudEvent cloudEvent) {
......
}
getRoutesForPost() 方法的代码实现如下:
private static String[] getRoutesForPost(Method method, String topicName) {
String[] postValueArray = new String[] {topicName};
// 读取 PostMapping 注解
PostMapping postMapping = method.getAnnotation(PostMapping.class);
if (postMapping != null) {
// 如果有 PostMapping 注解
if (postMapping.path() != null && postMapping.path().length >= 1) {
// 如果 path 属性有设置则从 path 属性取值
postValueArray = postMapping.path();
} else if (postMapping.value() != null && postMapping.value().length >= 1) {
// 如果 path 属性没有设置则直接从 PostMapping 注解的 value 中取值
postValueArray = postMapping.value();
}
} else {
// 如果没有 PostMapping 注解,则尝试读取 RequestMapping 注解
RequestMapping reqMapping = method.getAnnotation(RequestMapping.class);
for (RequestMethod reqMethod : reqMapping.method()) {
// 要求 RequestMethod 为 POST
if (reqMethod == RequestMethod.POST) {
// 同样读取 path 或者 value 的值
if (reqMapping.path() != null && reqMapping.path().length >= 1) {
postValueArray = reqMapping.path();
} else if (reqMapping.value() != null && reqMapping.value().length >= 1) {
postValueArray = reqMapping.value();
}
break;
}
}
}
return postValueArray;
}
getRoutesForPost() 方法的解读,就是从标记了 @topic 注解的方法上读取路由信息,也就是后续订阅的事件应该发送的地址。读取的逻辑为:
- 优先读取 PostMapping 注解,没有的话读取 RequestMethod 为 POST 的 RequestMapping 注解
- 优先读取上述注解的 path 属性,没有的话读取 value
保存 topic 订阅信息
topic 订阅信息在读取之后,就会通过 DaprRuntime 的 addSubscribedTopic() 方法保存起来:
public synchronized void addSubscribedTopic(String pubsubName,
String topicName,
String match,
int priority,
String route,
Map<String,String> metadata) {
// 用 pubsubName 和 topicName 做 key
DaprTopicKey topicKey = new DaprTopicKey(pubsubName, topicName);
// 获取 key 对应的 builder,没有的话就创建一个
DaprSubscriptionBuilder builder = subscriptionBuilders.get(topicKey);
if (builder == null) {
builder = new DaprSubscriptionBuilder(pubsubName, topicName);
subscriptionBuilders.put(topicKey, builder);
}
// match 不为空则添加 rule,为空则采用默认路径
if (match.length() > 0) {
builder.addRule(route, match, priority);
} else {
builder.setDefaultPath(route);
}
if (metadata != null && !metadata.isEmpty()) {
builder.setMetadata(metadata);
}
}
考虑到调用的地方代码是:
// 读取路由信息
List<String> routes = getAllCompleteRoutesForPost(clazz, method, topicName);
for (String route : routes) {
// 将读取的路由信息添加到 dapr runtime 中。
DaprRuntime.getInstance().addSubscribedTopic(
pubSubName, topicName, match, rule.priority(), route, metadata);
}
所以前面的读取流程可以理解为就是读取和 topic 订阅有关的上述6个参数,然后保存起老。
应答 topic 订阅信息
在 DaprController 中,daprSubscribe() 方法对外暴露路径 /dapr/subscribe
,以便让 dapr sidecar 可以通过读取该路径来获取当前应用的 topic 订阅信息:
@GetMapping(path = "/dapr/subscribe", produces = MediaType.APPLICATION_JSON_VALUE)
public byte[] daprSubscribe() throws IOException {
return SERIALIZER.serialize(DaprRuntime.getInstance().listSubscribedTopics());
}
而 DaprRuntime 的 listSubscribedTopics() 方法获取的就是前面保存起来的 topic 订阅信息:
public synchronized DaprTopicSubscription[] listSubscribedTopics() {
List<DaprTopicSubscription> values = subscriptionBuilders.values().stream()
.map(b -> b.build()).collect(Collectors.toList());
return values.toArray(new DaprTopicSubscription[0]);
}
流程总结
整个 topic 订阅流程的示意图如下:
title topic subscription
hide footbox
skinparam style strictuml
box "Application" #LightBlue
participant DaprBeanPostProcessor
participant bean
participant DaprRuntime
participant DaprController
end box
participant daprd
-> DaprBeanPostProcessor: postProcessBeforeInitialization(bean)
DaprBeanPostProcessor -> bean: get @topic
bean --> DaprBeanPostProcessor
alt if bean has @topic
DaprBeanPostProcessor -> bean: parse @topic @rule
bean --> DaprBeanPostProcessor: pubsub name, topic name, match,\n priority, routes, metadata
DaprBeanPostProcessor -> DaprRuntime: addSubscribedTopic()
DaprRuntime -> DaprRuntime: save in map\n subscriptionBuilders
DaprRuntime --> DaprBeanPostProcessor
end
<-- DaprBeanPostProcessor
daprd -> DaprController: get subscription
DaprController -> DaprRuntime: listSubscribedTopics()
DaprRuntime --> DaprController
DaprController --> daprd