1 - 单元测试

Dapr的单元测试

https://app.codecov.io/gh/dapr

2 - e2e测试

Dapr的e2e测试

2.1 - e2e测试的搭建

搭建Dapr的e2e测试

前言

dapr 为 e2e 测试提供了一个说明文档, 存在放 dapr/dapr 仓库下的 tests/docs/running-e2e-test.md 下:

https://github.com/dapr/dapr/blob/master/tests/docs/running-e2e-test.md

以下内容为参考官方文档的详细步骤。

准备工作

准备dapr开发环境

https://github.com/dapr/dapr/blob/master/docs/development/setup-dapr-development-env.md

具体有:

  • 准备docker:包括安装 docker ,创建 docker hub 账号
  • 准备golang:必须是go 1.17
  • 准备k8s

准备helm v3

参考:https://helm.sh/docs/intro/install/

在mac下最简单的方式就是用 brew 安装:

$ brew install helm

$ helm version
version.BuildInfo{Version:"v3.8.1", GitCommit:"5cb9af4b1b271d11d7a97a71df3ac337dd94ad37", GitTreeState:"clean", GoVersion:"go1.17.8"}

ubuntu下通过 apt 命令安装:

curl https://baltocdn.com/helm/signing.asc | sudo apt-key add -
sudo apt-get install apt-transport-https --yes
echo "deb https://baltocdn.com/helm/stable/debian/ all main" | sudo tee /etc/apt/sources.list.d/helm-stable-debian.list
sudo apt-get update
sudo apt-get install helm

准备dapr

e2e 需要用到 kubernetes。

  • 安装dapr命令行

    参考: Install the Dapr CLI | Dapr Docs

  • 在 kubernets 中安装 dapr 控制面

    参考:quickstarts/hello-kubernetes at master · dapr/quickstarts (github.com)

    dapr init --kubernetes --wait
    

    这样安装出来的是标准的dapr,其镜像版本为release版本,如下所示, dapr-sidecar-injector pod的container:

    Containers:
      dapr-sidecar-injector:
        Container ID:  docker://bbda12f7c31adc380122651ca2b0ccf90cc599735da54df411df5764689e4cd6
        Image:         docker.io/daprio/dapr:1.1.2
    

    所以,如果需要改动dapr控制面,则不能用这个方式安装dapr控制面。如果只是测试验证daprd(sidecar),则这个方式是可以的。

搭建 Dapr 控制面

以master分支为例,从dapr/dapr仓库的源码开始构建。

设置e2e相关的环境变量

m1本地测试

在 m1 macbook 上,如果为了构建后给本地的 arm64 docker 和 arm64 k8s 运行:

export DAPR_REGISTRY=docker.io/skyao
export DAPR_TAG=dev
export DAPR_NAMESPACE=dapr-tests
export DAPR_TEST_NAMESPACE=dapr-tests
export DAPR_TEST_REGISTRY=docker.io/skyao
export DAPR_TEST_TAG=dev-linux-arm64
export GOOS=linux
export GOARCH=arm64						# 默认是amd64,m1上本地运行需要修改为arm64
export TARGET_OS=linux
export TARGET_ARCH=arm64				# 默认是amd64,m1上本地运行需要修改为arm64

m1 交叉测试

在 m1 macbook 上,如果为了构建后给远程的 amd64 docker 和 amd64 k8s 运行:

export DAPR_REGISTRY=docker.io/skyao
export DAPR_TAG=dev
export DAPR_NAMESPACE=dapr-tests
export DAPR_TEST_NAMESPACE=dapr-tests
export DAPR_TEST_REGISTRY=docker.io/skyao
export DAPR_TEST_TAG=dev-linux-amd64
export GOOS=linux
export GOARCH=amd64
export TARGET_OS=linux
export TARGET_ARCH=amd64
#export DAPR_TEST_MINIKUBE_IP=192.168.100.40		# use this in IDE
#export MINIKUBE_NODE_IP=192.168.100.40 			# use this in make command

amd64

在 amd64 机器上:

export DAPR_REGISTRY=docker.io/skyao
export DAPR_TAG=dev
export DAPR_NAMESPACE=dapr-tests
export DAPR_TEST_NAMESPACE=dapr-tests
export DAPR_TEST_REGISTRY=docker.io/skyao
export DAPR_TEST_TAG=dev-linux-amd64
export GOOS=linux
export GOARCH=amd64
export TARGET_OS=linux
export TARGET_ARCH=amd64
#export DAPR_TEST_MINIKUBE_IP=192.168.100.40		# use this in IDE
#export MINIKUBE_NODE_IP=192.168.100.40 			# use this in make command

构建dapr镜像

make build-linux
make docker-build
make docker-push

m1 本地测试

由于缺乏类似 redis 之类的 arm64 镜像, 这个方案跑不通.

暂时放弃.

m1 交叉测试

在 m1 macbook 上构建 linux + amd 64 的二进制文件:

$ make build-linux 

CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build  -ldflags="-X github.com/dapr/dapr/pkg/version.gitcommit=551722f533afa5dfee97482fe3e63d8ff6233d50 -X github.com/dapr/dapr/pkg/version.gitversion=v1.5.1-rc.3-411-g551722f-dirty -X github.com/dapr/dapr/pkg/version.version=edge -X github.com/dapr/kit/logger.DaprVersion=edge -s -w" -o ./dist/linux_amd64/release/daprd ./cmd/daprd/;
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build  -ldflags="-X github.com/dapr/dapr/pkg/version.gitcommit=551722f533afa5dfee97482fe3e63d8ff6233d50 -X github.com/dapr/dapr/pkg/version.gitversion=v1.5.1-rc.3-411-g551722f-dirty -X github.com/dapr/dapr/pkg/version.version=edge -X github.com/dapr/kit/logger.DaprVersion=edge -s -w" -o ./dist/linux_amd64/release/placement ./cmd/placement/;
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build  -ldflags="-X github.com/dapr/dapr/pkg/version.gitcommit=551722f533afa5dfee97482fe3e63d8ff6233d50 -X github.com/dapr/dapr/pkg/version.gitversion=v1.5.1-rc.3-411-g551722f-dirty -X github.com/dapr/dapr/pkg/version.version=edge -X github.com/dapr/kit/logger.DaprVersion=edge -s -w" -o ./dist/linux_amd64/release/operator ./cmd/operator/;
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build  -ldflags="-X github.com/dapr/dapr/pkg/version.gitcommit=551722f533afa5dfee97482fe3e63d8ff6233d50 -X github.com/dapr/dapr/pkg/version.gitversion=v1.5.1-rc.3-411-g551722f-dirty -X github.com/dapr/dapr/pkg/version.version=edge -X github.com/dapr/kit/logger.DaprVersion=edge -s -w" -o ./dist/linux_amd64/release/injector ./cmd/injector/;
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build  -ldflags="-X github.com/dapr/dapr/pkg/version.gitcommit=551722f533afa5dfee97482fe3e63d8ff6233d50 -X github.com/dapr/dapr/pkg/version.gitversion=v1.5.1-rc.3-411-g551722f-dirty -X github.com/dapr/dapr/pkg/version.version=edge -X github.com/dapr/kit/logger.DaprVersion=edge -s -w" -o ./dist/linux_amd64/release/sentry ./cmd/sentry/;

打包 docker 镜像:

$ make docker-build     

Building docker.io/skyao/dapr:dev docker image ...
docker build --build-arg PKG_FILES=* -f ./docker/Dockerfile ./dist/linux_amd64/release -t docker.io/skyao/dapr:dev-linux-amd64
[+] Building 16.1s (12/12) FINISHED    
......
 => => naming to docker.io/skyao/dapr:dev-linux-amd64 
 => => naming to docker.io/skyao/daprd:dev-linux-amd64 
 => => naming to docker.io/skyao/placement:dev-linux-amd64 
 => => naming to docker.io/skyao/sentry:dev-linux-amd64 

推送 docker 镜像

$ make docker-push

Building docker.io/skyao/dapr:dev docker image ...
docker build --build-arg PKG_FILES=* -f ./docker/Dockerfile ./dist/linux_amd64/release -t docker.io/skyao/dapr:dev-linux-amd64
.....

docker push docker.io/skyao/dapr:dev-linux-amd64
The push refers to repository [docker.io/skyao/dapr]

docker push docker.io/skyao/daprd:dev-linux-amd64
The push refers to repository [docker.io/skyao/daprd]

docker push docker.io/skyao/placement:dev-linux-amd64
The push refers to repository [docker.io/skyao/placement]

docker push docker.io/skyao/sentry:dev-linux-amd64
The push refers to repository [docker.io/skyao/sentry]

amd64

在 x86 机器上构建 linux + amd 64 的二进制文件:

$ make build-linux 

CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build  -ldflags="-X github.com/dapr/dapr/pkg/version.gitcommit=79ffea446dfd14ac25429c8dc9ad792983b46ce2 -X github.com/dapr/dapr/pkg/version.gitversion=v1.0.0-rc.4-1305-g79ffea4 -X github.com/dapr/dapr/pkg/version.version=edge -X github.com/dapr/kit/logger.DaprVersion=edge -s -w" -o ./dist/linux_amd64/release/daprd ./cmd/daprd/;
go: downloading github.com/dapr/components-contrib v1.6.0-rc.2.0.20220322152414-4d44c2f04f43
go: downloading github.com/nats-io/nats.go v1.13.1-0.20220308171302-2f2f6968e98d
go: downloading github.com/alibabacloud-go/darabonba-openapi v0.1.16
go: downloading github.com/apache/pulsar-client-go v0.8.1
go: downloading github.com/prometheus/client_golang v1.11.1
go: downloading github.com/klauspost/compress v1.14.4
go: downloading github.com/alibabacloud-go/alibabacloud-gateway-spi v0.0.4
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build  -ldflags="-X github.com/dapr/dapr/pkg/version.gitcommit=79ffea446dfd14ac25429c8dc9ad792983b46ce2 -X github.com/dapr/dapr/pkg/version.gitversion=v1.0.0-rc.4-1305-g79ffea4 -X github.com/dapr/dapr/pkg/version.version=edge -X github.com/dapr/kit/logger.DaprVersion=edge -s -w" -o ./dist/linux_amd64/release/placement ./cmd/placement/;
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build  -ldflags="-X github.com/dapr/dapr/pkg/version.gitcommit=79ffea446dfd14ac25429c8dc9ad792983b46ce2 -X github.com/dapr/dapr/pkg/version.gitversion=v1.0.0-rc.4-1305-g79ffea4 -X github.com/dapr/dapr/pkg/version.version=edge -X github.com/dapr/kit/logger.DaprVersion=edge -s -w" -o ./dist/linux_amd64/release/operator ./cmd/operator/;
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build  -ldflags="-X github.com/dapr/dapr/pkg/version.gitcommit=79ffea446dfd14ac25429c8dc9ad792983b46ce2 -X github.com/dapr/dapr/pkg/version.gitversion=v1.0.0-rc.4-1305-g79ffea4 -X github.com/dapr/dapr/pkg/version.version=edge -X github.com/dapr/kit/logger.DaprVersion=edge -s -w" -o ./dist/linux_amd64/release/injector ./cmd/injector/;
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build  -ldflags="-X github.com/dapr/dapr/pkg/version.gitcommit=79ffea446dfd14ac25429c8dc9ad792983b46ce2 -X github.com/dapr/dapr/pkg/version.gitversion=v1.0.0-rc.4-1305-g79ffea4 -X github.com/dapr/dapr/pkg/version.version=edge -X github.com/dapr/kit/logger.DaprVersion=edge -s -w" -o ./dist/linux_amd64/release/sentry ./cmd/sentry/;

打包 docker 镜像:

$ make docker-build     

Building docker.io/skyao/dapr:dev docker image ...
docker build --build-arg PKG_FILES=* -f ./docker/Dockerfile ./dist/linux_amd64/release -t docker.io/skyao/dapr:dev-linux-amd64
Sending build context to Docker daemon  236.2MB

......
Successfully tagged skyao/dapr:dev-linux-amd64
Successfully tagged skyao/daprd:dev-linux-amd64
Successfully tagged skyao/placement:dev-linux-amd64
Successfully tagged skyao/sentry:dev-linux-amd64

推送 docker 镜像

$ make docker-push

Building docker.io/skyao/dapr:dev docker image ...
docker build --build-arg PKG_FILES=* -f ./docker/Dockerfile ./dist/linux_amd64/release -t docker.io/skyao/dapr:dev-linux-amd64
.....

docker push docker.io/skyao/dapr:dev-linux-amd64
The push refers to repository [docker.io/skyao/dapr]

docker push docker.io/skyao/daprd:dev-linux-amd64
The push refers to repository [docker.io/skyao/daprd]

docker push docker.io/skyao/placement:dev-linux-amd64
The push refers to repository [docker.io/skyao/placement]

docker push docker.io/skyao/sentry:dev-linux-amd64
The push refers to repository [docker.io/skyao/sentry]

如果执行时遇到报错:

denied: requested access to the resource is denied
make: *** [docker/docker.mk:110: docker-push] Error 1

可以通过 docker login 命令先登录再 push。

部署e2e测试的应用

参考文档指示,一步一步来:

https://github.com/dapr/dapr/blob/master/tests/docs/running-e2e-test.md#option-2-step-by-step-guide

准备测试用的namespace

准备测试用的 namespace,在 dapr/dapr 仓库下执行:

$ make delete-test-namespace
$ make create-test-namespace
kubectl create namespace dapr-tests
namespace/dapr-tests created

初始化heml

$ make setup-helm-init

helm repo add bitnami https://charts.bitnami.com/bitnami
"bitnami" already exists with the same configuration, skipping
helm repo add stable https://charts.helm.sh/stable
"stable" already exists with the same configuration, skipping
helm repo add incubator https://charts.helm.sh/incubator
"incubator" already exists with the same configuration, skipping
helm repo update
Hang tight while we grab the latest from your chart repositories...
...Successfully got an update from the "incubator" chart repository
...Successfully got an update from the "stable" chart repository
...Successfully got an update from the "bitnami" chart repository
Update Complete. ⎈Happy Helming!⎈

准备测试会用到的redis和kafka

make setup-test-env-redis
make setup-test-env-kafka

部署 dapr 控制面

$ make docker-deploy-k8s

Deploying docker.io/skyao/dapr:dev to the current K8S context...
helm install \
                dapr --namespace=dapr-system --wait --timeout 5m0s \
                --set global.ha.enabled=false --set-string global.tag=dev-linux-amd64 \
                --set-string global.registry=docker.io/skyao --set global.logAsJson=true \
                --set global.daprControlPlaneOs=linux --set global.daprControlPlaneArch=amd64 \
                --set dapr_placement.logLevel=debug --set dapr_sidecar_injector.sidecarImagePullPolicy=Always \
                --set global.imagePullPolicy=Always --set global.imagePullSecrets= \
                --set global.mtls.enabled=true \
                --set dapr_placement.cluster.forceInMemoryLog=true ./charts/dapr
NAME: dapr
LAST DEPLOYED: Thu Mar 24 18:43:41 2022
NAMESPACE: dapr-system
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
Thank you for installing Dapr: High-performance, lightweight serverless runtime for cloud and edge

Your release is named dapr.

To get started with Dapr, we recommend using our quickstarts:
https://github.com/dapr/quickstarts

For more information on running Dapr, visit:
https://dapr.io

部署之后,检查一下pod 的情况

$ k get pods -A                 
NAMESPACE     NAME                                     READY   STATUS    RESTARTS   AGE
dapr-system   dapr-dashboard-7c99fc9c45-cgnb6          1/1     Running   0          101s
dapr-system   dapr-operator-556dfb5987-xfnwt           1/1     Running   0          101s
dapr-system   dapr-placement-server-0                  1/1     Running   0          101s
dapr-system   dapr-sentry-54cb9989d5-hq5m4             1/1     Running   0          101s
dapr-system   dapr-sidecar-injector-58d99b46c7-vxn5r   1/1     Running   0          101s
dapr-tests    dapr-kafka-0                             1/1     Running   0          18m
dapr-tests    dapr-kafka-zookeeper-0                   1/1     Running   0          18m
dapr-tests    dapr-redis-master-0                      1/1     Running   0          18m
......

关闭 mtls:

make setup-disable-mtls
kubectl apply -f ./tests/config/dapr_mtls_off_config.yaml --namespace dapr-tests
configuration.dapr.io/daprsystem created

部署测试用的组件

$ make setup-test-components

kubectl apply -f ./tests/config/dapr_observability_test_config.yaml --namespace dapr-tests
configuration.dapr.io/disable-telemetry created
configuration.dapr.io/obs-defaultmetric created
kubectl apply -f ./tests/config/kubernetes_secret.yaml --namespace dapr-tests
secret/daprsecret created
secret/daprsecret2 created
secret/emptysecret created
kubectl apply -f ./tests/config/kubernetes_secret_config.yaml --namespace dapr-tests
configuration.dapr.io/secretappconfig created
kubectl apply -f ./tests/config/kubernetes_redis_secret.yaml --namespace dapr-tests
secret/redissecret created
kubectl apply -f ./tests/config/dapr_redis_state.yaml --namespace dapr-tests
component.dapr.io/statestore created
kubectl apply -f ./tests/config/dapr_mongodb_state.yaml --namespace dapr-tests
component.dapr.io/querystatestore created
kubectl apply -f ./tests/config/dapr_tests_cluster_role_binding.yaml --namespace dapr-tests
rolebinding.rbac.authorization.k8s.io/dapr-secret-reader created
role.rbac.authorization.k8s.io/secret-reader created
kubectl apply -f ./tests/config/dapr_redis_pubsub.yaml --namespace dapr-tests
component.dapr.io/messagebus created
kubectl apply -f ./tests/config/dapr_kafka_bindings.yaml --namespace dapr-tests
component.dapr.io/test-topic created
kubectl apply -f ./tests/config/dapr_kafka_bindings_custom_route.yaml --namespace dapr-tests
component.dapr.io/test-topic-custom-route created
kubectl apply -f ./tests/config/dapr_kafka_bindings_grpc.yaml --namespace dapr-tests
component.dapr.io/test-topic-grpc created
kubectl apply -f ./tests/config/app_topic_subscription_pubsub.yaml --namespace dapr-tests
subscription.dapr.io/c-topic-subscription created
kubectl apply -f ./tests/config/app_topic_subscription_pubsub_grpc.yaml --namespace dapr-tests
subscription.dapr.io/c-topic-subscription-grpc created
kubectl apply -f ./tests/config/kubernetes_allowlists_config.yaml --namespace dapr-tests
configuration.dapr.io/allowlistsappconfig created
kubectl apply -f ./tests/config/kubernetes_allowlists_grpc_config.yaml --namespace dapr-tests
configuration.dapr.io/allowlistsgrpcappconfig created
kubectl apply -f ./tests/config/dapr_redis_state_query.yaml --namespace dapr-tests
component.dapr.io/querystatestore2 created
kubectl apply -f ./tests/config/dapr_redis_state_badhost.yaml --namespace dapr-tests
component.dapr.io/badhost-store created
kubectl apply -f ./tests/config/dapr_redis_state_badpass.yaml --namespace dapr-tests
component.dapr.io/badpass-store created
kubectl apply -f ./tests/config/uppercase.yaml --namespace dapr-tests
component.dapr.io/uppercase created
kubectl apply -f ./tests/config/pipeline.yaml --namespace dapr-tests
configuration.dapr.io/pipeline created
kubectl apply -f ./tests/config/app_reentrant_actor.yaml --namespace dapr-tests
configuration.dapr.io/reentrantconfig created
kubectl apply -f ./tests/config/app_actor_type_metadata.yaml --namespace dapr-tests
configuration.dapr.io/actortypemetadata created
kubectl apply -f ./tests/config/app_topic_subscription_routing.yaml --namespace dapr-tests
subscription.dapr.io/pubsub-routing-crd-http-subscription created
kubectl apply -f ./tests/config/app_topic_subscription_routing_grpc.yaml --namespace dapr-tests
subscription.dapr.io/pubsub-routing-crd-grpc-subscription created
kubectl apply -f ./tests/config/app_pubsub_routing.yaml --namespace dapr-tests
configuration.dapr.io/pubsubroutingconfig created
# Show the installed components
kubectl get components --namespace dapr-tests
NAME                      AGE
badhost-store             23s
badpass-store             20s
messagebus                49s
querystatestore           56s
querystatestore2          27s
statestore                58s
test-topic                45s
test-topic-custom-route   42s
test-topic-grpc           39s
uppercase                 18s
# Show the installed configurations
kubectl get configurations --namespace dapr-tests
NAME                      AGE
actortypemetadata         11s
allowlistsappconfig       33s
allowlistsgrpcappconfig   31s
daprsystem                93s
disable-telemetry         73s
obs-defaultmetric         72s
pipeline                  16s
pubsubroutingconfig       3s
reentrantconfig           13s
secretappconfig           65s

部署测试用的应用

make build-e2e-app-all

make push-e2e-app-all

运行e2e测试

$ make test-e2e-all

# Note2: use env variable DAPR_E2E_TEST to pick one e2e test to run.

根据提示,如果只想单独执行某个测试案例,可以设置环境变量 DAPR_E2E_TEST:

$ export DAPR_E2E_TEST=service_invocation

$ make test-e2e-all

测试非常不稳定,待查。

特殊情况

m1 本地启动k8s

部署之后 redis pod 启动不起来,报错 ‘1 node(s) didn’t match Pod’s node affinity/selector.’,经检查是因为 redis pods 的设置中要求部署在 linux + amd64 下,m1 本地启动的 k8s 节点是 arm64,所以没有节点可以启动:

spec:
  affinity:
    nodeAffinity:
      requiredDuringSchedulingIgnoredDuringExecution:
        nodeSelectorTerms:
        - matchExpressions:
          - key: kubernetes.io/os
            operator: In
            values:
            - linux
          - key: kubernetes.io/arch
            operator: In
            values:
            - amd64

暂时作罢,这需要改动部署文件了。redis 有 arm64 的镜像:

https://hub.docker.com/r/arm64v8/redis/

试了一下在 m1 上执行 docker run --name some-redis -d arm64v8/redis 命令是可以正常在docker中启动 redis 的。

原则上修改pod一下内容应该就可以跑起来:

          - key: kubernetes.io/arch
            operator: In
            values:
            - amd64  # 修改为 arm64

image: docker.io/redislabs/rejson:latest  # 修改为 image: docker.io/arm64v8/redis:latest

查了一下 helm 对 redis arm64 还没有提供支持,而且这个需求一年内被提出了n次,但还是唧唧歪歪的不支持。

所以,结论是:没法在 m1 macbook 上启动 k8s 并完成开发测试。

2.2 - 搭建e2e测试环境时的常见问题

搭建Dapr e2e测试环境时的常见问题

mongodb 部署失败

部署测试用的 mongodb 失败,长时间挂住,然后超时报错:

$ make setup-test-env-mongodb 
helm install dapr-mongodb bitnami/mongodb -f ./tests/config/mongodb_override.yaml --namespace dapr-tests --wait --timeout 5m0s

Error: INSTALLATION FAILED: timed out waiting for the condition
make: *** [tests/dapr_tests.mk:269: setup-test-env-mongodb] Error 1

pod 状态一直是 Pending:

$ k get pods -A
NAMESPACE     NAME                                     READY   STATUS    RESTARTS       AGE
dapr-tests    dapr-mongodb-77fdf49576-d9lx4            0/1     Pending   0              8m37s

pod的描述提示是 PersistentVolumeClaims 的问题:

$ k describe pods dapr-mongodb-77fdf49576-d9lx4 -n dapr-tests
Name:           dapr-mongodb-77fdf49576-d9lx4

Events:
  Type     Reason            Age                    From               Message
  ----     ------            ----                   ----               -------
  Warning  FailedScheduling  8m55s                  default-scheduler  0/3 nodes are available: 3 pod has unbound immediate PersistentVolumeClaims.
  Warning  FailedScheduling  6m49s (x1 over 7m49s)  default-scheduler  0/3 nodes are available: 3 pod has unbound immediate PersistentVolumeClaims.

pvc 也是 pending 状态:

k get pvc -n dapr-tests  
NAME           STATUS    VOLUME   CAPACITY   ACCESS MODES   STORAGECLASS   AGE
dapr-mongodb   Pending                                                     22m

详细状态:

$ k describe pvc dapr-mongodb -n dapr-tests                  
Name:          dapr-mongodb
Namespace:     dapr-tests
StorageClass:  
Status:        Pending
Volume:        
Labels:        app.kubernetes.io/component=mongodb
               app.kubernetes.io/instance=dapr-mongodb
               app.kubernetes.io/managed-by=Helm
               app.kubernetes.io/name=mongodb
               helm.sh/chart=mongodb-11.1.5
Annotations:   meta.helm.sh/release-name: dapr-mongodb
               meta.helm.sh/release-namespace: dapr-tests
Finalizers:    [kubernetes.io/pvc-protection]
Capacity:      
Access Modes:  
VolumeMode:    Filesystem
Used By:       dapr-mongodb-77fdf49576-d9lx4
Events:
  Type    Reason         Age                   From                         Message
  ----    ------         ----                  ----                         -------
  Normal  FailedBinding  3m34s (x82 over 23m)  persistentvolume-controller  no persistent volumes available for this claim and no storage class is set

“StorageClass” 为空,所以失败。

参考资料:

sidecar 无法注入

一般是发生在重新安装 dapr 之后,主要原因是直接执行了 kubectl delete namespace dapr-tests ,而没有先执行 helm uninstall dapr -n dapr-tests

详细说明见:

https://github.com/dapr/dapr/issues/4612

daprd启动时 sentry 证书报错

daprd启动时报错退出,日志如下:

time="2022-12-04T01:35:44.198741493Z" level=info msg="sending workload csr request to sentry" app_id=service-a instance=service-a-8bbd5bf88-fxc6k scope=dapr.runtime.grpc.internal type=log ver=edge
time="2022-12-04T01:35:44.207675954Z" level=fatal msg="failed to start internal gRPC server: error from authenticator CreateSignedWorkloadCert: error from sentry SignCertificate: rpc error: code = Unknown desc = error validating requester identity: csr validation failed: token/id mismatch. received id: dapr-tests:default" app_id=service-a instance=service-a-8bbd5bf88-fxc6k scope=dapr.runtime type=log ver=edge

这是没有执行 make setup-disable-mtls 的原因。

3 - 性能测试

Dapr的性能测试

3.1 - 运行性能测试

运行Dapr的性能测试

准备工作

基本类似 e2e 测试。

设置环境变量

首先要设置相关的环境变量。

amd64

在 amd64 机器上:

export DAPR_REGISTRY=docker.io/skyao
export DAPR_TAG=dev
export DAPR_NAMESPACE=dapr-tests
export TARGET_OS=linux
export TARGET_ARCH=amd64
export GOOS=linux
export GOARCH=amd64
export DAPR_TEST_NAMESPACE=dapr-tests
export DAPR_TEST_REGISTRY=docker.io/skyao
export DAPR_TEST_TAG=dev-linux-amd64
export DAPR_TEST_MINIKUBE_IP=192.168.100.40		# use this in IDE
export MINIKUBE_NODE_IP=192.168.100.40 			# use this in make command

构建并部署dapr到k8s中

$ make create-test-namespace

$ make build-linux
$ make docker-build
$ make docker-push

$ make docker-deploy-k8s

如果之前有过部署,则需要在重新部署前删除之前的部署,有两种情况:

  1. 只清除 dapr 的控制平面

    $ helm uninstall dapr  -n dapr-tests
    $ make docker-deploy-k8s
    
  2. 清除所有 dapr 内容

    $ helm uninstall dapr  -n dapr-tests
    $ make delete-test-namespace 
    # 再重复上面的构建和部署过程
    

类似 e2e 测试,性能测试中本有一个额外的操作,实际是安装 redis / kafka / mongodb :

$ make setup-3rd-party

helm repo add bitnami https://charts.bitnami.com/bitnami
"bitnami" already exists with the same configuration, skipping
helm repo add stable https://charts.helm.sh/stable
"stable" already exists with the same configuration, skipping
helm repo add incubator https://charts.helm.sh/incubator
"incubator" already exists with the same configuration, skipping
helm repo update
Hang tight while we grab the latest from your chart repositories...
...Successfully got an update from the "incubator" chart repository
...Successfully got an update from the "bitnami" chart repository
...Successfully got an update from the "stable" chart repository
Update Complete. ⎈Happy Helming!⎈
helm install dapr-redis bitnami/redis --wait --timeout 5m0s --namespace dapr-tests -f ./tests/config/redis_override.yaml
......
helm install dapr-kafka bitnami/kafka -f ./tests/config/kafka_override.yaml --namespace dapr-tests --timeout 10m0s
......
helm install dapr-mongodb bitnami/mongodb -f ./tests/config/mongodb_override.yaml --namespace dapr-tests --wait --timeout 5m0s

对于性能测试,没有必要,可以视情况(是否要测试 actor 相关的功能)看是否要安装 redis:

make setup-test-env-redis 

dapr相关的设置

关闭遥测:

$ make setup-app-configurations

kubectl apply -f ./tests/config/dapr_observability_test_config.yaml --namespace dapr-tests
configuration.dapr.io/disable-telemetry created
configuration.dapr.io/obs-defaultmetric created

关闭mtls:

$ make setup-disable-mtls
kubectl apply -f ./tests/config/dapr_mtls_off_config.yaml --namespace dapr-tests
configuration.dapr.io/daprsystem created

准备测试用的components:

# 切记不要用这个命令
$ make setup-test-components

这个命令会将 tests/config 下的yaml文件都安装到k8s下,有些多,而且部分component文件在没有配置好外部组件时会导致 daprd 启动失败。安全起见,如果只是跑个别性能测试的test case,手工安装需要的就好了

# 对于 actor 相关的性能测试案例,需要安装 redis 并开启 statestore-actos
# 由于redis 配置中使用到了 secret,因此需要安装 kubernetes_redis_secret
# make setup-test-env-redis 
$ k apply -f ./tests/config/dapr_redis_state_actorstore.yaml -n dapr-tests
$ k apply -f ./tests/config/kubernetes_redis_secret.yaml -n dapr-tests

# 对于 pubsub 的测试,只需要开启 in-memory pubsub
$ k apply -f ./tests/config/dapr_in_memory_pubsub.yaml -n dapr-tests

# 对于 state 的测试,只需要开启 in-memory state
$ k apply -f ./tests/config/dapr_in_memory_state.yaml -n dapr-tests

准备测试用的应用

$ make build-perf-app-all
$ make push-perf-app-all

如果是在开发或者修改某一个测试案例,要节约时间,不需要构建和发布所有的测试案例。只单独构建和推送某一个性能测试的应用,可以直接调用 make target,如针对 service_invocation_http 这个 test case :

$ make build-perf-app-service_invocation_http
$ make push-perf-app-service_invocation_http

执行性能测试

测试前先安装一下 jq,后面会用到:

brew install jq
=======


## 运行性能测试
# 切记不要跑 test-perf-all,由于环境变量在各个perf test case中的设置要求不同,全部一起跑会有问题。
# 在本地(无论是IDE还是终端)不要跑 test-perf-all,只能单独跑某一个 perf test case
# make test-perf-all
make test-perf-xxxxx
# 特别注意,如果没有设置性能测试输入条件相关的环境变量,直接默认跑,有一些 perf test case 是可以跑起来的
make test-perf-state_get_http
make test-perf-state_get_grpc
make test-perf-service_invoke_http
make test-perf-service_invoke_grpc
make test-perf-pubsub_publish_grpc

# 有部分 perf test 是跑不起来的,会报错
make test-perf-actor_timer. # fortio报错,HTTP响应为 405 Method Not Allow,必须用 HTTP POST

跑的时候特别注意日志文件中的这些内容:

2022/03/25 18:02:05 Installing test apps...
2022/03/25 18:02:09 Adding app {testapp 0  map[] true perf-service_invocation_http:dev-linux-amd64  docker.io/skyao 1 true true   4.0 0.1 800Mi 2500Mi 4.0 0.1 512Mi 250Mi <nil> false}
2022/03/25 18:02:09 Adding app {tester 3001  map[] true perf-tester:dev-linux-amd64  docker.io/skyao 1 true true   4.0 0.1 800Mi 2500Mi 4.0 0.1 512Mi 250Mi <nil> false}

仔细检查 app 的镜像信息,包括 tag 要求是 “dev-linux-amd64”, image registry 要求是自己设定的类似 “docker.io/skyao”,否则 pod 会无法启动。

最好先 env | grep DAPR 看一下设置的环境变量是否有生效。

本地debug

perf test 的测试案例都是用 go test 编写,原则上只要前面步骤准备好,是可以在本地 IDE 中以 debug 方式启动 perf test 的测试案例,然后进行 debug 的。

特别注意:actor 相关的 test case 要设置好性能测试输入条件的环境变量

特殊情况

彻底清理namespace

还要清理以下在 default namespace 中保存的内容:

k delete clusterrole dapr-operator-admin -n default
k delete clusterrole dashboard-reader -n default
k delete clusterrolebindings.rbac.authorization.k8s.io dapr-operator -n default
k delete clusterrolebindings.rbac.authorization.k8s.io dapr-role-tokenreview-binding -n default
k delete clusterrolebindings.rbac.authorization.k8s.io dashboard-reader-global -n default
k delete role secret-reader -n default
k delete rolebinding dapr-secret-reader -n default
k delete mutatingwebhookconfiguration dapr-sidecar-injector -n default

否则,重新安装 dapr 控制面时,如果 dapr 控制面的 namespace 发生变化,就会出报错:

make docker-deploy-k8s

Deploying docker.io/skyao/dapr:dev to the current K8S context...
helm install \
                dapr --namespace=dapr-tests --wait --timeout 5m0s \
                --set global.ha.enabled=false --set-string global.tag=dev-linux-amd64 \
                --set-string global.registry=docker.io/skyao --set global.logAsJson=true \
                --set global.daprControlPlaneOs=linux --set global.daprControlPlaneArch=amd64 \
                --set dapr_placement.logLevel=debug --set dapr_sidecar_injector.sidecarImagePullPolicy=Always \
                --set global.imagePullPolicy=Always --set global.imagePullSecrets= \
                --set global.mtls.enabled=true \
                --set dapr_placement.cluster.forceInMemoryLog=true ./charts/dapr
Error: INSTALLATION FAILED: rendered manifests contain a resource that already exists. Unable to continue with install: RoleBinding "dapr-secret-reader" in namespace "default" exists and cannot be imported into the current release: invalid ownership metadata; annotation validation error: key "meta.helm.sh/release-namespace" must equal "dapr-tests": current value is "dapr-system"
make: *** [docker-deploy-k8s] Error 1

3.2 - 如何使用fortio实现性能测试

Dapr的性能测试工具是如何使用fortio的

fortio介绍

Dapr 采用 fortio 作为性能测试工具。

Fortio 相关内容已经转移到单独的笔记中:Learning Fortio

tester 应用

tester 镜像的生成方法

tester 应用的镜像生成由三个镜像组成:

构建 tester go app 二进制文件的镜像

FROM golang:1.17 as build_env

ARG GOARCH_ARG=amd64

WORKDIR /app
COPY app.go go.mod ./
RUN go get -d -v && GOOS=linux GOARCH=$GOARCH_ARG go build -o tester .

这个 dockerfile 会将 dapr 仓库下 tests/apps/perf/tester 目录中的 app.go 和 go.mod 文件复制到镜像中,然后执行 go get 和 go build 命令将 go 代码打包为名为 tester 的二进制可执行文件。

最终的产出物是 /app/tester 这个二进制可执行文件。

构建 fortio 二进制文件的镜像

FROM golang:1.17 as fortio_build_env

ARG GOARCH_ARG=amd64

WORKDIR /fortio
ADD "https://api.github.com/repos/fortio/fortio/branches/master" skipcache
RUN git clone https://github.com/fortio/fortio.git
RUN cd fortio && git checkout v1.16.1 && GOOS=linux GOARCH=$GOARCH_ARG go build

这个镜像是构建 fortio v1.16.1 的代码。

最终的产出物是 /fortio/fortio/fortio 这个二进制可执行文件。

构建 buster-slim 的镜像

FROM debian:buster-slim
#RUN apt update
#RUN apt install wget -y
WORKDIR /
COPY --from=build_env /app/tester /
COPY --from=fortio_build_env /fortio/fortio/fortio /usr/local/bin
CMD ["/tester"]

这个就只是复制前两个镜像的产出物了,将 /app/tester 复制到根目录,将 fortio 复制到

/usr/local/bin 目录。

tester 应用的工作原理和实现代码

tests/apps/perf/tester/app.go 的核心代码如下:

main 函数

func main() {
	http.HandleFunc("/", handler)
	http.HandleFunc("/test", testHandler)
	log.Fatal(http.ListenAndServe(":3001", nil))
}

main 函数启动 http server,监听 3001 端口,然后注册了两个路径和对应的 handler。

简单探活 handler

这个 handler 超级简单,什么都不做,只是返回 http 200 。

func main() {
	http.HandleFunc("/", handler)
  ......
}

func handler(w http.ResponseWriter, r *http.Request) {
	w.WriteHeader(200)
}

测试handler

testHandler 执行性能测试:

func main() {
	http.HandleFunc("/test", testHandler)
  ......
}

func testHandler(w http.ResponseWriter, r *http.Request) {
	fmt.Println("test execution request received")

  // 步骤1: 从请求中读取测试相关的配置参数,这些参数是从 test case 中发出的
	var testParams TestParameters
	b, err := io.ReadAll(r.Body)
	if err != nil {
		w.WriteHeader(500)
		w.Write([]byte(fmt.Sprintf("error reading request body: %s", err)))
		return
	}

  // 步骤2: 解析读取的测试相关的配置参数
	err = json.Unmarshal(b, &testParams)
	if err != nil {
		w.WriteHeader(400)
		w.Write([]byte(fmt.Sprintf("error parsing test params: %s", err)))
		return
	}

  // 步骤3: 开始执行性能测试
	fmt.Println("executing test")
	results, err := runTest(testParams)
	if err != nil {
		w.WriteHeader(500)
		w.Write([]byte(fmt.Sprintf("error encountered while running test: %s", err)))
		return
	}

  // 步骤4: 返回性能测试的结果
	fmt.Println("test finished")
	w.Header().Add("Content-Type", "application/json")
	w.Write(results)
}

真正的性能测试

真正的性能测试是通过 exec.Command 来执行命令行,通过调用 fortio 工具来进行的,也即是说,前面的 tester 应用除了用来启动 daprd 外,tester 自身只是配合走完性能测试的流程,真正的性能测试是由 fortio 进行。

// runTest accepts a set of test parameters, runs Fortio with the configured setting and returns
// the test results in json format.
func runTest(params TestParameters) ([]byte, error) {
	var args []string

  // 步骤1: 根据请求参数构建不同的 fortio 执行参数
	if len(params.Payload) > 0 {
		args = []string{
			"load", "-json", "result.json", "-content-type", "application/json", "-qps", fmt.Sprint(params.QPS), "-c", fmt.Sprint(params.ClientConnections),
			"-t", params.TestDuration, "-payload", params.Payload,
		}
	} else {
		args = []string{
			"load", "-json", "result.json", "-qps", fmt.Sprint(params.QPS), "-c", fmt.Sprint(params.ClientConnections),
			"-t", params.TestDuration, "-payload-size", fmt.Sprint(params.PayloadSizeKB),
		}
	}
	if params.StdClient {
		args = append(args, "-stdclient")
	}
	args = append(args, params.TargetEndpoint)
	fmt.Printf("running test with params: %s", args)

  // 步骤2: 调用 fortio 执行性能测试
	cmd := exec.Command("fortio", args...)
	cmd.Stdout = os.Stdout
	cmd.Stderr = os.Stderr
	err := cmd.Run()
	if err != nil {
		return nil, err
	}
  
  // 步骤3: 返回性能测试执行结果
	return os.ReadFile("result.json")
}

无负载的性能测试

对于 payload 大小为 0 的情况,执行的是如下的 fortio 命令:

fortio load -json result.json -qps ${QPS} -c ${ClientConnections} -t ${TestDuration} -payload-size ${PayloadSizeKB} ${TargetEndpoint}

疑问:payload 都为零了,为啥了还要设置 -payload-size ?

翻了一下相关的日志,找到对应的日志内容为:

fmt.Printf("running test with params: %s", args)

running test with params: [load -json result.json -qps 1 -c 1 -t 1m -payload-size 0 http://testapp:3000/test]

带负载的性能测试

对于 payload 大小不为 0 的情况,执行的是如下的 fortio 命令:

fortio load -json result.json -content-type application/json -qps ${QPS} -c ${ClientConnections} -t ${TestDuration} -payload ${Payload} ${TargetEndpoint}

全流程参数传递

在 perf test 的测试过程中,参数传递比较复杂(或者说比较绕),期间涉及到 perf test case 如何

title parameters transfer in load test
hide footbox
skinparam style strictuml

actor test_case as "Test Case"
participant tester as "Tester App"
box "Fortio" #LightBlue
participant fortio_load as "fortioLoad"
participant fgrpc
participant dapr as "daprResults"
end box
box "daprd"
participant grpc_api as "gRPC API"
end box

test_case -> tester : HTTP Post
note left: TestParameters
tester -> fortio_load : exec
note left: fortio load ... -grpc -dapr k1=v1,k2=v2
fortio_load -> fgrpc : RunGRPCTest()
note left: -grpc -dapr k1=v1,k2=v2
fgrpc -> dapr : RunTest()
note left: -dapr k1=v1,k2=v2
dapr -> grpc_api
dapr <-- grpc_api
fgrpc <-- dapr
fortio_load <-- fgrpc
tester <-- fortio_load
test_case <-- tester

步骤1:perf test case 获取测试参数

perf test case 在启动时,会从环境变量中获取测试参数

  • DAPR_PERF_QPS: 默认1
  • DAPR_PERF_CONNECTIONS: 默认1
  • DAPR_TEST_DURATION: 默认 “1m”,即1分钟
  • DAPR_PAYLOAD_SIZE: 默认0
  • DAPR_PAYLOAD: 默认为空(字符串"")

步骤2:perf test case打包测试参数发送给tester app

perf test case 会发送一个 HTTP 请求到 tester app,其内容如下:

  • URL: http://testerAppURL/test

  • Method: POST

  • Body: 将 TestParameters 结构体系列化为 json

TestParameters 结构体的字段如下所示:

type TestParameters struct {
	QPS               int    `json:"qps"`
	ClientConnections int    `json:"clientConnections"`
	TargetEndpoint    string `json:"targetEndpoint"`
	TestDuration      string `json:"testDuration"`
	PayloadSizeKB     int    `json:"payloadSizeKB"`
	Payload           string `json:"payload"`
	StdClient         bool   `json:"stdClient"`
}

在支持 gRPC + dapr 时,由于没有合适的参数可以使用,因此增加了 grpc 和 dapr 两个字段:

type TestParameters struct {
	......
	Grpc              bool   `json:"grpc"`
	Dapr              string `json:"dapr"`
}

步骤3:tester app解析测试参数,传递给fortio

在 tester app (tests/apps/perf/tester/app.go) 的 testHandler() 方法中,会对传入http body进行读取和json解析,然后将 TestParameters 转为 fortio 的参数:

func testHandler(w http.ResponseWriter, r *http.Request) {
	b, err := io.ReadAll(r.Body)
	err = json.Unmarshal(b, &testParams)
	results, err := runTest(testParams)
    ......
}
func runTest(params TestParameters) ([]byte, error) {
	args := buildFortioArgs(params)
    ......
}

TestParameters 字段和 fortio 参数的对应关系:

语义 环境变量 TestParameters 字段 fortio 参数
load
-json result.json
-content-type application/json
QPS DAPR_PERF_QPS QPS -qps
客户端连接数 DAPR_PERF_CONNECTIONS ClientConnections -c
测试时长 DAPR_TEST_DURATION TestDuration -t
负载 DAPR_PAYLOAD Payload -payload
负载大小 DAPR_PAYLOAD_SIZE PayloadSizeKB -payload-size
StdClient -stdclient
是否是grpc测试 Grpc -grpc
Dapr测试参数 Dapr -dapr

步骤4:fortio解析 dapr flag,在发起的dapr 请求中使用

在 fortio 的执行中, load 子命令会有 fortioLoad 方法负责,检查发现有 -grpc flag,则会转到 fgrpc.RunGRPCTest() 方法。

fortio 的 grpc 支持默认只有自带的 ping 和标准的 health,为了支持 dapr ,我们扩展了 fgrpc.RunGRPCTest() 方法。

-dapr flag 传递的参数会被透传到 dapr 的扩展代码中, 然后解析为下面的结构:

type DaprRequestParameters struct {
	capability string
	target     string
	method     string
	appId      string
	store      string

	extensions map[string]string
}

这些参数将在后面 fortio 扩展代码中进行 dapr 调用时被使用到。

3.3 - 性能测试案例 service invoke http 的实现

性能测试案例 service invoke http 的实现

运行性能测试

打开 dapr/dapr 仓库下的 .github/workflows/dapr-perf.yml 文件,找到 service_invocation_http 的性能测试输入条件:

      - name: Run Perf test service_invocation_http
        if: env.TEST_PREFIX != ''
        env:
          DAPR_PERF_QPS: 1000
          DAPR_PERF_CONNECTIONS: 16
          DAPR_TEST_DURATION: 1m
          DAPR_PAYLOAD_SIZE: 1024
        run: make test-perf-service_invocation_http
# service_invocation_http
export DAPR_PERF_QPS=1000
export DAPR_PERF_CONNECTIONS=16
export DAPR_TEST_DURATION=1m
export DAPR_PAYLOAD_SIZE=1024
unset DAPR_PAYLOAD
make test-perf-service_invocation_http

主流程详细分析

性能测试的主流程为:

func TestMain(m *testing.M) {
  // 步骤1: 准备两个app:作为服务器端的 testapp 和作为客户端的 tester
	testApps := []kube.AppDescription{
		{
			AppName:           "testapp",
		},
		{
			AppName:           "tester",
	}

	tr = runner.NewTestRunner("serviceinvocationhttp", testApps, nil, nil)
	os.Exit(tr.Start(m))
}
  
func TestServiceInvocationHTTPPerformance(t *testing.T) {
  // 步骤4: 执行测试案例 
......
}
 

// Start is the entry point of Dapr test runner.
func (tr *TestRunner) Start(m runnable) int {
	// 步骤2: 启动测试平台
	err := tr.Platform.setup()

  // 可选步骤2.5: 安装组件,这个测试案例中没有
	if tr.components != nil && len(tr.components) > 0 {
		log.Println("Installing components...")
		if err := tr.Platform.addComponents(tr.components); err != nil {
			fmt.Fprintf(os.Stderr, "Failed Platform.addComponents(), %s", err.Error())
			return runnerFailExitCode
		}
	}

  // 可选步骤2.75: 安装初始化应用,这个测试案例中没有
	if tr.initApps != nil && len(tr.initApps) > 0 {
		log.Println("Installing init apps...")
		if err := tr.Platform.addApps(tr.initApps); err != nil {
			fmt.Fprintf(os.Stderr, "Failed Platform.addInitApps(), %s", err.Error())
			return runnerFailExitCode
		}
	}

	// 步骤3: 安装测试应用,这个测试案例中是前面步骤1中准备的作为服务器端的 testapp 和作为客户端的 tester
	if tr.testApps != nil && len(tr.testApps) > 0 {
		log.Println("Installing test apps...")
		if err := tr.Platform.addApps(tr.testApps); err != nil {
			fmt.Fprintf(os.Stderr, "Failed Platform.addApps(), %s", err.Error())
			return runnerFailExitCode
		}
	}

	// 步骤4: 执行测试案例 
	return m.Run()
}
  
func (tr *TestRunner) tearDown() {
  // 步骤5: 执行完成后的 tearDown
  // 具体为删除前面步骤3中安装的测试应用(作为服务器端的 testapp 和作为客户端的 tester)
  // 如果用 ctrl + c 等方式强行中断 testcase 的执行,就会导致 teardown 没有执行,
  // testapp/tester 两个应用就不会从k8s中删除
	tr.Platform.tearDown()
}

测试应用准备

对应到上面主流程中的 “步骤1: 准备两个app”, 这里需要准备作为服务器端的 testapp 和作为客户端的 tester:

testApps := []kube.AppDescription{
		{
			AppName:           "testapp",				// 作为服务器端的 testapp 
			DaprEnabled:       true,
			ImageName:         "perf-service_invocation_http",
			IngressEnabled:    true,
			......
		},
		{
			AppName:           "tester",				// 作为客户端的 tester
			DaprEnabled:       true,
			ImageName:         "perf-tester",
			IngressEnabled:    true,
			AppPort:           3001,
			......
		},
	}

特别注意:IngressEnabled: true 的设置。

测试应用安装的流程

对应到上面主流程中的 “步骤3: 安装测试应用”。

对应代码在 tests/runner/kube_testplatform.go 中的 addApps() 方法:

// addApps adds test apps to disposable App Resource queues.
func (c *KubeTestPlatform) addApps(apps []kube.AppDescription) error {
 
  	for _, app := range apps {
      log.Printf("Adding app %v", app)
			c.AppResources.Add(kube.NewAppManager(c.KubeClient, getNamespaceOrDefault(app.Namespace), app))
    }
  
  	// installApps installs the apps in AppResource queue sequentially
	log.Printf("Installing apps ...")
	if err := c.AppResources.setup(); err != nil {
		return err
	}
	log.Printf("Apps are installed.")
}

添加 app 这里对应的日志为:

2022/03/27 11:48:39 Adding app {testapp 0  map[] true perf-service_invocation_http:dev-linux-amd64  docker.io/skyao 1 true true   4.0 0.1 800Mi 2500Mi 4.0 0.1 512Mi 250Mi <nil> false}
2022/03/27 11:48:39 Adding app {tester 3001  map[] true perf-tester:dev-linux-amd64  docker.io/skyao 1 true true   4.0 0.1 800Mi 2500Mi 4.0 0.1 512Mi 250Mi <nil> false}

遇到问题时建议特别小心的检查这行日志,确认各个参数(如 image 的 name, tag,registry)等是否OK。

安装测试应用

详细看安装测试应用的代码和日志:

log.Printf("Deploying app %v ...", m.app.AppName)

		// Deploy app and wait until deployment is done
		if _, err := m.Deploy(); err != nil {
			return err
		}

		// Wait until app is deployed completely
		if _, err := m.WaitUntilDeploymentState(m.IsDeploymentDone); err != nil {
			return err
		}

		if m.logPrefix != "" {
			if err := m.StreamContainerLogs(); err != nil {
				log.Printf("Failed to retrieve container logs for %s. Error was: %s", m.app.AppName, err)
			}
		}
	
	log.Printf("App %v has been deployed.", m.app.AppName)

对应日志为:

2022/03/27 11:48:41 Deploying app testapp ...
2022/03/27 11:48:48 App testapp has been deployed.

2022/03/27 11:48:50 Deploying app tester ...
2022/03/27 11:48:57 App tester has been deployed.

从日志上看,在镜像本地已经有缓存的情况下,启动时间也就大概7秒左右。

	// PollInterval is how frequently e2e tests will poll for updates.
	PollInterval = 1 * time.Second
	// PollTimeout is how long e2e tests will wait for resource updates when polling.
	PollTimeout = 10 * time.Minute

// WaitUntilDeploymentState waits until isState returns true.
func (m *AppManager) WaitUntilDeploymentState(isState func(*appsv1.Deployment, error) bool) (*appsv1.Deployment, error) {
	deploymentsClient := m.client.Deployments(m.namespace)

	var lastDeployment *appsv1.Deployment

	waitErr := wait.PollImmediate(PollInterval, PollTimeout, func() (bool, error) {
		var err error
		lastDeployment, err = deploymentsClient.Get(context.TODO(), m.app.AppName, metav1.GetOptions{})
		done := isState(lastDeployment, err)
		if !done && err != nil {
			return true, err
		}
		return done, nil
	})

	if waitErr != nil {
		// get deployment's Pods detail status info
		......
	}

	return lastDeployment, nil
}

从代码实现看,等待应用部署的时间长达10分钟,所以正常情况下还是能等到应用启动完成的,除非应用的部署出问题了,比如镜像信息不对导致无法下载镜像。

检查 sidecar

检查 sidecar 是否启动成功的代码:

	// maxSideCarDetectionRetries is the maximum number of retries to detect Dapr sidecar.
	maxSideCarDetectionRetries = 3

log.Printf("Validating sidecar for app %v ....", m.app.AppName)
		for i := 0; i <= maxSideCarDetectionRetries; i++ {
			// Validate daprd side car is injected
			if err := m.ValidateSidecar(); err != nil {
				if i == maxSideCarDetectionRetries {
					return err
				}

				log.Printf("Did not find sidecar for app %v error %s, retrying ....", m.app.AppName, err)
				time.Sleep(10 * time.Second)
				continue
			}

			break
		}
		log.Printf("Sidecar for app %v has been validated.", m.app.AppName)

对应的日志为:

2022/03/27 11:48:48 Validating sidecar for app testapp ....
2022/03/27 11:48:48 Streaming Kubernetes logs to ./container_logs/testapp-85d8d9db89-llmrr.daprd.log
2022/03/27 11:48:48 Streaming Kubernetes logs to ./container_logs/testapp-85d8d9db89-llmrr.testapp.log
2022/03/27 11:48:49 Sidecar for app testapp has been validated.

2022/03/27 11:48:57 Validating sidecar for app tester ....
2022/03/27 11:48:57 Streaming Kubernetes logs to ./container_logs/tester-7944b6bb68-wzfj2.tester.log
2022/03/27 11:48:57 Streaming Kubernetes logs to ./container_logs/tester-7944b6bb68-wzfj2.daprd.log
2022/03/27 11:48:58 Sidecar for app tester has been validated.

默认重试3次,每次间隔时间为 10 秒,所以执行4次 (1次 + 3次重试)总共40 秒之后,如果 sidecar 还没能启动起来,就会报错。

由于 sidecar 几乎是和应用同时启动,所以在应用启动完成后在检查 sidecar 通常会很快完成,由于应用自身启动时间就高达7秒。

创建 ingress

创建 ingress 的代码实现:

		// Create Ingress endpoint
		log.Printf("Creating ingress for app %v ....", m.app.AppName)
		if _, err := m.CreateIngressService(); err != nil {
			return err
		}
		log.Printf("Ingress for app %v has been created.", m.app.AppName)

从日志上看,创建 ingress 的速度很快,不到1秒:

2022/03/27 10:41:09 Creating ingress for app testapp ....
2022/03/27 10:41:10 Ingress for app testapp has been created.

2022/03/27 10:41:19 Creating ingress for app tester ....
2022/03/27 10:41:19 Ingress for app tester has been created.

但特别注意:这里的所谓created,应该只是将命令发给了k8s,也就是这里是异步返回。并不是 ingress 立即可用。

创建端口转发

创建 ingress 的代码实现:

		log.Printf("Creating pod port forwarder for app %v ....", m.app.AppName)
		m.forwarder = NewPodPortForwarder(m.client, m.namespace)
		log.Printf("Pod port forwarder for app %v has been created.", m.app.AppName)

从日志上看,创建端口转发的速度也非常快,不到1秒:

2022/03/27 10:41:10 Creating pod port forwarder for app testapp ....
2022/03/27 10:41:10 Pod port forwarder for app testapp has been created.

2022/03/27 10:41:19 Creating pod port forwarder for app tester ....
2022/03/27 10:41:19 Pod port forwarder for app tester has been created.

但特别注意:这里的所谓created,应该只是将命令发给了k8s,也就是这里是异步返回。并不是端口转发立即可用。

完整的日志分析

下面是一个完整的日志,安装 testapp 和 tester 两个应用,耗时 19秒:

2022/03/27 11:48:39 Running setup...
2022/03/27 11:48:39 Installing test apps...
2022/03/27 11:48:39 Adding app {testapp 0  map[] true perf-service_invocation_http:dev-linux-amd64  docker.io/skyao 1 true true   4.0 0.1 800Mi 2500Mi 4.0 0.1 512Mi 250Mi <nil> false}
2022/03/27 11:48:39 Adding app {tester 3001  map[] true perf-tester:dev-linux-amd64  docker.io/skyao 1 true true   4.0 0.1 800Mi 2500Mi 4.0 0.1 512Mi 250Mi <nil> false}
2022/03/27 11:48:39 Installing apps ...
2022/03/27 11:48:41 Deploying app testapp ...
2022/03/27 11:48:48 App testapp has been deployed.
2022/03/27 11:48:48 Validating sidecar for app testapp ....
2022/03/27 11:48:48 Streaming Kubernetes logs to ./container_logs/testapp-85d8d9db89-llmrr.daprd.log
2022/03/27 11:48:48 Streaming Kubernetes logs to ./container_logs/testapp-85d8d9db89-llmrr.testapp.log
2022/03/27 11:48:49 Sidecar for app testapp has been validated.
2022/03/27 11:48:49 Creating ingress for app testapp ....
2022/03/27 11:48:49 Ingress for app testapp has been created.
2022/03/27 11:48:49 Creating pod port forwarder for app testapp ....
2022/03/27 11:48:49 Pod port forwarder for app testapp has been created.
2022/03/27 11:48:50 Deploying app tester ...
2022/03/27 11:48:57 App tester has been deployed.
2022/03/27 11:48:57 Validating sidecar for app tester ....
2022/03/27 11:48:57 Streaming Kubernetes logs to ./container_logs/tester-7944b6bb68-wzfj2.tester.log
2022/03/27 11:48:57 Streaming Kubernetes logs to ./container_logs/tester-7944b6bb68-wzfj2.daprd.log
2022/03/27 11:48:58 Sidecar for app tester has been validated.
2022/03/27 11:48:58 Creating ingress for app tester ....
2022/03/27 11:48:58 Ingress for app tester has been created.
2022/03/27 11:48:58 Creating pod port forwarder for app tester ....
2022/03/27 11:48:58 Pod port forwarder for app tester has been created.
2022/03/27 11:48:58 Apps are installed.
2022/03/27 11:48:58 Running tests...

这19秒总时长中比较耗时的操作主要是:

  • Installing apps: 2 秒
  • 部署 testapp: 7秒
  • 部署 tester: 7秒
  • 验证sidecar/创建ingress和端口转发:1秒 (两个app x 2)

等待测试应用就绪的流程

等待流程就绪

由于存在一个安装测试应用的流程, 而 k8s 部署/启动完成测试应用是需要一段时间的,因此,就需要一个机制能等待并检测到测试应用是否安装完成,这样才可以开启真正的测试即开始执行 testcase。

以 TestServiceInvocationHTTPPerformance 为例,删除测试执行的细节代码:

const numHealthChecks = 60 // Number of times to check for endpoint health per app.

func TestServiceInvocationHTTPPerformance(t *testing.T) {
	p := perf.Params()
	t.Logf("running service invocation http test with params: qps=%v, connections=%v, duration=%s, payload size=%v, payload=%v", p.QPS, p.ClientConnections, p.TestDuration, p.PayloadSizeKB, p.Payload) // line 79


	// Get the ingress external url of test app
	testAppURL := tr.Platform.AcquireAppExternalURL("testapp")
	require.NotEmpty(t, testAppURL, "test app external URL must not be empty")

	// Check if test app endpoint is available
	t.Logf("test app url: %s", testAppURL+"/test")   // line 86
	_, err := utils.HTTPGetNTimes(testAppURL+"/test", numHealthChecks)    // 在这里等待测试应用 testapp 就绪!
	require.NoError(t, err)

	// Get the ingress external url of tester app
	testerAppURL := tr.Platform.AcquireAppExternalURL("tester")
	require.NotEmpty(t, testerAppURL, "tester app external URL must not be empty")

	// Check if tester app endpoint is available
	t.Logf("teter app url: %s", testerAppURL)    // line 95							// 在这里等待测试应用 tester 就绪!
	_, err = utils.HTTPGetNTimes(testerAppURL, numHealthChecks)
	require.NoError(t, err)

	// Perform baseline test
	......

	// Perform dapr test
	......
}

从执行日志中分别提取对应上面三处的日志内容:

    service_invocation_http_test.go:79: running service invocation http test with params: qps=1, connections=1, duration=1m, payload size=0, payload=
2022/03/27 11:48:58 Waiting until service ingress is ready for testapp...
2022/03/27 11:49:03 Service ingress for testapp is ready...
    service_invocation_http_test.go:86: test app url: 20.84.11.6:3000/test
2022/03/27 11:49:04 Waiting until service ingress is ready for tester...
2022/03/27 11:49:33 Service ingress for tester is ready...
    service_invocation_http_test.go:95: teter app url: 20.85.250.98:3000

这两处就是在等待测试应用 testapp 和 tester 启动完成。检查的方式就是访问这两个应用的 public url (也就是 health check的地址),如果能访问(可连接,返回 htltp 200)则说明应用启动完成。如果失败,则继续等待。

这里面实际是有两个等待:

  1. 等待测试应用的 ingress 就绪
  2. 等待测试应用自身就绪
等待测试应用的 ingress 就绪

因为 pod ip 不能直接在 k8s 下访问,因此需要通过 ingress 和 端口转发。前面安装测试应用的流程中作了 ingress 的创建和端口转发的创建,但生效是需要时间的。在 AcquireAppExternalURL() 方法中会等待 ingress 就绪:

	testAppURL := tr.Platform.AcquireAppExternalURL("testapp")
	testerAppURL := tr.Platform.AcquireAppExternalURL("tester")

k8s下,实现的代码在 tests/runner/kube_testplatform.go 中:

// AcquireAppExternalURL returns the external url for 'name'.
func (c *KubeTestPlatform) AcquireAppExternalURL(name string) string {
	app := c.AppResources.FindActiveResource(name)
	return app.(*kube.AppManager).AcquireExternalURL()
}

// AcquireExternalURL gets external ingress endpoint from service when it is ready.
func (m *AppManager) AcquireExternalURL() string {
	log.Printf("Waiting until service ingress is ready for %s...\n", m.app.AppName)
	svc, err := m.WaitUntilServiceState(m.IsServiceIngressReady) // 等待直到 ingress reday
	if err != nil {
		return ""
	}

	log.Printf("Service ingress for %s is ready...\n", m.app.AppName)
	return m.AcquireExternalURLFromService(svc)
}

具体的等待实现在 WaitUntilServiceState() 方法中:

	// PollInterval is how frequently e2e tests will poll for updates.
	PollInterval = 1 * time.Second
	// PollTimeout is how long e2e tests will wait for resource updates when polling.
	PollTimeout = 10 * time.Minute
	
	// WaitUntilServiceState waits until isState returns true.
func (m *AppManager) WaitUntilServiceState(isState func(*apiv1.Service, error) bool) (*apiv1.Service, error) {
	serviceClient := m.client.Services(m.namespace)
	var lastService *apiv1.Service

	waitErr := wait.PollImmediate(PollInterval, PollTimeout, func() (bool, error) {
		var err error
		lastService, err = serviceClient.Get(context.TODO(), m.app.AppName, metav1.GetOptions{})
		done := isState(lastService, err)
		if !done && err != nil {
			return true, err
		}

		return done, nil
	})

	......
	return lastService, nil
}

每隔1秒,时长 10 分钟,这个时间长度有点离谱。之前遇到过 ingress 无法访问的案例,就在这里等待长达10 分钟。

但如果能正常工作,只是启动速度慢,那这里的10分钟怎么也够 ingress 生效的。

等待测试应用自身就绪

实现代码如下:

const numHealthChecks = 60 // Number of times to check for endpoint health per app.

// HTTPGetNTimes calls the url n times and returns the first success or last error.
func HTTPGetNTimes(url string, n int) ([]byte, error) {
	var res []byte
	var err error
	for i := n - 1; i >= 0; i-- {
		res, err = HTTPGet(url)
		if i == 0 {
			break
		}

		if err != nil {
			time.Sleep(time.Second)
		} else {
			return res, nil
		}
	}

	return res, err
}

每秒检查一次,重试 60 次。60 秒之后只要启动成功就可以。

备注:这里的检查顺序,先检查 testapp,再检查 tester app,所以如果两者都启动的慢的话,顺序偏后的 tester app 有更多的启动时间。

bug:总是卡在等待测试应用就绪上

测试中发现,测试案例总是卡在等待测试应用就绪上,但奇怪的是,从打印日志上看,上面的两个等待过程中,第一个等待 ingress 达到 ready 状态总是通过,然后等待测试应用自身就绪(也就是通过 http://52.226.222.31:3000/test 地址访问)就总是不能成功:

=== Failed
=== FAIL: tests/perf/service_invocation_http TestServiceInvocationHTTPPerformance (248.07s)
    service_invocation_http_test.go:79: running service invocation http test with params: qps=1, connections=1, duration=1m, payload size=0, payload=
2022/03/25 21:49:17 Waiting until service ingress is ready for testapp...
2022/03/25 21:49:20 Service ingress for testapp is ready...
    service_invocation_http_test.go:86: test app url: 52.226.222.31:3000/test
    service_invocation_http_test.go:88: 
                Error Trace:    service_invocation_http_test.go:88
                Error:          Received unexpected error:
                                Get "http://52.226.222.31:3000/test": EOF
                Test:           TestServiceInvocationHTTPPerformance

翻了一下 tests/apps/perf/service_invocation_http/app.go 的代码,这是服务器端appt的实现,超级简单:

func handler(w http.ResponseWriter, r *http.Request) {
	w.WriteHeader(200)
}

func main() {
	http.HandleFunc("/test", handler)
	log.Fatal(http.ListenAndServe(":3000", nil))
}

k get pods -n dapr-tests 命令的输出看,testapp 的状态很早就是 running了。这个简单的应用不存在 60 秒还启动不起来的情况。但奇怪的是,有时这个问题又不存在,能正常的访问。而且一旦正常就会一直都正常,一旦不正常就一直不正常。

经过反复测试排查发现:在使用 azure 部署 k8s 时,pod 的外部访问地址,必须在连接公司 VPN (GlobalProtect)时才能正常访问,如果 VPN 未能开启,则会报错 Empty reply from server ,一直卡在这里直到 60 秒超时。

# 断开vpn
$ curl -i 20.81.110.42:3000/test
curl: (52) Empty reply from server

# 连接vpn
$ curl -i 20.81.110.42:3000/test
HTTP/1.1 200 OK
Date: Sun, 27 Mar 2022 07:10:02 GMT
Content-Length: 0

# 再次断开vpn
$ curl -i 20.81.110.42:3000/test
curl: (52) Empty reply from server

# 再次连接vpn
$ curl -i 20.81.110.42:3000/test
HTTP/1.1 200 OK
Date: Sun, 27 Mar 2022 07:12:05 GMT
Content-Length: 0

排除这个问题之后,性能测试的案例就可以正常

备注:我是在我本地机器 macbook 上跑测试案例的,案例和 azure 上的k8s集群通讯一直正常,但是就是有时可以访问 app 的external url,有时不能。没想到是这个原因

3.4 - 性能测试案例 state in-momery 的实现

性能测试案例 state in-momery 的实现

测试逻辑

完整的端到端的 state 压力测试应该是这样的,以 redis 为例:

title state load test with redis store
hide footbox
skinparam style strictuml

actor test_case as "Test Case"
participant tester as "Tester"
participant fortio as "Fortio"
box "daprd" #LightBlue
participant grpc_api as "gRPC API"
participant redis_component as "Redis State Component"
end box
database redis_server as "Redis server"

test_case -> tester : baseline test
note left: baseline test 
tester -> fortio : exec
fortio -> redis_server: redis native protocol
fortio <-- redis_server
tester <-- fortio
test_case <-- tester

|||
|||
|||

test_case -> tester : dapr test
note left: dapr test 
tester -> fortio : exec
fortio -> grpc_api : gRPC
grpc_api -> redis_component
redis_component -> redis_server: redis native protocol
redis_component <-- redis_server
grpc_api <-- redis_component
fortio <-- grpc_api
tester <-- fortio
test_case <-- tester

但目前 dapr 仓库中的 perf test 的测试目标都是 dapr runtime,也就是不包括 dapr sdk 和 dapr components,因此,需要一个 in-memory 的 state 组件来进行压力测试以排除 redis 等外部组件的干扰,流程大体是这样:

title state load test with in-memory store
hide footbox
skinparam style strictuml

actor test_case as "Test Case"
participant tester as "Tester"
participant fortio as "Fortio"
box "daprd" #LightBlue
participant grpc_api as "gRPC API"
participant in_momory_component as "In-Memory Component"
end box

test_case -> tester : dapr test
note left: dapr test 
tester -> fortio : exec
fortio -> grpc_api : gRPC
grpc_api -> in_momory_component
in_momory_component -> in_momory_component: in-memory operations
grpc_api <-- in_momory_component
fortio <-- grpc_api
tester <-- fortio
test_case <-- tester

in-memory 的 state 组件的性能消耗可以视为0,因此将访问 redis 的远程开销在 baseline test 和 dapr test 中同时去除之后,得到的新的 baseline test 和 dapr test 如下图:

title state load test with in-memory store
hide footbox
skinparam style strictuml

actor test_case as "Test Case"
participant tester as "Tester"
participant fortio as "Fortio"
box "daprd" #LightBlue
participant grpc_api as "gRPC API"
participant in_momory_component as "In-Memory Component"
end box

test_case -> tester : baseline test
note left: baseline test 
tester -> fortio : exec
fortio -> fortio : do nothing
tester <-- fortio
test_case <-- tester

|||
|||
|||

test_case -> tester : dapr test
note left: dapr test 
tester -> fortio : exec
fortio -> grpc_api : gRPC
grpc_api -> in_momory_component
in_momory_component -> in_momory_component: in-memory operations
grpc_api <-- in_momory_component
fortio <-- grpc_api
tester <-- fortio
test_case <-- tester

fortio命令

对于 baseline test 中的 no-op,fortio 的命令为:

./fortio load -json result.json -qps 1 -c 1 -t 1m -payload-size 0 -grpc -dapr capability=state,target=noop http://localhost:50001/

对于 dapr test 中的 state get 请求,fortio 的命令为:

 ./fortio load -json result.json -qps 1 -c 1 -t 1m -payload-size 0 -grpc --dapr capability=state,target=dapr,method=get,store=inmemorystate,key=abc123 http://127.0.0.1:50001

perf test case 的请求

3.5 - 性能测试案例 pubsub in-momery 的实现

性能测试案例 pubsub in-momery 的实现

测试逻辑

和 state in-momery 类似。

fortio命令

对于 baseline test 中的 no-op,fortio 的命令为:

./fortio load -json result.json -qps 1 -c 1 -t 1m -payload-size 0 -grpc -dapr capability=pubsub,target=noop http://localhost:50001/

对于 dapr test 中的 state get 请求,fortio 的命令为:

./fortio load -json result.json -qps 1 -c 1 -t 1m -payload-size 100 -grpc --dapr capability=pubsub,target=dapr,method=publish,store=inmemorypubsub,topic=topic123,contenttype=text/plain http://127.0.0.1:50001

perf test case 的请求

4 - 认证测试

Dapr的认证测试

4.1 - 认证测试概述

Dapr的认证测试

资料

4.2 - pubsub

Dapr的pubsub认证测试

4.2.1 - kafka pubsub

Dapr的kafka pubsub component认证测试

4.2.1.1 - kafka集群配置

kafka pubsub component认证测试中的kafka集群配置

test plan

按照 README.md 文件中 test plan 描述的:

### Basic tests

* Bring up a 3-node Kafka cluster
    * Configured to have 10+ partitions per topic

kafka集群将有三个节点,每个 topic 配置有10+的分区

TBD: 配置分区在哪里实现的?

docker-compose.yml

在项目根目录下有 docker-compose.yml 文件,定义了测试中需要使用到的 kafka 集群的配置,主要有:

  • zookeeper: 端口 “2181:2181”
  • kafka1:端口 “19094:19094” / “19093:19093” / “19092:19092”
  • Kafka2:端口 “29094:29094” / “29093:29093” / “29092:29092”
  • kafka3:端口 “39094:39094” / “39093:39093” / “39092:39092”
  • hydra:端口 “4443:4443” / “4444:4444”
  • hydra-config

以及三个volumes:

  • kafka1-data
  • kafka2-data
  • kafka3-data

kafka证书文件

在 kafka1 / kafka2 / kafka3 三个kafka 容器的配置中, 都有一段相同的证书文件配置和类似的自定义配置文件:

    volumes:
      - type: bind
        source: ./strimzi-ca-certs
        target: /opt/kafka/cluster-ca-certs
        read_only: true
      - type: bind
        source: ./strimzi-broker-certs
        target: /opt/kafka/broker-certs
        read_only: true
      - type: bind
        source: ./strimzi-client-ca
        target: /opt/kafka/client-ca-certs
        read_only: true
      - type: bind
        source: ./strimzi-listener-certs
        target: /opt/kafka/certificates/custom-mtls-9094-certs
        read_only: true
      - type: bind
        source: ./strimzi-listener-certs
        target: /opt/kafka/certificates/custom-oauth-9093-certs
        read_only: true
      - type: bind
        source: ./strimzi-listener-certs
        target: /opt/kafka/certificates/oauth-oauth-9093-certs
        read_only: true
      - type: bind
        source: ./strimzi-kafka1-config
        target: /opt/kafka/custom-config
        read_only: true

这些证书对应的项目文件为:

kafka-ca-files

4.2.1.2 - kafka组件配置

kafka pubsub component认证测试中的kafka组件配置

组件配置文件

kafka pubsub 组件认证测试中使用到三个kafka pubsub 组件:

  1. consumer1
  2. mtls-consumer
  3. oauth-consumer

kafka-component-files

共性配置

三个 component 的 name 都设定为 messagebus:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: messagebus
spec:
  type: pubsub.kafka
  version: v1
  ......
  metadata:
  - name: initialOffset
    value: oldest
  - name: backOffDuration
    value: 50ms

consumer1

  metadata:
  - name: brokers
    value: localhost:19092,localhost:29092,localhost:39092
  - name: consumerGroup
    value: kafkaCertification1
  - name: authType
    value: "none"
  • brokers 指向端口为 19092 / 29092 / 39092 的kafka集群

  • authType 设置为 none,不进行authentication。

  • consumerGroup 设置为 kafkaCertification1

mtls-consumer

  metadata:
  - name: brokers
    value: localhost:19094,localhost:29094,localhost:39094
  - name: consumerGroup
    value: kafkaCertification2
  - name: authType
    value: mtls
  - name: caCert
    value: |
      -----BEGIN CERTIFICATE-----      
			......
      -----END CERTIFICATE-----
  - name: clientCert
    value: |
      -----BEGIN CERTIFICATE-----      
			......
      -----END CERTIFICATE-----
  - name: clientKey
    value: |
      -----BEGIN RSA PRIVATE KEY-----
      ......
      -----END RSA PRIVATE KEY-----      
  • brokers 指向端口为 19094 / 29094 / 39094 的kafka集群

  • authType 设置为 mtls,进行mtls authentication。

  • consumerGroup 设置为 kafkaCertification2

oauth-consumer

  metadata:
  - name: brokers
    value: localhost:19093,localhost:29093,localhost:39093
  - name: consumerGroup
    value: kafkaCertification2
  - name: authType
    value: "oidc"
  - name: oidcTokenEndpoint
    value: https://localhost:4443/oauth2/token
  - name: oidcClientID
    value: "dapr"
  - name: oidcClientSecret
    value: "dapr-test"
  - name: oidcScopes
    value: openid,kafka
  - name: caCert
    value: |
      -----BEGIN CERTIFICATE-----      
			......
      -----END CERTIFICATE-----
  • brokers 指向端口为 19093 / 29093 / 39093 的kafka集群

  • authType 设置为 oidc,进行auth authentication。

  • consumerGroup 设置为 kafkaCertification2 (和 mtls 一致?)

4.2.1.3 - 测试源代码

Dapr的kafka pubsub component认证测试的源代码

源代码文件只有一个 kafka_test.go

准备工作

kafka component

创建 kafka pubsub component 的代码:

	log := logger.NewLogger("dapr.components")
	component := pubsub_loader.New("kafka", func() pubsub.PubSub {
		return pubsub_kafka.NewKafka(log)
	})

直接创建一个 kafka pubsub component 的实例。

consumer group

	// For Kafka, we should ensure messages are received in order.
	consumerGroup1 := watcher.NewOrdered()
	// This watcher is across multiple consumers in the same group
	// so exact ordering is not expected.
	consumerGroup2 := watcher.NewUnordered()

consumerGroup1 是有序的,consumerGroup2因为是在同一个group中跨了多个consumer因此只是无序的。

application函数

application 用来消费从topic中得到的消息:

	// Application logic that tracks messages from a topic.
	application := func(messages *watcher.Watcher) app.SetupFn {
		return func(ctx flow.Context, s common.Service) error {
			// Simulate periodic errors.
			sim := simulate.PeriodicError(ctx, 100)

			// Setup the /orders event handler.
			return multierr.Combine(
				s.AddTopicEventHandler(&common.Subscription{
					PubsubName: "messagebus",
					Topic:      "neworder",
					Route:      "/orders",
				}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
					if err := sim(); err != nil {
						return true, err
					}

					// Track/Observe the data of the event.
					messages.Observe(e.Data)
					return false, nil
				}),
			)
		}
	}

其中 application 订阅了名为 messagebus 的 pubsub 组件的 neworder topic,对应出来接收消息的地址为 “/orders”。

application 在接收到消息时,会讲消息保存到 Watcher 中以供后面检查。

另外 application 会模拟偶尔出错的情况,一遍进行retry的测试。

sendRecvTest函数

sendRecvTest函数用于测试发送消息到topic的逻辑,并验证应用是否接收到消息:

const (
  numMessages       = 1000
  

// Test logic that sends messages to a topic and
	// verifies the application has received them.
	sendRecvTest := func(metadata map[string]string, messages ...*watcher.Watcher) flow.Runnable {
		_, hasKey := metadata[messageKey]
		return func(ctx flow.Context) error {
      // 获取client,里面封装还有dapr go sdk的 Client interface
			client := sidecar.GetClient(ctx, sidecarName1)

			// Declare what is expected BEFORE performing any steps
			// that will satisfy the test.
			msgs := make([]string, numMessages)
			for i := range msgs {
				msgs[i] = fmt.Sprintf("Hello, Messages %03d", i)
			}
			for _, m := range messages {
				m.ExpectStrings(msgs...)
			}
			// If no key it provided, create a random one.
			// For Kafka, this will spread messages across
			// the topic's partitions.
			if !hasKey {
				metadata[messageKey] = uuid.NewString()
			}

			// Send events that the application above will observe.
			ctx.Log("Sending messages!")
			for _, msg := range msgs {
				ctx.Logf("Sending: %q", msg)
        // 通过 dapr client 的 PublishEvent 方法发送消息到topic
        // 对应的 pubsubName 为 "messagebus",topicName 为 "neworder"
        // 消息内容为前面准备的 "Hello, Messages %03d"
				err := client.PublishEvent(
					ctx, pubsubName, topicName, msg,
					dapr.PublishEventWithMetadata(metadata))
				require.NoError(ctx, err, "error publishing message")
			}

			// Do the messages we observed match what we expect?
			for _, m := range messages {
        // 一分钟内应该能收到全部消息
				m.Assert(ctx, time.Minute)
			}

			return nil
		}
	}

sendMessagesInBackground函数

assertMessages函数

测试流程

准备步骤

启动Docker Compose

clusterName       = "kafkacertification"
dockerComposeYAML = "docker-compose.yml"		

// Run Kafka using Docker Compose.
Step(dockercompose.Run(clusterName, dockerComposeYAML)).

等待broker启动

var (
	brokers          = []string{"localhost:19092", "localhost:29092", "localhost:39092"}
)
Step("wait for broker sockets",
			network.WaitForAddresses(5*time.Minute, brokers...)).

等待 broker 启动完成打开端口。

这是 consumer1 对应的 kafka 集群,不需要认证

等待kafka集群准备完成

		Step("wait for kafka readiness", retry.Do(time.Second, 30, func(ctx flow.Context) error {
			config := sarama.NewConfig()
			config.ClientID = "test-consumer"
			config.Consumer.Return.Errors = true

			// Create new consumer
			client, err := sarama.NewConsumer(brokers, config)
			if err != nil {
				return err
			}
			defer client.Close()

			// Ensure the brokers are ready by attempting to consume
			// a topic partition.
			_, err = client.ConsumePartition("myTopic", 0, sarama.OffsetOldest)

			return err
		})).

等待oauth认证完成

		Step("wait for Dapr OAuth client", retry.Do(10*time.Second, 6, func(ctx flow.Context) error {
			httpClient := &http.Client{
				Transport: &http.Transport{
					TLSClientConfig: &tls.Config{
						InsecureSkipVerify: true, // test server certificate is not trusted.
					},
				},
			}

			resp, err := httpClient.Get(oauthClientQuery)
			if err != nil {
				return err
			}
			if resp.StatusCode != 200 {
				return fmt.Errorf("oauth client query for 'dapr' not successful")
			}
			return nil
		})).

启动应用和sidecar

启动app-1

准备 app-1 以接收有序消息:

const (
	appID1            = "app-1"
  appPort           = 8000
)
consumerGroup1 := watcher.NewOrdered()

// Run the application logic above.
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
             application(consumerGroup1))).

运行应用"app-1",端口 8000,使用 consumerGroup1 作为watcher,要求收到的消息是有序的。

然后启动 app-1 对应的带 kafka pubsub 组件的 sidecar:

const (
	sidecarName1      = "dapr-1"
)

Step(sidecar.Run(sidecarName1,
			embedded.WithComponentsPath("./components/consumer1"),
			embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
			embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort),
			embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort),
			runtime.WithPubSubs(component))).

组件配置文件路径为 “./components/consumer1”,里面的组件配置为无认证,连接的broker为 “localhost:19092,localhost:29092,localhost:39092”, consumer group 为 kafkaCertification1.

备注:这是直接在当前 go test 里面启动了一个dapr runtime,并注入了一个 kafka pubsub component,够狠

启动app-2

运行应用"app-2",使用 consumerGroup2 作为watcher,要求收到的消息可以是无序的。

const (
	appID2            = "app-2"
)
		// Run the second application.
		Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+portOffset),
			application(consumerGroup2))).

同样启动 app-2 对应的带 kafka pubsub 组件的 sidecar:

const (
	sidecarName2      = "dapr-2"
)
		// Run the Dapr sidecar with the Kafka component.
		Step(sidecar.Run(sidecarName2,
			embedded.WithComponentsPath("./components/mtls-consumer"),
			embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset),
			embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset),
			embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset),
			embedded.WithProfilePort(runtime.DefaultProfilePort+portOffset),
			runtime.WithPubSubs(component))).

组件配置文件路径为 “./components/mtls-consumer”,里面的组件配置为 mtls 认证,连接的broker为 “localhost:19094,localhost:29094,localhost:39094”, consumer group 为 kafkaCertification2.

启动app-3

运行应用"app-3",使用 consumerGroup2 作为watcher,要求收到的消息可以是无序的。

const (
	appID3            = "app-3"
)
		// Run the third application.
		Step(app.Run(appID3, fmt.Sprintf(":%d", appPort+portOffset*2),
			application(consumerGroup2))).

同样启动 app-3 对应的带 kafka pubsub 组件的 sidecar:

const (
	sidecarName3      = "dapr-3"
)
		// Run the Dapr sidecar with the Kafka component.
		Step(sidecar.Run(sidecarName3,
			embedded.WithComponentsPath("./components/oauth-consumer"),
			embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset*2),
			embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset*2),
			embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset*2),
			embedded.WithProfilePort(runtime.DefaultProfilePort+portOffset*2),
			runtime.WithPubSubs(component))).

组件配置文件路径为 “./components/oauth-consumer”,里面的组件配置为 oauth 认证,连接的broker为 “localhost:19093,localhost:29093,localhost:39093”, consumer group 为 kafkaCertification2.

启动 app-3 时,重置 consumerGroup2:

Step("reset", flow.Reset(consumerGroup2)).

运行简单有序测试

由于使用了同一个 “partitionKey”,因此会被发送到kafka topic 的同一个分区,这样消息接收的顺序就会和发送的顺序保持一致。

const	
messageKey        = "partitionKey"	

// Set the partition key on all messages so they
	// are written to the same partition.
	// This allows for checking of ordered messages.
	metadata := map[string]string{
		messageKey: "test",
	}

// Send messages using the same metadata/message key so we can expect
// in-order processing.
Step("send and wait", sendRecvTest(metadata, consumerGroup1, consumerGroup2)).

sendRecvTest函数会往 pubsubName “messagebus” 的 topic “neworder” 中发送 1000 条消息,而 app-1 启动后它的 sidecar dapr-1 时会订阅这个topic,然后发送给 app-1,app-1 通过 consumerGroup1 这个 watcher 记录接受到的消息,类似的 app-2 启动后它的 sidecar dapr-2 时也会订阅这个topic,然后发送给 app-2,app-2 通过 consumerGroup2 这个 watcher 记录接受到的消息,

由于 dapr-1 和 dapr-2 在订阅时使用了不同的 consumerGroup (kafkaCertification1 和 kafkaCertification2),因此每一条消息都会会分别发送到两个应用,一式两份。

运行简单无序测试

在启动第三个应用之后,运行简单无序测试:

// 先重置 consumerGroup2
Step("reset", flow.Reset(consumerGroup2)).		
// Send messages with random keys to test message consumption
// across more than one consumer group and consumers per group.
Step("send and wait", sendRecvTest(map[string]string{}, consumerGroup2)).

由于 sidecar dapr-1 和 sidecar dapr-2 在订阅时使用的是同一个 consumerGroup (kafkaCertification2),因此消息会分流,两个应用各种接收到一部分。因为使用的都是同一个 watcher consumerGroup2,因此 consumerGroup2 里面应该接收到所有的消息,但是顺序无法保证。

运行重连测试

在收发消息的期间,关闭某一个broker,以测试组件重连的能力

// Gradually stop each broker.
// This tests the components ability to handle reconnections
// when brokers are shutdown cleanly.
// 启动异步步骤,在后台发送消息
StepAsync("steady flow of messages to publish", &task,
          sendMessagesInBackground(consumerGroup1, consumerGroup2)).
// 5秒钟之后关闭 kafka1 broker
Step("wait", flow.Sleep(5*time.Second)).
Step("stop broker 1", dockercompose.Stop(clusterName, dockerComposeYAML, "kafka1")).

// 再过5秒钟之后关闭 kafka2 broker
Step("wait", flow.Sleep(5*time.Second)).
// Errors will likely start occurring here since quorum is lost.
Step("stop broker 2", dockercompose.Stop(clusterName, dockerComposeYAML, "kafka2")).

// 再过10秒钟之后关闭 kafka3 broker
Step("wait", flow.Sleep(10*time.Second)).
// Errors will definitely occur here.
Step("stop broker 3", dockercompose.Stop(clusterName, dockerComposeYAML, "kafka3")).

// 等待30秒后,重新启动 kafka1 / kafka2 / kafka3
Step("wait", flow.Sleep(30*time.Second)).
Step("restart broker 3", dockercompose.Start(clusterName, dockerComposeYAML, "kafka3")).
Step("restart broker 2", dockercompose.Start(clusterName, dockerComposeYAML, "kafka2")).
Step("restart broker 1", dockercompose.Start(clusterName, dockerComposeYAML, "kafka1")).

// Component should recover at this point.
// 再等30秒,组件应该能恢复工作
Step("wait", flow.Sleep(30*time.Second)).
// 检查消息已经都接收到
Step("assert messages", assertMessages(consumerGroup1, consumerGroup2)).

运行网络中断测试

在收发消息的期间,中断网络,以测试组件恢复的能力

		// Simulate a network interruption.
		// This tests the components ability to handle reconnections
		// when Dapr is disconnected abnormally.
		StepAsync("steady flow of messages to publish", &task,
			sendMessagesInBackground(consumerGroup1, consumerGroup2)).
		Step("wait", flow.Sleep(5*time.Second)).
		//
		// Errors will occurring here.
		Step("interrupt network",
			network.InterruptNetwork(30*time.Second, nil, nil, "19092", "29092", "39092")).
		//
		// Component should recover at this point.
		Step("wait", flow.Sleep(30*time.Second)).
		Step("assert messages", assertMessages(consumerGroup1, consumerGroup2)).

备注: 网络中断的方式在我本地跑不起来

运行重平衡测试

验证多个consumer运行时关闭其中一个不会影响工作:

// Reset and test that all messages are received during a
// consumer rebalance.
Step("reset", flow.Reset(consumerGroup2)).
// 后台异步发送消息
StepAsync("steady flow of messages to publish", &task,
          sendMessagesInBackground(consumerGroup2)).
// 15秒之后停止 sidecar dapr-2
Step("wait", flow.Sleep(15*time.Second)).
Step("stop sidecar 2", sidecar.Stop(sidecarName2)).
// 3秒之后停止 app-2
Step("wait", flow.Sleep(3*time.Second)).
Step("stop app 2", app.Stop(appID2)).
// 等待30秒
Step("wait", flow.Sleep(30*time.Second)).
// 预期消息还是能全部收到(app-3/dapr-3还在工作)
Step("assert messages", assertMessages(consumerGroup2)).

4.3 - bindings

Dapr的bindings认证测试

4.3.1 - kafka bindings

Dapr的kafka bindings component认证测试