Java SDK
- 1: Java SDK 概述
- 2: sdk-autogen
- 2.1: pom.xml
- 3: sdk
- 3.1: 序列化
- 3.1.1: 背景
- 3.1.2: DaprObjectSerializer
- 3.1.3: DefaultObjectSerializer
- 3.1.4: ObjectSerializer
- 3.2: HTTP客户端
- 3.2.1: DaprHttp
- 3.2.2: DaprHttpBuilder
- 3.3: gRPC客户端
- 3.4: Dapr 客户端
- 3.4.1: DaprClient
- 3.4.2: DaprPreviewClient
- 3.4.3: AbstractDaprClient
- 3.4.4: DaprClientHttp
- 3.4.5: DaprClientGrpc
- 3.5: opencensus
- 3.6: 注解
- 4: actors
- 4.1: client
- 4.2: actor runtime
- 5: springboot
- 5.1: spring auto configuration
- 5.2: controller
- 5.3: topic subscription
- 6: workflow
- 6.1: workflow定义
- 6.2: DaprWorkflowContextImpl实现
- 6.3: runtime package
- 6.3.1: WorkflowRuntime实现
- 6.3.2: WorkflowRuntimeBuilder实现
- 6.3.3: OrchestratorWrapper实现
- 6.3.4: ActivityWrapper实现
- 6.3.5: WorkflowActivity实现
- 6.4: client package
- 6.4.1: DaprWorkflowClient代码实现
- 6.4.2: WorkflowInstanceStatus代码实现
- 6.4.3: WorkflowFailureDetails代码实现
1 - Java SDK 概述
项目结构
主要有以下子项目:
- sdk
- sdk-autogen
- sdk-springboot
- sdk-tests
2 - sdk-autogen
2.1 - pom.xml
基本定义
依赖
定义的项目依赖:
- javax.annotation-api: provided
- grpc-netty-shaded: runtime
- grpc-protobuf
- grpc-stub
- grpc-testing: test
其中 grpc 版本为 1.42.1。
<properties>
<grpc.version>1.42.1</grpc.version>
</properties>
<dependencies>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>${grpc.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
<version>${grpc.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
代码生成的目录
两个目录:
- input: proto
- output: generated-sources
<properties>
<protobuf.output.directory>${project.build.directory}/generated-sources</protobuf.output.directory>
<protobuf.input.directory>${project.build.directory}/proto</protobuf.input.directory>
</properties>
maven插件
download-maven-plugin
download-maven-plugin 用来下载 proto 文件。
插件的功能可以简单理解为:
- 用 wget 命令从 ${dapr.proto.baseurl}/common/v1/common.proto 处下载到 common.proto 文件
- 用 wget 命令从 ${dapr.proto.baseurl}/common/v1/dapr.proto 处下载到 dapr.proto 文件
- 用 wget 命令从 ${dapr.proto.baseurl}/common/v1/appcallback.proto 处下载到 appcallback.proto 文件
- 以上三个文件下载后都会放置到目录 ${protobuf.input.directory}/dapr/proto/common/v1 下
<plugin>
<groupId>com.googlecode.maven-download-plugin</groupId>
<artifactId>download-maven-plugin</artifactId>
<version>1.6.0</version>
<executions>
<execution>
<id>getCommonProto</id>
<!-- the wget goal actually binds itself to this phase by default -->
<phase>initialize</phase>
<goals>
<goal>wget</goal>
</goals>
<configuration>
<url>${dapr.proto.baseurl}/common/v1/common.proto</url>
<outputFileName>common.proto</outputFileName>
<!-- default target location, just to demonstrate the parameter -->
<outputDirectory>${protobuf.input.directory}/dapr/proto/common/v1</outputDirectory>
</configuration>
</execution>
<execution>
<id>getDaprProto</id>
<!-- the wget goal actually binds itself to this phase by default -->
<phase>initialize</phase>
<goals>
<goal>wget</goal>
</goals>
<configuration>
<url>${dapr.proto.baseurl}/runtime/v1/dapr.proto</url>
<outputFileName>dapr.proto</outputFileName>
<!-- default target location, just to demonstrate the parameter -->
<outputDirectory>${protobuf.input.directory}</outputDirectory>
</configuration>
</execution>
<execution>
<id>getDaprClientProto</id>
<!-- the wget goal actually binds itself to this phase by default -->
<phase>initialize</phase>
<goals>
<goal>wget</goal>
</goals>
<configuration>
<url>${dapr.proto.baseurl}/runtime/v1/appcallback.proto</url>
<outputFileName>appcallback.proto</outputFileName>
<!-- default target location, just to demonstrate the parameter -->
<outputDirectory>${protobuf.input.directory}</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
protoc-jar-maven-plugin
最关键的地方,protoc-jar-maven-plugin 用于将 proto 文件生成 java 代码。
<plugin>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
<version>3.11.4</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<protocVersion>${protobuf.version}</protocVersion>
<addProtoSources>inputs</addProtoSources>
<includeMavenTypes>direct</includeMavenTypes>
<includeStdTypes>true</includeStdTypes>
<inputDirectories>
<include>${protobuf.input.directory}/dapr/proto/common/v1</include>
<include>${protobuf.input.directory}</include>
</inputDirectories>
<outputTargets>
<outputTarget>
<type>java</type>
<outputDirectory>${protobuf.output.directory}</outputDirectory>
</outputTarget>
<outputTarget>
<type>grpc-java</type>
<outputDirectory>${protobuf.output.directory}</outputDirectory>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}</pluginArtifact>
</outputTarget>
</outputTargets>
</configuration>
</execution>
</executions>
</plugin>
spotbugs-maven-plugin
没啥特殊,只是为自动生成的代码跳过 findbugs
<plugin>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-maven-plugin</artifactId>
<configuration>
<!-- Skip findbugs for auto-generated code -->
<skip>true</skip>
</configuration>
</plugin>
maven-javadoc-plugin
没啥特殊。
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
maven-source-plugin
没啥特殊。
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
执行结果和分析
执行代码生成操作
执行 mvn install 命令,就可以看到代码生成的过程和结果。
download-maven-plugin 插件首先会下载 proto 文件到 target/proto 目录:
之后 protoc-jar-maven-plugin 插件会将这些 proto 文件生成 java 代码:
编译完成之后 proto 文件和 class 文件都被放到 target/classes 目录:
最后被打包为 jar 包,以及对应的 sources 和 javadoc 的 jar:
解开这个jar包,可以看到里面的文件内容和 target/classes 目录里面的内容是一致的:
里面不仅仅有 java classes文件,还有 proto 文件。
注意事项
dapr proto 文件是来源于 ${dapr.proto.baseurl},通过 wget 命令下载。
而 dapr.proto.baseurl
的定义在 java-sdk 根目录下的 pom.xml 文件中定义:
<dapr.proto.baseurl>https://raw.githubusercontent.com/dapr/dapr/v1.7.0-rc.2/dapr/proto</dapr.proto.baseurl>
这里就涉及到 proto 文件的版本(所在分支 / tag /commit id)。本地开发时如果涉及到 proto 文件的修改,就需要更新这里的 url 地址以对应正确的 proto 文件。反过来说,如果发现根据 proto 生成的代码没有反映出 proto 中新的修改,则应该第一时间检查这个 url 地址的有效性。
3 - sdk
3.1 - 序列化
3.1.1 - 背景
文档介绍
https://github.com/dapr/java-sdk#how-to-use-a-custom-serializer
dapr java-sdk 项目的 readme 中有这么一段介绍:
How to use a custom serializer
如何使用一个自定义的序列化器
This SDK provides a basic serialization for request/response objects but also for state objects. Applications should provide their own serialization for production scenarios.
这个SDK为请求/响应对象提供了一个基本的序列化,但也为状态对象提供了序列化。应用程序应该为生产场景提供他们自己的序列化。
3.1.2 - DaprObjectSerializer
接口定义
DaprObjectSerializer 接口很简单,定义如下:
// 对应用程序的对象进行序列化和反序列化
public interface DaprObjectSerializer {
// 将给定的对象序列化为byte[].
byte[] serialize(Object o) throws IOException;
// 将给定的byte[]反序列化为一个对象。
<T> T deserialize(byte[] data, TypeRef<T> type) throws IOException;
// 返回请求的内容类型
String getContentType();
}
getContentType() 方法获知内容的类型,serialize() 和 deserialize() 分别实现序列化和反序列化,即实现对象和 byte[] 的相互转换。
3.1.3 - DefaultObjectSerializer
DefaultObjectSerializer 继承自 ObjectSerializer, serialize 和 deserialize 都只是代理给 ObjectSerializer ,而 getContentType() 方法则 hard code 为返回 “application/json”:
public class DefaultObjectSerializer extends ObjectSerializer implements DaprObjectSerializer {
@Override
public byte[] serialize(Object o) throws IOException {
return super.serialize(o);
}
@Override
public <T> T deserialize(byte[] data, TypeRef<T> type) throws IOException {
return super.deserialize(data, type);
}
@Override
public String getContentType() {
return "application/json";
}
}
3.1.4 - ObjectSerializer
类定义
public class ObjectSerializer {
// 默认构造函数,以避免类在包外被实例化,但仍可以被继承。
protected ObjectSerializer() {
}
}
jackson 相关设置
protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
serialize() 方法实现
public byte[] serialize(Object state) throws IOException {
if (state == null) {
return null;
}
if (state.getClass() == Void.class) {
return null;
}
// Have this check here to be consistent with deserialization (see deserialize() method below).
if (state instanceof byte[]) {
return (byte[]) state;
}
// Proto buffer class is serialized directly.
if (state instanceof MessageLite) {
return ((MessageLite) state).toByteArray();
}
// Not string, not primitive, so it is a complex type: we use JSON for that.
return OBJECT_MAPPER.writeValueAsBytes(state);
}
deserialize() 方法实现
这两个方法都是简单代理:
public <T> T deserialize(byte[] content, TypeRef<T> type) throws IOException {
return deserialize(content, OBJECT_MAPPER.constructType(type.getType()));
}
public <T> T deserialize(byte[] content, Class<T> clazz) throws IOException {
return deserialize(content, OBJECT_MAPPER.constructType(clazz));
}
具体实现在这里:
private <T> T deserialize(byte[] content, JavaType javaType) throws IOException {
// 对应 serialize 的做法
if ((javaType == null) || javaType.isTypeOrSubTypeOf(Void.class)) {
return null;
}
// 如果是 java 基本类型,则交给 deserializePrimitives() 方法处理
// 注意此时 content 有可能是 null 或者 空数组
if (javaType.isPrimitive()) {
return deserializePrimitives(content, javaType);
}
// 对应 serialize 的做法
if (content == null) {
return null;
}
// Deserialization of GRPC response fails without this check since it does not come as base64 encoded byte[].
// 如果没有这个检查,GRPC响应的反序列化就会失败,因为它不是以 base64 编码的 byte[] 形式出现的。
// TBD:这里有点不是太理解
if (javaType.hasRawClass(byte[].class)) {
return (T) content;
}
// // 对应 serialize 的做法,但长度为零的检测放在 byte[] 检测之后
if (content.length == 0) {
return null;
}
// 对 CloudEvent 的支持:如果是 CloudEvent,则单独序列化
if (javaType.hasRawClass(CloudEvent.class)) {
return (T) CloudEvent.deserialize(content);
}
// 对 grpc MessageLite 的支持:通过反射调用 parseFrom 方法
if (javaType.isTypeOrSubTypeOf(MessageLite.class)) {
try {
Method method = javaType.getRawClass().getDeclaredMethod("parseFrom", byte[].class);
if (method != null) {
return (T) method.invoke(null, content);
}
} catch (NoSuchMethodException e) {
// It was a best effort. Skip this try.
} catch (Exception e) {
throw new IOException(e);
}
}
// 最后才通过 jackson 进行标准的 json 序列化
return OBJECT_MAPPER.readValue(content, javaType);
}
deserializePrimitives() 方法
对原生类型的解析:
private static <T> T deserializePrimitives(byte[] content, JavaType javaType) throws IOException {
if ((content == null) || (content.length == 0)) {
// content 为null或者空的特殊处理,相当于是缺省值
if (javaType.hasRawClass(boolean.class)) {
return (T) Boolean.FALSE;
}
if (javaType.hasRawClass(byte.class)) {
return (T) Byte.valueOf((byte) 0);
}
if (javaType.hasRawClass(short.class)) {
return (T) Short.valueOf((short) 0);
}
if (javaType.hasRawClass(int.class)) {
return (T) Integer.valueOf(0);
}
if (javaType.hasRawClass(long.class)) {
return (T) Long.valueOf(0L);
}
if (javaType.hasRawClass(float.class)) {
return (T) Float.valueOf(0);
}
if (javaType.hasRawClass(double.class)) {
return (T) Double.valueOf(0);
}
if (javaType.hasRawClass(char.class)) {
return (T) Character.valueOf(Character.MIN_VALUE);
}
return null;
}
// 对于非空值,通过 jackson 进行反序列化
return OBJECT_MAPPER.readValue(content, javaType);
}
总结
这个代码中,在 jackson 处理之前有很多特殊逻辑,这些逻辑理论上应该是独立于 jackson 序列化方案的,如果要引入其他 DaprObjectSerializer 的实现,这些特殊逻辑都要重复 n 次,有代码重复和逻辑不一致的风险。
最好是能把这些逻辑提取出来,在序列化和反序列化时先用这些特殊逻辑出来一遍,最后再交给 DaprObjectSerializer ,会比较合理。
再有就是依赖冲突问题,目前的 DaprObjectSerializer 方案没有给出完整的解决方案。jackson 的依赖还是写死的。
3.2 - HTTP客户端
3.2.1 - DaprHttp
常量定义
public static final String API_VERSION = "v1.0";
public static final String ALPHA_1_API_VERSION = "v1.0-alpha1";
private static final String HEADER_DAPR_REQUEST_ID = "X-DaprRequestId";
private static final String DEFAULT_HTTP_SCHEME = "http";
private static final Set<String> ALLOWED_CONTEXT_IN_HEADERS =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("grpc-trace-bin", "traceparent", "tracestate")));
HTTP 方法定义:
public enum HttpMethods {
NONE,
GET,
PUT,
POST,
DELETE,
HEAD,
CONNECT,
OPTIONS,
TRACE
}
基本类定义
public static class Response {
private byte[] body;
private Map<String, String> headers;
private int statusCode;
......
}
DaprHttp 类定义
private final OkHttpClient httpClient;
private final int port;
private final String hostname;
DaprHttp(String hostname, int port, OkHttpClient httpClient) {
this.hostname = hostname;
this.port = port;
this.httpClient = httpClient;
}
invokeApi() 方法实现
这个方法有多个重载,最终的实现如下,用来执行http调用请求:
/**
* 调用API,返回文本格式有效载荷。
*
* @param method HTTP method.
* @param pathSegments Array of path segments (/a/b/c -> ["a", "b", "c"]).
* @param urlParameters Parameters in the URL
* @param content payload to be posted.
* @param headers HTTP headers.
* @param context OpenTelemetry's Context.
* @return CompletableFuture for Response.
*/
private CompletableFuture<Response> doInvokeApi(String method,
String[] pathSegments,
Map<String, List<String>> urlParameters,
byte[] content, Map<String, String> headers,
Context context) {
// 方法人口参数基本就是一个非常简化的HTTP请求的格式抽象
// 取 UUID 为 requestId
final String requestId = UUID.randomUUID().toString();
RequestBody body;
//组装 okhttp3 的 request
String contentType = headers != null ? headers.get(Metadata.CONTENT_TYPE) : null;
MediaType mediaType = contentType == null ? MEDIA_TYPE_APPLICATION_JSON : MediaType.get(contentType);
if (content == null) {
body = mediaType.equals(MEDIA_TYPE_APPLICATION_JSON)
? REQUEST_BODY_EMPTY_JSON
: RequestBody.Companion.create(new byte[0], mediaType);
} else {
body = RequestBody.Companion.create(content, mediaType);
}
HttpUrl.Builder urlBuilder = new HttpUrl.Builder();
urlBuilder.scheme(DEFAULT_HTTP_SCHEME)
.host(this.hostname)
.port(this.port);
for (String pathSegment : pathSegments) {
urlBuilder.addPathSegment(pathSegment);
}
Optional.ofNullable(urlParameters).orElse(Collections.emptyMap()).entrySet().stream()
.forEach(urlParameter ->
Optional.ofNullable(urlParameter.getValue()).orElse(Collections.emptyList()).stream()
.forEach(urlParameterValue ->
urlBuilder.addQueryParameter(urlParameter.getKey(), urlParameterValue)));
Request.Builder requestBuilder = new Request.Builder()
.url(urlBuilder.build())
.addHeader(HEADER_DAPR_REQUEST_ID, requestId);
if (context != null) {
context.stream()
.filter(entry -> ALLOWED_CONTEXT_IN_HEADERS.contains(entry.getKey().toString().toLowerCase()))
.forEach(entry -> requestBuilder.addHeader(entry.getKey().toString(), entry.getValue().toString()));
}
if (HttpMethods.GET.name().equals(method)) {
requestBuilder.get();
} else if (HttpMethods.DELETE.name().equals(method)) {
requestBuilder.delete();
} else {
requestBuilder.method(method, body);
}
String daprApiToken = Properties.API_TOKEN.get();
if (daprApiToken != null) {
requestBuilder.addHeader(Headers.DAPR_API_TOKEN, daprApiToken);
}
if (headers != null) {
Optional.ofNullable(headers.entrySet()).orElse(Collections.emptySet()).stream()
.forEach(header -> {
requestBuilder.addHeader(header.getKey(), header.getValue());
});
}
// 完成 request 的组装,构建 request 对象
Request request = requestBuilder.build();
// 发出 okhttp3 的请求,然后返回 CompletableFuture
CompletableFuture<Response> future = new CompletableFuture<>();
this.httpClient.newCall(request).enqueue(new ResponseFutureCallback(future));
return future;
}
在 http 请求组装过程中,注意 header 的处理:
- request id: “X-DaprRequestId”,值为 UUID
- dapr api token: “dapr-api-token”,值从系统变量 “dapr.api.token” 或者环境变量 “DAPR_API_TOKEN” 中获取
- 发送请求时明确传递的header: 透传
- OpenTelemetry 相关的值:会试图从传递进来的 OpenTelemetry context 中获取 “grpc-trace-bin”, “traceparent”, “tracestate” 这三个 header 并继续传递下去
3.2.2 - DaprHttpBuilder
代码没啥特殊的,就注意一下 okhttp 的一些参数的获取。
另外 MaxRequestsPerHost 默认为5,这是一个超级大坑!
private DaprHttp buildDaprHttp() {
// 双重检查锁
if (OK_HTTP_CLIENT == null) {
synchronized (LOCK) {
if (OK_HTTP_CLIENT == null) {
OkHttpClient.Builder builder = new OkHttpClient.Builder();
Duration readTimeout = Duration.ofSeconds(Properties.HTTP_CLIENT_READ_TIMEOUT_SECONDS.get());
builder.readTimeout(readTimeout);
Dispatcher dispatcher = new Dispatcher();
dispatcher.setMaxRequests(Properties.HTTP_CLIENT_MAX_REQUESTS.get());
//这里有一个超级大坑!
// The maximum number of requests for each host to execute concurrently.
// Default value is 5 in okhttp which is totally UNACCEPTABLE!
// For sidecar case, set it the same as maxRequests.
dispatcher.setMaxRequestsPerHost(Properties.HTTP_CLIENT_MAX_REQUESTS.get());
builder.dispatcher(dispatcher);
ConnectionPool pool = new ConnectionPool(Properties.HTTP_CLIENT_MAX_IDLE_CONNECTIONS.get(),
KEEP_ALIVE_DURATION, TimeUnit.SECONDS);
builder.connectionPool(pool);
OK_HTTP_CLIENT = builder.build();
}
}
}
return new DaprHttp(Properties.SIDECAR_IP.get(), Properties.HTTP_PORT.get(), OK_HTTP_CLIENT);
}
}
相关的几个参数的获取:
- http read timeout:默认60秒,可以通过系统参数 dapr.http.client.maxRequests 或者环境变量 DAPR_HTTP_CLIENT_READ_TIMEOUT_SECONDS 覆盖
- http max request:默认 1024,可以通过系统参数 dapr.http.client.readTimeoutSeconds 或者环境变量 DAPR_HTTP_CLIENT_MAX_REQUESTS 覆盖
- http max idle connections: 默认 1024,可以通过系统参数 dapr.http.client.maxIdleConnections 或者环境变量 DAPR_HTTP_CLIENT_MAX_IDLE_CONNECTIONS 覆盖
- HTTP keep alive duration: hard code 为 30
对于 okhttp,还必须自动设置 http max request per host 参数,不然默认值为 5 对于 sidecar 来说完全不可用。
3.3 - gRPC客户端
3.4 - Dapr 客户端
3.4.1 - DaprClient
// 无论需要何种GRPC或HTTP客户端实现,都可以使用通用客户端适配器。
public interface DaprClient extends AutoCloseable {
Mono<Void> waitForSidecar(int timeoutInMilliseconds);
Mono<Void> shutdown();
}
其他方法都是和 dapr api 相关的方法,然后所有的方法都是实现了 reactive 风格,如:
Mono<Void> publishEvent(String pubsubName, String topicName, Object data);
Mono<Void> publishEvent(String pubsubName, String topicName, Object data, Map<String, String> metadata);
Mono<Void> publishEvent(PublishEventRequest request);
3.4.2 - DaprPreviewClient
DaprPreviewClient 接口定义,目前只有新增的 configuration api 的方法和 state query 的方法:
// 无论需要何种GRPC或HTTP客户端实现,都可以使用通用客户端适配器。
public interface DaprPreviewClient extends AutoCloseable {
Mono<ConfigurationItem> getConfiguration(String storeName, String key);
Flux<List<ConfigurationItem>> subscribeToConfiguration(String storeName, String... keys);
<T> Mono<QueryStateResponse<T>> queryState(String storeName, String query, TypeRef<T> type);
}
备注:distribuyted lock 的方法还没有加上来,估计是还没有开始实现。
3.4.3 - AbstractDaprClient
// 抽象类,具有客户端实现之间共同的便利方法。
abstract class AbstractDaprClient implements DaprClient, DaprPreviewClient {
// 这里还是写死了 jackson!
// TBD: 看下是哪里在用
protected static final ObjectMapper JSON_REQUEST_MAPPER = new ObjectMapper();
protected DaprObjectSerializer objectSerializer;
protected DaprObjectSerializer stateSerializer;
AbstractDaprClient(
DaprObjectSerializer objectSerializer,
DaprObjectSerializer stateSerializer) {
this.objectSerializer = objectSerializer;
this.stateSerializer = stateSerializer;
}
}
其他都方法实现基本都是一些代理方法,没有实质性内容,实际实现都应该在子类中实现。
@Override
public Mono<Void> publishEvent(String pubsubName, String topicName, Object data) {
return this.publishEvent(pubsubName, topicName, data, null);
}
@Override
public Mono<Void> publishEvent(String pubsubName, String topicName, Object data, Map<String, String> metadata) {
PublishEventRequest req = new PublishEventRequest(pubsubName, topicName, data)
.setMetadata(metadata);
return this.publishEvent(req).then();
}
这些方法重载可以理解成一些语法糖,可以不用构造复杂的请求对象如 PublishEventRequest 就可以方便的直接使用而已。
3.4.4 - DaprClientHttp
类定义
public class DaprClientHttp extends AbstractDaprClient {
private final DaprHttp client;
private final boolean isObjectSerializerDefault;
private final boolean isStateSerializerDefault;
DaprClientHttp(DaprHttp client, DaprObjectSerializer objectSerializer, DaprObjectSerializer stateSerializer) {
super(objectSerializer, stateSerializer);
this.client = client;
this.isObjectSerializerDefault = objectSerializer.getClass() == DefaultObjectSerializer.class;
this.isStateSerializerDefault = stateSerializer.getClass() == DefaultObjectSerializer.class;
}
DaprClientHttp(DaprHttp client) {
this(client, new DefaultObjectSerializer(), new DefaultObjectSerializer());
}
}
client特有的方法实现
waitForSidecar() 方法
waitForSidecar() 方法通过连接指定的 sidecar ip地址和端口来判断并等待 sidecar 是不是可用。
public Mono<Void> waitForSidecar(int timeoutInMilliseconds) {
return Mono.fromRunnable(() -> {
try {
NetworkUtils.waitForSocket(Properties.SIDECAR_IP.get(), Properties.HTTP_PORT.get(), timeoutInMilliseconds);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
close() 方法
close() 方法是实现 java.lang.AutoCloseable 的要求,DaprClient 继承了这个接口:
@Override
public void close() {
// 简单的关闭 http client
client.close();
}
dapr api 方法的实现
publishEvent()方法
publishEvent()方法主要是两个任务:
- 组装发送请求的各种参数,包括 http 请求的 method,path,parameters,以及事件序列化后 byte[] 格式的数据
- 调用 DaprClient 发出 HTTP 请求
@Override
public Mono<Void> publishEvent(PublishEventRequest request) {
try {
String pubsubName = request.getPubsubName();
String topic = request.getTopic();
Object data = request.getData();
Map<String, String> metadata = request.getMetadata();
if (topic == null || topic.trim().isEmpty()) {
throw new IllegalArgumentException("Topic name cannot be null or empty.");
}
byte[] serializedEvent = objectSerializer.serialize(data);
// Content-type can be overwritten on a per-request basis.
// It allows CloudEvents to be handled differently, for example.
String contentType = request.getContentType();
if (contentType == null || contentType.isEmpty()) {
contentType = objectSerializer.getContentType();
}
Map<String, String> headers = Collections.singletonMap("content-type", contentType);
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "publish", pubsubName, topic };
Map<String, List<String>> queryArgs = metadataToQueryArgs(metadata);
return Mono.subscriberContext().flatMap(
context -> this.client.invokeApi(
DaprHttp.HttpMethods.POST.name(), pathSegments, queryArgs, serializedEvent, headers, context
)
).then();
} catch (Exception ex) {
return DaprException.wrapMono(ex);
}
}
shutdown() 方法
注意这个 shutdown() 方法是关闭 sidecar,因此也是需要发送请求到 sidecar 的:
@Override
public Mono<Void> shutdown() {
String[] pathSegments = new String[]{ DaprHttp.API_VERSION, "shutdown" };
return Mono.subscriberContext().flatMap(
context -> client.invokeApi(DaprHttp.HttpMethods.POST.name(), pathSegments,
null, null, context))
.then();
}
http 请求最终是通过 DaprClient 发出去的。
3.4.5 - DaprClientGrpc
类定义
public class DaprClientGrpc extends AbstractDaprClient {
private Closeable channel;
private DaprGrpc.DaprStub asyncStub;
DaprClientGrpc(
Closeable closeableChannel,
DaprGrpc.DaprStub asyncStub,
DaprObjectSerializer objectSerializer,
DaprObjectSerializer stateSerializer) {
super(objectSerializer, stateSerializer);
this.channel = closeableChannel;
this.asyncStub = intercept(asyncStub);
}
}
client特有的方法实现
waitForSidecar() 方法
waitForSidecar() 方法通过连接指定的 sidecar ip地址和端口来判断并等待 sidecar 是不是可用。
和 HTTP 的实现差别只是端口不同。
@Override
public Mono<Void> waitForSidecar(int timeoutInMilliseconds) {
return Mono.fromRunnable(() -> {
try {
NetworkUtils.waitForSocket(Properties.SIDECAR_IP.get(), Properties.GRPC_PORT.get(), timeoutInMilliseconds);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
close() 方法
close() 方法是实现 java.lang.AutoCloseable 的要求,DaprClient 继承了这个接口:
public void close() throws Exception {
if (channel != null) {
DaprException.wrap(() -> {
// 关闭channel
channel.close();
return true;
}).call();
}
}
dapr api 方法的实现
publishEvent()方法
publishEvent()方法主要是两个任务:
- 组装发送 grpc 请求的各种参数,构建 PublishEventRequest 请求对象
- 调用 gRPC asyncStub 的对应方法发出 gRPC 请求
@Override
public Mono<Void> publishEvent(PublishEventRequest request) {
try {
String pubsubName = request.getPubsubName();
String topic = request.getTopic();
Object data = request.getData();
DaprProtos.PublishEventRequest.Builder envelopeBuilder = DaprProtos.PublishEventRequest.newBuilder()
.setTopic(topic)
.setPubsubName(pubsubName)
.setData(ByteString.copyFrom(objectSerializer.serialize(data)));
// Content-type can be overwritten on a per-request basis.
// It allows CloudEvents to be handled differently, for example.
String contentType = request.getContentType();
if (contentType == null || contentType.isEmpty()) {
contentType = objectSerializer.getContentType();
}
envelopeBuilder.setDataContentType(contentType);
Map<String, String> metadata = request.getMetadata();
if (metadata != null) {
envelopeBuilder.putAllMetadata(metadata);
}
return Mono.subscriberContext().flatMap(
context ->
this.<Empty>createMono(
it -> intercept(context, asyncStub).publishEvent(envelopeBuilder.build(), it)
)
).then();
} catch (Exception ex) {
return DaprException.wrapMono(ex);
}
}
3.5 - opencensus
3.6 - 注解
3.6.1 - topic注解
@topic
注解用来订阅某个主题, pubsubName, name, metadata 分别对应 dapr pub/sub API 中的 pubsubName, topic,metadata 字段:
@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface Topic {
String name();
String pubsubName();
String metadata() default "{}";
// 用于匹配传入的 cloud event 的规则。
Rule rule() default @Rule(match = "", priority = 0);
}
以下是 @topic 注解使用的典型例子:
@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}")
@PostMapping(path = "/testingtopic")
public Mono<Void> handleMessage(@RequestBody(required = false) CloudEvent<?> cloudEvent) {
......
}
3.6.2 - rule注解
@topic
注解用来表述匹配规则。
@Documented
@Target(ElementType.ANNOTATION_TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Rule {
// 用于匹配传入的 cloud event 的通用表达式语言( Common Expression Language / CEL)表达。
String match();
// 规则的优先级,用于排序。最低的数字有更高的优先权。
int priority();
}
以下是 @rule 注解使用的典型例子:
@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}",
rule = @Rule(match = "event.type == \"v2\"", priority = 1))
@PostMapping(path = "/testingtopicV2")
public Mono<Void> handleMessageV2(@RequestBody(required = false) CloudEvent cloudEvent) {
......
}
4 - actors
4.1 - client
4.2 - actor runtime
5 - springboot
5.1 - spring auto configuration
meta-inf
按照 springboot 的标准做法,src/main/resources/META-INF/spring.factories
文件内容如下:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
io.dapr.springboot.DaprAutoConfiguration
DaprAutoConfiguration
DaprAutoConfiguration 的内容非常简单:
@Configuration
@ConditionalOnWebApplication
@ComponentScan("io.dapr.springboot")
public class DaprAutoConfiguration {
}
DaprBeanPostProcessor
DaprBeanPostProcessor 用来处理 dapr 注解。
@Component
public class DaprBeanPostProcessor implements BeanPostProcessor {
private static final ObjectMapper MAPPER = new ObjectMapper();
private final EmbeddedValueResolver embeddedValueResolver;
DaprBeanPostProcessor(ConfigurableBeanFactory beanFactory) {
embeddedValueResolver = new EmbeddedValueResolver(beanFactory);
}
......
}
BeanPostProcessor 接口的 postProcessBeforeInitialization() 的说明如下:
在任何 Bean 初始化回调(如 InitializingBean 的
afterPropertiesSet
或自定义init-method
)之前,将此 BeanPostProcessor 应用于给定的新 Bean 实例。 该 bean 将已经被填充了属性值。返回的 Bean 实例可能是一个围绕原始 Bean 的包装器。
也就是每个 bean 在初始化后都会调用这个方法以便植入我们需要的逻辑,如在这里就需要扫描 bean 是否带有 dapr 的 topic 注解:
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if (bean == null) {
return null;
}
subscribeToTopics(bean.getClass(), embeddedValueResolver);
return bean;
}
subscribeToTopics() 方法的具体实现后面再详细看,期间还有规则匹配的实现代码。
postProcessAfterInitialization() 方法没有特殊逻辑,简单返回原始bean:
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
5.2 - controller
@RestController
public class DaprController {
}
healthz endpoint
用于 health check 的 endpoint,路径为 “/healthz”,实现为空。
@GetMapping(path = "/healthz")
public void healthz() {
}
TBD:这里是否要考虑 sidecar 的某些状态?目前这是只要 sidecar 进程和端口可以访问就会应答状态OK,而不管sidecar 中的功能是否正常。
dapr configuration endpoint
用于获取 dapr sidecar 的自身配置, 路径为 “/dapr/config”
@GetMapping(path = "/dapr/config", produces = MediaType.APPLICATION_JSON_VALUE)
public byte[] daprConfig() throws IOException {
return ActorRuntime.getInstance().serializeConfig();
}
但看 ActorRuntime 的代码实现,这个 config 是指 actor configuration:
public byte[] serializeConfig() throws IOException {
return INTERNAL_SERIALIZER.serialize(this.config);
}
private ActorRuntime(ManagedChannel channel, DaprClient daprClient) throws IllegalStateException {
this.config = new ActorRuntimeConfig();
}
subscribe endpoint
用于获取当前 dapr sidecar 的 pub/sub 订阅信息,路径为 “/dapr/subscribe”:
@GetMapping(path = "/dapr/subscribe", produces = MediaType.APPLICATION_JSON_VALUE)
public byte[] daprSubscribe() throws IOException {
return SERIALIZER.serialize(DaprRuntime.getInstance().listSubscribedTopics());
}
actor endpoint
用于 actor 的 endpoint,包括 deactive, invoke actor method, invoke actor timer 和 invoke actor reminder:
@DeleteMapping(path = "/actors/{type}/{id}")
public Mono<Void> deactivateActor(@PathVariable("type") String type,
@PathVariable("id") String id) {
return ActorRuntime.getInstance().deactivate(type, id);
}
@PutMapping(path = "/actors/{type}/{id}/method/{method}")
public Mono<byte[]> invokeActorMethod(@PathVariable("type") String type,
@PathVariable("id") String id,
@PathVariable("method") String method,
@RequestBody(required = false) byte[] body) {
return ActorRuntime.getInstance().invoke(type, id, method, body);
}
@PutMapping(path = "/actors/{type}/{id}/method/timer/{timer}")
public Mono<Void> invokeActorTimer(@PathVariable("type") String type,
@PathVariable("id") String id,
@PathVariable("timer") String timer,
@RequestBody byte[] body) {
return ActorRuntime.getInstance().invokeTimer(type, id, timer, body);
}
@PutMapping(path = "/actors/{type}/{id}/method/remind/{reminder}")
public Mono<Void> invokeActorReminder(@PathVariable("type") String type,
@PathVariable("id") String id,
@PathVariable("reminder") String reminder,
@RequestBody(required = false) byte[] body) {
return ActorRuntime.getInstance().invokeReminder(type, id, reminder, body);
}
5.3 - topic subscription
读取 topic 订阅注解
订阅 topic 的具体代码实现在类 DaprBeanPostProcessor 的 subscribeToTopics() 方法中,在 bean 初始化时被调用。
topic 注解使用的例子如下:
@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}",
rule = @Rule(match = "event.type == \"v2\"", priority = 1))
@PostMapping(path = "/testingtopicV2")
public Mono<Void> handleMessageV2(@RequestBody(required = false) CloudEvent cloudEvent) {
......
}
读取 topic 注解
现在需要在 postProcessBeforeInitialization() 方法中扫描并解析所有有 topic 注解的 bean:
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
subscribeToTopics(bean.getClass(), embeddedValueResolver);
return bean;
}
private static void subscribeToTopics(Class clazz, EmbeddedValueResolver embeddedValueResolver) {
if (clazz == null) {
return;
}
// 先用 Superclass 做一次递归调用,这样就会从当前类的父类开始先推衍
// 由于每次都是父类先执行,因此这会一直递归到最顶层的 Object 类
subscribeToTopics(clazz.getSuperclass(), embeddedValueResolver);
// 取当前类的所有方法
for (Method method : clazz.getDeclaredMethods()) {
// 然后看方法上是不是标记了 dapr 的 topic 注解
Topic topic = method.getAnnotation(Topic.class);
if (topic == null) {
continue;
}
// 如果方法上有标记 dapr 的 topic 注解,则开始处理
// 先获取 topic 注解上的属性 topic name, pubsub name, rule
Rule rule = topic.rule();
String topicName = embeddedValueResolver.resolveStringValue(topic.name());
String pubSubName = embeddedValueResolver.resolveStringValue(topic.pubsubName());
// rule 也是一个注解,获取 match 属性
String match = embeddedValueResolver.resolveStringValue(rule.match());
if ((topicName != null) && (topicName.length() > 0) && pubSubName != null && pubSubName.length() > 0) {
// topicName 和 pubSubName 不能为空 (metadata 可以为空,rule可以为空)
try {
TypeReference<HashMap<String, String>> typeRef
= new TypeReference<HashMap<String, String>>() {};
// 读取 topic 注解上的 metadata 属性
Map<String, String> metadata = MAPPER.readValue(topic.metadata(), typeRef);
// 读取路由信息,细节看下一节
List<String> routes = getAllCompleteRoutesForPost(clazz, method, topicName);
for (String route : routes) {
// 将读取的路由信息添加到 dapr runtime 中。
// 细节看下一节
DaprRuntime.getInstance().addSubscribedTopic(
pubSubName, topicName, match, rule.priority(), route, metadata);
}
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Error while parsing metadata: " + e);
}
}
}
}
读取路由信息
路由信息配置方法如下:
@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}",
rule = @Rule(match = "event.type == \"v2\"", priority = 1))
@PostMapping(path = "/testingtopicV2")
public Mono<Void> handleMessageV2(@RequestBody(required = false) CloudEvent cloudEvent) {
......
}
getAllCompleteRoutesForPost() 方法负责读取 @rule 注解相关的路由信息:
private static List<String> getAllCompleteRoutesForPost(Class clazz, Method method, String topicName) {
List<String> routesList = new ArrayList<>();
RequestMapping clazzRequestMapping =
(RequestMapping) clazz.getAnnotation(RequestMapping.class);
String[] clazzLevelRoute = null;
if (clazzRequestMapping != null) {
clazzLevelRoute = clazzRequestMapping.value();
}
// 读取该方法上的路由信息,注意必须是 POST
String[] postValueArray = getRoutesForPost(method, topicName);
if (postValueArray != null && postValueArray.length >= 1) {
for (String postValue : postValueArray) {
if (clazzLevelRoute != null && clazzLevelRoute.length >= 1) {
for (String clazzLevelValue : clazzLevelRoute) {
// 完整的路由路径应该是类级别 + 方法级别
String route = clazzLevelValue + confirmLeadingSlash(postValue);
routesList.add(route);
}
} else {
routesList.add(postValue);
}
}
}
return routesList;
}
getRoutesForPost() 方法用来读取 @topic 注解所在方法的 @PostMapping 注解,以便获得路由的 path 信息,对应例子如下:
@Topic(name = "testingtopic", pubsubName = "${myAppProperty:messagebus}",
rule = @Rule(match = "event.type == \"v2\"", priority = 1))
@PostMapping(path = "/testingtopicV2")
public Mono<Void> handleMessageV2(@RequestBody(required = false) CloudEvent cloudEvent) {
......
}
getRoutesForPost() 方法的代码实现如下:
private static String[] getRoutesForPost(Method method, String topicName) {
String[] postValueArray = new String[] {topicName};
// 读取 PostMapping 注解
PostMapping postMapping = method.getAnnotation(PostMapping.class);
if (postMapping != null) {
// 如果有 PostMapping 注解
if (postMapping.path() != null && postMapping.path().length >= 1) {
// 如果 path 属性有设置则从 path 属性取值
postValueArray = postMapping.path();
} else if (postMapping.value() != null && postMapping.value().length >= 1) {
// 如果 path 属性没有设置则直接从 PostMapping 注解的 value 中取值
postValueArray = postMapping.value();
}
} else {
// 如果没有 PostMapping 注解,则尝试读取 RequestMapping 注解
RequestMapping reqMapping = method.getAnnotation(RequestMapping.class);
for (RequestMethod reqMethod : reqMapping.method()) {
// 要求 RequestMethod 为 POST
if (reqMethod == RequestMethod.POST) {
// 同样读取 path 或者 value 的值
if (reqMapping.path() != null && reqMapping.path().length >= 1) {
postValueArray = reqMapping.path();
} else if (reqMapping.value() != null && reqMapping.value().length >= 1) {
postValueArray = reqMapping.value();
}
break;
}
}
}
return postValueArray;
}
getRoutesForPost() 方法的解读,就是从标记了 @topic 注解的方法上读取路由信息,也就是后续订阅的事件应该发送的地址。读取的逻辑为:
- 优先读取 PostMapping 注解,没有的话读取 RequestMethod 为 POST 的 RequestMapping 注解
- 优先读取上述注解的 path 属性,没有的话读取 value
保存 topic 订阅信息
topic 订阅信息在读取之后,就会通过 DaprRuntime 的 addSubscribedTopic() 方法保存起来:
public synchronized void addSubscribedTopic(String pubsubName,
String topicName,
String match,
int priority,
String route,
Map<String,String> metadata) {
// 用 pubsubName 和 topicName 做 key
DaprTopicKey topicKey = new DaprTopicKey(pubsubName, topicName);
// 获取 key 对应的 builder,没有的话就创建一个
DaprSubscriptionBuilder builder = subscriptionBuilders.get(topicKey);
if (builder == null) {
builder = new DaprSubscriptionBuilder(pubsubName, topicName);
subscriptionBuilders.put(topicKey, builder);
}
// match 不为空则添加 rule,为空则采用默认路径
if (match.length() > 0) {
builder.addRule(route, match, priority);
} else {
builder.setDefaultPath(route);
}
if (metadata != null && !metadata.isEmpty()) {
builder.setMetadata(metadata);
}
}
考虑到调用的地方代码是:
// 读取路由信息
List<String> routes = getAllCompleteRoutesForPost(clazz, method, topicName);
for (String route : routes) {
// 将读取的路由信息添加到 dapr runtime 中。
DaprRuntime.getInstance().addSubscribedTopic(
pubSubName, topicName, match, rule.priority(), route, metadata);
}
所以前面的读取流程可以理解为就是读取和 topic 订阅有关的上述6个参数,然后保存起老。
应答 topic 订阅信息
在 DaprController 中,daprSubscribe() 方法对外暴露路径 /dapr/subscribe
,以便让 dapr sidecar 可以通过读取该路径来获取当前应用的 topic 订阅信息:
@GetMapping(path = "/dapr/subscribe", produces = MediaType.APPLICATION_JSON_VALUE)
public byte[] daprSubscribe() throws IOException {
return SERIALIZER.serialize(DaprRuntime.getInstance().listSubscribedTopics());
}
而 DaprRuntime 的 listSubscribedTopics() 方法获取的就是前面保存起来的 topic 订阅信息:
public synchronized DaprTopicSubscription[] listSubscribedTopics() {
List<DaprTopicSubscription> values = subscriptionBuilders.values().stream()
.map(b -> b.build()).collect(Collectors.toList());
return values.toArray(new DaprTopicSubscription[0]);
}
流程总结
整个 topic 订阅流程的示意图如下:
title topic subscription
hide footbox
skinparam style strictuml
box "Application" #LightBlue
participant DaprBeanPostProcessor
participant bean
participant DaprRuntime
participant DaprController
end box
participant daprd
-> DaprBeanPostProcessor: postProcessBeforeInitialization(bean)
DaprBeanPostProcessor -> bean: get @topic
bean --> DaprBeanPostProcessor
alt if bean has @topic
DaprBeanPostProcessor -> bean: parse @topic @rule
bean --> DaprBeanPostProcessor: pubsub name, topic name, match,\n priority, routes, metadata
DaprBeanPostProcessor -> DaprRuntime: addSubscribedTopic()
DaprRuntime -> DaprRuntime: save in map\n subscriptionBuilders
DaprRuntime --> DaprBeanPostProcessor
end
<-- DaprBeanPostProcessor
daprd -> DaprController: get subscription
DaprController -> DaprRuntime: listSubscribedTopics()
DaprRuntime --> DaprController
DaprController --> daprd
6 - workflow
6.1 - workflow定义
workflow
Workflow 定义定义很简单:
public abstract class Workflow {
// 默认构造函数应该可以不用写的
public Workflow(){
}
public abstract WorkflowStub create();
public void run(WorkflowContext ctx) {
this.create().run(ctx);
}
}
create() 方法定义创建 WorkflowStub 的模板方法,然后在 run() 方法通过执行 create() 方法创建 WorkflowStub ,在执行 WorkflowStub 的 run() 方法。
WorkflowStub
WorkflowStub 是一个单方法的接口定义,用于实现函数编程,标注有 java.lang.@FunctionalInterface
注解。
@FunctionalInterface
public interface WorkflowStub {
void run(WorkflowContext ctx);
}
@FunctionalInterface 的 javadoc 描述如下:
一种信息性注解类型,用于表明接口类型声明是 Java 语言规范所定义的函数接口。从概念上讲,一个函数接口只有一个抽象方法。由于默认方法有一个实现,所以它们不是抽象方法。如果一个接口声明了一个覆盖 java.lang.Object 公共方法之一的抽象方法,该方法也不计入接口的抽象方法数,因为接口的任何实现都将有一个来自 java.lang.Object 或其他地方的实现。
请注意,函数接口的实例可以通过 lambda 表达式、方法引用或构造器引用来创建。
如果一个类型被注释为该注释类型,编译器必须生成一条错误信息,除非:
- 该类型是接口类型,而不是注解类型、枚举或类。
- 注解的类型满足函数接口的要求。
然而,无论接口声明中是否有 FunctionalInterface 注解,编译器都会将任何符合函数接口定义的接口视为函数接口。
WorkflowContext
出乎意外的是 WorkflowContext 的定义超级复杂,远远不是一个 上下文 那么简单。
WorkflowContext的基本方法
WorkflowContext 接口上定义了大量的方法,其中部分基本方法
public interface WorkflowContext {
// 通过这个方法传递 logger 对象以供在后续执行时打印日志
Logger getLogger();
// 获取 workflow 的 name
String getName();
// 获取 workflow instance 的 id
String getInstanceId();
//获取当前协调时间(UTC)
Instant getCurrentInstant();
// 完成当前 wofklow,输出是完成的workflow的序列化输出
void complete(Object output);
......
}
waitForExternalEvent()方法
WorkflowContext 接口上定义了三个 waitForExternalEvent() 接口方法和一个默认实现:
public interface WorkflowContext {
......
<V> Task<V> waitForExternalEvent(String name, Duration timeout, Class<V> dataType) throws TaskCanceledException;
<V> Task<Void> waitForExternalEvent(String name, Duration timeout) throws TaskCanceledException;
<V> Task<Void> waitForExternalEvent(String name) throws TaskCanceledException;
default <V> Task<V> waitForExternalEvent(String name, Class<V> dataType) {
try {
return this.waitForExternalEvent(name, null, dataType);
} catch (TaskCanceledException e) {
// This should never happen because of the max duration
throw new RuntimeException("An unexpected exception was throw while waiting for an external event.", e);
}
}
......
}
waitForExternalEvent 的 javadoc 描述如下:
等待名为 name 的事件发生,并返回一个 Task,该任务在收到事件时完成,或在超时时取消。
如果当前协调器尚未等待名为 name 的事件,那么事件将保存在协调器实例状态中,并在调用此方法时立即派发。即使当前协调器在收到事件前取消了等待操作,事件保存也会发生。
协调器可以多次等待同一事件名,因此允许等待多个同名事件。协调器收到的每个外部事件将只完成本方法返回的一个任务。
特别注意: 这个 Task 的类型是 com.microsoft.durabletask.Task
,直接用在 dapr workflow 的接口定义上,意味着 dapr workflow 彻底和 durabletask 绑定。
callActivity()方法
WorkflowContext 接口上定义了 callActivity() 接口方法和多个默认方法来重写不同参数的 callActivity() 方法
public interface WorkflowContext {
......
<V> Task<V> callActivity(String name, Object input, TaskOptions options, Class<V> returnType);
default Task<Void> callActivity(String name) {
return this.callActivity(name, null, null, Void.class);
}
default Task<Void> callActivity(String name, Object input) {
return this.callActivity(name, input, null, Void.class);
}
default <V> Task<V> callActivity(String name, Class<V> returnType) {
return this.callActivity(name, null, null, returnType);
}
default <V> Task<V> callActivity(String name, Object input, Class<V> returnType) {
return this.callActivity(name, input, null, returnType);
}
default Task<Void> callActivity(String name, Object input, TaskOptions options) {
return this.callActivity(name, input, options, Void.class);
}
......
}
waitForExternalEvent 的 javadoc 描述如下:
使用指定的 input 异步调用一个 activity,并在 activity 完成时返回一个新的 task。如果 activity 成功完成,返回的 task 值将是 task 的输出。如果 activity 失败,返回的 task 将以 TaskFailedException 异常完成。
isReplaying() 方法
isReplaying() 用来判断当前工作流当前是否正在重放之前的执行:
public interface WorkflowContext {
......
boolean isReplaying();
}
waitForExternalEvent 的 javadoc 描述如下:
获取一个值,指示工作流当前是否正在重放之前的执行。
工作流函数从内存中卸载后会进行 “重放”,以重建本地变量状态。在重放过程中,先前执行的任务将自动使用存储在工作流历史记录中的先前查看值完成。一旦工作流达到不再重放现有历史记录的程度,此方法将返回 false。
如果您的逻辑只需要在不重放时运行,则可以使用此方法。例如,某些类型的应用程序日志在作为重放的一部分进行复制时可能会变得过于嘈杂。应用程序代码可以检查函数是否正在重放,然后在该值为 false 时发出日志语句。
allOf()和 anyOf()方法
<V> Task<List<V>> allOf(List<Task<V>> tasks) throws CompositeTaskFailedException;
Task<Task<?>> anyOf(List<Task<?>> tasks);
default Task<Task<?>> anyOf(Task<?>... tasks) {
return this.anyOf(Arrays.asList(tasks));
}
allOf 的 javadoc 描述如下:
返回一个新任务,该任务在所有给定任务完成后完成。如果任何给定任务在完成时出现异常,返回的任务也会在完成时出现 CompositeTaskFailedException,其中包含第一次遇到的故障的详细信息。返回的任务值是给定任务返回值的有序列表。如果没有提供任务,则返回值为空的已完成任务。
该方法适用于在继续协调的下一步之前等待一组独立任务的完成,如下面的示例:
Task
t1 = ctx.callActivity(“MyActivity”, String.class); Task t2 = ctx.callActivity(“MyActivity”, String.class); Task t3 = ctx.callActivity(“MyActivity”, String.class); List
orderedResults = ctx.allOf(List.of(t1, t2, t3)).await(); 任何给定任务出现异常都会导致非受查的 CompositeTaskFailedException 异常。可以通过检查该异常来获取单个任务的失败详情。
try { List
orderedResults = ctx.allOf(List.of(t1, t2, t3)).await(); } catch (CompositeTaskFailedException e) { List exceptions = e.getExceptions() } }
特别注意: 这个 CompositeTaskFailedException 的类型是 com.microsoft.durabletask.CompositeTaskFailedException
,直接用在 dapr workflow 的接口定义上,意味着 dapr workflow 彻底和 durabletask 绑定。
anyOf 的 javadoc 描述如下:
当任何给定任务完成时,返回一个已完成的新任务。新任务的值是已完成任务对象的引用。如果没有提供任务,则返回一个永不完成的任务。
该方法适用于等待多个并发任务,并在第一个任务完成时执行特定于任务的操作,如下面的示例:
Task
event1 = ctx.waitForExternalEvent(“Event1”); Task event2 = ctx.waitForExternalEvent(“Event2”); Task event3 = ctx.waitForExternalEvent(“Event3”); Task> winner = ctx.anyOf(event1、event2、event3).await(); 如果(winner == event1){ // … } else if (winner == event2) { // … // … } else if (winner == event3) { // … // … }
anyOf 方法还可用于实现长时间超时,如下面的示例:
Task
activityTask = ctx.callActivity(“SlowActivity”); Task timeoutTask = ctx.createTimer(Duration.ofMinutes(30)); Task> winner = ctx.anyOf(activityTask, timeoutTask).await(); 如果(winner == activityTask){ // 完成情况 } else { // 超时情况 }
createTimer()方法
创建一个在指定延迟后过期的 durable timer。
指定较长的延迟(例如,几天或更长时间的延迟)可能会导致创建多个内部管理的 durable timer。协调器代码不需要意识到这种行为。不过,框架日志和存储的历史状态中可能会显示这种行为。
Task<Void> createTimer(Duration duration);
default Task<Void> createTimer(ZonedDateTime zonedDateTime) {
throw new UnsupportedOperationException("This method is not implemented.");
}
getInput()方法
getInput() 方法获取当前任务协调器的反序列化输入。
<V> V getInput(Class<V> targetType);
callSubWorkflow()
callSubWorkflow() 方法异步调用另一个工作流作为子工作流:
default Task<Void> callSubWorkflow(String name) {
return this.callSubWorkflow(name, null);
}
default Task<Void> callSubWorkflow(String name, Object input) {
return this.callSubWorkflow(name, input, null);
}
default <V> Task<V> callSubWorkflow(String name, Object input, Class<V> returnType) {
return this.callSubWorkflow(name, input, null, returnType);
}
default <V> Task<V> callSubWorkflow(String name, Object input, String instanceID, Class<V> returnType) {
return this.callSubWorkflow(name, input, instanceID, null, returnType);
}
default Task<Void> callSubWorkflow(String name, Object input, String instanceID, TaskOptions options) {
return this.callSubWorkflow(name, input, instanceID, options, Void.class);
}
<V> Task<V> callSubWorkflow(String name,
@Nullable Object input,
@Nullable String instanceID,
@Nullable TaskOptions options,
Class<V> returnType);
callSubWorkflow() 的 javadoc 描述如下:
异步调用另一个工作流作为子工作流,并在子工作流完成时返回一个任务。如果子工作流成功完成,返回的任务值将是 activity 的输出。如果子工作流失败,返回的任务将以 TaskFailedException 异常完成。
子工作流有自己的 instance ID、历史和状态,与启动它的父工作流无关。将大型协调分解为子工作流有很多好处:
- 将大型协调拆分成一系列较小的子工作流可以使代码更易于维护。
- 如果协调逻辑需要协调大量任务,那么在多个计算节点上并发分布协调逻辑就非常有用。
- 通过保持较小的父协调历史记录,可以减少内存使用和 CPU 开销。
缺点是启动子工作流和处理其输出会产生开销。这通常只适用于非常小的协调。
由于子工作流独立于父工作流,因此终止父协调不会影响任何子工作流。
continueAsNew()
callSubWorkflow() 方法使用新输入重启协调并清除其历史记录:
default void continueAsNew(Object input) {
this.continueAsNew(input, true);
}
void continueAsNew(Object input, boolean preserveUnprocessedEvents);
}
continueAsNew() 的 javadoc 描述如下:
使用新输入重启协调并清除其历史记录。
该方法主要针对永恒协调(eternal orchestrations),即可能永远无法完成的协调。它的工作原理是重新启动协调,为其提供新的输入,并截断现有的协调历史。它允许协调无限期地继续运行,而不会让其历史记录无限制地增长。定期截断历史记录的好处包括降低内存使用率、减少存储容量,以及在重建状态时缩短协调器重播时间。
当协调器调用 continueAsNew 时,任何未完成任务的结果都将被丢弃。例如,如果计划了一个定时器,但在定时器启动前调用了 continueAsNew,那么定时器事件将被丢弃。唯一的例外是外部事件。默认情况下,如果协调收到外部事件但尚未处理,则会通过调用 waitForExternalEvent 将该事件保存在协调状态单元中。即使协调器使用 continueAsNew 重新启动,这些事件也会保留在内存中。可以通过为 preserveUnprocessedEvents 参数值指定 false 来禁用此行为。
协调器实现应在调用 continueAsNew 方法后立即完成。
6.2 - DaprWorkflowContextImpl实现
类定义
DaprWorkflowContextImpl 类实现了 WorkflowContext 接口,实现上采用代理给内部字段 innerContext,这是一个 com.microsoft.durabletask.TaskOrchestrationContext
import com.microsoft.durabletask.TaskOrchestrationContext;
public class DaprWorkflowContextImpl implements WorkflowContext {
private final TaskOrchestrationContext innerContext;
private final Logger logger;
......
}
构造函数只是简单赋值,加了一些必要的 null 检测:
public DaprWorkflowContextImpl(TaskOrchestrationContext context) throws IllegalArgumentException {
this(context, LoggerFactory.getLogger(WorkflowContext.class));
}
public DaprWorkflowContextImpl(TaskOrchestrationContext context, Logger logger) throws IllegalArgumentException {
if (context == null) {
throw new IllegalArgumentException("Context cannot be null");
}
if (logger == null) {
throw new IllegalArgumentException("Logger cannot be null");
}
this.innerContext = context;
this.logger = logger;
}
方法实现
除 getLogger() 外的所有方法的实现都是简单的代理给 innerContext 的同名方法:
public Logger getLogger() {
if (this.innerContext.getIsReplaying()) {
return NOPLogger.NOP_LOGGER;
}
return this.logger;
}
public String getName() {
return this.innerContext.getName();
}
public String getInstanceId() {
return this.innerContext.getInstanceId();
}
public Instant getCurrentInstant() {
return this.innerContext.getCurrentInstant();
}
public boolean isReplaying() {
return this.innerContext.getIsReplaying();
}
public <V> Task<V> callSubWorkflow(String name, @Nullable Object input, @Nullable String instanceID,
@Nullable TaskOptions options, Class<V> returnType) {
return this.innerContext.callSubOrchestrator(name, input, instanceID, options, returnType);
}
public void continueAsNew(Object input) {
this.innerContext.continueAsNew(input);
}
小结
这个类基本就是 com.microsoft.durabletask.TaskOrchestrationContext
的简单包裹,所有功能都代理给 com.microsoft.durabletask.TaskOrchestrationContext
, 包括设计甚至方法名。
dapr 的 workflow 实现基本是完全绑定在 durabletask 上的。
6.3 - runtime package
6.3.1 - WorkflowRuntime实现
WorkflowRuntime 简单封装了 durabletask 的 DurableTaskGrpcWorker:
import com.microsoft.durabletask.DurableTaskGrpcWorker;
public class WorkflowRuntime implements AutoCloseable {
private DurableTaskGrpcWorker worker;
public WorkflowRuntime(DurableTaskGrpcWorker worker) {
this.worker = worker;
}
......
}
然后将 start() 和 close() 方法简单的代理给 durabletask 的 DurableTaskGrpcWorker:
public void start() {
this.start(true);
}
public void start(boolean block) {
if (block) {
this.worker.startAndBlock();
} else {
this.worker.start();
}
}
public void close() {
if (this.worker != null) {
this.worker.close();
this.worker = null;
}
}
6.3.2 - WorkflowRuntimeBuilder实现
类定义
WorkflowRuntimeBuilder 用来构建 WorkflowRuntime,类似 WorkflowRuntime 只是简单封装了 durabletask 的 DurableTaskGrpcWorker, WorkflowRuntimeBuilder 的实现也是简单封装了 durabletask 的 DurableTaskGrpcWorkerBuilder:
import com.microsoft.durabletask.DurableTaskGrpcWorkerBuilder;
public class WorkflowRuntimeBuilder {
private static volatile WorkflowRuntime instance;
private DurableTaskGrpcWorkerBuilder builder;
public WorkflowRuntimeBuilder() {
this.builder = new DurableTaskGrpcWorkerBuilder().grpcChannel(NetworkUtils.buildGrpcManagedChannel());
}
......
}
grpcChannel()的细节后面细看。
registerWorkflow()方法
registerWorkflow() 方法注册 workflow 对象,实际代理给 DurableTaskGrpcWorkerBuilder 的 addOrchestration() 方法:
public <T extends Workflow> WorkflowRuntimeBuilder registerWorkflow(Class<T> clazz) {
this.builder = this.builder.addOrchestration(
new OrchestratorWrapper<>(clazz)
);
return this;
}
registerActivity() 方法
registerActivity() 方法注册 activity 对象,实际代理给 DurableTaskGrpcWorkerBuilder 的 addActivity() 方法:
public <T extends WorkflowActivity> void registerActivity(Class<T> clazz) {
this.builder = this.builder.addActivity(
new ActivityWrapper<>(clazz)
);
}
build() 方法
build() 方法实现了一个简单的单例,只容许构建一个 WorkflowRuntime 的 instance:
private static volatile WorkflowRuntime instance;
public WorkflowRuntime build() {
if (instance == null) {
synchronized (WorkflowRuntime.class) {
if (instance == null) {
instance = new WorkflowRuntime(this.builder.build());
}
}
}
return instance;
}
grpcChannel 的构建细节
DurableTaskGrpcWorkerBuilder() 在构建时,需要设置 grpcChannel,而这个 grpcChannel 是通过 NetworkUtils.buildGrpcManagedChannel() 方法来实现的。
NetworkUtils.buildGrpcManagedChannel() 在 sdk/src/main/java/io/dapr/utils/NetworkUtils.java
文件中,是一个通用的网络工具类。buildGrpcManagedChannel() 方法的实现如下:
private static final String DEFAULT_SIDECAR_IP = "127.0.0.1";
private static final Integer DEFAULT_GRPC_PORT = 50001;
public static final Property<String> SIDECAR_IP = new StringProperty(
"dapr.sidecar.ip",
"DAPR_SIDECAR_IP",
DEFAULT_SIDECAR_IP);
public static final Property<Integer> GRPC_PORT = new IntegerProperty(
"dapr.grpc.port",
"DAPR_GRPC_PORT",
DEFAULT_GRPC_PORT);
public static final Property<String> GRPC_ENDPOINT = new StringProperty(
"dapr.grpc.endpoint",
"DAPR_GRPC_ENDPOINT",
null);
public static ManagedChannel buildGrpcManagedChannel() {
// 从系统属性或者环境变量中读取 dapr sidecar 的IP
String address = Properties.SIDECAR_IP.get();
// 从系统属性或者环境变量中读取 dapr grpc 端口
int port = Properties.GRPC_PORT.get();
// 默认不用https
boolean insecure = true;
// 从系统属性或者环境变量中读取 dapr grpc 端点信息
String grpcEndpoint = Properties.GRPC_ENDPOINT.get();
if ((grpcEndpoint != null) && !grpcEndpoint.isEmpty()) {
// 如果 dapr grpc 端点不为空,则用 grpc 端点的内容覆盖
URI uri = URI.create(grpcEndpoint);
// 通过 schema 是不是 http 来判断是 http 还是 https
insecure = uri.getScheme().equalsIgnoreCase("http");
// grpcEndpoint 如果设置有端口则采用,没有设置则根据是 http 还是 https 来选择 80 或者 443 端口
port = uri.getPort() > 0 ? uri.getPort() : (insecure ? 80 : 443);
// 覆盖 dapr sidecar 的地址
address = uri.getHost();
if ((uri.getPath() != null) && !uri.getPath().isEmpty()) {
address += uri.getPath();
}
}
// 构建连接到指定地址的 grpc channel
ManagedChannelBuilder<?> builder = ManagedChannelBuilder.forAddress(address, port)
.userAgent(Version.getSdkVersion());
if (insecure) {
builder = builder.usePlaintext();
}
return builder.build();
}
从部署来看,runtime 运行在 client 一侧的 app 应用程序内部,然后通过 durabletask 的 sdk 连接到 dapr sidecar 了,走 grpc 协议。
这个设计有点奇怪,dapr sdk 和 dapr sidecar 之间没有走标准的 dapr API,而是通过 durabletask 的 sdk 。
6.3.3 - OrchestratorWrapper实现
背景
WorkflowRuntimeBuilder 的 registerWorkflow() 方法在注册 workflow 对象时,实际代理给 DurableTaskGrpcWorkerBuilder 的 addOrchestration() 方法:
import com.microsoft.durabletask.TaskOrchestrationFactory;
public <T extends Workflow> WorkflowRuntimeBuilder registerWorkflow(Class<T> clazz) {
this.builder = this.builder.addOrchestration(
new OrchestratorWrapper<>(clazz)
);
return this;
}
而 addOrchestration() 方法的输入参数为 com.microsoft.durabletask.TaskOrchestrationFactory
:
public interface TaskOrchestrationFactory {
String getName();
TaskOrchestration create();
}
因此需要提供一个 TaskOrchestrationFactory 的实现。
类定义
OrchestratorWrapper 类实现了 com.microsoft.durabletask.TaskOrchestrationFactory
接口:
class OrchestratorWrapper<T extends Workflow> implements TaskOrchestrationFactory {
private final Constructor<T> workflowConstructor;
private final String name;
......
}
构造函数:
public OrchestratorWrapper(Class<T> clazz) {
// 获取并设置 name
this.name = clazz.getCanonicalName();
try {
// 获取 Constructor
this.workflowConstructor = clazz.getDeclaredConstructor();
} catch (NoSuchMethodException e) {
throw new RuntimeException(
String.format("No constructor found for workflow class '%s'.", this.name), e
);
}
}
接口实现
TaskOrchestrationFactory 接口要求的 getName() 方法,直接返回前面获取的 name:
@Override
public String getName() {
return name;
}
TaskOrchestrationFactory 接口要求的 create() 方法,要返回一个 durabletask 的 TaskOrchestration ,而 TaskOrchestration 是一个 @FunctionalInterface,仅有一个 run() 方法:
@FunctionalInterface
public interface TaskOrchestration {
void run(TaskOrchestrationContext ctx);
}
因此构建 TaskOrchestration 实例的方式被简写为:
import com.microsoft.durabletask.TaskOrchestration;
@Override
public TaskOrchestration create() {
return ctx -> {
T workflow;
try {
// 通过 workflow 的构造器生成一个 workflow 实例
workflow = this.workflowConstructor.newInstance();
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(
String.format("Unable to instantiate instance of workflow class '%s'", this.name), e
);
}
// 将 durable task 的 context 包装为 dapr 的 workflow context DaprWorkflowContextImpl
// 然后执行 workflow.run()
workflow.run(new DaprWorkflowContextImpl(ctx));
};
}
6.3.4 - ActivityWrapper实现
背景
WorkflowRuntimeBuilder 的 registerActivity() 方法在注册 activity 对象时,实际代理给 DurableTaskGrpcWorkerBuilder 的 addActivity() 方法:
import com.microsoft.durabletask.TaskOrchestrationFactory;
public <T extends WorkflowActivity> void registerActivity(Class<T> clazz) {
this.builder = this.builder.addActivity(
new ActivityWrapper<>(clazz)
);
}
而 addActivity() 方法的输入参数为 com.microsoft.durabletask.TaskActivityFactory
:
public interface TaskActivityFactory {
String getName();
TaskActivity create();
}
因此需要提供一个 TaskActivityFactory 的实现。
类定义
ActivityWrapper 类实现了 com.microsoft.durabletask.TaskActivityFactory
接口:
public class ActivityWrapper<T extends WorkflowActivity> implements TaskActivityFactory {
private final Constructor<T> activityConstructor;
private final String name;
......
}
构造函数:
public ActivityWrapper(Class<T> clazz) {
this.name = clazz.getCanonicalName();
try {
this.activityConstructor = clazz.getDeclaredConstructor();
} catch (NoSuchMethodException e) {
throw new RuntimeException(
String.format("No constructor found for activity class '%s'.", this.name), e
);
}
}
接口实现
TaskActivityFactory 接口要求的 getName() 方法,直接返回前面获取的 name:
@Override
public String getName() {
return name;
}
TaskActivityFactory 接口要求的 create() 方法,要返回一个 durabletask 的 TaskActivity ,而 TaskActivity 是一个 @FunctionalInterface,仅有一个 run() 方法:
@FunctionalInterface
public interface TaskActivity {
Object run(TaskActivityContext ctx);
}
因此构建 TaskActivity 实例的方式被简写为:
import com.microsoft.durabletask.TaskActivity;
@Override
public TaskActivity create() {
return ctx -> {
Object result;
T activity;
try {
activity = this.activityConstructor.newInstance();
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(
String.format("Unable to instantiate instance of activity class '%s'", this.name), e
);
}
result = activity.run(new WorkflowActivityContext(ctx));
return result;
};
}
}
6.3.5 - WorkflowActivity实现
WorkflowActivity接口定义
WorkflowActivity接口定义了 Activity
public interface WorkflowActivity {
/**
* 执行活动逻辑并返回一个值,该值将被序列化并返回给调用的协调器。
*
* @param ctx 提供有关当前活动执行的信息,如活动名称和协调程序提供给它的输入数据。
* @return 要返回给调用协调器的任何可序列化值。
*/
Object run(WorkflowActivityContext ctx);
}
WorkflowActivity 的 javadoc 描述如下:
任务活动实现的通用接口。
活动(Activity)是 durable task 协调的基本工作单位。活动(Activity)是在业务流程中进行协调的任务。例如,您可以创建一个协调器来处理订单。这些任务包括检查库存、向客户收费和创建装运。每个任务都是一个单独的活动(Activity)。这些活动(Activity)可以串行执行、并行执行或两者结合执行。
与任务协调器不同的是,活动(Activity)在工作类型上不受限制。活动(Activity)函数经常用于进行网络调用或运行 CPU 密集型操作。活动(Activity)还可以将数据返回给协调器函数。 durable task 运行时保证每个被调用的活动(Activity)函数在协调执行期间至少被执行一次。
由于活动(Activity)只能保证至少执行一次,因此建议尽可能将活动(Activity)逻辑作为幂等逻辑来实现。
协调器使用 io.dapr.workflows.WorkflowContext.callActivity 方法重载之一来调度活动。
WorkflowActivityContext
WorkflowActivityContext 简单包装了 durabletask 的 TaskActivityContext :
import com.microsoft.durabletask.TaskActivityContext;
public class WorkflowActivityContext implements TaskActivityContext {
private final TaskActivityContext innerContext;
public WorkflowActivityContext(TaskActivityContext context) throws IllegalArgumentException {
if (context == null) {
throw new IllegalArgumentException("Context cannot be null");
}
this.innerContext = context;
}
......
}
TaskActivityContext 接口要求的 getName() 和 getInput() 方法都简单代理给了内部的 durabletask 的 TaskActivityContext :
public String getName() {
return this.innerContext.getName();
}
public <T> T getInput(Class<T> targetType) {
return this.innerContext.getInput(targetType);
}
备注:这样的包装并没有起到隔离 dapr sdk 和 durabletask sdk 的目的,还是紧密的耦合在一起,包装的意义何在?
6.4 - client package
6.4.1 - DaprWorkflowClient代码实现
定义和创建
类定义
DaprWorkflowClient 定义管理 Dapr 工作流实例的客户端操作。
注意这里是 “管理” !
import com.microsoft.durabletask.DurableTaskClient;
public class DaprWorkflowClient implements AutoCloseable {
DurableTaskClient innerClient;
ManagedChannel grpcChannel;
public DaprWorkflowClient() {
this(NetworkUtils.buildGrpcManagedChannel());
}
private DaprWorkflowClient(ManagedChannel grpcChannel) {
this(createDurableTaskClient(grpcChannel), grpcChannel);
}
private DaprWorkflowClient(DurableTaskClient innerClient, ManagedChannel grpcChannel) {
this.innerClient = innerClient;
this.grpcChannel = grpcChannel;
}
实现上依然是包装 durabletask 的 DurableTaskClient , 而 durabletask 的 DurableTaskClient 在创建时需要传入一个 grpcChannel。
关键点在于这个 grpcChannel 的创建,可以从外部传入,如果没有传入则可以通过 NetworkUtils.buildGrpcManagedChannel() 方法进行创建。
grpcChannel 的创建
实现和之前 WorkflowRuntimeBuilder 中的一致,都是调用 NetworkUtils.buildGrpcManagedChannel()
方法。
NetworkUtils.buildGrpcManagedChannel()
方法在 dapr java sdk 中一共有3处调用:
-
WorkflowRuntimeBuilder:
public WorkflowRuntimeBuilder() { this.builder = new DurableTaskGrpcWorkerBuilder().grpcChannel(NetworkUtils.buildGrpcManagedChannel()); }
-
DaprWorkflowClient:
public DaprWorkflowClient() { this(NetworkUtils.buildGrpcManagedChannel()); }
-
DaprClientBuilder
final ManagedChannel channel = NetworkUtils.buildGrpcManagedChannel();
DurableTaskClient 的创建
DurableTaskClient 的创建是简单的调用 durabletask 的 DurableTaskGrpcClientBuilder 来实现的:
import com.microsoft.durabletask.DurableTaskGrpcClientBuilder;
private static DurableTaskClient createDurableTaskClient(ManagedChannel grpcChannel) {
return new DurableTaskGrpcClientBuilder()
.grpcChannel(grpcChannel)
.build();
}
close() 方法
close() 方法用于关闭 DaprWorkflowClient,内部实现为关闭包装的 durabletask 的 DurableTaskClient 以及创建时传入的 grpcChannel:
public void close() throws InterruptedException {
try {
if (this.innerClient != null) {
this.innerClient.close();
this.innerClient = null;
}
} finally {
if (this.grpcChannel != null && !this.grpcChannel.isShutdown()) {
this.grpcChannel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
this.grpcChannel = null;
}
}
}
}
操作 workflow instance
scheduleNewWorkflow() 方法
scheduleNewWorkflow() 方法调度一个新的 workflow ,即创建并开始一个新的 workflow instance,这个方法返回 workflow instance id:
package io.dapr.workflows.client;
public <T extends Workflow> String scheduleNewWorkflow(Class<T> clazz) {
return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName());
}
public <T extends Workflow> String scheduleNewWorkflow(Class<T> clazz, Object input) {
return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName(), input);
}
public <T extends Workflow> String scheduleNewWorkflow(Class<T> clazz, Object input, String instanceId) {
return this.innerClient.scheduleNewOrchestrationInstance(clazz.getCanonicalName(), input, instanceId);
}
实现完全代理给 durabletask 的 DurableTaskClient 。
terminateWorkflow() 方法
terminateWorkflow() 方法终止一个 workflow instance 的执行,需要传入之前从 scheduleNewWorkflow() 方法中得到的 workflow instance id。
public void terminateWorkflow(String workflowInstanceId, @Nullable Object output) {
this.innerClient.terminate(workflowInstanceId, output);
}
output 参数是可选的,用来传递被终止的 workflow instance 的输出。
getInstanceState() 方法
getInstanceState() 方法获取 workflow instance 的状态,同样需要传入之前从 scheduleNewWorkflow() 方法中得到的 workflow instance id:
@Nullable
public WorkflowInstanceStatus getInstanceState(String instanceId, boolean getInputsAndOutputs) {
OrchestrationMetadata metadata = this.innerClient.getInstanceMetadata(instanceId, getInputsAndOutputs);
if (metadata == null) {
return null;
}
return new WorkflowInstanceStatus(metadata);
}
实现为调用 durabletask 的 DurableTaskClient 的 getInstanceMetadata() 方法来获取 OrchestrationMetadata,然后转换为 dapr 定义的 WorkflowInstanceStatus()。
这里的细节在 WorkflowInstanceStatus 类实现中展开。
waitForInstanceStart() 方法
waitForInstanceStart() 方法等待 workflow instance 执行的开始:
@Nullable
public WorkflowInstanceStatus waitForInstanceStart(String instanceId, Duration timeout, boolean getInputsAndOutputs)
throws TimeoutException {
OrchestrationMetadata metadata = this.innerClient.waitForInstanceStart(instanceId, timeout, getInputsAndOutputs);
return metadata == null ? null : new WorkflowInstanceStatus(metadata);
}
waitForInstanceStart() 方法的 javadoc 描述为:
等待工作流开始运行,并返回一个 WorkflowInstanceStatus 对象,该对象包含已启动实例的元数据,以及可选的输入、输出和自定义状态有效载荷。
“已启动” 的工作流实例是指未处于 “Pending” 状态的任何实例。
如果调用该方法时工作流实例已在运行,该方法将立即返回。
waitForInstanceCompletion() 方法
waitForInstanceCompletion() 方法等待 workflow instance 执行的完成:
@Nullable
public WorkflowInstanceStatus waitForInstanceCompletion(String instanceId, Duration timeout,
boolean getInputsAndOutputs) throws TimeoutException {
OrchestrationMetadata metadata =
this.innerClient.waitForInstanceCompletion(instanceId, timeout, getInputsAndOutputs);
return metadata == null ? null : new WorkflowInstanceStatus(metadata);
}
waitForInstanceStart() 方法的 javadoc 描述为:
等待工作流完成,并返回一个包含已完成实例元数据的 WorkflowInstanceStatus 对象。
“已完成” 的工作流实例是指处于终止状态之一的任何实例。例如,“Completed”、“Failed” 或 “Terminated” 状态。
工作流是长期运行的,可能需要数小时、数天或数月才能完成。工作流也可能是长久的,在这种情况下,除非终止,否则永远不会完成。在这种情况下,该调用可能会无限期阻塞,因此必须注意确保使用适当的超时。如果调用该方法时工作流实例已经完成,该方法将立即返回。
purgeInstance() 方法
purgeInstance() 方法从工作流状态存储中清除工作流实例的状态:
public boolean purgeInstance(String workflowInstanceId) {
PurgeResult result = this.innerClient.purgeInstance(workflowInstanceId);
if (result != null) {
return result.getDeletedInstanceCount() > 0;
}
return false;
}
如果找到工作流状态并成功清除,则返回 true,否则返回 false。
raiseEvent() 方法
raiseEvent() 方法向等待中的工作流实例发送事件通知消息:
public void raiseEvent(String workflowInstanceId, String eventName, Object eventPayload) {
this.innerClient.raiseEvent(workflowInstanceId, eventName, eventPayload);
}
TaskHub的方法
这两个方法暂时还知道什么情况下用,暂时忽略。
public void createTaskHub(boolean recreateIfExists) {
this.innerClient.createTaskHub(recreateIfExists);
}
public void deleteTaskHub() {
this.innerClient.deleteTaskHub();
}
6.4.2 - WorkflowInstanceStatus代码实现
类定义和构造函数
WorkflowInstanceStatus 代表工作流实例当前状态的快照,包括元数据。
WorkflowInstanceStatus 的实现依然是包装 durabletask,内部是一个 durabletask 的 OrchestrationMetadata,以及 OrchestrationMetadata 携带的 FailureDetails:
import com.microsoft.durabletask.FailureDetails;
import com.microsoft.durabletask.OrchestrationMetadata;
public class WorkflowInstanceStatus {
private final OrchestrationMetadata orchestrationMetadata;
@Nullable
private final WorkflowFailureDetails failureDetails;
public WorkflowInstanceStatus(OrchestrationMetadata orchestrationMetadata) {
if (orchestrationMetadata == null) {
throw new IllegalArgumentException("OrchestrationMetadata cannot be null");
}
this.orchestrationMetadata = orchestrationMetadata;
FailureDetails details = orchestrationMetadata.getFailureDetails();
if (details != null) {
this.failureDetails = new WorkflowFailureDetails(details);
} else {
this.failureDetails = null;
}
}
获取 FailureDetails 之后将转为 dapr 的 WorkflowFailureDetails, 这里的细节在 WorkflowFailureDetails 类实现中展开。
各种代理方法
6.4.3 - WorkflowFailureDetails代码实现
WorkflowFailureDetails 只是非常简单的包装了 durabletask 的 FailureDetails
public class WorkflowFailureDetails {
FailureDetails workflowFailureDetails;
/**
* Class constructor.
* @param failureDetails failure Details
*/
public WorkflowFailureDetails(FailureDetails failureDetails) {
this.workflowFailureDetails = failureDetails;
}
然后代理各种方法:
public String getErrorType() {
return workflowFailureDetails.getErrorType();
}
public String getErrorMessage() {
return workflowFailureDetails.getErrorMessage();
}
public String getStackTrace() {
return workflowFailureDetails.getStackTrace();
}