DaprClientGrpc
Dapr Client gRPC 实现
类定义
public class DaprClientGrpc extends AbstractDaprClient {
private Closeable channel;
private DaprGrpc.DaprStub asyncStub;
DaprClientGrpc(
Closeable closeableChannel,
DaprGrpc.DaprStub asyncStub,
DaprObjectSerializer objectSerializer,
DaprObjectSerializer stateSerializer) {
super(objectSerializer, stateSerializer);
this.channel = closeableChannel;
this.asyncStub = intercept(asyncStub);
}
}
client特有的方法实现
waitForSidecar() 方法
waitForSidecar() 方法通过连接指定的 sidecar ip地址和端口来判断并等待 sidecar 是不是可用。
和 HTTP 的实现差别只是端口不同。
@Override
public Mono<Void> waitForSidecar(int timeoutInMilliseconds) {
return Mono.fromRunnable(() -> {
try {
NetworkUtils.waitForSocket(Properties.SIDECAR_IP.get(), Properties.GRPC_PORT.get(), timeoutInMilliseconds);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
}
close() 方法
close() 方法是实现 java.lang.AutoCloseable 的要求,DaprClient 继承了这个接口:
public void close() throws Exception {
if (channel != null) {
DaprException.wrap(() -> {
// 关闭channel
channel.close();
return true;
}).call();
}
}
dapr api 方法的实现
publishEvent()方法
publishEvent()方法主要是两个任务:
- 组装发送 grpc 请求的各种参数,构建 PublishEventRequest 请求对象
- 调用 gRPC asyncStub 的对应方法发出 gRPC 请求
@Override
public Mono<Void> publishEvent(PublishEventRequest request) {
try {
String pubsubName = request.getPubsubName();
String topic = request.getTopic();
Object data = request.getData();
DaprProtos.PublishEventRequest.Builder envelopeBuilder = DaprProtos.PublishEventRequest.newBuilder()
.setTopic(topic)
.setPubsubName(pubsubName)
.setData(ByteString.copyFrom(objectSerializer.serialize(data)));
// Content-type can be overwritten on a per-request basis.
// It allows CloudEvents to be handled differently, for example.
String contentType = request.getContentType();
if (contentType == null || contentType.isEmpty()) {
contentType = objectSerializer.getContentType();
}
envelopeBuilder.setDataContentType(contentType);
Map<String, String> metadata = request.getMetadata();
if (metadata != null) {
envelopeBuilder.putAllMetadata(metadata);
}
return Mono.subscriberContext().flatMap(
context ->
this.<Empty>createMono(
it -> intercept(context, asyncStub).publishEvent(envelopeBuilder.build(), it)
)
).then();
} catch (Exception ex) {
return DaprException.wrapMono(ex);
}
}