ActivityWrapper实现
ActivityWrapper的代码实现
背景
WorkflowRuntimeBuilder 的 registerActivity() 方法在注册 activity 对象时,实际代理给 DurableTaskGrpcWorkerBuilder 的 addActivity() 方法:
import com.microsoft.durabletask.TaskOrchestrationFactory;
public <T extends WorkflowActivity> void registerActivity(Class<T> clazz) {
this.builder = this.builder.addActivity(
new ActivityWrapper<>(clazz)
);
}
而 addActivity() 方法的输入参数为 com.microsoft.durabletask.TaskActivityFactory
:
public interface TaskActivityFactory {
String getName();
TaskActivity create();
}
因此需要提供一个 TaskActivityFactory 的实现。
类定义
ActivityWrapper 类实现了 com.microsoft.durabletask.TaskActivityFactory
接口:
public class ActivityWrapper<T extends WorkflowActivity> implements TaskActivityFactory {
private final Constructor<T> activityConstructor;
private final String name;
......
}
构造函数:
public ActivityWrapper(Class<T> clazz) {
this.name = clazz.getCanonicalName();
try {
this.activityConstructor = clazz.getDeclaredConstructor();
} catch (NoSuchMethodException e) {
throw new RuntimeException(
String.format("No constructor found for activity class '%s'.", this.name), e
);
}
}
接口实现
TaskActivityFactory 接口要求的 getName() 方法,直接返回前面获取的 name:
@Override
public String getName() {
return name;
}
TaskActivityFactory 接口要求的 create() 方法,要返回一个 durabletask 的 TaskActivity ,而 TaskActivity 是一个 @FunctionalInterface,仅有一个 run() 方法:
@FunctionalInterface
public interface TaskActivity {
Object run(TaskActivityContext ctx);
}
因此构建 TaskActivity 实例的方式被简写为:
import com.microsoft.durabletask.TaskActivity;
@Override
public TaskActivity create() {
return ctx -> {
Object result;
T activity;
try {
activity = this.activityConstructor.newInstance();
} catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(
String.format("Unable to instantiate instance of activity class '%s'", this.name), e
);
}
result = activity.run(new WorkflowActivityContext(ctx));
return result;
};
}
}