runtime package
- 1: WorkflowRuntime实现
- 2: WorkflowRuntimeBuilder实现
- 3: OrchestratorWrapper实现
- 4: ActivityWrapper实现
- 5: WorkflowActivity实现
1 - WorkflowRuntime实现
WorkflowRuntime 简单封装了 durabletask 的 DurableTaskGrpcWorker:
import com.microsoft.durabletask.DurableTaskGrpcWorker;
public class WorkflowRuntime implements AutoCloseable {
private DurableTaskGrpcWorker worker;
public WorkflowRuntime(DurableTaskGrpcWorker worker) {
this.worker = worker;
}
......
}
然后将 start() 和 close() 方法简单的代理给 durabletask 的 DurableTaskGrpcWorker:
public void start() {
this.start(true);
}
public void start(boolean block) {
if (block) {
this.worker.startAndBlock();
} else {
this.worker.start();
}
}
public void close() {
if (this.worker != null) {
this.worker.close();
this.worker = null;
}
}
2 - WorkflowRuntimeBuilder实现
类定义
WorkflowRuntimeBuilder 用来构建 WorkflowRuntime,类似 WorkflowRuntime 只是简单封装了 durabletask 的 DurableTaskGrpcWorker, WorkflowRuntimeBuilder 的实现也是简单封装了 durabletask 的 DurableTaskGrpcWorkerBuilder:
import com.microsoft.durabletask.DurableTaskGrpcWorkerBuilder;
public class WorkflowRuntimeBuilder {
private static volatile WorkflowRuntime instance;
private DurableTaskGrpcWorkerBuilder builder;
public WorkflowRuntimeBuilder() {
this.builder = new DurableTaskGrpcWorkerBuilder().grpcChannel(NetworkUtils.buildGrpcManagedChannel());
}
......
}
grpcChannel()的细节后面细看。
registerWorkflow()方法
registerWorkflow() 方法注册 workflow 对象,实际代理给 DurableTaskGrpcWorkerBuilder 的 addOrchestration() 方法:
public <T extends Workflow> WorkflowRuntimeBuilder registerWorkflow(Class<T> clazz) {
this.builder = this.builder.addOrchestration(
new OrchestratorWrapper<>(clazz)
);
return this;
}
registerActivity() 方法
registerActivity() 方法注册 activity 对象,实际代理给 DurableTaskGrpcWorkerBuilder 的 addActivity() 方法:
public <T extends WorkflowActivity> void registerActivity(Class<T> clazz) {
this.builder = this.builder.addActivity(
new ActivityWrapper<>(clazz)
);
}
build() 方法
build() 方法实现了一个简单的单例,只容许构建一个 WorkflowRuntime 的 instance:
private static volatile WorkflowRuntime instance;
public WorkflowRuntime build() {
if (instance == null) {
synchronized (WorkflowRuntime.class) {
if (instance == null) {
instance = new WorkflowRuntime(this.builder.build());
}
}
}
return instance;
}
grpcChannel 的构建细节
DurableTaskGrpcWorkerBuilder() 在构建时,需要设置 grpcChannel,而这个 grpcChannel 是通过 NetworkUtils.buildGrpcManagedChannel() 方法来实现的。
NetworkUtils.buildGrpcManagedChannel() 在 sdk/src/main/java/io/dapr/utils/NetworkUtils.java
文件中,是一个通用的网络工具类。buildGrpcManagedChannel() 方法的实现如下:
private static final String DEFAULT_SIDECAR_IP = "127.0.0.1";
private static final Integer DEFAULT_GRPC_PORT = 50001;
public static final Property<String> SIDECAR_IP = new StringProperty(
"dapr.sidecar.ip",
"DAPR_SIDECAR_IP",
DEFAULT_SIDECAR_IP);
public static final Property<Integer> GRPC_PORT = new IntegerProperty(
"dapr.grpc.port",
"DAPR_GRPC_PORT",
DEFAULT_GRPC_PORT);
public static final Property<String> GRPC_ENDPOINT = new StringProperty(
"dapr.grpc.endpoint",
"DAPR_GRPC_ENDPOINT",
null);
public static ManagedChannel buildGrpcManagedChannel() {
// 从系统属性或者环境变量中读取 dapr sidecar 的IP
String address = Properties.SIDECAR_IP.get();
// 从系统属性或者环境变量中读取 dapr grpc 端口
int port = Properties.GRPC_PORT.get();
// 默认不用https
boolean insecure = true;
// 从系统属性或者环境变量中读取 dapr grpc 端点信息
String grpcEndpoint = Properties.GRPC_ENDPOINT.get();
if ((grpcEndpoint != null) && !grpcEndpoint.isEmpty()) {
// 如果 dapr grpc 端点不为空,则用 grpc 端点的内容覆盖
URI uri = URI.create(grpcEndpoint);
// 通过 schema 是不是 http 来判断是 http 还是 https
insecure = uri.getScheme().equalsIgnoreCase("http");
// grpcEndpoint 如果设置有端口则采用,没有设置则根据是 http 还是 https 来选择 80 或者 443 端口
port = uri.getPort() > 0 ? uri.getPort() : (insecure ? 80 : 443);
// 覆盖 dapr sidecar 的地址
address = uri.getHost();
if ((uri.getPath() != null) && !uri.getPath().isEmpty()) {
address += uri.getPath();
}
}
// 构建连接到指定地址的 grpc channel
ManagedChannelBuilder<?> builder = ManagedChannelBuilder.forAddress(address, port)
.userAgent(Version.getSdkVersion());
if (insecure) {
builder = builder.usePlaintext();
}
return builder.build();
}
从部署来看,runtime 运行在 client 一侧的 app 应用程序内部,然后通过 durabletask 的 sdk 连接到 dapr sidecar 了,走 grpc 协议。
这个设计有点奇怪,dapr sdk 和 dapr sidecar 之间没有走标准的 dapr API,而是通过 durabletask 的 sdk 。
3 - OrchestratorWrapper实现
背景
WorkflowRuntimeBuilder 的 registerWorkflow() 方法在注册 workflow 对象时,实际代理给 DurableTaskGrpcWorkerBuilder 的 addOrchestration() 方法:
import com.microsoft.durabletask.TaskOrchestrationFactory;
public <T extends Workflow> WorkflowRuntimeBuilder registerWorkflow(Class<T> clazz) {
this.builder = this.builder.addOrchestration(
new OrchestratorWrapper<>(clazz)
);
return this;
}
而 addOrchestration() 方法的输入参数为 com.microsoft.durabletask.TaskOrchestrationFactory
:
public interface TaskOrchestrationFactory {
String getName();
TaskOrchestration create();
}
因此需要提供一个 TaskOrchestrationFactory 的实现。
类定义
OrchestratorWrapper 类实现了 com.microsoft.durabletask.TaskOrchestrationFactory
接口:
class OrchestratorWrapper<T extends Workflow> implements TaskOrchestrationFactory {
private final Constructor<T> workflowConstructor;
private final String name;
......
}
构造函数:
public OrchestratorWrapper(Class<T> clazz) {
// 获取并设置 name
this.name = clazz.getCanonicalName();
try {
// 获取 Constructor
this.workflowConstructor = clazz.getDeclaredConstructor();
} catch (NoSuchMethodException e) {
throw new RuntimeException(
String.format("No constructor found for workflow class '%s'.", this.name), e
);
}
}
接口实现
TaskOrchestrationFactory 接口要求的 getName() 方法,直接返回前面获取的 name:
@Override
public String getName() {
return name;
}
TaskOrchestrationFactory 接口要求的 create() 方法,要返回一个 durabletask 的 TaskOrchestration ,而 TaskOrchestration 是一个 @FunctionalInterface,仅有一个 run() 方法:
@FunctionalInterface
public interface TaskOrchestration {
void run(TaskOrchestrationContext ctx);
}
因此构建 TaskOrchestration 实例的方式被简写为:
import com.microsoft.durabletask.TaskOrchestration;
@Override
public TaskOrchestration create() {
return ctx -> {
T workflow;
try {
// 通过 workflow 的构造器生成一个 workflow 实例
workflow = this.workflowConstructor.newInstance();
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(
String.format("Unable to instantiate instance of workflow class '%s'", this.name), e
);
}
// 将 durable task 的 context 包装为 dapr 的 workflow context DaprWorkflowContextImpl
// 然后执行 workflow.run()
workflow.run(new DaprWorkflowContextImpl(ctx));
};
}
4 - ActivityWrapper实现
背景
WorkflowRuntimeBuilder 的 registerActivity() 方法在注册 activity 对象时,实际代理给 DurableTaskGrpcWorkerBuilder 的 addActivity() 方法:
import com.microsoft.durabletask.TaskOrchestrationFactory;
public <T extends WorkflowActivity> void registerActivity(Class<T> clazz) {
this.builder = this.builder.addActivity(
new ActivityWrapper<>(clazz)
);
}
而 addActivity() 方法的输入参数为 com.microsoft.durabletask.TaskActivityFactory
:
public interface TaskActivityFactory {
String getName();
TaskActivity create();
}
因此需要提供一个 TaskActivityFactory 的实现。
类定义
ActivityWrapper 类实现了 com.microsoft.durabletask.TaskActivityFactory
接口:
public class ActivityWrapper<T extends WorkflowActivity> implements TaskActivityFactory {
private final Constructor<T> activityConstructor;
private final String name;
......
}
构造函数:
public ActivityWrapper(Class<T> clazz) {
this.name = clazz.getCanonicalName();
try {
this.activityConstructor = clazz.getDeclaredConstructor();
} catch (NoSuchMethodException e) {
throw new RuntimeException(
String.format("No constructor found for activity class '%s'.", this.name), e
);
}
}
接口实现
TaskActivityFactory 接口要求的 getName() 方法,直接返回前面获取的 name:
@Override
public String getName() {
return name;
}
TaskActivityFactory 接口要求的 create() 方法,要返回一个 durabletask 的 TaskActivity ,而 TaskActivity 是一个 @FunctionalInterface,仅有一个 run() 方法:
@FunctionalInterface
public interface TaskActivity {
Object run(TaskActivityContext ctx);
}
因此构建 TaskActivity 实例的方式被简写为:
import com.microsoft.durabletask.TaskActivity;
@Override
public TaskActivity create() {
return ctx -> {
Object result;
T activity;
try {
activity = this.activityConstructor.newInstance();
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(
String.format("Unable to instantiate instance of activity class '%s'", this.name), e
);
}
result = activity.run(new WorkflowActivityContext(ctx));
return result;
};
}
}
5 - WorkflowActivity实现
WorkflowActivity接口定义
WorkflowActivity接口定义了 Activity
public interface WorkflowActivity {
/**
* 执行活动逻辑并返回一个值,该值将被序列化并返回给调用的协调器。
*
* @param ctx 提供有关当前活动执行的信息,如活动名称和协调程序提供给它的输入数据。
* @return 要返回给调用协调器的任何可序列化值。
*/
Object run(WorkflowActivityContext ctx);
}
WorkflowActivity 的 javadoc 描述如下:
任务活动实现的通用接口。
活动(Activity)是 durable task 协调的基本工作单位。活动(Activity)是在业务流程中进行协调的任务。例如,您可以创建一个协调器来处理订单。这些任务包括检查库存、向客户收费和创建装运。每个任务都是一个单独的活动(Activity)。这些活动(Activity)可以串行执行、并行执行或两者结合执行。
与任务协调器不同的是,活动(Activity)在工作类型上不受限制。活动(Activity)函数经常用于进行网络调用或运行 CPU 密集型操作。活动(Activity)还可以将数据返回给协调器函数。 durable task 运行时保证每个被调用的活动(Activity)函数在协调执行期间至少被执行一次。
由于活动(Activity)只能保证至少执行一次,因此建议尽可能将活动(Activity)逻辑作为幂等逻辑来实现。
协调器使用 io.dapr.workflows.WorkflowContext.callActivity 方法重载之一来调度活动。
WorkflowActivityContext
WorkflowActivityContext 简单包装了 durabletask 的 TaskActivityContext :
import com.microsoft.durabletask.TaskActivityContext;
public class WorkflowActivityContext implements TaskActivityContext {
private final TaskActivityContext innerContext;
public WorkflowActivityContext(TaskActivityContext context) throws IllegalArgumentException {
if (context == null) {
throw new IllegalArgumentException("Context cannot be null");
}
this.innerContext = context;
}
......
}
TaskActivityContext 接口要求的 getName() 和 getInput() 方法都简单代理给了内部的 durabletask 的 TaskActivityContext :
public String getName() {
return this.innerContext.getName();
}
public <T> T getInput(Class<T> targetType) {
return this.innerContext.getInput(targetType);
}
备注:这样的包装并没有起到隔离 dapr sdk 和 durabletask sdk 的目的,还是紧密的耦合在一起,包装的意义何在?