WorkflowRuntimeBuilder实现
类定义
WorkflowRuntimeBuilder 用来构建 WorkflowRuntime,类似 WorkflowRuntime 只是简单封装了 durabletask 的 DurableTaskGrpcWorker, WorkflowRuntimeBuilder 的实现也是简单封装了 durabletask 的 DurableTaskGrpcWorkerBuilder:
import com.microsoft.durabletask.DurableTaskGrpcWorkerBuilder;
public class WorkflowRuntimeBuilder {
private static volatile WorkflowRuntime instance;
private DurableTaskGrpcWorkerBuilder builder;
public WorkflowRuntimeBuilder() {
this.builder = new DurableTaskGrpcWorkerBuilder().grpcChannel(NetworkUtils.buildGrpcManagedChannel());
}
......
}
grpcChannel()的细节后面细看。
registerWorkflow()方法
registerWorkflow() 方法注册 workflow 对象,实际代理给 DurableTaskGrpcWorkerBuilder 的 addOrchestration() 方法:
public <T extends Workflow> WorkflowRuntimeBuilder registerWorkflow(Class<T> clazz) {
this.builder = this.builder.addOrchestration(
new OrchestratorWrapper<>(clazz)
);
return this;
}
registerActivity() 方法
registerActivity() 方法注册 activity 对象,实际代理给 DurableTaskGrpcWorkerBuilder 的 addActivity() 方法:
public <T extends WorkflowActivity> void registerActivity(Class<T> clazz) {
this.builder = this.builder.addActivity(
new ActivityWrapper<>(clazz)
);
}
build() 方法
build() 方法实现了一个简单的单例,只容许构建一个 WorkflowRuntime 的 instance:
private static volatile WorkflowRuntime instance;
public WorkflowRuntime build() {
if (instance == null) {
synchronized (WorkflowRuntime.class) {
if (instance == null) {
instance = new WorkflowRuntime(this.builder.build());
}
}
}
return instance;
}
grpcChannel 的构建细节
DurableTaskGrpcWorkerBuilder() 在构建时,需要设置 grpcChannel,而这个 grpcChannel 是通过 NetworkUtils.buildGrpcManagedChannel() 方法来实现的。
NetworkUtils.buildGrpcManagedChannel() 在 sdk/src/main/java/io/dapr/utils/NetworkUtils.java
文件中,是一个通用的网络工具类。buildGrpcManagedChannel() 方法的实现如下:
private static final String DEFAULT_SIDECAR_IP = "127.0.0.1";
private static final Integer DEFAULT_GRPC_PORT = 50001;
public static final Property<String> SIDECAR_IP = new StringProperty(
"dapr.sidecar.ip",
"DAPR_SIDECAR_IP",
DEFAULT_SIDECAR_IP);
public static final Property<Integer> GRPC_PORT = new IntegerProperty(
"dapr.grpc.port",
"DAPR_GRPC_PORT",
DEFAULT_GRPC_PORT);
public static final Property<String> GRPC_ENDPOINT = new StringProperty(
"dapr.grpc.endpoint",
"DAPR_GRPC_ENDPOINT",
null);
public static ManagedChannel buildGrpcManagedChannel() {
// 从系统属性或者环境变量中读取 dapr sidecar 的IP
String address = Properties.SIDECAR_IP.get();
// 从系统属性或者环境变量中读取 dapr grpc 端口
int port = Properties.GRPC_PORT.get();
// 默认不用https
boolean insecure = true;
// 从系统属性或者环境变量中读取 dapr grpc 端点信息
String grpcEndpoint = Properties.GRPC_ENDPOINT.get();
if ((grpcEndpoint != null) && !grpcEndpoint.isEmpty()) {
// 如果 dapr grpc 端点不为空,则用 grpc 端点的内容覆盖
URI uri = URI.create(grpcEndpoint);
// 通过 schema 是不是 http 来判断是 http 还是 https
insecure = uri.getScheme().equalsIgnoreCase("http");
// grpcEndpoint 如果设置有端口则采用,没有设置则根据是 http 还是 https 来选择 80 或者 443 端口
port = uri.getPort() > 0 ? uri.getPort() : (insecure ? 80 : 443);
// 覆盖 dapr sidecar 的地址
address = uri.getHost();
if ((uri.getPath() != null) && !uri.getPath().isEmpty()) {
address += uri.getPath();
}
}
// 构建连接到指定地址的 grpc channel
ManagedChannelBuilder<?> builder = ManagedChannelBuilder.forAddress(address, port)
.userAgent(Version.getSdkVersion());
if (insecure) {
builder = builder.usePlaintext();
}
return builder.build();
}
从部署来看,runtime 运行在 client 一侧的 app 应用程序内部,然后通过 durabletask 的 sdk 连接到 dapr sidecar 了,走 grpc 协议。
这个设计有点奇怪,dapr sdk 和 dapr sidecar 之间没有走标准的 dapr API,而是通过 durabletask 的 sdk 。