1 - DaprHttp
Dapr HTTP 的 okhttp3 + jackson 实现
常量定义
public static final String API_VERSION = "v1.0";
public static final String ALPHA_1_API_VERSION = "v1.0-alpha1";
private static final String HEADER_DAPR_REQUEST_ID = "X-DaprRequestId";
private static final String DEFAULT_HTTP_SCHEME = "http";
private static final Set<String> ALLOWED_CONTEXT_IN_HEADERS =
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("grpc-trace-bin", "traceparent", "tracestate")));
HTTP 方法定义:
public enum HttpMethods {
NONE,
GET,
PUT,
POST,
DELETE,
HEAD,
CONNECT,
OPTIONS,
TRACE
}
基本类定义
public static class Response {
private byte[] body;
private Map<String, String> headers;
private int statusCode;
......
}
DaprHttp 类定义
private final OkHttpClient httpClient;
private final int port;
private final String hostname;
DaprHttp(String hostname, int port, OkHttpClient httpClient) {
this.hostname = hostname;
this.port = port;
this.httpClient = httpClient;
}
invokeApi() 方法实现
这个方法有多个重载,最终的实现如下,用来执行http调用请求:
/**
* 调用API,返回文本格式有效载荷。
*
* @param method HTTP method.
* @param pathSegments Array of path segments (/a/b/c -> ["a", "b", "c"]).
* @param urlParameters Parameters in the URL
* @param content payload to be posted.
* @param headers HTTP headers.
* @param context OpenTelemetry's Context.
* @return CompletableFuture for Response.
*/
private CompletableFuture<Response> doInvokeApi(String method,
String[] pathSegments,
Map<String, List<String>> urlParameters,
byte[] content, Map<String, String> headers,
Context context) {
// 方法人口参数基本就是一个非常简化的HTTP请求的格式抽象
// 取 UUID 为 requestId
final String requestId = UUID.randomUUID().toString();
RequestBody body;
//组装 okhttp3 的 request
String contentType = headers != null ? headers.get(Metadata.CONTENT_TYPE) : null;
MediaType mediaType = contentType == null ? MEDIA_TYPE_APPLICATION_JSON : MediaType.get(contentType);
if (content == null) {
body = mediaType.equals(MEDIA_TYPE_APPLICATION_JSON)
? REQUEST_BODY_EMPTY_JSON
: RequestBody.Companion.create(new byte[0], mediaType);
} else {
body = RequestBody.Companion.create(content, mediaType);
}
HttpUrl.Builder urlBuilder = new HttpUrl.Builder();
urlBuilder.scheme(DEFAULT_HTTP_SCHEME)
.host(this.hostname)
.port(this.port);
for (String pathSegment : pathSegments) {
urlBuilder.addPathSegment(pathSegment);
}
Optional.ofNullable(urlParameters).orElse(Collections.emptyMap()).entrySet().stream()
.forEach(urlParameter ->
Optional.ofNullable(urlParameter.getValue()).orElse(Collections.emptyList()).stream()
.forEach(urlParameterValue ->
urlBuilder.addQueryParameter(urlParameter.getKey(), urlParameterValue)));
Request.Builder requestBuilder = new Request.Builder()
.url(urlBuilder.build())
.addHeader(HEADER_DAPR_REQUEST_ID, requestId);
if (context != null) {
context.stream()
.filter(entry -> ALLOWED_CONTEXT_IN_HEADERS.contains(entry.getKey().toString().toLowerCase()))
.forEach(entry -> requestBuilder.addHeader(entry.getKey().toString(), entry.getValue().toString()));
}
if (HttpMethods.GET.name().equals(method)) {
requestBuilder.get();
} else if (HttpMethods.DELETE.name().equals(method)) {
requestBuilder.delete();
} else {
requestBuilder.method(method, body);
}
String daprApiToken = Properties.API_TOKEN.get();
if (daprApiToken != null) {
requestBuilder.addHeader(Headers.DAPR_API_TOKEN, daprApiToken);
}
if (headers != null) {
Optional.ofNullable(headers.entrySet()).orElse(Collections.emptySet()).stream()
.forEach(header -> {
requestBuilder.addHeader(header.getKey(), header.getValue());
});
}
// 完成 request 的组装,构建 request 对象
Request request = requestBuilder.build();
// 发出 okhttp3 的请求,然后返回 CompletableFuture
CompletableFuture<Response> future = new CompletableFuture<>();
this.httpClient.newCall(request).enqueue(new ResponseFutureCallback(future));
return future;
}
在 http 请求组装过程中,注意 header 的处理:
- request id: “X-DaprRequestId”,值为 UUID
- dapr api token: “dapr-api-token”,值从系统变量 “dapr.api.token” 或者环境变量 “DAPR_API_TOKEN” 中获取
- 发送请求时明确传递的header: 透传
- OpenTelemetry 相关的值:会试图从传递进来的 OpenTelemetry context 中获取 “grpc-trace-bin”, “traceparent”, “tracestate” 这三个 header 并继续传递下去
2 - DaprHttpBuilder
builder for DaprHttp,基于 okhttp
代码没啥特殊的,就注意一下 okhttp 的一些参数的获取。
另外 MaxRequestsPerHost 默认为5,这是一个超级大坑!
private DaprHttp buildDaprHttp() {
// 双重检查锁
if (OK_HTTP_CLIENT == null) {
synchronized (LOCK) {
if (OK_HTTP_CLIENT == null) {
OkHttpClient.Builder builder = new OkHttpClient.Builder();
Duration readTimeout = Duration.ofSeconds(Properties.HTTP_CLIENT_READ_TIMEOUT_SECONDS.get());
builder.readTimeout(readTimeout);
Dispatcher dispatcher = new Dispatcher();
dispatcher.setMaxRequests(Properties.HTTP_CLIENT_MAX_REQUESTS.get());
//这里有一个超级大坑!
// The maximum number of requests for each host to execute concurrently.
// Default value is 5 in okhttp which is totally UNACCEPTABLE!
// For sidecar case, set it the same as maxRequests.
dispatcher.setMaxRequestsPerHost(Properties.HTTP_CLIENT_MAX_REQUESTS.get());
builder.dispatcher(dispatcher);
ConnectionPool pool = new ConnectionPool(Properties.HTTP_CLIENT_MAX_IDLE_CONNECTIONS.get(),
KEEP_ALIVE_DURATION, TimeUnit.SECONDS);
builder.connectionPool(pool);
OK_HTTP_CLIENT = builder.build();
}
}
}
return new DaprHttp(Properties.SIDECAR_IP.get(), Properties.HTTP_PORT.get(), OK_HTTP_CLIENT);
}
}
相关的几个参数的获取:
- http read timeout:默认60秒,可以通过系统参数 dapr.http.client.maxRequests 或者环境变量 DAPR_HTTP_CLIENT_READ_TIMEOUT_SECONDS 覆盖
- http max request:默认 1024,可以通过系统参数 dapr.http.client.readTimeoutSeconds 或者环境变量 DAPR_HTTP_CLIENT_MAX_REQUESTS 覆盖
- http max idle connections: 默认 1024,可以通过系统参数 dapr.http.client.maxIdleConnections 或者环境变量 DAPR_HTTP_CLIENT_MAX_IDLE_CONNECTIONS 覆盖
- HTTP keep alive duration: hard code 为 30
对于 okhttp,还必须自动设置 http max request per host 参数,不然默认值为 5 对于 sidecar 来说完全不可用。