sdk
- 1: 序列化
- 1.1: 背景
- 1.2: DaprObjectSerializer
- 1.3: DefaultObjectSerializer
- 1.4: ObjectSerializer
- 2: HTTP客户端
- 2.1: DaprHttp
- 2.2: DaprHttpBuilder
- 3: gRPC客户端
- 4: Dapr 客户端
- 4.1: DaprClient
- 4.2: DaprPreviewClient
- 4.3: AbstractDaprClient
- 4.4: DaprClientHttp
- 4.5: DaprClientGrpc
- 5: opencensus
- 6: 注解
1 - 序列化
1.1 - 背景
文档介绍
https://github.com/dapr/java-sdk#how-to-use-a-custom-serializer
dapr java-sdk 项目的 readme 中有这么一段介绍:
How to use a custom serializer
如何使用一个自定义的序列化器
This SDK provides a basic serialization for request/response objects but also for state objects. Applications should provide their own serialization for production scenarios.
这个SDK为请求/响应对象提供了一个基本的序列化,但也为状态对象提供了序列化。应用程序应该为生产场景提供他们自己的序列化。
1.2 - DaprObjectSerializer
接口定义
DaprObjectSerializer 接口很简单,定义如下:
// 对应用程序的对象进行序列化和反序列化
public interface DaprObjectSerializer {
// 将给定的对象序列化为byte[].
byte[] serialize(Object o) throws IOException;
// 将给定的byte[]反序列化为一个对象。
<T> T deserialize(byte[] data, TypeRef<T> type) throws IOException;
// 返回请求的内容类型
String getContentType();
}
getContentType() 方法获知内容的类型,serialize() 和 deserialize() 分别实现序列化和反序列化,即实现对象和 byte[] 的相互转换。
1.3 - DefaultObjectSerializer
DefaultObjectSerializer 继承自 ObjectSerializer, serialize 和 deserialize 都只是代理给 ObjectSerializer ,而 getContentType() 方法则 hard code 为返回 “application/json”:
public class DefaultObjectSerializer extends ObjectSerializer implements DaprObjectSerializer {
@Override
public byte[] serialize(Object o) throws IOException {
return super.serialize(o);
}
@Override
public <T> T deserialize(byte[] data, TypeRef<T> type) throws IOException {
return super.deserialize(data, type);
}
@Override
public String getContentType() {
return "application/json";
}
}
1.4 - ObjectSerializer
类定义
public class ObjectSerializer {
// 默认构造函数,以避免类在包外被实例化,但仍可以被继承。
protected ObjectSerializer() {
}
}
jackson 相关设置
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
serialize() 方法实现
public byte[] serialize(Object state) throws IOException {
if (state == null) {
return null;
}
if (state.getClass() == Void.class) {
return null;
}
// Have this check here to be consistent with deserialization (see deserialize() method below).
if (state instanceof byte[]) {
return (byte[]) state;
}
// Proto buffer class is serialized directly.
if (state instanceof MessageLite) {
return ((MessageLite) state).toByteArray();
}
// Not string, not primitive, so it is a complex type: we use JSON for that.
return OBJECT_MAPPER.writeValueAsBytes(state);
}
deserialize() 方法实现
这两个方法都是简单代理:
public <T> T deserialize(byte[] content, TypeRef<T> type) throws IOException {
return deserialize(content, OBJECT_MAPPER.constructType(type.getType()));
}
public <T> T deserialize(byte[] content, Class<T> clazz) throws IOException {
return deserialize(content, OBJECT_MAPPER.constructType(clazz));
}
具体实现在这里:
private <T> T deserialize(byte[] content, JavaType javaType) throws IOException {
// 对应 serialize 的做法
if ((javaType == null) || javaType.isTypeOrSubTypeOf(Void.class)) {
return null;
}
// 如果是 java 基本类型,则交给 deserializePrimitives() 方法处理
// 注意此时 content 有可能是 null 或者 空数组
if (javaType.isPrimitive()) {
return deserializePrimitives(content, javaType);
}
// 对应 serialize 的做法
if (content == null) {
return null;
}
// Deserialization of GRPC response fails without this check since it does not come as base64 encoded byte[].
// 如果没有这个检查,GRPC响应的反序列化就会失败,因为它不是以 base64 编码的 byte[] 形式出现的。
// TBD:这里有点不是太理解
if (javaType.hasRawClass(byte[].class)) {
return (T) content;
}
// // 对应 serialize 的做法,但长度为零的检测放在 byte[] 检测之后
if (content.length == 0) {
return null;
}
// 对 CloudEvent 的支持:如果是 CloudEvent,则单独序列化
if (javaType.hasRawClass(CloudEvent.class)) {
return (T) CloudEvent.deserialize(content);
}
// 对 grpc MessageLite 的支持:通过反射调用 parseFrom 方法
if (javaType.isTypeOrSubTypeOf(MessageLite.class)) {
try {
Method method = javaType.getRawClass().getDeclaredMethod("parseFrom", byte[].class);
if (method != null) {
return (T) method.invoke(null, content);
}
} catch (NoSuchMethodException e) {
// It was a best effort. Skip this try.
} catch (Exception e) {
throw new IOException(e);
}
}
// 最后才通过 jackson 进行标准的 json 序列化
return OBJECT_MAPPER.readValue(content, javaType);
}
deserializePrimitives() 方法
对原生类型的解析:
private static <T> T deserializePrimitives(byte[] content, JavaType javaType) throws IOException {
if ((content == null) || (content.length == 0)) {
// content 为null或者空的特殊处理,相当于是缺省值
if (javaType.hasRawClass(boolean.class)) {
return (T) Boolean.FALSE;
}
if (javaType.hasRawClass(byte.class)) {
return (T) Byte.valueOf((byte) 0);
}
if (javaType.hasRawClass(short.class)) {
return (T) Short.valueOf((short) 0);
}
if (javaType.hasRawClass(int.class)) {
return (T) Integer.valueOf(0);
}
if (javaType.hasRawClass(long.class)) {
return (T) Long.valueOf(0L);
}
if (javaType.hasRawClass(float.class)) {
return (T) Float.valueOf(0);
}
if (javaType.hasRawClass(double.class)) {
return (T) Double.valueOf(0);
}
if (javaType.hasRawClass(char.class)) {
return (T) Character.valueOf(Character.MIN_VALUE);
}
return null;
}
// 对于非空值,通过 jackson 进行反序列化
return OBJECT_MAPPER.readValue(content, javaType);
}
总结
这个代码中,在 jackson 处理之前有很多特殊逻辑,这些逻辑理论上应该是独立于 jackson 序列化方案的,如果要引入其他 DaprObjectSerializer 的实现,这些特殊逻辑都要重复 n 次,有代码重复和逻辑不一致的风险。
最好是能把这些逻辑提取出来,在序列化和反序列化时先用这些特殊逻辑出来一遍,最后再交给 DaprObjectSerializer ,会比较合理。
再有就是依赖冲突问题,目前的 DaprObjectSerializer 方案没有给出完整的解决方案。jackson 的依赖还是写死的。
2 - HTTP客户端
2.1 - DaprHttp
常量定义
public static final String API_VERSION = "v1.0";
public static final String ALPHA_1_API_VERSION = "v1.0-alpha1";
private static final String HEADER_DAPR_REQUEST_ID = "X-DaprRequestId";
private static final String DEFAULT_HTTP_SCHEME = "http";
private static final Set<String> ALLOWED_CONTEXT_IN_HEADERS =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("grpc-trace-bin", "traceparent", "tracestate")));
HTTP 方法定义:
public enum HttpMethods {
NONE,
GET,
PUT,
POST,
DELETE,
HEAD,
CONNECT,
OPTIONS,
TRACE
}
基本类定义
public static class Response {
private byte[] body;
private Map<String, String> headers;
private int statusCode;
......
}
DaprHttp 类定义
private final OkHttpClient httpClient;
private final int port;
private final String hostname;
DaprHttp(String hostname, int port, OkHttpClient httpClient) {
this.hostname = hostname;
this.port = port;
this.httpClient = httpClient;
}
invokeApi() 方法实现
这个方法有多个重载,最终的实现如下,用来执行http调用请求:
/**
* 调用API,返回文本格式有效载荷。
*
* @param method HTTP method.
* @param pathSegments Array of path segments (/a/b/c -> ["a", "b", "c"]).
* @param urlParameters Parameters in the URL
* @param content payload to be posted.
* @param headers HTTP headers.
* @param context OpenTelemetry's Context.
* @return CompletableFuture for Response.
*/
private CompletableFuture<Response> doInvokeApi(String method,
String[] pathSegments,
Map<String, List<String>> urlParameters,
byte[] content, Map<String, String> headers,
Context context) {
// 方法人口参数基本就是一个非常简化的HTTP请求的格式抽象
// 取 UUID 为 requestId
final String requestId = UUID.randomUUID().toString();
RequestBody body;
//组装 okhttp3 的 request
String contentType = headers != null ? headers.get(Metadata.CONTENT_TYPE) : null;
MediaType mediaType = contentType == null ? MEDIA_TYPE_APPLICATION_JSON : MediaType.get(contentType);
if (content == null) {
body = mediaType.equals(MEDIA_TYPE_APPLICATION_JSON)
? REQUEST_BODY_EMPTY_JSON
: RequestBody.Companion.create(new byte[0], mediaType);
} else {
body = RequestBody.Companion.create(content, mediaType);
}
HttpUrl.Builder urlBuilder = new HttpUrl.Builder();
urlBuilder.scheme(DEFAULT_HTTP_SCHEME)
.host(this.hostname)
.port(this.port);
for (String pathSegment : pathSegments) {
urlBuilder.addPathSegment(pathSegment);
}
Optional.ofNullable(urlParameters).orElse(Collections.emptyMap()).entrySet().stream()
.forEach(urlParameter ->
Optional.ofNullable(urlParameter.getValue()).orElse(Collections.emptyList()).stream()
.forEach(urlParameterValue ->
urlBuilder.addQueryParameter(urlParameter.getKey(), urlParameterValue)));
Request.Builder requestBuilder = new Request.Builder()
.url(urlBuilder.build())
.addHeader(HEADER_DAPR_REQUEST_ID, requestId);
if (context != null) {
context.stream()
.filter(entry -> ALLOWED_CONTEXT_IN_HEADERS.contains(entry.getKey().toString().toLowerCase()))
.forEach(entry -> requestBuilder.addHeader(entry.getKey().toString(), entry.getValue().toString()));
}
if (HttpMethods.GET.name().equals(method)) {
requestBuilder.get();
} else if (HttpMethods.DELETE.name().equals(method)) {
requestBuilder.delete();
} else {
requestBuilder.method(method, body);
}
String daprApiToken = Properties.API_TOKEN.get();
if (daprApiToken != null) {
requestBuilder.addHeader(Headers.DAPR_API_TOKEN, daprApiToken);
}
if (headers != null) {
Optional.ofNullable(headers.entrySet()).orElse(Collections.emptySet()).stream()
.forEach(header -> {
requestBuilder.addHeader(header.getKey(), header.getValue());
});
}
// 完成 request 的组装,构建 request 对象
Request request = requestBuilder.build();
// 发出 okhttp3 的请求,然后返回 CompletableFuture
CompletableFuture<Response> future = new CompletableFuture<>();
this.httpClient.newCall(request).enqueue(new ResponseFutureCallback(future));
return future;
}
在 http 请求组装过程中,注意 header 的处理:
- request id: “X-DaprRequestId”,值为 UUID
- dapr api token: “dapr-api-token”,值从系统变量 “dapr.api.token” 或者环境变量 “DAPR_API_TOKEN” 中获取
- 发送请求时明确传递的header: 透传
- OpenTelemetry 相关的值:会试图从传递进来的 OpenTelemetry context 中获取 “grpc-trace-bin”, “traceparent”, “tracestate” 这三个 header 并继续传递下去
2.2 - DaprHttpBuilder
代码没啥特殊的,就注意一下 okhttp 的一些参数的获取。
另外 MaxRequestsPerHost 默认为5,这是一个超级大坑!
private DaprHttp buildDaprHttp() {
// 双重检查锁
if (OK_HTTP_CLIENT == null) {
synchronized (LOCK) {
if (OK_HTTP_CLIENT == null) {
OkHttpClient.Builder builder = new OkHttpClient.Builder();
Duration readTimeout = Duration.ofSeconds(Properties.HTTP_CLIENT_READ_TIMEOUT_SECONDS.get());
builder.readTimeout(readTimeout);
Dispatcher dispatcher = new Dispatcher();
dispatcher.setMaxRequests(Properties.HTTP_CLIENT_MAX_REQUESTS.get());
//这里有一个超级大坑!
// The maximum number of requests for each host to execute concurrently.
// Default value is 5 in okhttp which is totally UNACCEPTABLE!
// For sidecar case, set it the same as maxRequests.
dispatcher.setMaxRequestsPerHost(Properties.HTTP_CLIENT_MAX_REQUESTS.get());
builder.dispatcher(dispatcher);
ConnectionPool pool = new ConnectionPool(Properties.HTTP_CLIENT_MAX_IDLE_CONNECTIONS.get(),
KEEP_ALIVE_DURATION, TimeUnit.SECONDS);
builder.connectionPool(pool);
OK_HTTP_CLIENT = builder.build();
}
}
}
return new DaprHttp(Properties.SIDECAR_IP.get(), Properties.HTTP_PORT.get(), OK_HTTP_CLIENT);
}
}
相关的几个参数的获取:
- http read timeout:默认60秒,可以通过系统参数 dapr.http.client.maxRequests 或者环境变量 DAPR_HTTP_CLIENT_READ_TIMEOUT_SECONDS 覆盖
- http max request:默认 1024,可以通过系统参数 dapr.http.client.readTimeoutSeconds 或者环境变量 DAPR_HTTP_CLIENT_MAX_REQUESTS 覆盖
- http max idle connections: 默认 1024,可以通过系统参数 dapr.http.client.maxIdleConnections 或者环境变量 DAPR_HTTP_CLIENT_MAX_IDLE_CONNECTIONS 覆盖
- HTTP keep alive duration: hard code 为 30
对于 okhttp,还必须自动设置 http max request per host 参数,不然默认值为 5 对于 sidecar 来说完全不可用。
3 - gRPC客户端
4 - Dapr 客户端
4.1 - DaprClient
// 无论需要何种GRPC或HTTP客户端实现,都可以使用通用客户端适配器。
public interface DaprClient extends AutoCloseable {
Mono<Void> waitForSidecar(int timeoutInMilliseconds);
Mono<Void> shutdown();
}
其他方法都是和 dapr api 相关的方法,然后所有的方法都是实现了 reactive 风格,如:
Mono<Void> publishEvent(String pubsubName, String topicName, Object data);
Mono<Void> publishEvent(String pubsubName, String topicName, Object data, Map<String, String> metadata);
Mono<Void> publishEvent(PublishEventRequest request);
4.2 - DaprPreviewClient
DaprPreviewClient 接口定义,目前只有新增的 configuration api 的方法和 state query 的方法:
// 无论需要何种GRPC或HTTP客户端实现,都可以使用通用客户端适配器。
public interface DaprPreviewClient extends AutoCloseable {
Mono<ConfigurationItem> getConfiguration(String storeName, String key);
Flux<List<ConfigurationItem>> subscribeToConfiguration(String storeName, String... keys);
<T> Mono<QueryStateResponse<T>> queryState(String storeName, String query, TypeRef<T> type);
}
备注:distribuyted lock 的方法还没有加上来,估计是还没有开始实现。
4.3 - AbstractDaprClient
// 抽象类,具有客户端实现之间共同的便利方法。
abstract class AbstractDaprClient implements DaprClient, DaprPreviewClient {
// 这里还是写死了 jackson!
// TBD: 看下是哪里在用
protected static final ObjectMapper JSON_REQUEST_MAPPER = new ObjectMapper();
protected DaprObjectSerializer objectSerializer;
protected DaprObjectSerializer stateSerializer;
AbstractDaprClient(
DaprObjectSerializer objectSerializer,
DaprObjectSerializer stateSerializer) {
this.objectSerializer = objectSerializer;
this.stateSerializer = stateSerializer;
}
}
其他都方法实现基本都是一些代理方法,没有实质性内容,实际实现都应该在子类中实现。
@Override
public Mono<Void> publishEvent(String pubsubName, String topicName, Object data) {
return this.publishEvent(pubsubName, topicName, data, null);
}
@Override
public Mono<Void> publishEvent(String pubsubName, String topicName, Object data, Map<String, String> metadata) {
PublishEventRequest req = new PublishEventRequest(pubsubName, topicName, data)
.setMetadata(metadata);
return this.publishEvent(req).then();
}
这些方法重载可以理解成一些语法糖,可以不用构造复杂的请求对象如 PublishEventRequest 就可以方便的直接使用而已。
4.4 - DaprClientHttp
类定义
public class DaprClientHttp extends AbstractDaprClient {
private final DaprHttp client;
private final boolean isObjectSerializerDefault;
private final boolean isStateSerializerDefault;
DaprClientHttp(DaprHttp client, DaprObjectSerializer objectSerializer, DaprObjectSerializer stateSerializer) {
super(objectSerializer, stateSerializer);
this.client = client;
this.isObjectSerializerDefault = objectSerializer.getClass() == DefaultObjectSerializer.class;
this.isStateSerializerDefault = stateSerializer.getClass() == DefaultObjectSerializer.class;
}
DaprClientHttp(DaprHttp client) {
this(client, new DefaultObjectSerializer(), new DefaultObjectSerializer());
}
}
client特有的方法实现
waitForSidecar() 方法
waitForSidecar() 方法通过连接指定的 sidecar ip地址和端口来判断并等待 sidecar 是不是可用。
public Mono<Void> waitForSidecar(int timeoutInMilliseconds) {
return Mono.fromRunnable(() -> {
try {
NetworkUtils.waitForSocket(Properties.SIDECAR_IP.get(), Properties.HTTP_PORT.get(), timeoutInMilliseconds);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
close() 方法
close() 方法是实现 java.lang.AutoCloseable 的要求,DaprClient 继承了这个接口:
@Override
public void close() {
// 简单的关闭 http client
client.close();
}
dapr api 方法的实现
publishEvent()方法
publishEvent()方法主要是两个任务:
- 组装发送请求的各种参数,包括 http 请求的 method,path,parameters,以及事件序列化后 byte[] 格式的数据
- 调用 DaprClient 发出 HTTP 请求
@Override
public Mono<Void> publishEvent(PublishEventRequest request) {
try {
String pubsubName = request.getPubsubName();
String topic = request.getTopic();
Object data = request.getData();
Map<String, String> metadata = request.getMetadata();
if (topic == null || topic.trim().isEmpty()) {
throw new IllegalArgumentException("Topic name cannot be null or empty.");
}
byte[] serializedEvent = objectSerializer.serialize(data);
// Content-type can be overwritten on a per-request basis.
// It allows CloudEvents to be handled differently, for example.
String contentType = request.getContentType();
if (contentType == null || contentType.isEmpty()) {
contentType = objectSerializer.getContentType();
}
Map<String, String> headers = Collections.singletonMap("content-type", contentType);
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "publish", pubsubName, topic };
Map<String, List<String>> queryArgs = metadataToQueryArgs(metadata);
return Mono.subscriberContext().flatMap(
context -> this.client.invokeApi(
DaprHttp.HttpMethods.POST.name(), pathSegments, queryArgs, serializedEvent, headers, context
)
).then();
} catch (Exception ex) {
return DaprException.wrapMono(ex);
}
}
shutdown() 方法
注意这个 shutdown() 方法是关闭 sidecar,因此也是需要发送请求到 sidecar 的:
@Override
public Mono<Void> shutdown() {
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "shutdown" };
return Mono.subscriberContext().flatMap(
context -> client.invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments,
null, null, context))
.then();
}
http 请求最终是通过 DaprClient 发出去的。
4.5 - DaprClientGrpc
类定义
public class DaprClientGrpc extends AbstractDaprClient {
private Closeable channel;
private DaprGrpc.DaprStub asyncStub;
DaprClientGrpc(
Closeable closeableChannel,
DaprGrpc.DaprStub asyncStub,
DaprObjectSerializer objectSerializer,
DaprObjectSerializer stateSerializer) {
super(objectSerializer, stateSerializer);
this.channel = closeableChannel;
this.asyncStub = intercept(asyncStub);
}
}
client特有的方法实现
waitForSidecar() 方法
waitForSidecar() 方法通过连接指定的 sidecar ip地址和端口来判断并等待 sidecar 是不是可用。
和 HTTP 的实现差别只是端口不同。
@Override
public Mono<Void> waitForSidecar(int timeoutInMilliseconds) {
return Mono.fromRunnable(() -> {
try {
NetworkUtils.waitForSocket(Properties.SIDECAR_IP.get(), Properties.GRPC_PORT.get(), timeoutInMilliseconds);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
close() 方法
close() 方法是实现 java.lang.AutoCloseable 的要求,DaprClient 继承了这个接口:
public void close() throws Exception {
if (channel != null) {
DaprException.wrap(() -> {
// 关闭channel
channel.close();
return true;
}).call();
}
}
dapr api 方法的实现
publishEvent()方法
publishEvent()方法主要是两个任务:
- 组装发送 grpc 请求的各种参数,构建 PublishEventRequest 请求对象
- 调用 gRPC asyncStub 的对应方法发出 gRPC 请求
@Override
public Mono<Void> publishEvent(PublishEventRequest request) {
try {
String pubsubName = request.getPubsubName();
String topic = request.getTopic();
Object data = request.getData();
DaprProtos.PublishEventRequest.Builder envelopeBuilder = DaprProtos.PublishEventRequest.newBuilder()
.setTopic(topic)
.setPubsubName(pubsubName)
.setData(ByteString.copyFrom(objectSerializer.serialize(data)));
// Content-type can be overwritten on a per-request basis.
// It allows CloudEvents to be handled differently, for example.
String contentType = request.getContentType();
if (contentType == null || contentType.isEmpty()) {
contentType = objectSerializer.getContentType();
}
envelopeBuilder.setDataContentType(contentType);
Map<String, String> metadata = request.getMetadata();
if (metadata != null) {
envelopeBuilder.putAllMetadata(metadata);
}
return Mono.subscriberContext().flatMap(
context ->
this.<Empty>createMono(
it -> intercept(context, asyncStub).publishEvent(envelopeBuilder.build(), it)
)
).then();
} catch (Exception ex) {
return DaprException.wrapMono(ex);
}
}
5 - opencensus
6 - 注解
6.1 - topic注解
@topic
注解用来订阅某个主题, pubsubName, name, metadata 分别对应 dapr pub/sub API 中的 pubsubName, topic,metadata 字段:
@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Topic {
String name();
String pubsubName();
String metadata() default "{}";
// 用于匹配传入的 cloud event 的规则。
Rule rule() default @Rule(match = "", priority = 0);
}
以下是 @topic 注解使用的典型例子:
@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}")
@PostMapping(path = "/testingtopic")
public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent<?> cloudEvent) {
......
}
6.2 - rule注解
@topic
注解用来表述匹配规则。
@Documented
@Target(ElementType.ANNOTATION_TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Rule {
// 用于匹配传入的 cloud event 的通用表达式语言( Common Expression Language / CEL)表达。
String match();
// 规则的优先级,用于排序。最低的数字有更高的优先权。
int priority();
}
以下是 @rule 注解使用的典型例子:
@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}",
rule = @Rule(match = "event.type == \"v2\"", priority = 1))
@PostMapping(path = "/testingtopicV2")
public Mono<Void> handleMessageV2(@RequestBody(required = false) CloudEvent cloudEvent) {
......
}