这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

springboot

sdk子项目:springboot集成

1 - spring auto configuration

sdk子项目:springboot集成

meta-inf

按照 springboot 的标准做法,src/main/resources/META-INF/spring.factories 文件内容如下:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
io.dapr.springboot.DaprAutoConfiguration

DaprAutoConfiguration

DaprAutoConfiguration 的内容非常简单:

@Configuration
@ConditionalOnWebApplication
@ComponentScan("io.dapr.springboot")
public class DaprAutoConfiguration {
}

DaprBeanPostProcessor

DaprBeanPostProcessor 用来处理 dapr 注解。

@Component
public class DaprBeanPostProcessor implements BeanPostProcessor {

  private static final ObjectMapper MAPPER = new ObjectMapper();

  private final EmbeddedValueResolver embeddedValueResolver;

  DaprBeanPostProcessor(ConfigurableBeanFactory beanFactory) {
    embeddedValueResolver = new EmbeddedValueResolver(beanFactory);
  }
  ......
}

BeanPostProcessor 接口的 postProcessBeforeInitialization() 的说明如下:

在任何 Bean 初始化回调(如 InitializingBean 的 afterPropertiesSet 或自定义 init-method )之前,将此 BeanPostProcessor 应用于给定的新 Bean 实例。 该 bean 将已经被填充了属性值。返回的 Bean 实例可能是一个围绕原始 Bean 的包装器。

也就是每个 bean 在初始化后都会调用这个方法以便植入我们需要的逻辑,如在这里就需要扫描 bean 是否带有 dapr 的 topic 注解:

  @Override
  public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
    if (bean == null) {
      return null;
    }

    subscribeToTopics(bean.getClass(), embeddedValueResolver);

    return bean;
  }

subscribeToTopics() 方法的具体实现后面再详细看,期间还有规则匹配的实现代码。

postProcessAfterInitialization() 方法没有特殊逻辑,简单返回原始bean:

  @Override
  public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
    return bean;
  }

2 - controller

处理 dapr callback 请求的 springboot controller
@RestController
public class DaprController {
}

healthz endpoint

用于 health check 的 endpoint,路径为 “/healthz”,实现为空。

@GetMapping(path = "/healthz")
public void healthz() {
}

TBD:这里是否要考虑 sidecar 的某些状态?目前这是只要 sidecar 进程和端口可以访问就会应答状态OK,而不管sidecar 中的功能是否正常。

dapr configuration endpoint

用于获取 dapr sidecar 的自身配置, 路径为 “/dapr/config”

@GetMapping(path = "/dapr/config", produces = MediaType.APPLICATION_JSON_VALUE)
public byte[] daprConfig() throws IOException {
  return ActorRuntime.getInstance().serializeConfig();
}

但看 ActorRuntime 的代码实现,这个 config 是指 actor configuration:

  public byte[] serializeConfig() throws IOException {
    return INTERNAL_SERIALIZER.serialize(this.config);
  }

  private ActorRuntime(ManagedChannel channel, DaprClient daprClient) throws IllegalStateException {
    this.config = new ActorRuntimeConfig();
  }

subscribe endpoint

用于获取当前 dapr sidecar 的 pub/sub 订阅信息,路径为 “/dapr/subscribe”:

@GetMapping(path = "/dapr/subscribe", produces = MediaType.APPLICATION_JSON_VALUE)
public byte[] daprSubscribe() throws IOException {
  return SERIALIZER.serialize(DaprRuntime.getInstance().listSubscribedTopics());
}

actor endpoint

用于 actor 的 endpoint,包括 deactive, invoke actor method, invoke actor timer 和 invoke actor reminder:

@DeleteMapping(path = "/actors/{type}/{id}")
  public Mono<Void> deactivateActor(@PathVariable("type") String type,
                                    @PathVariable("id") String id) {
    return ActorRuntime.getInstance().deactivate(type, id);
  }

  @PutMapping(path = "/actors/{type}/{id}/method/{method}")
  public Mono<byte[]> invokeActorMethod(@PathVariable("type") String type,
                                        @PathVariable("id") String id,
                                        @PathVariable("method") String method,
                                        @RequestBody(required = false) byte[] body) {
    return ActorRuntime.getInstance().invoke(type, id, method, body);
  }

  @PutMapping(path = "/actors/{type}/{id}/method/timer/{timer}")
  public Mono<Void> invokeActorTimer(@PathVariable("type") String type,
                                     @PathVariable("id") String id,
                                     @PathVariable("timer") String timer,
                                     @RequestBody byte[] body) {
    return ActorRuntime.getInstance().invokeTimer(type, id, timer, body);
  }

  @PutMapping(path = "/actors/{type}/{id}/method/remind/{reminder}")
  public Mono<Void> invokeActorReminder(@PathVariable("type") String type,
                                        @PathVariable("id") String id,
                                        @PathVariable("reminder") String reminder,
                                        @RequestBody(required = false) byte[] body) {
    return ActorRuntime.getInstance().invokeReminder(type, id, reminder, body);
  }

3 - topic subscription

实现 pub/sub 中的 topic 订阅

读取 topic 订阅注解

订阅 topic 的具体代码实现在类 DaprBeanPostProcessor 的 subscribeToTopics() 方法中,在 bean 初始化时被调用。

topic 注解使用的例子如下:

  @Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}",
          rule = @Rule(match = "event.type == \"v2\"", priority = 1))
  @PostMapping(path = "/testingtopicV2")
  public Mono<Void> handleMessageV2(@RequestBody(required = false) CloudEvent cloudEvent) {
    ......
  }

读取 topic 注解

现在需要在 postProcessBeforeInitialization() 方法中扫描并解析所有有 topic 注解的 bean:

@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
  subscribeToTopics(bean.getClass(), embeddedValueResolver);
  return bean;
}

private static void subscribeToTopics(Class clazz, EmbeddedValueResolver embeddedValueResolver) {
    if (clazz == null) {
      return;
    }

    // 先用 Superclass 做一次递归调用,这样就会从当前类的父类开始先推衍
    // 由于每次都是父类先执行,因此这会一直递归到最顶层的 Object 类 
    subscribeToTopics(clazz.getSuperclass(), embeddedValueResolver);
    // 取当前类的所有方法
    for (Method method : clazz.getDeclaredMethods()) {
      // 然后看方法上是不是标记了 dapr 的 topic 注解
      Topic topic = method.getAnnotation(Topic.class);
      if (topic == null) {
        continue;
      }

      // 如果方法上有标记 dapr 的 topic 注解,则开始处理
      // 先获取 topic 注解上的属性 topic name, pubsub name, rule 
      Rule rule = topic.rule();
      String topicName = embeddedValueResolver.resolveStringValue(topic.name());
      String pubSubName = embeddedValueResolver.resolveStringValue(topic.pubsubName());
      // rule 也是一个注解,获取 match 属性
      String match = embeddedValueResolver.resolveStringValue(rule.match());
      if ((topicName != null) && (topicName.length() > 0) && pubSubName != null && pubSubName.length() > 0) {
        // topicName 和 pubSubName 不能为空 (metadata 可以为空,rule可以为空)
        try {
          TypeReference<HashMap<String, String>> typeRef
                  = new TypeReference<HashMap<String, String>>() {};
          // 读取 topic 注解上的 metadata 属性
          Map<String, String> metadata = MAPPER.readValue(topic.metadata(), typeRef);
          // 读取路由信息,细节看下一节
          List<String> routes = getAllCompleteRoutesForPost(clazz, method, topicName);
          for (String route : routes) {
            // 将读取的路由信息添加到 dapr runtime 中。
            // 细节看下一节
            DaprRuntime.getInstance().addSubscribedTopic(
                pubSubName, topicName, match, rule.priority(), route, metadata);
          }
        } catch (JsonProcessingException e) {
          throw new IllegalArgumentException("Error while parsing metadata: " + e);
        }
      }
    }
  }

读取路由信息

路由信息配置方法如下:

  @Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}",
          rule = @Rule(match = "event.type == \"v2\"", priority = 1))
  @PostMapping(path = "/testingtopicV2")
  public Mono<Void> handleMessageV2(@RequestBody(required = false) CloudEvent cloudEvent) {
    ......
  }

getAllCompleteRoutesForPost() 方法负责读取 @rule 注解相关的路由信息:

private static List<String> getAllCompleteRoutesForPost(Class clazz, Method method, String topicName) {
    List<String> routesList = new ArrayList<>();
    RequestMapping clazzRequestMapping =
        (RequestMapping) clazz.getAnnotation(RequestMapping.class);
    String[] clazzLevelRoute = null;
    if (clazzRequestMapping != null) {
      clazzLevelRoute = clazzRequestMapping.value();
    }
    // 读取该方法上的路由信息,注意必须是 POST
    String[] postValueArray = getRoutesForPost(method, topicName);
    if (postValueArray != null && postValueArray.length >= 1) {
      for (String postValue : postValueArray) {
        if (clazzLevelRoute != null && clazzLevelRoute.length >= 1) {
          for (String clazzLevelValue : clazzLevelRoute) {
            // 完整的路由路径应该是类级别 + 方法级别
            String route = clazzLevelValue + confirmLeadingSlash(postValue);
            routesList.add(route);
          }
        } else {
          routesList.add(postValue);
        }
      }
    }
    return routesList;
  }

getRoutesForPost() 方法用来读取 @topic 注解所在方法的 @PostMapping 注解,以便获得路由的 path 信息,对应例子如下:

  @Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}",
          rule = @Rule(match = "event.type == \"v2\"", priority = 1))
  @PostMapping(path = "/testingtopicV2")
  public Mono<Void> handleMessageV2(@RequestBody(required = false) CloudEvent cloudEvent) {
    ......
  }

getRoutesForPost() 方法的代码实现如下:

private static String[] getRoutesForPost(Method method, String topicName) {
    String[] postValueArray = new String[] {topicName};
    // 读取 PostMapping 注解
    PostMapping postMapping = method.getAnnotation(PostMapping.class);
    if (postMapping != null) {
      // 如果有 PostMapping 注解
      if (postMapping.path() != null && postMapping.path().length >= 1) {
        // 如果 path 属性有设置则从 path 属性取值
        postValueArray = postMapping.path();
      } else if (postMapping.value() != null && postMapping.value().length >= 1) {
        // 如果 path 属性没有设置则直接从 PostMapping 注解的 value 中取值
        postValueArray = postMapping.value();
      }
    } else {
      // 如果没有 PostMapping 注解,则尝试读取 RequestMapping 注解
      RequestMapping reqMapping = method.getAnnotation(RequestMapping.class);
      for (RequestMethod reqMethod : reqMapping.method()) {
        // 要求 RequestMethod 为 POST
        if (reqMethod == RequestMethod.POST) {
          // 同样读取 path 或者 value 的值
          if (reqMapping.path() != null && reqMapping.path().length >= 1) {
            postValueArray = reqMapping.path();
          } else if (reqMapping.value() != null && reqMapping.value().length >= 1) {
            postValueArray = reqMapping.value();
          }
          break;
        }
      }
    }
    return postValueArray;
  }

getRoutesForPost() 方法的解读,就是从标记了 @topic 注解的方法上读取路由信息,也就是后续订阅的事件应该发送的地址。读取的逻辑为:

  1. 优先读取 PostMapping 注解,没有的话读取 RequestMethod 为 POST 的 RequestMapping 注解
  2. 优先读取上述注解的 path 属性,没有的话读取 value

保存 topic 订阅信息

topic 订阅信息在读取之后,就会通过 DaprRuntime 的 addSubscribedTopic() 方法保存起来:

public synchronized void addSubscribedTopic(String pubsubName,
                                              String topicName,
                                              String match,
                                              int priority,
                                              String route,
                                              Map<String,String> metadata) {
    // 用 pubsubName 和 topicName 做 key
    DaprTopicKey topicKey = new DaprTopicKey(pubsubName, topicName);

    // 获取 key 对应的 builder,没有的话就创建一个
    DaprSubscriptionBuilder builder = subscriptionBuilders.get(topicKey);
    if (builder == null) {
      builder = new DaprSubscriptionBuilder(pubsubName, topicName);
      subscriptionBuilders.put(topicKey, builder);
    }

    // match 不为空则添加 rule,为空则采用默认路径
    if (match.length() > 0) {
      builder.addRule(route, match, priority);
    } else {
      builder.setDefaultPath(route);
    }

    if (metadata != null && !metadata.isEmpty()) {
      builder.setMetadata(metadata);
    }
  }

考虑到调用的地方代码是:

// 读取路由信息
List<String> routes = getAllCompleteRoutesForPost(clazz, method, topicName);
for (String route : routes) {
  // 将读取的路由信息添加到 dapr runtime 中。
  DaprRuntime.getInstance().addSubscribedTopic(
      pubSubName, topicName, match, rule.priority(), route, metadata);
}

所以前面的读取流程可以理解为就是读取和 topic 订阅有关的上述6个参数,然后保存起老。

应答 topic 订阅信息

在 DaprController 中,daprSubscribe() 方法对外暴露路径 /dapr/subscribe ,以便让 dapr sidecar 可以通过读取该路径来获取当前应用的 topic 订阅信息:

@GetMapping(path = "/dapr/subscribe", produces = MediaType.APPLICATION_JSON_VALUE)
public byte[] daprSubscribe() throws IOException {
  return SERIALIZER.serialize(DaprRuntime.getInstance().listSubscribedTopics());
}

而 DaprRuntime 的 listSubscribedTopics() 方法获取的就是前面保存起来的 topic 订阅信息:

  public synchronized DaprTopicSubscription[] listSubscribedTopics() {
    List<DaprTopicSubscription> values = subscriptionBuilders.values().stream()
            .map(b -> b.build()).collect(Collectors.toList());
    return values.toArray(new DaprTopicSubscription[0]);
  }

流程总结

整个 topic 订阅流程的示意图如下:

title topic subscription
hide footbox
skinparam style strictuml


box "Application" #LightBlue
participant DaprBeanPostProcessor
participant bean
participant DaprRuntime
participant DaprController
end box
participant daprd

-> DaprBeanPostProcessor: postProcessBeforeInitialization(bean)

DaprBeanPostProcessor -> bean: get @topic
bean --> DaprBeanPostProcessor
 
alt if bean has @topic
DaprBeanPostProcessor -> bean: parse @topic @rule
bean --> DaprBeanPostProcessor: pubsub name, topic name, match,\n priority, routes, metadata

DaprBeanPostProcessor -> DaprRuntime: addSubscribedTopic()

DaprRuntime -> DaprRuntime: save in map\n subscriptionBuilders

DaprRuntime --> DaprBeanPostProcessor
end
<-- DaprBeanPostProcessor

daprd -> DaprController: get subscription
DaprController -> DaprRuntime: listSubscribedTopics()
DaprRuntime --> DaprController
DaprController --> daprd