OrchestratorWrapper实现
OrchestratorWrapper的代码实现
背景
WorkflowRuntimeBuilder 的 registerWorkflow() 方法在注册 workflow 对象时,实际代理给 DurableTaskGrpcWorkerBuilder 的 addOrchestration() 方法:
import com.microsoft.durabletask.TaskOrchestrationFactory;
public <T extends Workflow> WorkflowRuntimeBuilder registerWorkflow(Class<T> clazz) {
this.builder = this.builder.addOrchestration(
new OrchestratorWrapper<>(clazz)
);
return this;
}
而 addOrchestration() 方法的输入参数为 com.microsoft.durabletask.TaskOrchestrationFactory
:
public interface TaskOrchestrationFactory {
String getName();
TaskOrchestration create();
}
因此需要提供一个 TaskOrchestrationFactory 的实现。
类定义
OrchestratorWrapper 类实现了 com.microsoft.durabletask.TaskOrchestrationFactory
接口:
class OrchestratorWrapper<T extends Workflow> implements TaskOrchestrationFactory {
private final Constructor<T> workflowConstructor;
private final String name;
......
}
构造函数:
public OrchestratorWrapper(Class<T> clazz) {
// 获取并设置 name
this.name = clazz.getCanonicalName();
try {
// 获取 Constructor
this.workflowConstructor = clazz.getDeclaredConstructor();
} catch (NoSuchMethodException e) {
throw new RuntimeException(
String.format("No constructor found for workflow class '%s'.", this.name), e
);
}
}
接口实现
TaskOrchestrationFactory 接口要求的 getName() 方法,直接返回前面获取的 name:
@Override
public String getName() {
return name;
}
TaskOrchestrationFactory 接口要求的 create() 方法,要返回一个 durabletask 的 TaskOrchestration ,而 TaskOrchestration 是一个 @FunctionalInterface,仅有一个 run() 方法:
@FunctionalInterface
public interface TaskOrchestration {
void run(TaskOrchestrationContext ctx);
}
因此构建 TaskOrchestration 实例的方式被简写为:
import com.microsoft.durabletask.TaskOrchestration;
@Override
public TaskOrchestration create() {
return ctx -> {
T workflow;
try {
// 通过 workflow 的构造器生成一个 workflow 实例
workflow = this.workflowConstructor.newInstance();
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(
String.format("Unable to instantiate instance of workflow class '%s'", this.name), e
);
}
// 将 durable task 的 context 包装为 dapr 的 workflow context DaprWorkflowContextImpl
// 然后执行 workflow.run()
workflow.run(new DaprWorkflowContextImpl(ctx));
};
}