这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

Dapr 客户端

java sdk 中的 Dapr 客户端

1 - DaprClient

DaprClient 接口定义
// 无论需要何种GRPC或HTTP客户端实现,都可以使用通用客户端适配器。
public interface DaprClient extends AutoCloseable {

  Mono<Void> waitForSidecar(int timeoutInMilliseconds);

  Mono<Void> shutdown();
}

其他方法都是和 dapr api 相关的方法,然后所有的方法都是实现了 reactive 风格,如:

  Mono<Void> publishEvent(String pubsubName, String topicName, Object data);

  Mono<Void> publishEvent(String pubsubName, String topicName, Object data, Map<String, String> metadata);

  Mono<Void> publishEvent(PublishEventRequest request);

2 - DaprPreviewClient

DaprPreviewClient 接口用于定义 preview 和 alpha 的 API

DaprPreviewClient 接口定义,目前只有新增的 configuration api 的方法和 state query 的方法:

// 无论需要何种GRPC或HTTP客户端实现,都可以使用通用客户端适配器。
public interface DaprPreviewClient extends AutoCloseable {

  Mono<ConfigurationItem> getConfiguration(String storeName, String key);

  Flux<List<ConfigurationItem>> subscribeToConfiguration(String storeName, String... keys);

  <T> Mono<QueryStateResponse<T>> queryState(String storeName, String query, TypeRef<T> type);
}

备注:distribuyted lock 的方法还没有加上来,估计是还没有开始实现。

3 - AbstractDaprClient

AbstractDaprClient 抽象基类实现
// 抽象类,具有客户端实现之间共同的便利方法。
abstract class AbstractDaprClient implements DaprClient, DaprPreviewClient {
  // 这里还是写死了 jackson!
  // TBD: 看下是哪里在用
  protected static final ObjectMapper JSON_REQUEST_MAPPER = new ObjectMapper();

  protected DaprObjectSerializer objectSerializer;

  protected DaprObjectSerializer stateSerializer;

    AbstractDaprClient(
      DaprObjectSerializer objectSerializer,
      DaprObjectSerializer stateSerializer) {
    this.objectSerializer = objectSerializer;
    this.stateSerializer = stateSerializer;
  }
}

其他都方法实现基本都是一些代理方法,没有实质性内容,实际实现都应该在子类中实现。

  @Override
  public Mono<Void> publishEvent(String pubsubName, String topicName, Object data) {
    return this.publishEvent(pubsubName, topicName, data, null);
  }

    @Override
  public Mono<Void> publishEvent(String pubsubName, String topicName, Object data, Map<String, String> metadata) {
    PublishEventRequest req = new PublishEventRequest(pubsubName, topicName, data)
        .setMetadata(metadata);
    return this.publishEvent(req).then();
  }

这些方法重载可以理解成一些语法糖,可以不用构造复杂的请求对象如 PublishEventRequest 就可以方便的直接使用而已。

4 - DaprClientHttp

Dapr Client Http 实现

类定义

public class DaprClientHttp extends AbstractDaprClient {

  private final DaprHttp client;
  private final boolean isObjectSerializerDefault;
  private final boolean isStateSerializerDefault;

  DaprClientHttp(DaprHttp client, DaprObjectSerializer objectSerializer, DaprObjectSerializer stateSerializer) {
    super(objectSerializer, stateSerializer);
    this.client = client;
    this.isObjectSerializerDefault = objectSerializer.getClass() == DefaultObjectSerializer.class;
    this.isStateSerializerDefault = stateSerializer.getClass() == DefaultObjectSerializer.class;
  }

   DaprClientHttp(DaprHttp client) {
    this(client, new DefaultObjectSerializer(), new DefaultObjectSerializer());
  }
}

client特有的方法实现

waitForSidecar() 方法

waitForSidecar() 方法通过连接指定的 sidecar ip地址和端口来判断并等待 sidecar 是不是可用。

  public Mono<Void> waitForSidecar(int timeoutInMilliseconds) {
    return Mono.fromRunnable(() -> {
      try {
        NetworkUtils.waitForSocket(Properties.SIDECAR_IP.get(), Properties.HTTP_PORT.get(), timeoutInMilliseconds);
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
    });
  }

close() 方法

close() 方法是实现 java.lang.AutoCloseable 的要求,DaprClient 继承了这个接口:

  @Override
  public void close() {
    // 简单的关闭 http client
    client.close();
  }

dapr api 方法的实现

publishEvent()方法

publishEvent()方法主要是两个任务:

  1. 组装发送请求的各种参数,包括 http 请求的 method,path,parameters,以及事件序列化后 byte[] 格式的数据
  2. 调用 DaprClient 发出 HTTP 请求
@Override
  public Mono<Void> publishEvent(PublishEventRequest request) {
    try {
      String pubsubName = request.getPubsubName();
      String topic = request.getTopic();
      Object data = request.getData();
      Map<String, String> metadata = request.getMetadata();

      if (topic == null || topic.trim().isEmpty()) {
        throw new IllegalArgumentException("Topic name cannot be null or empty.");
      }

      byte[] serializedEvent = objectSerializer.serialize(data);
      // Content-type can be overwritten on a per-request basis.
      // It allows CloudEvents to be handled differently, for example.
      String contentType = request.getContentType();
      if (contentType == null || contentType.isEmpty()) {
        contentType = objectSerializer.getContentType();
      }
      Map<String, String> headers = Collections.singletonMap("content-type", contentType);

      String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "publish", pubsubName, topic };

      Map<String, List<String>> queryArgs = metadataToQueryArgs(metadata);
      return Mono.subscriberContext().flatMap(
          context -> this.client.invokeApi(
              DaprHttp.HttpMethods.POST.name(), pathSegments, queryArgs, serializedEvent, headers, context
          )
      ).then();
    } catch (Exception ex) {
      return DaprException.wrapMono(ex);
    }
  }

shutdown() 方法

注意这个 shutdown() 方法是关闭 sidecar,因此也是需要发送请求到 sidecar 的:

  @Override
  public Mono<Void> shutdown() {
    String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "shutdown" };
    return Mono.subscriberContext().flatMap(
            context -> client.invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments,
                null, null, context))
        .then();
  }

http 请求最终是通过 DaprClient 发出去的。

5 - DaprClientGrpc

Dapr Client gRPC 实现

类定义


public class DaprClientGrpc extends AbstractDaprClient {

  private Closeable channel;

  private DaprGrpc.DaprStub asyncStub;

  DaprClientGrpc(
      Closeable closeableChannel,
      DaprGrpc.DaprStub asyncStub,
      DaprObjectSerializer objectSerializer,
      DaprObjectSerializer stateSerializer) {
    super(objectSerializer, stateSerializer);
    this.channel = closeableChannel;
    this.asyncStub = intercept(asyncStub);
  }

}

client特有的方法实现

waitForSidecar() 方法

waitForSidecar() 方法通过连接指定的 sidecar ip地址和端口来判断并等待 sidecar 是不是可用。

和 HTTP 的实现差别只是端口不同。

  @Override
  public Mono<Void> waitForSidecar(int timeoutInMilliseconds) {
    return Mono.fromRunnable(() -> {
      try {
        NetworkUtils.waitForSocket(Properties.SIDECAR_IP.get(), Properties.GRPC_PORT.get(), timeoutInMilliseconds);
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
    });
  }

close() 方法

close() 方法是实现 java.lang.AutoCloseable 的要求,DaprClient 继承了这个接口:

  public void close() throws Exception {
    if (channel != null) {
      DaprException.wrap(() -> {
        // 关闭channel
        channel.close();
        return true;
      }).call();
    }
  }

dapr api 方法的实现

publishEvent()方法

publishEvent()方法主要是两个任务:

  1. 组装发送 grpc 请求的各种参数,构建 PublishEventRequest 请求对象
  2. 调用 gRPC asyncStub 的对应方法发出 gRPC 请求
@Override
  public Mono<Void> publishEvent(PublishEventRequest request) {
    try {
      String pubsubName = request.getPubsubName();
      String topic = request.getTopic();
      Object data = request.getData();
      DaprProtos.PublishEventRequest.Builder envelopeBuilder = DaprProtos.PublishEventRequest.newBuilder()
          .setTopic(topic)
          .setPubsubName(pubsubName)
          .setData(ByteString.copyFrom(objectSerializer.serialize(data)));

      // Content-type can be overwritten on a per-request basis.
      // It allows CloudEvents to be handled differently, for example.
      String contentType = request.getContentType();
      if (contentType == null || contentType.isEmpty()) {
        contentType = objectSerializer.getContentType();
      }
      envelopeBuilder.setDataContentType(contentType);

      Map<String, String> metadata = request.getMetadata();
      if (metadata != null) {
        envelopeBuilder.putAllMetadata(metadata);
      }

      return Mono.subscriberContext().flatMap(
          context ->
              this.<Empty>createMono(
                  it -> intercept(context, asyncStub).publishEvent(envelopeBuilder.build(), it)
              )
      ).then();
    } catch (Exception ex) {
      return DaprException.wrapMono(ex);
    }
  }