学习 Dapr 的测试
Dapr测试学习
- 1: 单元测试
- 2: e2e测试
- 2.1: e2e测试的搭建
- 2.2: 搭建e2e测试环境时的常见问题
- 3: 性能测试
- 3.1: 运行性能测试
- 3.2: 如何使用fortio实现性能测试
- 3.3: 性能测试案例 service invoke http 的实现
- 3.4: 性能测试案例 state in-momery 的实现
- 3.5: 性能测试案例 pubsub in-momery 的实现
- 4: 认证测试
- 4.1: 认证测试概述
- 4.2: pubsub
- 4.2.1: kafka pubsub
- 4.3: bindings
- 4.3.1: kafka bindings
2 - e2e测试
2.1 - e2e测试的搭建
前言
dapr 为 e2e 测试提供了一个说明文档, 存在放 dapr/dapr
仓库下的 tests/docs/running-e2e-test.md
下:
https://github.com/dapr/dapr/blob/master/tests/docs/running-e2e-test.md
以下内容为参考官方文档的详细步骤。
准备工作
准备dapr开发环境
https://github.com/dapr/dapr/blob/master/docs/development/setup-dapr-development-env.md
具体有:
- 准备docker:包括安装 docker ,创建 docker hub 账号
- 准备golang:必须是go 1.17
- 准备k8s
准备helm v3
参考:https://helm.sh/docs/intro/install/
在mac下最简单的方式就是用 brew 安装:
$ brew install helm
$ helm version
version.BuildInfo{Version:"v3.8.1", GitCommit:"5cb9af4b1b271d11d7a97a71df3ac337dd94ad37", GitTreeState:"clean", GoVersion:"go1.17.8"}
ubuntu下通过 apt 命令安装:
curl https://baltocdn.com/helm/signing.asc | sudo apt-key add -
sudo apt-get install apt-transport-https --yes
echo "deb https://baltocdn.com/helm/stable/debian/ all main" | sudo tee /etc/apt/sources.list.d/helm-stable-debian.list
sudo apt-get update
sudo apt-get install helm
准备dapr
e2e 需要用到 kubernetes。
-
安装dapr命令行
-
在 kubernets 中安装 dapr 控制面参考:quickstarts/hello-kubernetes at master · dapr/quickstarts (github.com)
dapr init --kubernetes --wait
这样安装出来的是标准的dapr,其镜像版本为release版本,如下所示, dapr-sidecar-injector pod的container:
Containers: dapr-sidecar-injector: Container ID: docker://bbda12f7c31adc380122651ca2b0ccf90cc599735da54df411df5764689e4cd6 Image: docker.io/daprio/dapr:1.1.2
所以,如果需要改动dapr控制面,则不能用这个方式安装dapr控制面。如果只是测试验证daprd(sidecar),则这个方式是可以的。
搭建 Dapr 控制面
以master分支为例,从dapr/dapr仓库的源码开始构建。
设置e2e相关的环境变量
m1本地测试
在 m1 macbook 上,如果为了构建后给本地的 arm64 docker 和 arm64 k8s 运行:
export DAPR_REGISTRY=docker.io/skyao
export DAPR_TAG=dev
export DAPR_NAMESPACE=dapr-tests
export DAPR_TEST_NAMESPACE=dapr-tests
export DAPR_TEST_REGISTRY=docker.io/skyao
export DAPR_TEST_TAG=dev-linux-arm64
export GOOS=linux
export GOARCH=arm64 # 默认是amd64,m1上本地运行需要修改为arm64
export TARGET_OS=linux
export TARGET_ARCH=arm64 # 默认是amd64,m1上本地运行需要修改为arm64
m1 交叉测试
在 m1 macbook 上,如果为了构建后给远程的 amd64 docker 和 amd64 k8s 运行:
export DAPR_REGISTRY=docker.io/skyao
export DAPR_TAG=dev
export DAPR_NAMESPACE=dapr-tests
export DAPR_TEST_NAMESPACE=dapr-tests
export DAPR_TEST_REGISTRY=docker.io/skyao
export DAPR_TEST_TAG=dev-linux-amd64
export GOOS=linux
export GOARCH=amd64
export TARGET_OS=linux
export TARGET_ARCH=amd64
#export DAPR_TEST_MINIKUBE_IP=192.168.100.40 # use this in IDE
#export MINIKUBE_NODE_IP=192.168.100.40 # use this in make command
amd64
在 amd64 机器上:
export DAPR_REGISTRY=docker.io/skyao
export DAPR_TAG=dev
export DAPR_NAMESPACE=dapr-tests
export DAPR_TEST_NAMESPACE=dapr-tests
export DAPR_TEST_REGISTRY=docker.io/skyao
export DAPR_TEST_TAG=dev-linux-amd64
export GOOS=linux
export GOARCH=amd64
export TARGET_OS=linux
export TARGET_ARCH=amd64
#export DAPR_TEST_MINIKUBE_IP=192.168.100.40 # use this in IDE
#export MINIKUBE_NODE_IP=192.168.100.40 # use this in make command
构建dapr镜像
make build-linux
make docker-build
make docker-push
m1 本地测试
由于缺乏类似 redis 之类的 arm64 镜像, 这个方案跑不通.
暂时放弃.
m1 交叉测试
在 m1 macbook 上构建 linux + amd 64 的二进制文件:
$ make build-linux
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags="-X github.com/dapr/dapr/pkg/version.gitcommit=551722f533afa5dfee97482fe3e63d8ff6233d50 -X github.com/dapr/dapr/pkg/version.gitversion=v1.5.1-rc.3-411-g551722f-dirty -X github.com/dapr/dapr/pkg/version.version=edge -X github.com/dapr/kit/logger.DaprVersion=edge -s -w" -o ./dist/linux_amd64/release/daprd ./cmd/daprd/;
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags="-X github.com/dapr/dapr/pkg/version.gitcommit=551722f533afa5dfee97482fe3e63d8ff6233d50 -X github.com/dapr/dapr/pkg/version.gitversion=v1.5.1-rc.3-411-g551722f-dirty -X github.com/dapr/dapr/pkg/version.version=edge -X github.com/dapr/kit/logger.DaprVersion=edge -s -w" -o ./dist/linux_amd64/release/placement ./cmd/placement/;
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags="-X github.com/dapr/dapr/pkg/version.gitcommit=551722f533afa5dfee97482fe3e63d8ff6233d50 -X github.com/dapr/dapr/pkg/version.gitversion=v1.5.1-rc.3-411-g551722f-dirty -X github.com/dapr/dapr/pkg/version.version=edge -X github.com/dapr/kit/logger.DaprVersion=edge -s -w" -o ./dist/linux_amd64/release/operator ./cmd/operator/;
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags="-X github.com/dapr/dapr/pkg/version.gitcommit=551722f533afa5dfee97482fe3e63d8ff6233d50 -X github.com/dapr/dapr/pkg/version.gitversion=v1.5.1-rc.3-411-g551722f-dirty -X github.com/dapr/dapr/pkg/version.version=edge -X github.com/dapr/kit/logger.DaprVersion=edge -s -w" -o ./dist/linux_amd64/release/injector ./cmd/injector/;
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags="-X github.com/dapr/dapr/pkg/version.gitcommit=551722f533afa5dfee97482fe3e63d8ff6233d50 -X github.com/dapr/dapr/pkg/version.gitversion=v1.5.1-rc.3-411-g551722f-dirty -X github.com/dapr/dapr/pkg/version.version=edge -X github.com/dapr/kit/logger.DaprVersion=edge -s -w" -o ./dist/linux_amd64/release/sentry ./cmd/sentry/;
打包 docker 镜像:
$ make docker-build
Building docker.io/skyao/dapr:dev docker image ...
docker build --build-arg PKG_FILES=* -f ./docker/Dockerfile ./dist/linux_amd64/release -t docker.io/skyao/dapr:dev-linux-amd64
[+] Building 16.1s (12/12) FINISHED
......
=> => naming to docker.io/skyao/dapr:dev-linux-amd64
=> => naming to docker.io/skyao/daprd:dev-linux-amd64
=> => naming to docker.io/skyao/placement:dev-linux-amd64
=> => naming to docker.io/skyao/sentry:dev-linux-amd64
推送 docker 镜像
$ make docker-push
Building docker.io/skyao/dapr:dev docker image ...
docker build --build-arg PKG_FILES=* -f ./docker/Dockerfile ./dist/linux_amd64/release -t docker.io/skyao/dapr:dev-linux-amd64
.....
docker push docker.io/skyao/dapr:dev-linux-amd64
The push refers to repository [docker.io/skyao/dapr]
docker push docker.io/skyao/daprd:dev-linux-amd64
The push refers to repository [docker.io/skyao/daprd]
docker push docker.io/skyao/placement:dev-linux-amd64
The push refers to repository [docker.io/skyao/placement]
docker push docker.io/skyao/sentry:dev-linux-amd64
The push refers to repository [docker.io/skyao/sentry]
amd64
在 x86 机器上构建 linux + amd 64 的二进制文件:
$ make build-linux
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags="-X github.com/dapr/dapr/pkg/version.gitcommit=79ffea446dfd14ac25429c8dc9ad792983b46ce2 -X github.com/dapr/dapr/pkg/version.gitversion=v1.0.0-rc.4-1305-g79ffea4 -X github.com/dapr/dapr/pkg/version.version=edge -X github.com/dapr/kit/logger.DaprVersion=edge -s -w" -o ./dist/linux_amd64/release/daprd ./cmd/daprd/;
go: downloading github.com/dapr/components-contrib v1.6.0-rc.2.0.20220322152414-4d44c2f04f43
go: downloading github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d
go: downloading github.com/alibabacloud-go/darabonba-openapi v0.1.16
go: downloading github.com/apache/pulsar-client-go v0.8.1
go: downloading github.com/prometheus/client_golang v1.11.1
go: downloading github.com/klauspost/compress v1.14.4
go: downloading github.com/alibabacloud-go/alibabacloud-gateway-spi v0.0.4
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags="-X github.com/dapr/dapr/pkg/version.gitcommit=79ffea446dfd14ac25429c8dc9ad792983b46ce2 -X github.com/dapr/dapr/pkg/version.gitversion=v1.0.0-rc.4-1305-g79ffea4 -X github.com/dapr/dapr/pkg/version.version=edge -X github.com/dapr/kit/logger.DaprVersion=edge -s -w" -o ./dist/linux_amd64/release/placement ./cmd/placement/;
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags="-X github.com/dapr/dapr/pkg/version.gitcommit=79ffea446dfd14ac25429c8dc9ad792983b46ce2 -X github.com/dapr/dapr/pkg/version.gitversion=v1.0.0-rc.4-1305-g79ffea4 -X github.com/dapr/dapr/pkg/version.version=edge -X github.com/dapr/kit/logger.DaprVersion=edge -s -w" -o ./dist/linux_amd64/release/operator ./cmd/operator/;
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags="-X github.com/dapr/dapr/pkg/version.gitcommit=79ffea446dfd14ac25429c8dc9ad792983b46ce2 -X github.com/dapr/dapr/pkg/version.gitversion=v1.0.0-rc.4-1305-g79ffea4 -X github.com/dapr/dapr/pkg/version.version=edge -X github.com/dapr/kit/logger.DaprVersion=edge -s -w" -o ./dist/linux_amd64/release/injector ./cmd/injector/;
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags="-X github.com/dapr/dapr/pkg/version.gitcommit=79ffea446dfd14ac25429c8dc9ad792983b46ce2 -X github.com/dapr/dapr/pkg/version.gitversion=v1.0.0-rc.4-1305-g79ffea4 -X github.com/dapr/dapr/pkg/version.version=edge -X github.com/dapr/kit/logger.DaprVersion=edge -s -w" -o ./dist/linux_amd64/release/sentry ./cmd/sentry/;
打包 docker 镜像:
$ make docker-build
Building docker.io/skyao/dapr:dev docker image ...
docker build --build-arg PKG_FILES=* -f ./docker/Dockerfile ./dist/linux_amd64/release -t docker.io/skyao/dapr:dev-linux-amd64
Sending build context to Docker daemon 236.2MB
......
Successfully tagged skyao/dapr:dev-linux-amd64
Successfully tagged skyao/daprd:dev-linux-amd64
Successfully tagged skyao/placement:dev-linux-amd64
Successfully tagged skyao/sentry:dev-linux-amd64
推送 docker 镜像
$ make docker-push
Building docker.io/skyao/dapr:dev docker image ...
docker build --build-arg PKG_FILES=* -f ./docker/Dockerfile ./dist/linux_amd64/release -t docker.io/skyao/dapr:dev-linux-amd64
.....
docker push docker.io/skyao/dapr:dev-linux-amd64
The push refers to repository [docker.io/skyao/dapr]
docker push docker.io/skyao/daprd:dev-linux-amd64
The push refers to repository [docker.io/skyao/daprd]
docker push docker.io/skyao/placement:dev-linux-amd64
The push refers to repository [docker.io/skyao/placement]
docker push docker.io/skyao/sentry:dev-linux-amd64
The push refers to repository [docker.io/skyao/sentry]
如果执行时遇到报错:
denied: requested access to the resource is denied
make: *** [docker/docker.mk:110: docker-push] Error 1
可以通过 docker login
命令先登录再 push。
部署e2e测试的应用
参考文档指示,一步一步来:
https://github.com/dapr/dapr/blob/master/tests/docs/running-e2e-test.md#option-2-step-by-step-guide
准备测试用的namespace
准备测试用的 namespace,在 dapr/dapr
仓库下执行:
$ make delete-test-namespace
$ make create-test-namespace
kubectl create namespace dapr-tests
namespace/dapr-tests created
初始化heml
$ make setup-helm-init
helm repo add bitnami https://charts.bitnami.com/bitnami
"bitnami" already exists with the same configuration, skipping
helm repo add stable https://charts.helm.sh/stable
"stable" already exists with the same configuration, skipping
helm repo add incubator https://charts.helm.sh/incubator
"incubator" already exists with the same configuration, skipping
helm repo update
Hang tight while we grab the latest from your chart repositories...
...Successfully got an update from the "incubator" chart repository
...Successfully got an update from the "stable" chart repository
...Successfully got an update from the "bitnami" chart repository
Update Complete. ⎈Happy Helming!⎈
准备测试会用到的redis和kafka
make setup-test-env-redis
make setup-test-env-kafka
部署 dapr 控制面
$ make docker-deploy-k8s
Deploying docker.io/skyao/dapr:dev to the current K8S context...
helm install \
dapr --namespace=dapr-system --wait --timeout 5m0s \
--set global.ha.enabled=false --set-string global.tag=dev-linux-amd64 \
--set-string global.registry=docker.io/skyao --set global.logAsJson=true \
--set global.daprControlPlaneOs=linux --set global.daprControlPlaneArch=amd64 \
--set dapr_placement.logLevel=debug --set dapr_sidecar_injector.sidecarImagePullPolicy=Always \
--set global.imagePullPolicy=Always --set global.imagePullSecrets= \
--set global.mtls.enabled=true \
--set dapr_placement.cluster.forceInMemoryLog=true ./charts/dapr
NAME: dapr
LAST DEPLOYED: Thu Mar 24 18:43:41 2022
NAMESPACE: dapr-system
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
Thank you for installing Dapr: High-performance, lightweight serverless runtime for cloud and edge
Your release is named dapr.
To get started with Dapr, we recommend using our quickstarts:
https://github.com/dapr/quickstarts
For more information on running Dapr, visit:
https://dapr.io
部署之后,检查一下pod 的情况
$ k get pods -A
NAMESPACE NAME READY STATUS RESTARTS AGE
dapr-system dapr-dashboard-7c99fc9c45-cgnb6 1/1 Running 0 101s
dapr-system dapr-operator-556dfb5987-xfnwt 1/1 Running 0 101s
dapr-system dapr-placement-server-0 1/1 Running 0 101s
dapr-system dapr-sentry-54cb9989d5-hq5m4 1/1 Running 0 101s
dapr-system dapr-sidecar-injector-58d99b46c7-vxn5r 1/1 Running 0 101s
dapr-tests dapr-kafka-0 1/1 Running 0 18m
dapr-tests dapr-kafka-zookeeper-0 1/1 Running 0 18m
dapr-tests dapr-redis-master-0 1/1 Running 0 18m
......
关闭 mtls:
make setup-disable-mtls
kubectl apply -f ./tests/config/dapr_mtls_off_config.yaml --namespace dapr-tests
configuration.dapr.io/daprsystem created
部署测试用的组件
$ make setup-test-components
kubectl apply -f ./tests/config/dapr_observability_test_config.yaml --namespace dapr-tests
configuration.dapr.io/disable-telemetry created
configuration.dapr.io/obs-defaultmetric created
kubectl apply -f ./tests/config/kubernetes_secret.yaml --namespace dapr-tests
secret/daprsecret created
secret/daprsecret2 created
secret/emptysecret created
kubectl apply -f ./tests/config/kubernetes_secret_config.yaml --namespace dapr-tests
configuration.dapr.io/secretappconfig created
kubectl apply -f ./tests/config/kubernetes_redis_secret.yaml --namespace dapr-tests
secret/redissecret created
kubectl apply -f ./tests/config/dapr_redis_state.yaml --namespace dapr-tests
component.dapr.io/statestore created
kubectl apply -f ./tests/config/dapr_mongodb_state.yaml --namespace dapr-tests
component.dapr.io/querystatestore created
kubectl apply -f ./tests/config/dapr_tests_cluster_role_binding.yaml --namespace dapr-tests
rolebinding.rbac.authorization.k8s.io/dapr-secret-reader created
role.rbac.authorization.k8s.io/secret-reader created
kubectl apply -f ./tests/config/dapr_redis_pubsub.yaml --namespace dapr-tests
component.dapr.io/messagebus created
kubectl apply -f ./tests/config/dapr_kafka_bindings.yaml --namespace dapr-tests
component.dapr.io/test-topic created
kubectl apply -f ./tests/config/dapr_kafka_bindings_custom_route.yaml --namespace dapr-tests
component.dapr.io/test-topic-custom-route created
kubectl apply -f ./tests/config/dapr_kafka_bindings_grpc.yaml --namespace dapr-tests
component.dapr.io/test-topic-grpc created
kubectl apply -f ./tests/config/app_topic_subscription_pubsub.yaml --namespace dapr-tests
subscription.dapr.io/c-topic-subscription created
kubectl apply -f ./tests/config/app_topic_subscription_pubsub_grpc.yaml --namespace dapr-tests
subscription.dapr.io/c-topic-subscription-grpc created
kubectl apply -f ./tests/config/kubernetes_allowlists_config.yaml --namespace dapr-tests
configuration.dapr.io/allowlistsappconfig created
kubectl apply -f ./tests/config/kubernetes_allowlists_grpc_config.yaml --namespace dapr-tests
configuration.dapr.io/allowlistsgrpcappconfig created
kubectl apply -f ./tests/config/dapr_redis_state_query.yaml --namespace dapr-tests
component.dapr.io/querystatestore2 created
kubectl apply -f ./tests/config/dapr_redis_state_badhost.yaml --namespace dapr-tests
component.dapr.io/badhost-store created
kubectl apply -f ./tests/config/dapr_redis_state_badpass.yaml --namespace dapr-tests
component.dapr.io/badpass-store created
kubectl apply -f ./tests/config/uppercase.yaml --namespace dapr-tests
component.dapr.io/uppercase created
kubectl apply -f ./tests/config/pipeline.yaml --namespace dapr-tests
configuration.dapr.io/pipeline created
kubectl apply -f ./tests/config/app_reentrant_actor.yaml --namespace dapr-tests
configuration.dapr.io/reentrantconfig created
kubectl apply -f ./tests/config/app_actor_type_metadata.yaml --namespace dapr-tests
configuration.dapr.io/actortypemetadata created
kubectl apply -f ./tests/config/app_topic_subscription_routing.yaml --namespace dapr-tests
subscription.dapr.io/pubsub-routing-crd-http-subscription created
kubectl apply -f ./tests/config/app_topic_subscription_routing_grpc.yaml --namespace dapr-tests
subscription.dapr.io/pubsub-routing-crd-grpc-subscription created
kubectl apply -f ./tests/config/app_pubsub_routing.yaml --namespace dapr-tests
configuration.dapr.io/pubsubroutingconfig created
# Show the installed components
kubectl get components --namespace dapr-tests
NAME AGE
badhost-store 23s
badpass-store 20s
messagebus 49s
querystatestore 56s
querystatestore2 27s
statestore 58s
test-topic 45s
test-topic-custom-route 42s
test-topic-grpc 39s
uppercase 18s
# Show the installed configurations
kubectl get configurations --namespace dapr-tests
NAME AGE
actortypemetadata 11s
allowlistsappconfig 33s
allowlistsgrpcappconfig 31s
daprsystem 93s
disable-telemetry 73s
obs-defaultmetric 72s
pipeline 16s
pubsubroutingconfig 3s
reentrantconfig 13s
secretappconfig 65s
部署测试用的应用
make build-e2e-app-all
make push-e2e-app-all
运行e2e测试
$ make test-e2e-all
# Note2: use env variable DAPR_E2E_TEST to pick one e2e test to run.
根据提示,如果只想单独执行某个测试案例,可以设置环境变量 DAPR_E2E_TEST:
$ export DAPR_E2E_TEST=service_invocation
$ make test-e2e-all
测试非常不稳定,待查。
特殊情况
m1 本地启动k8s
部署之后 redis pod 启动不起来,报错 ‘1 node(s) didn’t match Pod’s node affinity/selector.’,经检查是因为 redis pods 的设置中要求部署在 linux + amd64 下,m1 本地启动的 k8s 节点是 arm64,所以没有节点可以启动:
spec:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: kubernetes.io/os
operator: In
values:
- linux
- key: kubernetes.io/arch
operator: In
values:
- amd64
暂时作罢,这需要改动部署文件了。redis 有 arm64 的镜像:
https://hub.docker.com/r/arm64v8/redis/
试了一下在 m1 上执行 docker run --name some-redis -d arm64v8/redis
命令是可以正常在docker中启动 redis 的。
原则上修改pod一下内容应该就可以跑起来:
- key: kubernetes.io/arch
operator: In
values:
- amd64 # 修改为 arm64
image: docker.io/redislabs/rejson:latest # 修改为 image: docker.io/arm64v8/redis:latest
查了一下 helm 对 redis arm64 还没有提供支持,而且这个需求一年内被提出了n次,但还是唧唧歪歪的不支持。
所以,结论是:没法在 m1 macbook 上启动 k8s 并完成开发测试。
2.2 - 搭建e2e测试环境时的常见问题
mongodb 部署失败
部署测试用的 mongodb 失败,长时间挂住,然后超时报错:
$ make setup-test-env-mongodb
helm install dapr-mongodb bitnami/mongodb -f ./tests/config/mongodb_override.yaml --namespace dapr-tests --wait --timeout 5m0s
Error: INSTALLATION FAILED: timed out waiting for the condition
make: *** [tests/dapr_tests.mk:269: setup-test-env-mongodb] Error 1
pod 状态一直是 Pending:
$ k get pods -A
NAMESPACE NAME READY STATUS RESTARTS AGE
dapr-tests dapr-mongodb-77fdf49576-d9lx4 0/1 Pending 0 8m37s
pod的描述提示是 PersistentVolumeClaims 的问题:
$ k describe pods dapr-mongodb-77fdf49576-d9lx4 -n dapr-tests
Name: dapr-mongodb-77fdf49576-d9lx4
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Warning FailedScheduling 8m55s default-scheduler 0/3 nodes are available: 3 pod has unbound immediate PersistentVolumeClaims.
Warning FailedScheduling 6m49s (x1 over 7m49s) default-scheduler 0/3 nodes are available: 3 pod has unbound immediate PersistentVolumeClaims.
pvc 也是 pending 状态:
k get pvc -n dapr-tests
NAME STATUS VOLUME CAPACITY ACCESS MODES STORAGECLASS AGE
dapr-mongodb Pending 22m
详细状态:
$ k describe pvc dapr-mongodb -n dapr-tests
Name: dapr-mongodb
Namespace: dapr-tests
StorageClass:
Status: Pending
Volume:
Labels: app.kubernetes.io/component=mongodb
app.kubernetes.io/instance=dapr-mongodb
app.kubernetes.io/managed-by=Helm
app.kubernetes.io/name=mongodb
helm.sh/chart=mongodb-11.1.5
Annotations: meta.helm.sh/release-name: dapr-mongodb
meta.helm.sh/release-namespace: dapr-tests
Finalizers: [kubernetes.io/pvc-protection]
Capacity:
Access Modes:
VolumeMode: Filesystem
Used By: dapr-mongodb-77fdf49576-d9lx4
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal FailedBinding 3m34s (x82 over 23m) persistentvolume-controller no persistent volumes available for this claim and no storage class is set
“StorageClass” 为空,所以失败。
参考资料:
- https://netapp-trident.readthedocs.io/en/stable-v18.07/kubernetes/operations/tasks/storage-classes.html
- https://blog.zuolinux.com/2020/06/10/nfs-client-provisioner.html
sidecar 无法注入
一般是发生在重新安装 dapr 之后,主要原因是直接执行了 kubectl delete namespace dapr-tests
,而没有先执行 helm uninstall dapr -n dapr-tests
详细说明见:
https://github.com/dapr/dapr/issues/4612
daprd启动时 sentry 证书报错
daprd启动时报错退出,日志如下:
time="2022-12-04T01:35:44.198741493Z" level=info msg="sending workload csr request to sentry" app_id=service-a instance=service-a-8bbd5bf88-fxc6k scope=dapr.runtime.grpc.internal type=log ver=edge
time="2022-12-04T01:35:44.207675954Z" level=fatal msg="failed to start internal gRPC server: error from authenticator CreateSignedWorkloadCert: error from sentry SignCertificate: rpc error: code = Unknown desc = error validating requester identity: csr validation failed: token/id mismatch. received id: dapr-tests:default" app_id=service-a instance=service-a-8bbd5bf88-fxc6k scope=dapr.runtime type=log ver=edge
这是没有执行 make setup-disable-mtls
的原因。
3 - 性能测试
3.1 - 运行性能测试
准备工作
基本类似 e2e 测试。
设置环境变量
首先要设置相关的环境变量。
amd64
在 amd64 机器上:
export DAPR_REGISTRY=docker.io/skyao
export DAPR_TAG=dev
export DAPR_NAMESPACE=dapr-tests
export TARGET_OS=linux
export TARGET_ARCH=amd64
export GOOS=linux
export GOARCH=amd64
export DAPR_TEST_NAMESPACE=dapr-tests
export DAPR_TEST_REGISTRY=docker.io/skyao
export DAPR_TEST_TAG=dev-linux-amd64
export DAPR_TEST_MINIKUBE_IP=192.168.100.40 # use this in IDE
export MINIKUBE_NODE_IP=192.168.100.40 # use this in make command
构建并部署dapr到k8s中
$ make create-test-namespace
$ make build-linux
$ make docker-build
$ make docker-push
$ make docker-deploy-k8s
如果之前有过部署,则需要在重新部署前删除之前的部署,有两种情况:
-
只清除 dapr 的控制平面
$ helm uninstall dapr -n dapr-tests $ make docker-deploy-k8s
-
清除所有 dapr 内容
$ helm uninstall dapr -n dapr-tests $ make delete-test-namespace # 再重复上面的构建和部署过程
类似 e2e 测试,性能测试中本有一个额外的操作,实际是安装 redis / kafka / mongodb :
$ make setup-3rd-party
helm repo add bitnami https://charts.bitnami.com/bitnami
"bitnami" already exists with the same configuration, skipping
helm repo add stable https://charts.helm.sh/stable
"stable" already exists with the same configuration, skipping
helm repo add incubator https://charts.helm.sh/incubator
"incubator" already exists with the same configuration, skipping
helm repo update
Hang tight while we grab the latest from your chart repositories...
...Successfully got an update from the "incubator" chart repository
...Successfully got an update from the "bitnami" chart repository
...Successfully got an update from the "stable" chart repository
Update Complete. ⎈Happy Helming!⎈
helm install dapr-redis bitnami/redis --wait --timeout 5m0s --namespace dapr-tests -f ./tests/config/redis_override.yaml
......
helm install dapr-kafka bitnami/kafka -f ./tests/config/kafka_override.yaml --namespace dapr-tests --timeout 10m0s
......
helm install dapr-mongodb bitnami/mongodb -f ./tests/config/mongodb_override.yaml --namespace dapr-tests --wait --timeout 5m0s
对于性能测试,没有必要,可以视情况(是否要测试 actor 相关的功能)看是否要安装 redis:
make setup-test-env-redis
dapr相关的设置
关闭遥测:
$ make setup-app-configurations
kubectl apply -f ./tests/config/dapr_observability_test_config.yaml --namespace dapr-tests
configuration.dapr.io/disable-telemetry created
configuration.dapr.io/obs-defaultmetric created
关闭mtls:
$ make setup-disable-mtls
kubectl apply -f ./tests/config/dapr_mtls_off_config.yaml --namespace dapr-tests
configuration.dapr.io/daprsystem created
准备测试用的components:
# 切记不要用这个命令
$ make setup-test-components
这个命令会将 tests/config
下的yaml文件都安装到k8s下,有些多,而且部分component文件在没有配置好外部组件时会导致 daprd 启动失败。安全起见,如果只是跑个别性能测试的test case,手工安装需要的就好了
# 对于 actor 相关的性能测试案例,需要安装 redis 并开启 statestore-actos
# 由于redis 配置中使用到了 secret,因此需要安装 kubernetes_redis_secret
# make setup-test-env-redis
$ k apply -f ./tests/config/dapr_redis_state_actorstore.yaml -n dapr-tests
$ k apply -f ./tests/config/kubernetes_redis_secret.yaml -n dapr-tests
# 对于 pubsub 的测试,只需要开启 in-memory pubsub
$ k apply -f ./tests/config/dapr_in_memory_pubsub.yaml -n dapr-tests
# 对于 state 的测试,只需要开启 in-memory state
$ k apply -f ./tests/config/dapr_in_memory_state.yaml -n dapr-tests
准备测试用的应用
$ make build-perf-app-all
$ make push-perf-app-all
如果是在开发或者修改某一个测试案例,要节约时间,不需要构建和发布所有的测试案例。只单独构建和推送某一个性能测试的应用,可以直接调用 make target,如针对 service_invocation_http 这个 test case :
$ make build-perf-app-service_invocation_http
$ make push-perf-app-service_invocation_http
执行性能测试
测试前先安装一下 jq,后面会用到:
brew install jq
=======
## 运行性能测试
# 切记不要跑 test-perf-all,由于环境变量在各个perf test case中的设置要求不同,全部一起跑会有问题。
# 在本地(无论是IDE还是终端)不要跑 test-perf-all,只能单独跑某一个 perf test case
# make test-perf-all
make test-perf-xxxxx
# 特别注意,如果没有设置性能测试输入条件相关的环境变量,直接默认跑,有一些 perf test case 是可以跑起来的
make test-perf-state_get_http
make test-perf-state_get_grpc
make test-perf-service_invoke_http
make test-perf-service_invoke_grpc
make test-perf-pubsub_publish_grpc
# 有部分 perf test 是跑不起来的,会报错
make test-perf-actor_timer. # fortio报错,HTTP响应为 405 Method Not Allow,必须用 HTTP POST
跑的时候特别注意日志文件中的这些内容:
2022/03/25 18:02:05 Installing test apps...
2022/03/25 18:02:09 Adding app {testapp 0 map[] true perf-service_invocation_http:dev-linux-amd64 docker.io/skyao 1 true true 4.0 0.1 800Mi 2500Mi 4.0 0.1 512Mi 250Mi <nil> false}
2022/03/25 18:02:09 Adding app {tester 3001 map[] true perf-tester:dev-linux-amd64 docker.io/skyao 1 true true 4.0 0.1 800Mi 2500Mi 4.0 0.1 512Mi 250Mi <nil> false}
仔细检查 app 的镜像信息,包括 tag 要求是 “dev-linux-amd64”, image registry 要求是自己设定的类似 “docker.io/skyao”,否则 pod 会无法启动。
最好先 env | grep DAPR
看一下设置的环境变量是否有生效。
本地debug
perf test 的测试案例都是用 go test 编写,原则上只要前面步骤准备好,是可以在本地 IDE 中以 debug 方式启动 perf test 的测试案例,然后进行 debug 的。
特别注意:actor 相关的 test case 要设置好性能测试输入条件的环境变量
特殊情况
彻底清理namespace
还要清理以下在 default namespace 中保存的内容:
k delete clusterrole dapr-operator-admin -n default
k delete clusterrole dashboard-reader -n default
k delete clusterrolebindings.rbac.authorization.k8s.io dapr-operator -n default
k delete clusterrolebindings.rbac.authorization.k8s.io dapr-role-tokenreview-binding -n default
k delete clusterrolebindings.rbac.authorization.k8s.io dashboard-reader-global -n default
k delete role secret-reader -n default
k delete rolebinding dapr-secret-reader -n default
k delete mutatingwebhookconfiguration dapr-sidecar-injector -n default
否则,重新安装 dapr 控制面时,如果 dapr 控制面的 namespace 发生变化,就会出报错:
make docker-deploy-k8s
Deploying docker.io/skyao/dapr:dev to the current K8S context...
helm install \
dapr --namespace=dapr-tests --wait --timeout 5m0s \
--set global.ha.enabled=false --set-string global.tag=dev-linux-amd64 \
--set-string global.registry=docker.io/skyao --set global.logAsJson=true \
--set global.daprControlPlaneOs=linux --set global.daprControlPlaneArch=amd64 \
--set dapr_placement.logLevel=debug --set dapr_sidecar_injector.sidecarImagePullPolicy=Always \
--set global.imagePullPolicy=Always --set global.imagePullSecrets= \
--set global.mtls.enabled=true \
--set dapr_placement.cluster.forceInMemoryLog=true ./charts/dapr
Error: INSTALLATION FAILED: rendered manifests contain a resource that already exists. Unable to continue with install: RoleBinding "dapr-secret-reader" in namespace "default" exists and cannot be imported into the current release: invalid ownership metadata; annotation validation error: key "meta.helm.sh/release-namespace" must equal "dapr-tests": current value is "dapr-system"
make: *** [docker-deploy-k8s] Error 1
3.2 - 如何使用fortio实现性能测试
fortio介绍
Dapr 采用 fortio 作为性能测试工具。
Fortio 相关内容已经转移到单独的笔记中:Learning Fortio
tester 应用
tester 镜像的生成方法
tester 应用的镜像生成由三个镜像组成:
构建 tester go app 二进制文件的镜像
FROM golang:1.17 as build_env
ARG GOARCH_ARG=amd64
WORKDIR /app
COPY app.go go.mod ./
RUN go get -d -v && GOOS=linux GOARCH=$GOARCH_ARG go build -o tester .
这个 dockerfile 会将 dapr 仓库下 tests/apps/perf/tester
目录中的 app.go 和 go.mod 文件复制到镜像中,然后执行 go get 和 go build 命令将 go 代码打包为名为 tester 的二进制可执行文件。
最终的产出物是 /app/tester
这个二进制可执行文件。
构建 fortio 二进制文件的镜像
FROM golang:1.17 as fortio_build_env
ARG GOARCH_ARG=amd64
WORKDIR /fortio
ADD "https://api.github.com/repos/fortio/fortio/branches/master" skipcache
RUN git clone https://github.com/fortio/fortio.git
RUN cd fortio && git checkout v1.16.1 && GOOS=linux GOARCH=$GOARCH_ARG go build
这个镜像是构建 fortio v1.16.1 的代码。
最终的产出物是 /fortio/fortio/fortio
这个二进制可执行文件。
构建 buster-slim 的镜像
FROM debian:buster-slim
#RUN apt update
#RUN apt install wget -y
WORKDIR /
COPY --from=build_env /app/tester /
COPY --from=fortio_build_env /fortio/fortio/fortio /usr/local/bin
CMD ["/tester"]
这个就只是复制前两个镜像的产出物了,将 /app/tester
复制到根目录,将 fortio 复制到
/usr/local/bin
目录。
tester 应用的工作原理和实现代码
tests/apps/perf/tester/app.go
的核心代码如下:
main 函数
func main() {
http.HandleFunc("/", handler)
http.HandleFunc("/test", testHandler)
log.Fatal(http.ListenAndServe(":3001", nil))
}
main 函数启动 http server,监听 3001 端口,然后注册了两个路径和对应的 handler。
简单探活 handler
这个 handler 超级简单,什么都不做,只是返回 http 200 。
func main() {
http.HandleFunc("/", handler)
......
}
func handler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
}
测试handler
testHandler 执行性能测试:
func main() {
http.HandleFunc("/test", testHandler)
......
}
func testHandler(w http.ResponseWriter, r *http.Request) {
fmt.Println("test execution request received")
// 步骤1: 从请求中读取测试相关的配置参数,这些参数是从 test case 中发出的
var testParams TestParameters
b, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(500)
w.Write([]byte(fmt.Sprintf("error reading request body: %s", err)))
return
}
// 步骤2: 解析读取的测试相关的配置参数
err = json.Unmarshal(b, &testParams)
if err != nil {
w.WriteHeader(400)
w.Write([]byte(fmt.Sprintf("error parsing test params: %s", err)))
return
}
// 步骤3: 开始执行性能测试
fmt.Println("executing test")
results, err := runTest(testParams)
if err != nil {
w.WriteHeader(500)
w.Write([]byte(fmt.Sprintf("error encountered while running test: %s", err)))
return
}
// 步骤4: 返回性能测试的结果
fmt.Println("test finished")
w.Header().Add("Content-Type", "application/json")
w.Write(results)
}
真正的性能测试
真正的性能测试是通过 exec.Command 来执行命令行,通过调用 fortio 工具来进行的,也即是说,前面的 tester 应用除了用来启动 daprd 外,tester 自身只是配合走完性能测试的流程,真正的性能测试是由 fortio 进行。
// runTest accepts a set of test parameters, runs Fortio with the configured setting and returns
// the test results in json format.
func runTest(params TestParameters) ([]byte, error) {
var args []string
// 步骤1: 根据请求参数构建不同的 fortio 执行参数
if len(params.Payload) > 0 {
args = []string{
"load", "-json", "result.json", "-content-type", "application/json", "-qps", fmt.Sprint(params.QPS), "-c", fmt.Sprint(params.ClientConnections),
"-t", params.TestDuration, "-payload", params.Payload,
}
} else {
args = []string{
"load", "-json", "result.json", "-qps", fmt.Sprint(params.QPS), "-c", fmt.Sprint(params.ClientConnections),
"-t", params.TestDuration, "-payload-size", fmt.Sprint(params.PayloadSizeKB),
}
}
if params.StdClient {
args = append(args, "-stdclient")
}
args = append(args, params.TargetEndpoint)
fmt.Printf("running test with params: %s", args)
// 步骤2: 调用 fortio 执行性能测试
cmd := exec.Command("fortio", args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Run()
if err != nil {
return nil, err
}
// 步骤3: 返回性能测试执行结果
return os.ReadFile("result.json")
}
无负载的性能测试
对于 payload 大小为 0 的情况,执行的是如下的 fortio 命令:
fortio load -json result.json -qps ${QPS} -c ${ClientConnections} -t ${TestDuration} -payload-size ${PayloadSizeKB} ${TargetEndpoint}
疑问:payload 都为零了,为啥了还要设置 -payload-size ?
翻了一下相关的日志,找到对应的日志内容为:
fmt.Printf("running test with params: %s", args)
running test with params: [load -json result.json -qps 1 -c 1 -t 1m -payload-size 0 http://testapp:3000/test]
带负载的性能测试
对于 payload 大小不为 0 的情况,执行的是如下的 fortio 命令:
fortio load -json result.json -content-type application/json -qps ${QPS} -c ${ClientConnections} -t ${TestDuration} -payload ${Payload} ${TargetEndpoint}
全流程参数传递
在 perf test 的测试过程中,参数传递比较复杂(或者说比较绕),期间涉及到 perf test case 如何
title parameters transfer in load test
hide footbox
skinparam style strictuml
actor test_case as "Test Case"
participant tester as "Tester App"
box "Fortio" #LightBlue
participant fortio_load as "fortioLoad"
participant fgrpc
participant dapr as "daprResults"
end box
box "daprd"
participant grpc_api as "gRPC API"
end box
test_case -> tester : HTTP Post
note left: TestParameters
tester -> fortio_load : exec
note left: fortio load ... -grpc -dapr k1=v1,k2=v2
fortio_load -> fgrpc : RunGRPCTest()
note left: -grpc -dapr k1=v1,k2=v2
fgrpc -> dapr : RunTest()
note left: -dapr k1=v1,k2=v2
dapr -> grpc_api
dapr <-- grpc_api
fgrpc <-- dapr
fortio_load <-- fgrpc
tester <-- fortio_load
test_case <-- tester
步骤1:perf test case 获取测试参数
perf test case 在启动时,会从环境变量中获取测试参数
- DAPR_PERF_QPS: 默认1
- DAPR_PERF_CONNECTIONS: 默认1
- DAPR_TEST_DURATION: 默认 “1m”,即1分钟
- DAPR_PAYLOAD_SIZE: 默认0
- DAPR_PAYLOAD: 默认为空(字符串"")
步骤2:perf test case打包测试参数发送给tester app
perf test case 会发送一个 HTTP 请求到 tester app,其内容如下:
-
URL: http://testerAppURL/test
-
Method: POST
-
Body: 将 TestParameters 结构体系列化为 json
TestParameters 结构体的字段如下所示:
type TestParameters struct {
QPS int `json:"qps"`
ClientConnections int `json:"clientConnections"`
TargetEndpoint string `json:"targetEndpoint"`
TestDuration string `json:"testDuration"`
PayloadSizeKB int `json:"payloadSizeKB"`
Payload string `json:"payload"`
StdClient bool `json:"stdClient"`
}
在支持 gRPC + dapr 时,由于没有合适的参数可以使用,因此增加了 grpc 和 dapr 两个字段:
type TestParameters struct {
......
Grpc bool `json:"grpc"`
Dapr string `json:"dapr"`
}
步骤3:tester app解析测试参数,传递给fortio
在 tester app (tests/apps/perf/tester/app.go
) 的 testHandler() 方法中,会对传入http body进行读取和json解析,然后将 TestParameters 转为 fortio 的参数:
func testHandler(w http.ResponseWriter, r *http.Request) {
b, err := io.ReadAll(r.Body)
err = json.Unmarshal(b, &testParams)
results, err := runTest(testParams)
......
}
func runTest(params TestParameters) ([]byte, error) {
args := buildFortioArgs(params)
......
}
TestParameters 字段和 fortio 参数的对应关系:
语义 | 环境变量 | TestParameters 字段 | fortio 参数 |
---|---|---|---|
load | |||
-json result.json |
|||
-content-type application/json |
|||
QPS | DAPR_PERF_QPS | QPS | -qps |
客户端连接数 | DAPR_PERF_CONNECTIONS | ClientConnections | -c |
测试时长 | DAPR_TEST_DURATION | TestDuration | -t |
负载 | DAPR_PAYLOAD | Payload | -payload |
负载大小 | DAPR_PAYLOAD_SIZE | PayloadSizeKB | -payload-size |
StdClient | -stdclient |
||
是否是grpc测试 | Grpc | -grpc |
|
Dapr测试参数 | Dapr | -dapr |
步骤4:fortio解析 dapr flag,在发起的dapr 请求中使用
在 fortio 的执行中, load
子命令会有 fortioLoad
方法负责,检查发现有 -grpc
flag,则会转到 fgrpc.RunGRPCTest()
方法。
fortio 的 grpc 支持默认只有自带的 ping 和标准的 health,为了支持 dapr ,我们扩展了 fgrpc.RunGRPCTest()
方法。
-dapr
flag 传递的参数会被透传到 dapr 的扩展代码中, 然后解析为下面的结构:
type DaprRequestParameters struct {
capability string
target string
method string
appId string
store string
extensions map[string]string
}
这些参数将在后面 fortio 扩展代码中进行 dapr 调用时被使用到。
3.3 - 性能测试案例 service invoke http 的实现
运行性能测试
打开 dapr/dapr 仓库下的 .github/workflows/dapr-perf.yml
文件,找到 service_invocation_http 的性能测试输入条件:
- name: Run Perf test service_invocation_http
if: env.TEST_PREFIX != ''
env:
DAPR_PERF_QPS: 1000
DAPR_PERF_CONNECTIONS: 16
DAPR_TEST_DURATION: 1m
DAPR_PAYLOAD_SIZE: 1024
run: make test-perf-service_invocation_http
# service_invocation_http
export DAPR_PERF_QPS=1000
export DAPR_PERF_CONNECTIONS=16
export DAPR_TEST_DURATION=1m
export DAPR_PAYLOAD_SIZE=1024
unset DAPR_PAYLOAD
make test-perf-service_invocation_http
主流程详细分析
性能测试的主流程为:
func TestMain(m *testing.M) {
// 步骤1: 准备两个app:作为服务器端的 testapp 和作为客户端的 tester
testApps := []kube.AppDescription{
{
AppName: "testapp",
},
{
AppName: "tester",
}
tr = runner.NewTestRunner("serviceinvocationhttp", testApps, nil, nil)
os.Exit(tr.Start(m))
}
func TestServiceInvocationHTTPPerformance(t *testing.T) {
// 步骤4: 执行测试案例
......
}
// Start is the entry point of Dapr test runner.
func (tr *TestRunner) Start(m runnable) int {
// 步骤2: 启动测试平台
err := tr.Platform.setup()
// 可选步骤2.5: 安装组件,这个测试案例中没有
if tr.components != nil && len(tr.components) > 0 {
log.Println("Installing components...")
if err := tr.Platform.addComponents(tr.components); err != nil {
fmt.Fprintf(os.Stderr, "Failed Platform.addComponents(), %s", err.Error())
return runnerFailExitCode
}
}
// 可选步骤2.75: 安装初始化应用,这个测试案例中没有
if tr.initApps != nil && len(tr.initApps) > 0 {
log.Println("Installing init apps...")
if err := tr.Platform.addApps(tr.initApps); err != nil {
fmt.Fprintf(os.Stderr, "Failed Platform.addInitApps(), %s", err.Error())
return runnerFailExitCode
}
}
// 步骤3: 安装测试应用,这个测试案例中是前面步骤1中准备的作为服务器端的 testapp 和作为客户端的 tester
if tr.testApps != nil && len(tr.testApps) > 0 {
log.Println("Installing test apps...")
if err := tr.Platform.addApps(tr.testApps); err != nil {
fmt.Fprintf(os.Stderr, "Failed Platform.addApps(), %s", err.Error())
return runnerFailExitCode
}
}
// 步骤4: 执行测试案例
return m.Run()
}
func (tr *TestRunner) tearDown() {
// 步骤5: 执行完成后的 tearDown
// 具体为删除前面步骤3中安装的测试应用(作为服务器端的 testapp 和作为客户端的 tester)
// 如果用 ctrl + c 等方式强行中断 testcase 的执行,就会导致 teardown 没有执行,
// testapp/tester 两个应用就不会从k8s中删除
tr.Platform.tearDown()
}
测试应用准备
对应到上面主流程中的 “步骤1: 准备两个app”, 这里需要准备作为服务器端的 testapp 和作为客户端的 tester:
testApps := []kube.AppDescription{
{
AppName: "testapp", // 作为服务器端的 testapp
DaprEnabled: true,
ImageName: "perf-service_invocation_http",
IngressEnabled: true,
......
},
{
AppName: "tester", // 作为客户端的 tester
DaprEnabled: true,
ImageName: "perf-tester",
IngressEnabled: true,
AppPort: 3001,
......
},
}
特别注意:IngressEnabled: true
的设置。
测试应用安装的流程
对应到上面主流程中的 “步骤3: 安装测试应用”。
对应代码在 tests/runner/kube_testplatform.go
中的 addApps() 方法:
// addApps adds test apps to disposable App Resource queues.
func (c *KubeTestPlatform) addApps(apps []kube.AppDescription) error {
for _, app := range apps {
log.Printf("Adding app %v", app)
c.AppResources.Add(kube.NewAppManager(c.KubeClient, getNamespaceOrDefault(app.Namespace), app))
}
// installApps installs the apps in AppResource queue sequentially
log.Printf("Installing apps ...")
if err := c.AppResources.setup(); err != nil {
return err
}
log.Printf("Apps are installed.")
}
添加 app 这里对应的日志为:
2022/03/27 11:48:39 Adding app {testapp 0 map[] true perf-service_invocation_http:dev-linux-amd64 docker.io/skyao 1 true true 4.0 0.1 800Mi 2500Mi 4.0 0.1 512Mi 250Mi <nil> false}
2022/03/27 11:48:39 Adding app {tester 3001 map[] true perf-tester:dev-linux-amd64 docker.io/skyao 1 true true 4.0 0.1 800Mi 2500Mi 4.0 0.1 512Mi 250Mi <nil> false}
遇到问题时建议特别小心的检查这行日志,确认各个参数(如 image 的 name, tag,registry)等是否OK。
安装测试应用
详细看安装测试应用的代码和日志:
log.Printf("Deploying app %v ...", m.app.AppName)
// Deploy app and wait until deployment is done
if _, err := m.Deploy(); err != nil {
return err
}
// Wait until app is deployed completely
if _, err := m.WaitUntilDeploymentState(m.IsDeploymentDone); err != nil {
return err
}
if m.logPrefix != "" {
if err := m.StreamContainerLogs(); err != nil {
log.Printf("Failed to retrieve container logs for %s. Error was: %s", m.app.AppName, err)
}
}
log.Printf("App %v has been deployed.", m.app.AppName)
对应日志为:
2022/03/27 11:48:41 Deploying app testapp ...
2022/03/27 11:48:48 App testapp has been deployed.
2022/03/27 11:48:50 Deploying app tester ...
2022/03/27 11:48:57 App tester has been deployed.
从日志上看,在镜像本地已经有缓存的情况下,启动时间也就大概7秒左右。
// PollInterval is how frequently e2e tests will poll for updates.
PollInterval = 1 * time.Second
// PollTimeout is how long e2e tests will wait for resource updates when polling.
PollTimeout = 10 * time.Minute
// WaitUntilDeploymentState waits until isState returns true.
func (m *AppManager) WaitUntilDeploymentState(isState func(*appsv1.Deployment, error) bool) (*appsv1.Deployment, error) {
deploymentsClient := m.client.Deployments(m.namespace)
var lastDeployment *appsv1.Deployment
waitErr := wait.PollImmediate(PollInterval, PollTimeout, func() (bool, error) {
var err error
lastDeployment, err = deploymentsClient.Get(context.TODO(), m.app.AppName, metav1.GetOptions{})
done := isState(lastDeployment, err)
if !done && err != nil {
return true, err
}
return done, nil
})
if waitErr != nil {
// get deployment's Pods detail status info
......
}
return lastDeployment, nil
}
从代码实现看,等待应用部署的时间长达10分钟,所以正常情况下还是能等到应用启动完成的,除非应用的部署出问题了,比如镜像信息不对导致无法下载镜像。
检查 sidecar
检查 sidecar 是否启动成功的代码:
// maxSideCarDetectionRetries is the maximum number of retries to detect Dapr sidecar.
maxSideCarDetectionRetries = 3
log.Printf("Validating sidecar for app %v ....", m.app.AppName)
for i := 0; i <= maxSideCarDetectionRetries; i++ {
// Validate daprd side car is injected
if err := m.ValidateSidecar(); err != nil {
if i == maxSideCarDetectionRetries {
return err
}
log.Printf("Did not find sidecar for app %v error %s, retrying ....", m.app.AppName, err)
time.Sleep(10 * time.Second)
continue
}
break
}
log.Printf("Sidecar for app %v has been validated.", m.app.AppName)
对应的日志为:
2022/03/27 11:48:48 Validating sidecar for app testapp ....
2022/03/27 11:48:48 Streaming Kubernetes logs to ./container_logs/testapp-85d8d9db89-llmrr.daprd.log
2022/03/27 11:48:48 Streaming Kubernetes logs to ./container_logs/testapp-85d8d9db89-llmrr.testapp.log
2022/03/27 11:48:49 Sidecar for app testapp has been validated.
2022/03/27 11:48:57 Validating sidecar for app tester ....
2022/03/27 11:48:57 Streaming Kubernetes logs to ./container_logs/tester-7944b6bb68-wzfj2.tester.log
2022/03/27 11:48:57 Streaming Kubernetes logs to ./container_logs/tester-7944b6bb68-wzfj2.daprd.log
2022/03/27 11:48:58 Sidecar for app tester has been validated.
默认重试3次,每次间隔时间为 10 秒,所以执行4次 (1次 + 3次重试)总共40 秒之后,如果 sidecar 还没能启动起来,就会报错。
由于 sidecar 几乎是和应用同时启动,所以在应用启动完成后在检查 sidecar 通常会很快完成,由于应用自身启动时间就高达7秒。
创建 ingress
创建 ingress 的代码实现:
// Create Ingress endpoint
log.Printf("Creating ingress for app %v ....", m.app.AppName)
if _, err := m.CreateIngressService(); err != nil {
return err
}
log.Printf("Ingress for app %v has been created.", m.app.AppName)
从日志上看,创建 ingress 的速度很快,不到1秒:
2022/03/27 10:41:09 Creating ingress for app testapp ....
2022/03/27 10:41:10 Ingress for app testapp has been created.
2022/03/27 10:41:19 Creating ingress for app tester ....
2022/03/27 10:41:19 Ingress for app tester has been created.
但特别注意:这里的所谓created,应该只是将命令发给了k8s,也就是这里是异步返回。并不是 ingress 立即可用。
创建端口转发
创建 ingress 的代码实现:
log.Printf("Creating pod port forwarder for app %v ....", m.app.AppName)
m.forwarder = NewPodPortForwarder(m.client, m.namespace)
log.Printf("Pod port forwarder for app %v has been created.", m.app.AppName)
从日志上看,创建端口转发的速度也非常快,不到1秒:
2022/03/27 10:41:10 Creating pod port forwarder for app testapp ....
2022/03/27 10:41:10 Pod port forwarder for app testapp has been created.
2022/03/27 10:41:19 Creating pod port forwarder for app tester ....
2022/03/27 10:41:19 Pod port forwarder for app tester has been created.
但特别注意:这里的所谓created,应该只是将命令发给了k8s,也就是这里是异步返回。并不是端口转发立即可用。
完整的日志分析
下面是一个完整的日志,安装 testapp 和 tester 两个应用,耗时 19秒:
2022/03/27 11:48:39 Running setup...
2022/03/27 11:48:39 Installing test apps...
2022/03/27 11:48:39 Adding app {testapp 0 map[] true perf-service_invocation_http:dev-linux-amd64 docker.io/skyao 1 true true 4.0 0.1 800Mi 2500Mi 4.0 0.1 512Mi 250Mi <nil> false}
2022/03/27 11:48:39 Adding app {tester 3001 map[] true perf-tester:dev-linux-amd64 docker.io/skyao 1 true true 4.0 0.1 800Mi 2500Mi 4.0 0.1 512Mi 250Mi <nil> false}
2022/03/27 11:48:39 Installing apps ...
2022/03/27 11:48:41 Deploying app testapp ...
2022/03/27 11:48:48 App testapp has been deployed.
2022/03/27 11:48:48 Validating sidecar for app testapp ....
2022/03/27 11:48:48 Streaming Kubernetes logs to ./container_logs/testapp-85d8d9db89-llmrr.daprd.log
2022/03/27 11:48:48 Streaming Kubernetes logs to ./container_logs/testapp-85d8d9db89-llmrr.testapp.log
2022/03/27 11:48:49 Sidecar for app testapp has been validated.
2022/03/27 11:48:49 Creating ingress for app testapp ....
2022/03/27 11:48:49 Ingress for app testapp has been created.
2022/03/27 11:48:49 Creating pod port forwarder for app testapp ....
2022/03/27 11:48:49 Pod port forwarder for app testapp has been created.
2022/03/27 11:48:50 Deploying app tester ...
2022/03/27 11:48:57 App tester has been deployed.
2022/03/27 11:48:57 Validating sidecar for app tester ....
2022/03/27 11:48:57 Streaming Kubernetes logs to ./container_logs/tester-7944b6bb68-wzfj2.tester.log
2022/03/27 11:48:57 Streaming Kubernetes logs to ./container_logs/tester-7944b6bb68-wzfj2.daprd.log
2022/03/27 11:48:58 Sidecar for app tester has been validated.
2022/03/27 11:48:58 Creating ingress for app tester ....
2022/03/27 11:48:58 Ingress for app tester has been created.
2022/03/27 11:48:58 Creating pod port forwarder for app tester ....
2022/03/27 11:48:58 Pod port forwarder for app tester has been created.
2022/03/27 11:48:58 Apps are installed.
2022/03/27 11:48:58 Running tests...
这19秒总时长中比较耗时的操作主要是:
- Installing apps: 2 秒
- 部署 testapp: 7秒
- 部署 tester: 7秒
- 验证sidecar/创建ingress和端口转发:1秒 (两个app x 2)
等待测试应用就绪的流程
等待流程就绪
由于存在一个安装测试应用的流程, 而 k8s 部署/启动完成测试应用是需要一段时间的,因此,就需要一个机制能等待并检测到测试应用是否安装完成,这样才可以开启真正的测试即开始执行 testcase。
以 TestServiceInvocationHTTPPerformance 为例,删除测试执行的细节代码:
const numHealthChecks = 60 // Number of times to check for endpoint health per app.
func TestServiceInvocationHTTPPerformance(t *testing.T) {
p := perf.Params()
t.Logf("running service invocation http test with params: qps=%v, connections=%v, duration=%s, payload size=%v, payload=%v", p.QPS, p.ClientConnections, p.TestDuration, p.PayloadSizeKB, p.Payload) // line 79
// Get the ingress external url of test app
testAppURL := tr.Platform.AcquireAppExternalURL("testapp")
require.NotEmpty(t, testAppURL, "test app external URL must not be empty")
// Check if test app endpoint is available
t.Logf("test app url: %s", testAppURL+"/test") // line 86
_, err := utils.HTTPGetNTimes(testAppURL+"/test", numHealthChecks) // 在这里等待测试应用 testapp 就绪!
require.NoError(t, err)
// Get the ingress external url of tester app
testerAppURL := tr.Platform.AcquireAppExternalURL("tester")
require.NotEmpty(t, testerAppURL, "tester app external URL must not be empty")
// Check if tester app endpoint is available
t.Logf("teter app url: %s", testerAppURL) // line 95 // 在这里等待测试应用 tester 就绪!
_, err = utils.HTTPGetNTimes(testerAppURL, numHealthChecks)
require.NoError(t, err)
// Perform baseline test
......
// Perform dapr test
......
}
从执行日志中分别提取对应上面三处的日志内容:
service_invocation_http_test.go:79: running service invocation http test with params: qps=1, connections=1, duration=1m, payload size=0, payload=
2022/03/27 11:48:58 Waiting until service ingress is ready for testapp...
2022/03/27 11:49:03 Service ingress for testapp is ready...
service_invocation_http_test.go:86: test app url: 20.84.11.6:3000/test
2022/03/27 11:49:04 Waiting until service ingress is ready for tester...
2022/03/27 11:49:33 Service ingress for tester is ready...
service_invocation_http_test.go:95: teter app url: 20.85.250.98:3000
这两处就是在等待测试应用 testapp 和 tester 启动完成。检查的方式就是访问这两个应用的 public url (也就是 health check的地址),如果能访问(可连接,返回 htltp 200)则说明应用启动完成。如果失败,则继续等待。
这里面实际是有两个等待:
- 等待测试应用的 ingress 就绪
- 等待测试应用自身就绪
等待测试应用的 ingress 就绪
因为 pod ip 不能直接在 k8s 下访问,因此需要通过 ingress 和 端口转发。前面安装测试应用的流程中作了 ingress 的创建和端口转发的创建,但生效是需要时间的。在 AcquireAppExternalURL() 方法中会等待 ingress 就绪:
testAppURL := tr.Platform.AcquireAppExternalURL("testapp")
testerAppURL := tr.Platform.AcquireAppExternalURL("tester")
k8s下,实现的代码在 tests/runner/kube_testplatform.go
中:
// AcquireAppExternalURL returns the external url for 'name'.
func (c *KubeTestPlatform) AcquireAppExternalURL(name string) string {
app := c.AppResources.FindActiveResource(name)
return app.(*kube.AppManager).AcquireExternalURL()
}
// AcquireExternalURL gets external ingress endpoint from service when it is ready.
func (m *AppManager) AcquireExternalURL() string {
log.Printf("Waiting until service ingress is ready for %s...\n", m.app.AppName)
svc, err := m.WaitUntilServiceState(m.IsServiceIngressReady) // 等待直到 ingress reday
if err != nil {
return ""
}
log.Printf("Service ingress for %s is ready...\n", m.app.AppName)
return m.AcquireExternalURLFromService(svc)
}
具体的等待实现在 WaitUntilServiceState() 方法中:
// PollInterval is how frequently e2e tests will poll for updates.
PollInterval = 1 * time.Second
// PollTimeout is how long e2e tests will wait for resource updates when polling.
PollTimeout = 10 * time.Minute
// WaitUntilServiceState waits until isState returns true.
func (m *AppManager) WaitUntilServiceState(isState func(*apiv1.Service, error) bool) (*apiv1.Service, error) {
serviceClient := m.client.Services(m.namespace)
var lastService *apiv1.Service
waitErr := wait.PollImmediate(PollInterval, PollTimeout, func() (bool, error) {
var err error
lastService, err = serviceClient.Get(context.TODO(), m.app.AppName, metav1.GetOptions{})
done := isState(lastService, err)
if !done && err != nil {
return true, err
}
return done, nil
})
......
return lastService, nil
}
每隔1秒,时长 10 分钟,这个时间长度有点离谱。之前遇到过 ingress 无法访问的案例,就在这里等待长达10 分钟。
但如果能正常工作,只是启动速度慢,那这里的10分钟怎么也够 ingress 生效的。
等待测试应用自身就绪
实现代码如下:
const numHealthChecks = 60 // Number of times to check for endpoint health per app.
// HTTPGetNTimes calls the url n times and returns the first success or last error.
func HTTPGetNTimes(url string, n int) ([]byte, error) {
var res []byte
var err error
for i := n - 1; i >= 0; i-- {
res, err = HTTPGet(url)
if i == 0 {
break
}
if err != nil {
time.Sleep(time.Second)
} else {
return res, nil
}
}
return res, err
}
每秒检查一次,重试 60 次。60 秒之后只要启动成功就可以。
备注:这里的检查顺序,先检查 testapp,再检查 tester app,所以如果两者都启动的慢的话,顺序偏后的 tester app 有更多的启动时间。
bug:总是卡在等待测试应用就绪上
测试中发现,测试案例总是卡在等待测试应用就绪上,但奇怪的是,从打印日志上看,上面的两个等待过程中,第一个等待 ingress 达到 ready 状态总是通过,然后等待测试应用自身就绪(也就是通过 http://52.226.222.31:3000/test 地址访问)就总是不能成功:
=== Failed
=== FAIL: tests/perf/service_invocation_http TestServiceInvocationHTTPPerformance (248.07s)
service_invocation_http_test.go:79: running service invocation http test with params: qps=1, connections=1, duration=1m, payload size=0, payload=
2022/03/25 21:49:17 Waiting until service ingress is ready for testapp...
2022/03/25 21:49:20 Service ingress for testapp is ready...
service_invocation_http_test.go:86: test app url: 52.226.222.31:3000/test
service_invocation_http_test.go:88:
Error Trace: service_invocation_http_test.go:88
Error: Received unexpected error:
Get "http://52.226.222.31:3000/test": EOF
Test: TestServiceInvocationHTTPPerformance
翻了一下 tests/apps/perf/service_invocation_http/app.go
的代码,这是服务器端appt的实现,超级简单:
func handler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
}
func main() {
http.HandleFunc("/test", handler)
log.Fatal(http.ListenAndServe(":3000", nil))
}
从 k get pods -n dapr-tests
命令的输出看,testapp 的状态很早就是 running了。这个简单的应用不存在 60 秒还启动不起来的情况。但奇怪的是,有时这个问题又不存在,能正常的访问。而且一旦正常就会一直都正常,一旦不正常就一直不正常。
经过反复测试排查发现:在使用 azure 部署 k8s 时,pod 的外部访问地址,必须在连接公司 VPN (GlobalProtect)时才能正常访问,如果 VPN 未能开启,则会报错 Empty reply from server ,一直卡在这里直到 60 秒超时。
# 断开vpn
$ curl -i 20.81.110.42:3000/test
curl: (52) Empty reply from server
# 连接vpn
$ curl -i 20.81.110.42:3000/test
HTTP/1.1 200 OK
Date: Sun, 27 Mar 2022 07:10:02 GMT
Content-Length: 0
# 再次断开vpn
$ curl -i 20.81.110.42:3000/test
curl: (52) Empty reply from server
# 再次连接vpn
$ curl -i 20.81.110.42:3000/test
HTTP/1.1 200 OK
Date: Sun, 27 Mar 2022 07:12:05 GMT
Content-Length: 0
排除这个问题之后,性能测试的案例就可以正常
备注:我是在我本地机器 macbook 上跑测试案例的,案例和 azure 上的k8s集群通讯一直正常,但是就是有时可以访问 app 的external url,有时不能。没想到是这个原因
3.4 - 性能测试案例 state in-momery 的实现
测试逻辑
完整的端到端的 state 压力测试应该是这样的,以 redis 为例:
title state load test with redis store
hide footbox
skinparam style strictuml
actor test_case as "Test Case"
participant tester as "Tester"
participant fortio as "Fortio"
box "daprd" #LightBlue
participant grpc_api as "gRPC API"
participant redis_component as "Redis State Component"
end box
database redis_server as "Redis server"
test_case -> tester : baseline test
note left: baseline test
tester -> fortio : exec
fortio -> redis_server: redis native protocol
fortio <-- redis_server
tester <-- fortio
test_case <-- tester
|||
|||
|||
test_case -> tester : dapr test
note left: dapr test
tester -> fortio : exec
fortio -> grpc_api : gRPC
grpc_api -> redis_component
redis_component -> redis_server: redis native protocol
redis_component <-- redis_server
grpc_api <-- redis_component
fortio <-- grpc_api
tester <-- fortio
test_case <-- tester
但目前 dapr 仓库中的 perf test 的测试目标都是 dapr runtime,也就是不包括 dapr sdk 和 dapr components,因此,需要一个 in-memory 的 state 组件来进行压力测试以排除 redis 等外部组件的干扰,流程大体是这样:
title state load test with in-memory store
hide footbox
skinparam style strictuml
actor test_case as "Test Case"
participant tester as "Tester"
participant fortio as "Fortio"
box "daprd" #LightBlue
participant grpc_api as "gRPC API"
participant in_momory_component as "In-Memory Component"
end box
test_case -> tester : dapr test
note left: dapr test
tester -> fortio : exec
fortio -> grpc_api : gRPC
grpc_api -> in_momory_component
in_momory_component -> in_momory_component: in-memory operations
grpc_api <-- in_momory_component
fortio <-- grpc_api
tester <-- fortio
test_case <-- tester
in-memory 的 state 组件的性能消耗可以视为0,因此将访问 redis 的远程开销在 baseline test 和 dapr test 中同时去除之后,得到的新的 baseline test 和 dapr test 如下图:
title state load test with in-memory store
hide footbox
skinparam style strictuml
actor test_case as "Test Case"
participant tester as "Tester"
participant fortio as "Fortio"
box "daprd" #LightBlue
participant grpc_api as "gRPC API"
participant in_momory_component as "In-Memory Component"
end box
test_case -> tester : baseline test
note left: baseline test
tester -> fortio : exec
fortio -> fortio : do nothing
tester <-- fortio
test_case <-- tester
|||
|||
|||
test_case -> tester : dapr test
note left: dapr test
tester -> fortio : exec
fortio -> grpc_api : gRPC
grpc_api -> in_momory_component
in_momory_component -> in_momory_component: in-memory operations
grpc_api <-- in_momory_component
fortio <-- grpc_api
tester <-- fortio
test_case <-- tester
fortio命令
对于 baseline test 中的 no-op,fortio 的命令为:
./fortio load -json result.json -qps 1 -c 1 -t 1m -payload-size 0 -grpc -dapr capability=state,target=noop http://localhost:50001/
对于 dapr test 中的 state get 请求,fortio 的命令为:
./fortio load -json result.json -qps 1 -c 1 -t 1m -payload-size 0 -grpc --dapr capability=state,target=dapr,method=get,store=inmemorystate,key=abc123 http://127.0.0.1:50001
perf test case 的请求
3.5 - 性能测试案例 pubsub in-momery 的实现
测试逻辑
和 state in-momery 类似。
fortio命令
对于 baseline test 中的 no-op,fortio 的命令为:
./fortio load -json result.json -qps 1 -c 1 -t 1m -payload-size 0 -grpc -dapr capability=pubsub,target=noop http://localhost:50001/
对于 dapr test 中的 state get 请求,fortio 的命令为:
./fortio load -json result.json -qps 1 -c 1 -t 1m -payload-size 100 -grpc --dapr capability=pubsub,target=dapr,method=publish,store=inmemorypubsub,topic=topic123,contenttype=text/plain http://127.0.0.1:50001
perf test case 的请求
4 - 认证测试
4.1 - 认证测试概述
资料
- Certification lifecycle | Dapr Docs: dapr文档中有关认证测试的说明
- 认证生命周期 | Dapr 文档库: 上文的中文翻译
4.2 - pubsub
4.2.1 - kafka pubsub
4.2.1.1 - kafka集群配置
test plan
按照 README.md 文件中 test plan 描述的:
### Basic tests
* Bring up a 3-node Kafka cluster
* Configured to have 10+ partitions per topic
kafka集群将有三个节点,每个 topic 配置有10+的分区
TBD: 配置分区在哪里实现的?
docker-compose.yml
在项目根目录下有 docker-compose.yml 文件,定义了测试中需要使用到的 kafka 集群的配置,主要有:
- zookeeper: 端口 “2181:2181”
- kafka1:端口 “19094:19094” / “19093:19093” / “19092:19092”
- Kafka2:端口 “29094:29094” / “29093:29093” / “29092:29092”
- kafka3:端口 “39094:39094” / “39093:39093” / “39092:39092”
- hydra:端口 “4443:4443” / “4444:4444”
- hydra-config
以及三个volumes:
- kafka1-data
- kafka2-data
- kafka3-data
kafka证书文件
在 kafka1 / kafka2 / kafka3 三个kafka 容器的配置中, 都有一段相同的证书文件配置和类似的自定义配置文件:
volumes:
- type: bind
source: ./strimzi-ca-certs
target: /opt/kafka/cluster-ca-certs
read_only: true
- type: bind
source: ./strimzi-broker-certs
target: /opt/kafka/broker-certs
read_only: true
- type: bind
source: ./strimzi-client-ca
target: /opt/kafka/client-ca-certs
read_only: true
- type: bind
source: ./strimzi-listener-certs
target: /opt/kafka/certificates/custom-mtls-9094-certs
read_only: true
- type: bind
source: ./strimzi-listener-certs
target: /opt/kafka/certificates/custom-oauth-9093-certs
read_only: true
- type: bind
source: ./strimzi-listener-certs
target: /opt/kafka/certificates/oauth-oauth-9093-certs
read_only: true
- type: bind
source: ./strimzi-kafka1-config
target: /opt/kafka/custom-config
read_only: true
这些证书对应的项目文件为:
4.2.1.2 - kafka组件配置
组件配置文件
kafka pubsub 组件认证测试中使用到三个kafka pubsub 组件:
- consumer1
- mtls-consumer
- oauth-consumer
共性配置
三个 component 的 name 都设定为 messagebus:
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: messagebus
spec:
type: pubsub.kafka
version: v1
......
metadata:
- name: initialOffset
value: oldest
- name: backOffDuration
value: 50ms
consumer1
metadata:
- name: brokers
value: localhost:19092,localhost:29092,localhost:39092
- name: consumerGroup
value: kafkaCertification1
- name: authType
value: "none"
-
brokers 指向端口为 19092 / 29092 / 39092 的kafka集群
-
authType 设置为 none,不进行authentication。
-
consumerGroup 设置为 kafkaCertification1
mtls-consumer
metadata:
- name: brokers
value: localhost:19094,localhost:29094,localhost:39094
- name: consumerGroup
value: kafkaCertification2
- name: authType
value: mtls
- name: caCert
value: |
-----BEGIN CERTIFICATE-----
......
-----END CERTIFICATE-----
- name: clientCert
value: |
-----BEGIN CERTIFICATE-----
......
-----END CERTIFICATE-----
- name: clientKey
value: |
-----BEGIN RSA PRIVATE KEY-----
......
-----END RSA PRIVATE KEY-----
-
brokers 指向端口为 19094 / 29094 / 39094 的kafka集群
-
authType 设置为 mtls,进行mtls authentication。
-
consumerGroup 设置为 kafkaCertification2
oauth-consumer
metadata:
- name: brokers
value: localhost:19093,localhost:29093,localhost:39093
- name: consumerGroup
value: kafkaCertification2
- name: authType
value: "oidc"
- name: oidcTokenEndpoint
value: https://localhost:4443/oauth2/token
- name: oidcClientID
value: "dapr"
- name: oidcClientSecret
value: "dapr-test"
- name: oidcScopes
value: openid,kafka
- name: caCert
value: |
-----BEGIN CERTIFICATE-----
......
-----END CERTIFICATE-----
-
brokers 指向端口为 19093 / 29093 / 39093 的kafka集群
-
authType 设置为 oidc,进行auth authentication。
-
consumerGroup 设置为 kafkaCertification2 (和 mtls 一致?)
4.2.1.3 - 测试源代码
源代码文件只有一个 kafka_test.go
准备工作
kafka component
创建 kafka pubsub component 的代码:
log := logger.NewLogger("dapr.components")
component := pubsub_loader.New("kafka", func() pubsub.PubSub {
return pubsub_kafka.NewKafka(log)
})
直接创建一个 kafka pubsub component 的实例。
consumer group
// For Kafka, we should ensure messages are received in order.
consumerGroup1 := watcher.NewOrdered()
// This watcher is across multiple consumers in the same group
// so exact ordering is not expected.
consumerGroup2 := watcher.NewUnordered()
consumerGroup1 是有序的,consumerGroup2因为是在同一个group中跨了多个consumer因此只是无序的。
application函数
application 用来消费从topic中得到的消息:
// Application logic that tracks messages from a topic.
application := func(messages *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
// Simulate periodic errors.
sim := simulate.PeriodicError(ctx, 100)
// Setup the /orders event handler.
return multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: "messagebus",
Topic: "neworder",
Route: "/orders",
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
if err := sim(); err != nil {
return true, err
}
// Track/Observe the data of the event.
messages.Observe(e.Data)
return false, nil
}),
)
}
}
其中 application 订阅了名为 messagebus 的 pubsub 组件的 neworder topic,对应出来接收消息的地址为 “/orders”。
application 在接收到消息时,会讲消息保存到 Watcher 中以供后面检查。
另外 application 会模拟偶尔出错的情况,一遍进行retry的测试。
sendRecvTest函数
sendRecvTest函数用于测试发送消息到topic的逻辑,并验证应用是否接收到消息:
const (
numMessages = 1000
)
// Test logic that sends messages to a topic and
// verifies the application has received them.
sendRecvTest := func(metadata map[string]string, messages ...*watcher.Watcher) flow.Runnable {
_, hasKey := metadata[messageKey]
return func(ctx flow.Context) error {
// 获取client,里面封装还有dapr go sdk的 Client interface
client := sidecar.GetClient(ctx, sidecarName1)
// Declare what is expected BEFORE performing any steps
// that will satisfy the test.
msgs := make([]string, numMessages)
for i := range msgs {
msgs[i] = fmt.Sprintf("Hello, Messages %03d", i)
}
for _, m := range messages {
m.ExpectStrings(msgs...)
}
// If no key it provided, create a random one.
// For Kafka, this will spread messages across
// the topic's partitions.
if !hasKey {
metadata[messageKey] = uuid.NewString()
}
// Send events that the application above will observe.
ctx.Log("Sending messages!")
for _, msg := range msgs {
ctx.Logf("Sending: %q", msg)
// 通过 dapr client 的 PublishEvent 方法发送消息到topic
// 对应的 pubsubName 为 "messagebus",topicName 为 "neworder"
// 消息内容为前面准备的 "Hello, Messages %03d"
err := client.PublishEvent(
ctx, pubsubName, topicName, msg,
dapr.PublishEventWithMetadata(metadata))
require.NoError(ctx, err, "error publishing message")
}
// Do the messages we observed match what we expect?
for _, m := range messages {
// 一分钟内应该能收到全部消息
m.Assert(ctx, time.Minute)
}
return nil
}
}
sendMessagesInBackground函数
assertMessages函数
测试流程
准备步骤
启动Docker Compose
clusterName = "kafkacertification"
dockerComposeYAML = "docker-compose.yml"
// Run Kafka using Docker Compose.
Step(dockercompose.Run(clusterName, dockerComposeYAML)).
等待broker启动
var (
brokers = []string{"localhost:19092", "localhost:29092", "localhost:39092"}
)
Step("wait for broker sockets",
network.WaitForAddresses(5*time.Minute, brokers...)).
等待 broker 启动完成打开端口。
这是 consumer1 对应的 kafka 集群,不需要认证
等待kafka集群准备完成
Step("wait for kafka readiness", retry.Do(time.Second, 30, func(ctx flow.Context) error {
config := sarama.NewConfig()
config.ClientID = "test-consumer"
config.Consumer.Return.Errors = true
// Create new consumer
client, err := sarama.NewConsumer(brokers, config)
if err != nil {
return err
}
defer client.Close()
// Ensure the brokers are ready by attempting to consume
// a topic partition.
_, err = client.ConsumePartition("myTopic", 0, sarama.OffsetOldest)
return err
})).
等待oauth认证完成
Step("wait for Dapr OAuth client", retry.Do(10*time.Second, 6, func(ctx flow.Context) error {
httpClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true, // test server certificate is not trusted.
},
},
}
resp, err := httpClient.Get(oauthClientQuery)
if err != nil {
return err
}
if resp.StatusCode != 200 {
return fmt.Errorf("oauth client query for 'dapr' not successful")
}
return nil
})).
启动应用和sidecar
启动app-1
准备 app-1 以接收有序消息:
const (
appID1 = "app-1"
appPort = 8000
)
consumerGroup1 := watcher.NewOrdered()
// Run the application logic above.
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
application(consumerGroup1))).
运行应用"app-1",端口 8000,使用 consumerGroup1 作为watcher,要求收到的消息是有序的。
然后启动 app-1 对应的带 kafka pubsub 组件的 sidecar:
const (
sidecarName1 = "dapr-1"
)
Step(sidecar.Run(sidecarName1,
embedded.WithComponentsPath("./components/consumer1"),
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort),
runtime.WithPubSubs(component))).
组件配置文件路径为 “./components/consumer1”,里面的组件配置为无认证,连接的broker为 “localhost:19092,localhost:29092,localhost:39092”, consumer group 为 kafkaCertification1.
备注:这是直接在当前 go test 里面启动了一个dapr runtime,并注入了一个 kafka pubsub component,够狠
启动app-2
运行应用"app-2",使用 consumerGroup2 作为watcher,要求收到的消息可以是无序的。
const (
appID2 = "app-2"
)
// Run the second application.
Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+portOffset),
application(consumerGroup2))).
同样启动 app-2 对应的带 kafka pubsub 组件的 sidecar:
const (
sidecarName2 = "dapr-2"
)
// Run the Dapr sidecar with the Kafka component.
Step(sidecar.Run(sidecarName2,
embedded.WithComponentsPath("./components/mtls-consumer"),
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset),
embedded.WithProfilePort(runtime.DefaultProfilePort+portOffset),
runtime.WithPubSubs(component))).
组件配置文件路径为 “./components/mtls-consumer”,里面的组件配置为 mtls 认证,连接的broker为 “localhost:19094,localhost:29094,localhost:39094”, consumer group 为 kafkaCertification2.
启动app-3
运行应用"app-3",使用 consumerGroup2 作为watcher,要求收到的消息可以是无序的。
const (
appID3 = "app-3"
)
// Run the third application.
Step(app.Run(appID3, fmt.Sprintf(":%d", appPort+portOffset*2),
application(consumerGroup2))).
同样启动 app-3 对应的带 kafka pubsub 组件的 sidecar:
const (
sidecarName3 = "dapr-3"
)
// Run the Dapr sidecar with the Kafka component.
Step(sidecar.Run(sidecarName3,
embedded.WithComponentsPath("./components/oauth-consumer"),
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset*2),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset*2),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset*2),
embedded.WithProfilePort(runtime.DefaultProfilePort+portOffset*2),
runtime.WithPubSubs(component))).
组件配置文件路径为 “./components/oauth-consumer”,里面的组件配置为 oauth 认证,连接的broker为 “localhost:19093,localhost:29093,localhost:39093”, consumer group 为 kafkaCertification2.
启动 app-3 时,重置 consumerGroup2:
Step("reset", flow.Reset(consumerGroup2)).
运行简单有序测试
由于使用了同一个 “partitionKey”,因此会被发送到kafka topic 的同一个分区,这样消息接收的顺序就会和发送的顺序保持一致。
const(
messageKey = "partitionKey"
)
// Set the partition key on all messages so they
// are written to the same partition.
// This allows for checking of ordered messages.
metadata := map[string]string{
messageKey: "test",
}
// Send messages using the same metadata/message key so we can expect
// in-order processing.
Step("send and wait", sendRecvTest(metadata, consumerGroup1, consumerGroup2)).
sendRecvTest函数会往 pubsubName “messagebus” 的 topic “neworder” 中发送 1000 条消息,而 app-1 启动后它的 sidecar dapr-1 时会订阅这个topic,然后发送给 app-1,app-1 通过 consumerGroup1 这个 watcher 记录接受到的消息,类似的 app-2 启动后它的 sidecar dapr-2 时也会订阅这个topic,然后发送给 app-2,app-2 通过 consumerGroup2 这个 watcher 记录接受到的消息,
由于 dapr-1 和 dapr-2 在订阅时使用了不同的 consumerGroup (kafkaCertification1 和 kafkaCertification2),因此每一条消息都会会分别发送到两个应用,一式两份。
运行简单无序测试
在启动第三个应用之后,运行简单无序测试:
// 先重置 consumerGroup2
Step("reset", flow.Reset(consumerGroup2)).
// Send messages with random keys to test message consumption
// across more than one consumer group and consumers per group.
Step("send and wait", sendRecvTest(map[string]string{}, consumerGroup2)).
由于 sidecar dapr-1 和 sidecar dapr-2 在订阅时使用的是同一个 consumerGroup (kafkaCertification2),因此消息会分流,两个应用各种接收到一部分。因为使用的都是同一个 watcher consumerGroup2,因此 consumerGroup2 里面应该接收到所有的消息,但是顺序无法保证。
运行重连测试
在收发消息的期间,关闭某一个broker,以测试组件重连的能力
// Gradually stop each broker.
// This tests the components ability to handle reconnections
// when brokers are shutdown cleanly.
// 启动异步步骤,在后台发送消息
StepAsync("steady flow of messages to publish", &task,
sendMessagesInBackground(consumerGroup1, consumerGroup2)).
// 5秒钟之后关闭 kafka1 broker
Step("wait", flow.Sleep(5*time.Second)).
Step("stop broker 1", dockercompose.Stop(clusterName, dockerComposeYAML, "kafka1")).
// 再过5秒钟之后关闭 kafka2 broker
Step("wait", flow.Sleep(5*time.Second)).
// Errors will likely start occurring here since quorum is lost.
Step("stop broker 2", dockercompose.Stop(clusterName, dockerComposeYAML, "kafka2")).
// 再过10秒钟之后关闭 kafka3 broker
Step("wait", flow.Sleep(10*time.Second)).
// Errors will definitely occur here.
Step("stop broker 3", dockercompose.Stop(clusterName, dockerComposeYAML, "kafka3")).
// 等待30秒后,重新启动 kafka1 / kafka2 / kafka3
Step("wait", flow.Sleep(30*time.Second)).
Step("restart broker 3", dockercompose.Start(clusterName, dockerComposeYAML, "kafka3")).
Step("restart broker 2", dockercompose.Start(clusterName, dockerComposeYAML, "kafka2")).
Step("restart broker 1", dockercompose.Start(clusterName, dockerComposeYAML, "kafka1")).
// Component should recover at this point.
// 再等30秒,组件应该能恢复工作
Step("wait", flow.Sleep(30*time.Second)).
// 检查消息已经都接收到
Step("assert messages", assertMessages(consumerGroup1, consumerGroup2)).
运行网络中断测试
在收发消息的期间,中断网络,以测试组件恢复的能力
// Simulate a network interruption.
// This tests the components ability to handle reconnections
// when Dapr is disconnected abnormally.
StepAsync("steady flow of messages to publish", &task,
sendMessagesInBackground(consumerGroup1, consumerGroup2)).
Step("wait", flow.Sleep(5*time.Second)).
//
// Errors will occurring here.
Step("interrupt network",
network.InterruptNetwork(30*time.Second, nil, nil, "19092", "29092", "39092")).
//
// Component should recover at this point.
Step("wait", flow.Sleep(30*time.Second)).
Step("assert messages", assertMessages(consumerGroup1, consumerGroup2)).
备注: 网络中断的方式在我本地跑不起来
运行重平衡测试
验证多个consumer运行时关闭其中一个不会影响工作:
// Reset and test that all messages are received during a
// consumer rebalance.
Step("reset", flow.Reset(consumerGroup2)).
// 后台异步发送消息
StepAsync("steady flow of messages to publish", &task,
sendMessagesInBackground(consumerGroup2)).
// 15秒之后停止 sidecar dapr-2
Step("wait", flow.Sleep(15*time.Second)).
Step("stop sidecar 2", sidecar.Stop(sidecarName2)).
// 3秒之后停止 app-2
Step("wait", flow.Sleep(3*time.Second)).
Step("stop app 2", app.Stop(appID2)).
// 等待30秒
Step("wait", flow.Sleep(30*time.Second)).
// 预期消息还是能全部收到(app-3/dapr-3还在工作)
Step("assert messages", assertMessages(consumerGroup2)).