1 - 序列化

java sdk 中序列化的设计和实现

1.1 - 背景

java sdk 中序列化的背景

文档介绍

https://github.com/dapr/java-sdk#how-to-use-a-custom-serializer

dapr java-sdk 项目的 readme 中有这么一段介绍:

How to use a custom serializer

如何使用一个自定义的序列化器

This SDK provides a basic serialization for request/response objects but also for state objects. Applications should provide their own serialization for production scenarios.

这个SDK为请求/响应对象提供了一个基本的序列化,但也为状态对象提供了序列化。应用程序应该为生产场景提供他们自己的序列化。

1.2 - DaprObjectSerializer

DaprObjectSerializer 接口定义了 dapr 的对象序列化器

接口定义

DaprObjectSerializer 接口很简单,定义如下:

// 对应用程序的对象进行序列化和反序列化
public interface DaprObjectSerializer {

  // 将给定的对象序列化为byte[].
  byte[] serialize(Object o) throws IOException;

  // 将给定的byte[]反序列化为一个对象。
  <T> T deserialize(byte[] data, TypeRef<T> type) throws IOException;

  // 返回请求的内容类型
  String getContentType();
}

getContentType() 方法获知内容的类型,serialize() 和 deserialize() 分别实现序列化和反序列化,即实现对象和 byte[] 的相互转换。

1.3 - DefaultObjectSerializer

DefaultObjectSerializer 是 dapr 的默认对象序列化器

DefaultObjectSerializer 继承自 ObjectSerializer, serialize 和 deserialize 都只是代理给 ObjectSerializer ,而 getContentType() 方法则 hard code 为返回 “application/json”:

public class DefaultObjectSerializer extends ObjectSerializer implements DaprObjectSerializer {

  @Override
  public byte[] serialize(Object o) throws IOException {
    return super.serialize(o);
  }

  @Override
  public <T> T deserialize(byte[] data, TypeRef<T> type) throws IOException {
    return super.deserialize(data, type);
  }

  @Override
  public String getContentType() {
    return "application/json";
  }
}

1.4 - ObjectSerializer

ObjectSerializer 是 dapr 的默认对象序列化器

类定义

public class ObjectSerializer {
  // 默认构造函数,以避免类在包外被实例化,但仍可以被继承。
  protected ObjectSerializer() {
  }
}

jackson 相关设置

  protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
      .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
      .setSerializationInclusion(JsonInclude.Include.NON_NULL);

serialize() 方法实现

public byte[] serialize(Object state) throws IOException {
    if (state == null) {
      return null;
    }

    if (state.getClass() == Void.class) {
      return null;
    }

    // Have this check here to be consistent with deserialization (see deserialize() method below).
    if (state instanceof byte[]) {
      return (byte[]) state;
    }

    // Proto buffer class is serialized directly.
    if (state instanceof MessageLite) {
      return ((MessageLite) state).toByteArray();
    }

    // Not string, not primitive, so it is a complex type: we use JSON for that.
    return OBJECT_MAPPER.writeValueAsBytes(state);
  }

deserialize() 方法实现

这两个方法都是简单代理:

  public <T> T deserialize(byte[] content, TypeRef<T> type) throws IOException {
    return deserialize(content, OBJECT_MAPPER.constructType(type.getType()));
  }

  public <T> T deserialize(byte[] content, Class<T> clazz) throws IOException {
    return deserialize(content, OBJECT_MAPPER.constructType(clazz));
  }

具体实现在这里:

  private <T> T deserialize(byte[] content, JavaType javaType) throws IOException {
    // 对应 serialize 的做法
    if ((javaType == null) || javaType.isTypeOrSubTypeOf(Void.class)) {
      return null;
    }

    // 如果是 java 基本类型,则交给 deserializePrimitives() 方法处理
    // 注意此时 content 有可能是 null 或者 空数组
    if (javaType.isPrimitive()) {
      return deserializePrimitives(content, javaType);
    }

    // 对应 serialize 的做法
    if (content == null) {
      return null;
    }

    // Deserialization of GRPC response fails without this check since it does not come as base64 encoded byte[].
    // 如果没有这个检查,GRPC响应的反序列化就会失败,因为它不是以 base64 编码的 byte[] 形式出现的。
    // TBD:这里有点不是太理解
    if (javaType.hasRawClass(byte[].class)) {
      return (T) content;
    }

    // // 对应 serialize 的做法,但长度为零的检测放在 byte[] 检测之后
    if (content.length == 0) {
      return null;
    }

    // 对 CloudEvent 的支持:如果是 CloudEvent,则单独序列化
    if (javaType.hasRawClass(CloudEvent.class)) {
      return (T) CloudEvent.deserialize(content);
    }

    // 对 grpc MessageLite 的支持:通过反射调用 parseFrom 方法
    if (javaType.isTypeOrSubTypeOf(MessageLite.class)) {
      try {
        Method method = javaType.getRawClass().getDeclaredMethod("parseFrom", byte[].class);
        if (method != null) {
          return (T) method.invoke(null, content);
        }
      } catch (NoSuchMethodException e) {
        // It was a best effort. Skip this try.
      } catch (Exception e) {
        throw new IOException(e);
      }
    }

    // 最后才通过 jackson 进行标准的 json 序列化
    return OBJECT_MAPPER.readValue(content, javaType);
  }

deserializePrimitives() 方法

对原生类型的解析:

private static <T> T deserializePrimitives(byte[] content, JavaType javaType) throws IOException {
    if ((content == null) || (content.length == 0)) {
      // content 为null或者空的特殊处理,相当于是缺省值
      if (javaType.hasRawClass(boolean.class)) {
        return (T) Boolean.FALSE;
      }

      if (javaType.hasRawClass(byte.class)) {
        return (T) Byte.valueOf((byte) 0);
      }

      if (javaType.hasRawClass(short.class)) {
        return (T) Short.valueOf((short) 0);
      }

      if (javaType.hasRawClass(int.class)) {
        return (T) Integer.valueOf(0);
      }

      if (javaType.hasRawClass(long.class)) {
        return (T) Long.valueOf(0L);
      }

      if (javaType.hasRawClass(float.class)) {
        return (T) Float.valueOf(0);
      }

      if (javaType.hasRawClass(double.class)) {
        return (T) Double.valueOf(0);
      }

      if (javaType.hasRawClass(char.class)) {
        return (T) Character.valueOf(Character.MIN_VALUE);
      }

      return null;
    }

    // 对于非空值,通过 jackson 进行反序列化
    return OBJECT_MAPPER.readValue(content, javaType);
  }

总结

这个代码中,在 jackson 处理之前有很多特殊逻辑,这些逻辑理论上应该是独立于 jackson 序列化方案的,如果要引入其他 DaprObjectSerializer 的实现,这些特殊逻辑都要重复 n 次,有代码重复和逻辑不一致的风险。

最好是能把这些逻辑提取出来,在序列化和反序列化时先用这些特殊逻辑出来一遍,最后再交给 DaprObjectSerializer ,会比较合理。

再有就是依赖冲突问题,目前的 DaprObjectSerializer 方案没有给出完整的解决方案。jackson 的依赖还是写死的。

2 - HTTP客户端

java sdk 中的HTTP客户端

2.1 - DaprHttp

Dapr HTTP 的 okhttp3 + jackson 实现

常量定义

  public static final String API_VERSION = "v1.0";

  public static final String ALPHA_1_API_VERSION = "v1.0-alpha1";

  private static final String HEADER_DAPR_REQUEST_ID = "X-DaprRequestId";

  private static final String DEFAULT_HTTP_SCHEME = "http";

  private static final Set<String> ALLOWED_CONTEXT_IN_HEADERS =
      Collections.unmodifiableSet(new HashSet<>(Arrays.asList("grpc-trace-bin", "traceparent", "tracestate")));

HTTP 方法定义:

  public enum HttpMethods {
    NONE,
    GET,
    PUT,
    POST,
    DELETE,
    HEAD,
    CONNECT,
    OPTIONS,
    TRACE
  }

基本类定义

  public static class Response {
    private byte[] body;
    private Map<String, String> headers;
    private int statusCode;
    ......
  }

DaprHttp 类定义

  private final OkHttpClient httpClient;
  private final int port;
  private final String hostname;

  DaprHttp(String hostname, int port, OkHttpClient httpClient) {
    this.hostname = hostname;
    this.port = port;
    this.httpClient = httpClient;
  }

invokeApi() 方法实现

这个方法有多个重载,最终的实现如下,用来执行http调用请求:

  /**
   * 调用API,返回文本格式有效载荷。
   *
   * @param method        HTTP method.
   * @param pathSegments  Array of path segments (/a/b/c -> ["a", "b", "c"]).
   * @param urlParameters Parameters in the URL
   * @param content       payload to be posted.
   * @param headers       HTTP headers.
   * @param context       OpenTelemetry's Context.
   * @return CompletableFuture for Response.
   */
private CompletableFuture<Response> doInvokeApi(String method,
                               String[] pathSegments,
                               Map<String, List<String>> urlParameters,
                               byte[] content, Map<String, String> headers,
                               Context context) {
    // 方法人口参数基本就是一个非常简化的HTTP请求的格式抽象

    // 取 UUID 为 requestId
    final String requestId = UUID.randomUUID().toString();
    RequestBody body;

    //组装 okhttp3 的 request
    String contentType = headers != null ? headers.get(Metadata.CONTENT_TYPE) : null;
    MediaType mediaType = contentType == null ? MEDIA_TYPE_APPLICATION_JSON : MediaType.get(contentType);
    if (content == null) {
      body = mediaType.equals(MEDIA_TYPE_APPLICATION_JSON)
          ? REQUEST_BODY_EMPTY_JSON
          : RequestBody.Companion.create(new byte[0], mediaType);
    } else {
      body = RequestBody.Companion.create(content, mediaType);
    }
    HttpUrl.Builder urlBuilder = new HttpUrl.Builder();
    urlBuilder.scheme(DEFAULT_HTTP_SCHEME)
        .host(this.hostname)
        .port(this.port);
    for (String pathSegment : pathSegments) {
      urlBuilder.addPathSegment(pathSegment);
    }
    Optional.ofNullable(urlParameters).orElse(Collections.emptyMap()).entrySet().stream()
        .forEach(urlParameter ->
            Optional.ofNullable(urlParameter.getValue()).orElse(Collections.emptyList()).stream()
              .forEach(urlParameterValue ->
                  urlBuilder.addQueryParameter(urlParameter.getKey(), urlParameterValue)));

    Request.Builder requestBuilder = new Request.Builder()
        .url(urlBuilder.build())
        .addHeader(HEADER_DAPR_REQUEST_ID, requestId);
    if (context != null) {
      context.stream()
          .filter(entry -> ALLOWED_CONTEXT_IN_HEADERS.contains(entry.getKey().toString().toLowerCase()))
          .forEach(entry -> requestBuilder.addHeader(entry.getKey().toString(), entry.getValue().toString()));
    }
    if (HttpMethods.GET.name().equals(method)) {
      requestBuilder.get();
    } else if (HttpMethods.DELETE.name().equals(method)) {
      requestBuilder.delete();
    } else {
      requestBuilder.method(method, body);
    }

    String daprApiToken = Properties.API_TOKEN.get();
    if (daprApiToken != null) {
      requestBuilder.addHeader(Headers.DAPR_API_TOKEN, daprApiToken);
    }

    if (headers != null) {
      Optional.ofNullable(headers.entrySet()).orElse(Collections.emptySet()).stream()
          .forEach(header -> {
            requestBuilder.addHeader(header.getKey(), header.getValue());
          });
    }
    // 完成 request 的组装,构建 request 对象
    Request request = requestBuilder.build();

    // 发出 okhttp3 的请求,然后返回 CompletableFuture
    CompletableFuture<Response> future = new CompletableFuture<>();
    this.httpClient.newCall(request).enqueue(new ResponseFutureCallback(future));
    return future;
  }

在 http 请求组装过程中,注意 header 的处理:

  • request id: “X-DaprRequestId”,值为 UUID
  • dapr api token: “dapr-api-token”,值从系统变量 “dapr.api.token” 或者环境变量 “DAPR_API_TOKEN” 中获取
  • 发送请求时明确传递的header: 透传
  • OpenTelemetry 相关的值:会试图从传递进来的 OpenTelemetry context 中获取 “grpc-trace-bin”, “traceparent”, “tracestate” 这三个 header 并继续传递下去

2.2 - DaprHttpBuilder

builder for DaprHttp,基于 okhttp

代码没啥特殊的,就注意一下 okhttp 的一些参数的获取。

另外 MaxRequestsPerHost 默认为5,这是一个超级大坑!

private DaprHttp buildDaprHttp() {
    // 双重检查锁
    if (OK_HTTP_CLIENT == null) {
      synchronized (LOCK) {
        if (OK_HTTP_CLIENT == null) {
          OkHttpClient.Builder builder = new OkHttpClient.Builder();
          Duration readTimeout = Duration.ofSeconds(Properties.HTTP_CLIENT_READ_TIMEOUT_SECONDS.get());
          builder.readTimeout(readTimeout);

          Dispatcher dispatcher = new Dispatcher();
          dispatcher.setMaxRequests(Properties.HTTP_CLIENT_MAX_REQUESTS.get());
          //这里有一个超级大坑!
          // The maximum number of requests for each host to execute concurrently.
          // Default value is 5 in okhttp which is totally UNACCEPTABLE!
          // For sidecar case, set it the same as maxRequests.
          dispatcher.setMaxRequestsPerHost(Properties.HTTP_CLIENT_MAX_REQUESTS.get());
          builder.dispatcher(dispatcher);

          ConnectionPool pool = new ConnectionPool(Properties.HTTP_CLIENT_MAX_IDLE_CONNECTIONS.get(),
                  KEEP_ALIVE_DURATION, TimeUnit.SECONDS);
          builder.connectionPool(pool);

          OK_HTTP_CLIENT = builder.build();
        }
      }
    }

    return new DaprHttp(Properties.SIDECAR_IP.get(), Properties.HTTP_PORT.get(), OK_HTTP_CLIENT);
  }
}

相关的几个参数的获取:

  • http read timeout:默认60秒,可以通过系统参数 dapr.http.client.maxRequests 或者环境变量 DAPR_HTTP_CLIENT_READ_TIMEOUT_SECONDS 覆盖
  • http max request:默认 1024,可以通过系统参数 dapr.http.client.readTimeoutSeconds 或者环境变量 DAPR_HTTP_CLIENT_MAX_REQUESTS 覆盖
  • http max idle connections: 默认 1024,可以通过系统参数 dapr.http.client.maxIdleConnections 或者环境变量 DAPR_HTTP_CLIENT_MAX_IDLE_CONNECTIONS 覆盖
  • HTTP keep alive duration: hard code 为 30

对于 okhttp,还必须自动设置 http max request per host 参数,不然默认值为 5 对于 sidecar 来说完全不可用。

3 - gRPC客户端

java sdk 中的gRPC客户端

4 - Dapr 客户端

java sdk 中的 Dapr 客户端

4.1 - DaprClient

DaprClient 接口定义
// 无论需要何种GRPC或HTTP客户端实现,都可以使用通用客户端适配器。
public interface DaprClient extends AutoCloseable {

  Mono<Void> waitForSidecar(int timeoutInMilliseconds);

  Mono<Void> shutdown();
}

其他方法都是和 dapr api 相关的方法,然后所有的方法都是实现了 reactive 风格,如:

  Mono<Void> publishEvent(String pubsubName, String topicName, Object data);

  Mono<Void> publishEvent(String pubsubName, String topicName, Object data, Map<String, String> metadata);

  Mono<Void> publishEvent(PublishEventRequest request);

4.2 - DaprPreviewClient

DaprPreviewClient 接口用于定义 preview 和 alpha 的 API

DaprPreviewClient 接口定义,目前只有新增的 configuration api 的方法和 state query 的方法:

// 无论需要何种GRPC或HTTP客户端实现,都可以使用通用客户端适配器。
public interface DaprPreviewClient extends AutoCloseable {

  Mono<ConfigurationItem> getConfiguration(String storeName, String key);

  Flux<List<ConfigurationItem>> subscribeToConfiguration(String storeName, String... keys);

  <T> Mono<QueryStateResponse<T>> queryState(String storeName, String query, TypeRef<T> type);
}

备注:distribuyted lock 的方法还没有加上来,估计是还没有开始实现。

4.3 - AbstractDaprClient

AbstractDaprClient 抽象基类实现
// 抽象类,具有客户端实现之间共同的便利方法。
abstract class AbstractDaprClient implements DaprClient, DaprPreviewClient {
  // 这里还是写死了 jackson!
  // TBD: 看下是哪里在用
  protected static final ObjectMapper JSON_REQUEST_MAPPER = new ObjectMapper();

  protected DaprObjectSerializer objectSerializer;

  protected DaprObjectSerializer stateSerializer;

    AbstractDaprClient(
      DaprObjectSerializer objectSerializer,
      DaprObjectSerializer stateSerializer) {
    this.objectSerializer = objectSerializer;
    this.stateSerializer = stateSerializer;
  }
}

其他都方法实现基本都是一些代理方法,没有实质性内容,实际实现都应该在子类中实现。

  @Override
  public Mono<Void> publishEvent(String pubsubName, String topicName, Object data) {
    return this.publishEvent(pubsubName, topicName, data, null);
  }

    @Override
  public Mono<Void> publishEvent(String pubsubName, String topicName, Object data, Map<String, String> metadata) {
    PublishEventRequest req = new PublishEventRequest(pubsubName, topicName, data)
        .setMetadata(metadata);
    return this.publishEvent(req).then();
  }

这些方法重载可以理解成一些语法糖,可以不用构造复杂的请求对象如 PublishEventRequest 就可以方便的直接使用而已。

4.4 - DaprClientHttp

Dapr Client Http 实现

类定义

public class DaprClientHttp extends AbstractDaprClient {

  private final DaprHttp client;
  private final boolean isObjectSerializerDefault;
  private final boolean isStateSerializerDefault;

  DaprClientHttp(DaprHttp client, DaprObjectSerializer objectSerializer, DaprObjectSerializer stateSerializer) {
    super(objectSerializer, stateSerializer);
    this.client = client;
    this.isObjectSerializerDefault = objectSerializer.getClass() == DefaultObjectSerializer.class;
    this.isStateSerializerDefault = stateSerializer.getClass() == DefaultObjectSerializer.class;
  }

   DaprClientHttp(DaprHttp client) {
    this(client, new DefaultObjectSerializer(), new DefaultObjectSerializer());
  }
}

client特有的方法实现

waitForSidecar() 方法

waitForSidecar() 方法通过连接指定的 sidecar ip地址和端口来判断并等待 sidecar 是不是可用。

  public Mono<Void> waitForSidecar(int timeoutInMilliseconds) {
    return Mono.fromRunnable(() -> {
      try {
        NetworkUtils.waitForSocket(Properties.SIDECAR_IP.get(), Properties.HTTP_PORT.get(), timeoutInMilliseconds);
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
    });
  }

close() 方法

close() 方法是实现 java.lang.AutoCloseable 的要求,DaprClient 继承了这个接口:

  @Override
  public void close() {
    // 简单的关闭 http client
    client.close();
  }

dapr api 方法的实现

publishEvent()方法

publishEvent()方法主要是两个任务:

  1. 组装发送请求的各种参数,包括 http 请求的 method,path,parameters,以及事件序列化后 byte[] 格式的数据
  2. 调用 DaprClient 发出 HTTP 请求
@Override
  public Mono<Void> publishEvent(PublishEventRequest request) {
    try {
      String pubsubName = request.getPubsubName();
      String topic = request.getTopic();
      Object data = request.getData();
      Map<String, String> metadata = request.getMetadata();

      if (topic == null || topic.trim().isEmpty()) {
        throw new IllegalArgumentException("Topic name cannot be null or empty.");
      }

      byte[] serializedEvent = objectSerializer.serialize(data);
      // Content-type can be overwritten on a per-request basis.
      // It allows CloudEvents to be handled differently, for example.
      String contentType = request.getContentType();
      if (contentType == null || contentType.isEmpty()) {
        contentType = objectSerializer.getContentType();
      }
      Map<String, String> headers = Collections.singletonMap("content-type", contentType);

      String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "publish", pubsubName, topic };

      Map<String, List<String>> queryArgs = metadataToQueryArgs(metadata);
      return Mono.subscriberContext().flatMap(
          context -> this.client.invokeApi(
              DaprHttp.HttpMethods.POST.name(), pathSegments, queryArgs, serializedEvent, headers, context
          )
      ).then();
    } catch (Exception ex) {
      return DaprException.wrapMono(ex);
    }
  }

shutdown() 方法

注意这个 shutdown() 方法是关闭 sidecar,因此也是需要发送请求到 sidecar 的:

  @Override
  public Mono<Void> shutdown() {
    String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "shutdown" };
    return Mono.subscriberContext().flatMap(
            context -> client.invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments,
                null, null, context))
        .then();
  }

http 请求最终是通过 DaprClient 发出去的。

4.5 - DaprClientGrpc

Dapr Client gRPC 实现

类定义


public class DaprClientGrpc extends AbstractDaprClient {

  private Closeable channel;

  private DaprGrpc.DaprStub asyncStub;

  DaprClientGrpc(
      Closeable closeableChannel,
      DaprGrpc.DaprStub asyncStub,
      DaprObjectSerializer objectSerializer,
      DaprObjectSerializer stateSerializer) {
    super(objectSerializer, stateSerializer);
    this.channel = closeableChannel;
    this.asyncStub = intercept(asyncStub);
  }

}

client特有的方法实现

waitForSidecar() 方法

waitForSidecar() 方法通过连接指定的 sidecar ip地址和端口来判断并等待 sidecar 是不是可用。

和 HTTP 的实现差别只是端口不同。

  @Override
  public Mono<Void> waitForSidecar(int timeoutInMilliseconds) {
    return Mono.fromRunnable(() -> {
      try {
        NetworkUtils.waitForSocket(Properties.SIDECAR_IP.get(), Properties.GRPC_PORT.get(), timeoutInMilliseconds);
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
    });
  }

close() 方法

close() 方法是实现 java.lang.AutoCloseable 的要求,DaprClient 继承了这个接口:

  public void close() throws Exception {
    if (channel != null) {
      DaprException.wrap(() -> {
        // 关闭channel
        channel.close();
        return true;
      }).call();
    }
  }

dapr api 方法的实现

publishEvent()方法

publishEvent()方法主要是两个任务:

  1. 组装发送 grpc 请求的各种参数,构建 PublishEventRequest 请求对象
  2. 调用 gRPC asyncStub 的对应方法发出 gRPC 请求
@Override
  public Mono<Void> publishEvent(PublishEventRequest request) {
    try {
      String pubsubName = request.getPubsubName();
      String topic = request.getTopic();
      Object data = request.getData();
      DaprProtos.PublishEventRequest.Builder envelopeBuilder = DaprProtos.PublishEventRequest.newBuilder()
          .setTopic(topic)
          .setPubsubName(pubsubName)
          .setData(ByteString.copyFrom(objectSerializer.serialize(data)));

      // Content-type can be overwritten on a per-request basis.
      // It allows CloudEvents to be handled differently, for example.
      String contentType = request.getContentType();
      if (contentType == null || contentType.isEmpty()) {
        contentType = objectSerializer.getContentType();
      }
      envelopeBuilder.setDataContentType(contentType);

      Map<String, String> metadata = request.getMetadata();
      if (metadata != null) {
        envelopeBuilder.putAllMetadata(metadata);
      }

      return Mono.subscriberContext().flatMap(
          context ->
              this.<Empty>createMono(
                  it -> intercept(context, asyncStub).publishEvent(envelopeBuilder.build(), it)
              )
      ).then();
    } catch (Exception ex) {
      return DaprException.wrapMono(ex);
    }
  }

5 - opencensus

dapr java sdk 提供对 opencensus 的支持

6 - 注解

dapr java sdk 提供注解支持

6.1 - topic注解

topic 注解提供对 subscribe 的支持

@topic 注解用来订阅某个主题, pubsubName, name, metadata 分别对应 dapr pub/sub API 中的 pubsubName, topic,metadata 字段:

@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Topic {
  String name();
  String pubsubName();
  String metadata() default "{}";

  // 用于匹配传入的 cloud event 的规则。
  Rule rule() default @Rule(match = "", priority = 0);
}

以下是 @topic 注解使用的典型例子:

  @Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}")
  @PostMapping(path = "/testingtopic")
  public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent<?> cloudEvent) {
    ......
  }

6.2 - rule注解

rule 注解用来表述匹配规则

@topic 注解用来表述匹配规则。

@Documented
@Target(ElementType.ANNOTATION_TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Rule {

  // 用于匹配传入的 cloud event 的通用表达式语言( Common Expression Language / CEL)表达。
  String match();

  // 规则的优先级,用于排序。最低的数字有更高的优先权。
  int priority();
}

以下是 @rule 注解使用的典型例子:

  @Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}",
          rule = @Rule(match = "event.type == \"v2\"", priority = 1))
  @PostMapping(path = "/testingtopicV2")
  public Mono<Void> handleMessageV2(@RequestBody(required = false) CloudEvent cloudEvent) {
    ......
  }