性能测试
- 1: 运行性能测试
- 2: 如何使用fortio实现性能测试
- 3: 性能测试案例 service invoke http 的实现
- 4: 性能测试案例 state in-momery 的实现
- 5: 性能测试案例 pubsub in-momery 的实现
1 - 运行性能测试
准备工作
基本类似 e2e 测试。
设置环境变量
首先要设置相关的环境变量。
amd64
在 amd64 机器上:
export DAPR_REGISTRY=docker.io/skyao
export DAPR_TAG=dev
export DAPR_NAMESPACE=dapr-tests
export TARGET_OS=linux
export TARGET_ARCH=amd64
export GOOS=linux
export GOARCH=amd64
export DAPR_TEST_NAMESPACE=dapr-tests
export DAPR_TEST_REGISTRY=docker.io/skyao
export DAPR_TEST_TAG=dev-linux-amd64
export DAPR_TEST_MINIKUBE_IP=192.168.100.40 # use this in IDE
export MINIKUBE_NODE_IP=192.168.100.40 # use this in make command
构建并部署dapr到k8s中
$ make create-test-namespace
$ make build-linux
$ make docker-build
$ make docker-push
$ make docker-deploy-k8s
如果之前有过部署,则需要在重新部署前删除之前的部署,有两种情况:
-
只清除 dapr 的控制平面
$ helm uninstall dapr -n dapr-tests $ make docker-deploy-k8s
-
清除所有 dapr 内容
$ helm uninstall dapr -n dapr-tests $ make delete-test-namespace # 再重复上面的构建和部署过程
类似 e2e 测试,性能测试中本有一个额外的操作,实际是安装 redis / kafka / mongodb :
$ make setup-3rd-party
helm repo add bitnami https://charts.bitnami.com/bitnami
"bitnami" already exists with the same configuration, skipping
helm repo add stable https://charts.helm.sh/stable
"stable" already exists with the same configuration, skipping
helm repo add incubator https://charts.helm.sh/incubator
"incubator" already exists with the same configuration, skipping
helm repo update
Hang tight while we grab the latest from your chart repositories...
...Successfully got an update from the "incubator" chart repository
...Successfully got an update from the "bitnami" chart repository
...Successfully got an update from the "stable" chart repository
Update Complete. ⎈Happy Helming!⎈
helm install dapr-redis bitnami/redis --wait --timeout 5m0s --namespace dapr-tests -f ./tests/config/redis_override.yaml
......
helm install dapr-kafka bitnami/kafka -f ./tests/config/kafka_override.yaml --namespace dapr-tests --timeout 10m0s
......
helm install dapr-mongodb bitnami/mongodb -f ./tests/config/mongodb_override.yaml --namespace dapr-tests --wait --timeout 5m0s
对于性能测试,没有必要,可以视情况(是否要测试 actor 相关的功能)看是否要安装 redis:
make setup-test-env-redis
dapr相关的设置
关闭遥测:
$ make setup-app-configurations
kubectl apply -f ./tests/config/dapr_observability_test_config.yaml --namespace dapr-tests
configuration.dapr.io/disable-telemetry created
configuration.dapr.io/obs-defaultmetric created
关闭mtls:
$ make setup-disable-mtls
kubectl apply -f ./tests/config/dapr_mtls_off_config.yaml --namespace dapr-tests
configuration.dapr.io/daprsystem created
准备测试用的components:
# 切记不要用这个命令
$ make setup-test-components
这个命令会将 tests/config
下的yaml文件都安装到k8s下,有些多,而且部分component文件在没有配置好外部组件时会导致 daprd 启动失败。安全起见,如果只是跑个别性能测试的test case,手工安装需要的就好了
# 对于 actor 相关的性能测试案例,需要安装 redis 并开启 statestore-actos
# 由于redis 配置中使用到了 secret,因此需要安装 kubernetes_redis_secret
# make setup-test-env-redis
$ k apply -f ./tests/config/dapr_redis_state_actorstore.yaml -n dapr-tests
$ k apply -f ./tests/config/kubernetes_redis_secret.yaml -n dapr-tests
# 对于 pubsub 的测试,只需要开启 in-memory pubsub
$ k apply -f ./tests/config/dapr_in_memory_pubsub.yaml -n dapr-tests
# 对于 state 的测试,只需要开启 in-memory state
$ k apply -f ./tests/config/dapr_in_memory_state.yaml -n dapr-tests
准备测试用的应用
$ make build-perf-app-all
$ make push-perf-app-all
如果是在开发或者修改某一个测试案例,要节约时间,不需要构建和发布所有的测试案例。只单独构建和推送某一个性能测试的应用,可以直接调用 make target,如针对 service_invocation_http 这个 test case :
$ make build-perf-app-service_invocation_http
$ make push-perf-app-service_invocation_http
执行性能测试
测试前先安装一下 jq,后面会用到:
brew install jq
=======
## 运行性能测试
# 切记不要跑 test-perf-all,由于环境变量在各个perf test case中的设置要求不同,全部一起跑会有问题。
# 在本地(无论是IDE还是终端)不要跑 test-perf-all,只能单独跑某一个 perf test case
# make test-perf-all
make test-perf-xxxxx
# 特别注意,如果没有设置性能测试输入条件相关的环境变量,直接默认跑,有一些 perf test case 是可以跑起来的
make test-perf-state_get_http
make test-perf-state_get_grpc
make test-perf-service_invoke_http
make test-perf-service_invoke_grpc
make test-perf-pubsub_publish_grpc
# 有部分 perf test 是跑不起来的,会报错
make test-perf-actor_timer. # fortio报错,HTTP响应为 405 Method Not Allow,必须用 HTTP POST
跑的时候特别注意日志文件中的这些内容:
2022/03/25 18:02:05 Installing test apps...
2022/03/25 18:02:09 Adding app {testapp 0 map[] true perf-service_invocation_http:dev-linux-amd64 docker.io/skyao 1 true true 4.0 0.1 800Mi 2500Mi 4.0 0.1 512Mi 250Mi <nil> false}
2022/03/25 18:02:09 Adding app {tester 3001 map[] true perf-tester:dev-linux-amd64 docker.io/skyao 1 true true 4.0 0.1 800Mi 2500Mi 4.0 0.1 512Mi 250Mi <nil> false}
仔细检查 app 的镜像信息,包括 tag 要求是 “dev-linux-amd64”, image registry 要求是自己设定的类似 “docker.io/skyao”,否则 pod 会无法启动。
最好先 env | grep DAPR
看一下设置的环境变量是否有生效。
本地debug
perf test 的测试案例都是用 go test 编写,原则上只要前面步骤准备好,是可以在本地 IDE 中以 debug 方式启动 perf test 的测试案例,然后进行 debug 的。
特别注意:actor 相关的 test case 要设置好性能测试输入条件的环境变量
特殊情况
彻底清理namespace
还要清理以下在 default namespace 中保存的内容:
k delete clusterrole dapr-operator-admin -n default
k delete clusterrole dashboard-reader -n default
k delete clusterrolebindings.rbac.authorization.k8s.io dapr-operator -n default
k delete clusterrolebindings.rbac.authorization.k8s.io dapr-role-tokenreview-binding -n default
k delete clusterrolebindings.rbac.authorization.k8s.io dashboard-reader-global -n default
k delete role secret-reader -n default
k delete rolebinding dapr-secret-reader -n default
k delete mutatingwebhookconfiguration dapr-sidecar-injector -n default
否则,重新安装 dapr 控制面时,如果 dapr 控制面的 namespace 发生变化,就会出报错:
make docker-deploy-k8s
Deploying docker.io/skyao/dapr:dev to the current K8S context...
helm install \
dapr --namespace=dapr-tests --wait --timeout 5m0s \
--set global.ha.enabled=false --set-string global.tag=dev-linux-amd64 \
--set-string global.registry=docker.io/skyao --set global.logAsJson=true \
--set global.daprControlPlaneOs=linux --set global.daprControlPlaneArch=amd64 \
--set dapr_placement.logLevel=debug --set dapr_sidecar_injector.sidecarImagePullPolicy=Always \
--set global.imagePullPolicy=Always --set global.imagePullSecrets= \
--set global.mtls.enabled=true \
--set dapr_placement.cluster.forceInMemoryLog=true ./charts/dapr
Error: INSTALLATION FAILED: rendered manifests contain a resource that already exists. Unable to continue with install: RoleBinding "dapr-secret-reader" in namespace "default" exists and cannot be imported into the current release: invalid ownership metadata; annotation validation error: key "meta.helm.sh/release-namespace" must equal "dapr-tests": current value is "dapr-system"
make: *** [docker-deploy-k8s] Error 1
2 - 如何使用fortio实现性能测试
fortio介绍
Dapr 采用 fortio 作为性能测试工具。
Fortio 相关内容已经转移到单独的笔记中:Learning Fortio
tester 应用
tester 镜像的生成方法
tester 应用的镜像生成由三个镜像组成:
构建 tester go app 二进制文件的镜像
FROM golang:1.17 as build_env
ARG GOARCH_ARG=amd64
WORKDIR /app
COPY app.go go.mod ./
RUN go get -d -v && GOOS=linux GOARCH=$GOARCH_ARG go build -o tester .
这个 dockerfile 会将 dapr 仓库下 tests/apps/perf/tester
目录中的 app.go 和 go.mod 文件复制到镜像中,然后执行 go get 和 go build 命令将 go 代码打包为名为 tester 的二进制可执行文件。
最终的产出物是 /app/tester
这个二进制可执行文件。
构建 fortio 二进制文件的镜像
FROM golang:1.17 as fortio_build_env
ARG GOARCH_ARG=amd64
WORKDIR /fortio
ADD "https://api.github.com/repos/fortio/fortio/branches/master" skipcache
RUN git clone https://github.com/fortio/fortio.git
RUN cd fortio && git checkout v1.16.1 && GOOS=linux GOARCH=$GOARCH_ARG go build
这个镜像是构建 fortio v1.16.1 的代码。
最终的产出物是 /fortio/fortio/fortio
这个二进制可执行文件。
构建 buster-slim 的镜像
FROM debian:buster-slim
#RUN apt update
#RUN apt install wget -y
WORKDIR /
COPY --from=build_env /app/tester /
COPY --from=fortio_build_env /fortio/fortio/fortio /usr/local/bin
CMD ["/tester"]
这个就只是复制前两个镜像的产出物了,将 /app/tester
复制到根目录,将 fortio 复制到
/usr/local/bin
目录。
tester 应用的工作原理和实现代码
tests/apps/perf/tester/app.go
的核心代码如下:
main 函数
func main() {
http.HandleFunc("/", handler)
http.HandleFunc("/test", testHandler)
log.Fatal(http.ListenAndServe(":3001", nil))
}
main 函数启动 http server,监听 3001 端口,然后注册了两个路径和对应的 handler。
简单探活 handler
这个 handler 超级简单,什么都不做,只是返回 http 200 。
func main() {
http.HandleFunc("/", handler)
......
}
func handler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
}
测试handler
testHandler 执行性能测试:
func main() {
http.HandleFunc("/test", testHandler)
......
}
func testHandler(w http.ResponseWriter, r *http.Request) {
fmt.Println("test execution request received")
// 步骤1: 从请求中读取测试相关的配置参数,这些参数是从 test case 中发出的
var testParams TestParameters
b, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(500)
w.Write([]byte(fmt.Sprintf("error reading request body: %s", err)))
return
}
// 步骤2: 解析读取的测试相关的配置参数
err = json.Unmarshal(b, &testParams)
if err != nil {
w.WriteHeader(400)
w.Write([]byte(fmt.Sprintf("error parsing test params: %s", err)))
return
}
// 步骤3: 开始执行性能测试
fmt.Println("executing test")
results, err := runTest(testParams)
if err != nil {
w.WriteHeader(500)
w.Write([]byte(fmt.Sprintf("error encountered while running test: %s", err)))
return
}
// 步骤4: 返回性能测试的结果
fmt.Println("test finished")
w.Header().Add("Content-Type", "application/json")
w.Write(results)
}
真正的性能测试
真正的性能测试是通过 exec.Command 来执行命令行,通过调用 fortio 工具来进行的,也即是说,前面的 tester 应用除了用来启动 daprd 外,tester 自身只是配合走完性能测试的流程,真正的性能测试是由 fortio 进行。
// runTest accepts a set of test parameters, runs Fortio with the configured setting and returns
// the test results in json format.
func runTest(params TestParameters) ([]byte, error) {
var args []string
// 步骤1: 根据请求参数构建不同的 fortio 执行参数
if len(params.Payload) > 0 {
args = []string{
"load", "-json", "result.json", "-content-type", "application/json", "-qps", fmt.Sprint(params.QPS), "-c", fmt.Sprint(params.ClientConnections),
"-t", params.TestDuration, "-payload", params.Payload,
}
} else {
args = []string{
"load", "-json", "result.json", "-qps", fmt.Sprint(params.QPS), "-c", fmt.Sprint(params.ClientConnections),
"-t", params.TestDuration, "-payload-size", fmt.Sprint(params.PayloadSizeKB),
}
}
if params.StdClient {
args = append(args, "-stdclient")
}
args = append(args, params.TargetEndpoint)
fmt.Printf("running test with params: %s", args)
// 步骤2: 调用 fortio 执行性能测试
cmd := exec.Command("fortio", args...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err := cmd.Run()
if err != nil {
return nil, err
}
// 步骤3: 返回性能测试执行结果
return os.ReadFile("result.json")
}
无负载的性能测试
对于 payload 大小为 0 的情况,执行的是如下的 fortio 命令:
fortio load -json result.json -qps ${QPS} -c ${ClientConnections} -t ${TestDuration} -payload-size ${PayloadSizeKB} ${TargetEndpoint}
疑问:payload 都为零了,为啥了还要设置 -payload-size ?
翻了一下相关的日志,找到对应的日志内容为:
fmt.Printf("running test with params: %s", args)
running test with params: [load -json result.json -qps 1 -c 1 -t 1m -payload-size 0 http://testapp:3000/test]
带负载的性能测试
对于 payload 大小不为 0 的情况,执行的是如下的 fortio 命令:
fortio load -json result.json -content-type application/json -qps ${QPS} -c ${ClientConnections} -t ${TestDuration} -payload ${Payload} ${TargetEndpoint}
全流程参数传递
在 perf test 的测试过程中,参数传递比较复杂(或者说比较绕),期间涉及到 perf test case 如何
title parameters transfer in load test
hide footbox
skinparam style strictuml
actor test_case as "Test Case"
participant tester as "Tester App"
box "Fortio" #LightBlue
participant fortio_load as "fortioLoad"
participant fgrpc
participant dapr as "daprResults"
end box
box "daprd"
participant grpc_api as "gRPC API"
end box
test_case -> tester : HTTP Post
note left: TestParameters
tester -> fortio_load : exec
note left: fortio load ... -grpc -dapr k1=v1,k2=v2
fortio_load -> fgrpc : RunGRPCTest()
note left: -grpc -dapr k1=v1,k2=v2
fgrpc -> dapr : RunTest()
note left: -dapr k1=v1,k2=v2
dapr -> grpc_api
dapr <-- grpc_api
fgrpc <-- dapr
fortio_load <-- fgrpc
tester <-- fortio_load
test_case <-- tester
步骤1:perf test case 获取测试参数
perf test case 在启动时,会从环境变量中获取测试参数
- DAPR_PERF_QPS: 默认1
- DAPR_PERF_CONNECTIONS: 默认1
- DAPR_TEST_DURATION: 默认 “1m”,即1分钟
- DAPR_PAYLOAD_SIZE: 默认0
- DAPR_PAYLOAD: 默认为空(字符串"")
步骤2:perf test case打包测试参数发送给tester app
perf test case 会发送一个 HTTP 请求到 tester app,其内容如下:
-
URL: http://testerAppURL/test
-
Method: POST
-
Body: 将 TestParameters 结构体系列化为 json
TestParameters 结构体的字段如下所示:
type TestParameters struct {
QPS int `json:"qps"`
ClientConnections int `json:"clientConnections"`
TargetEndpoint string `json:"targetEndpoint"`
TestDuration string `json:"testDuration"`
PayloadSizeKB int `json:"payloadSizeKB"`
Payload string `json:"payload"`
StdClient bool `json:"stdClient"`
}
在支持 gRPC + dapr 时,由于没有合适的参数可以使用,因此增加了 grpc 和 dapr 两个字段:
type TestParameters struct {
......
Grpc bool `json:"grpc"`
Dapr string `json:"dapr"`
}
步骤3:tester app解析测试参数,传递给fortio
在 tester app (tests/apps/perf/tester/app.go
) 的 testHandler() 方法中,会对传入http body进行读取和json解析,然后将 TestParameters 转为 fortio 的参数:
func testHandler(w http.ResponseWriter, r *http.Request) {
b, err := io.ReadAll(r.Body)
err = json.Unmarshal(b, &testParams)
results, err := runTest(testParams)
......
}
func runTest(params TestParameters) ([]byte, error) {
args := buildFortioArgs(params)
......
}
TestParameters 字段和 fortio 参数的对应关系:
语义 | 环境变量 | TestParameters 字段 | fortio 参数 |
---|---|---|---|
load | |||
-json result.json |
|||
-content-type application/json |
|||
QPS | DAPR_PERF_QPS | QPS | -qps |
客户端连接数 | DAPR_PERF_CONNECTIONS | ClientConnections | -c |
测试时长 | DAPR_TEST_DURATION | TestDuration | -t |
负载 | DAPR_PAYLOAD | Payload | -payload |
负载大小 | DAPR_PAYLOAD_SIZE | PayloadSizeKB | -payload-size |
StdClient | -stdclient |
||
是否是grpc测试 | Grpc | -grpc |
|
Dapr测试参数 | Dapr | -dapr |
步骤4:fortio解析 dapr flag,在发起的dapr 请求中使用
在 fortio 的执行中, load
子命令会有 fortioLoad
方法负责,检查发现有 -grpc
flag,则会转到 fgrpc.RunGRPCTest()
方法。
fortio 的 grpc 支持默认只有自带的 ping 和标准的 health,为了支持 dapr ,我们扩展了 fgrpc.RunGRPCTest()
方法。
-dapr
flag 传递的参数会被透传到 dapr 的扩展代码中, 然后解析为下面的结构:
type DaprRequestParameters struct {
capability string
target string
method string
appId string
store string
extensions map[string]string
}
这些参数将在后面 fortio 扩展代码中进行 dapr 调用时被使用到。
3 - 性能测试案例 service invoke http 的实现
运行性能测试
打开 dapr/dapr 仓库下的 .github/workflows/dapr-perf.yml
文件,找到 service_invocation_http 的性能测试输入条件:
- name: Run Perf test service_invocation_http
if: env.TEST_PREFIX != ''
env:
DAPR_PERF_QPS: 1000
DAPR_PERF_CONNECTIONS: 16
DAPR_TEST_DURATION: 1m
DAPR_PAYLOAD_SIZE: 1024
run: make test-perf-service_invocation_http
# service_invocation_http
export DAPR_PERF_QPS=1000
export DAPR_PERF_CONNECTIONS=16
export DAPR_TEST_DURATION=1m
export DAPR_PAYLOAD_SIZE=1024
unset DAPR_PAYLOAD
make test-perf-service_invocation_http
主流程详细分析
性能测试的主流程为:
func TestMain(m *testing.M) {
// 步骤1: 准备两个app:作为服务器端的 testapp 和作为客户端的 tester
testApps := []kube.AppDescription{
{
AppName: "testapp",
},
{
AppName: "tester",
}
tr = runner.NewTestRunner("serviceinvocationhttp", testApps, nil, nil)
os.Exit(tr.Start(m))
}
func TestServiceInvocationHTTPPerformance(t *testing.T) {
// 步骤4: 执行测试案例
......
}
// Start is the entry point of Dapr test runner.
func (tr *TestRunner) Start(m runnable) int {
// 步骤2: 启动测试平台
err := tr.Platform.setup()
// 可选步骤2.5: 安装组件,这个测试案例中没有
if tr.components != nil && len(tr.components) > 0 {
log.Println("Installing components...")
if err := tr.Platform.addComponents(tr.components); err != nil {
fmt.Fprintf(os.Stderr, "Failed Platform.addComponents(), %s", err.Error())
return runnerFailExitCode
}
}
// 可选步骤2.75: 安装初始化应用,这个测试案例中没有
if tr.initApps != nil && len(tr.initApps) > 0 {
log.Println("Installing init apps...")
if err := tr.Platform.addApps(tr.initApps); err != nil {
fmt.Fprintf(os.Stderr, "Failed Platform.addInitApps(), %s", err.Error())
return runnerFailExitCode
}
}
// 步骤3: 安装测试应用,这个测试案例中是前面步骤1中准备的作为服务器端的 testapp 和作为客户端的 tester
if tr.testApps != nil && len(tr.testApps) > 0 {
log.Println("Installing test apps...")
if err := tr.Platform.addApps(tr.testApps); err != nil {
fmt.Fprintf(os.Stderr, "Failed Platform.addApps(), %s", err.Error())
return runnerFailExitCode
}
}
// 步骤4: 执行测试案例
return m.Run()
}
func (tr *TestRunner) tearDown() {
// 步骤5: 执行完成后的 tearDown
// 具体为删除前面步骤3中安装的测试应用(作为服务器端的 testapp 和作为客户端的 tester)
// 如果用 ctrl + c 等方式强行中断 testcase 的执行,就会导致 teardown 没有执行,
// testapp/tester 两个应用就不会从k8s中删除
tr.Platform.tearDown()
}
测试应用准备
对应到上面主流程中的 “步骤1: 准备两个app”, 这里需要准备作为服务器端的 testapp 和作为客户端的 tester:
testApps := []kube.AppDescription{
{
AppName: "testapp", // 作为服务器端的 testapp
DaprEnabled: true,
ImageName: "perf-service_invocation_http",
IngressEnabled: true,
......
},
{
AppName: "tester", // 作为客户端的 tester
DaprEnabled: true,
ImageName: "perf-tester",
IngressEnabled: true,
AppPort: 3001,
......
},
}
特别注意:IngressEnabled: true
的设置。
测试应用安装的流程
对应到上面主流程中的 “步骤3: 安装测试应用”。
对应代码在 tests/runner/kube_testplatform.go
中的 addApps() 方法:
// addApps adds test apps to disposable App Resource queues.
func (c *KubeTestPlatform) addApps(apps []kube.AppDescription) error {
for _, app := range apps {
log.Printf("Adding app %v", app)
c.AppResources.Add(kube.NewAppManager(c.KubeClient, getNamespaceOrDefault(app.Namespace), app))
}
// installApps installs the apps in AppResource queue sequentially
log.Printf("Installing apps ...")
if err := c.AppResources.setup(); err != nil {
return err
}
log.Printf("Apps are installed.")
}
添加 app 这里对应的日志为:
2022/03/27 11:48:39 Adding app {testapp 0 map[] true perf-service_invocation_http:dev-linux-amd64 docker.io/skyao 1 true true 4.0 0.1 800Mi 2500Mi 4.0 0.1 512Mi 250Mi <nil> false}
2022/03/27 11:48:39 Adding app {tester 3001 map[] true perf-tester:dev-linux-amd64 docker.io/skyao 1 true true 4.0 0.1 800Mi 2500Mi 4.0 0.1 512Mi 250Mi <nil> false}
遇到问题时建议特别小心的检查这行日志,确认各个参数(如 image 的 name, tag,registry)等是否OK。
安装测试应用
详细看安装测试应用的代码和日志:
log.Printf("Deploying app %v ...", m.app.AppName)
// Deploy app and wait until deployment is done
if _, err := m.Deploy(); err != nil {
return err
}
// Wait until app is deployed completely
if _, err := m.WaitUntilDeploymentState(m.IsDeploymentDone); err != nil {
return err
}
if m.logPrefix != "" {
if err := m.StreamContainerLogs(); err != nil {
log.Printf("Failed to retrieve container logs for %s. Error was: %s", m.app.AppName, err)
}
}
log.Printf("App %v has been deployed.", m.app.AppName)
对应日志为:
2022/03/27 11:48:41 Deploying app testapp ...
2022/03/27 11:48:48 App testapp has been deployed.
2022/03/27 11:48:50 Deploying app tester ...
2022/03/27 11:48:57 App tester has been deployed.
从日志上看,在镜像本地已经有缓存的情况下,启动时间也就大概7秒左右。
// PollInterval is how frequently e2e tests will poll for updates.
PollInterval = 1 * time.Second
// PollTimeout is how long e2e tests will wait for resource updates when polling.
PollTimeout = 10 * time.Minute
// WaitUntilDeploymentState waits until isState returns true.
func (m *AppManager) WaitUntilDeploymentState(isState func(*appsv1.Deployment, error) bool) (*appsv1.Deployment, error) {
deploymentsClient := m.client.Deployments(m.namespace)
var lastDeployment *appsv1.Deployment
waitErr := wait.PollImmediate(PollInterval, PollTimeout, func() (bool, error) {
var err error
lastDeployment, err = deploymentsClient.Get(context.TODO(), m.app.AppName, metav1.GetOptions{})
done := isState(lastDeployment, err)
if !done && err != nil {
return true, err
}
return done, nil
})
if waitErr != nil {
// get deployment's Pods detail status info
......
}
return lastDeployment, nil
}
从代码实现看,等待应用部署的时间长达10分钟,所以正常情况下还是能等到应用启动完成的,除非应用的部署出问题了,比如镜像信息不对导致无法下载镜像。
检查 sidecar
检查 sidecar 是否启动成功的代码:
// maxSideCarDetectionRetries is the maximum number of retries to detect Dapr sidecar.
maxSideCarDetectionRetries = 3
log.Printf("Validating sidecar for app %v ....", m.app.AppName)
for i := 0; i <= maxSideCarDetectionRetries; i++ {
// Validate daprd side car is injected
if err := m.ValidateSidecar(); err != nil {
if i == maxSideCarDetectionRetries {
return err
}
log.Printf("Did not find sidecar for app %v error %s, retrying ....", m.app.AppName, err)
time.Sleep(10 * time.Second)
continue
}
break
}
log.Printf("Sidecar for app %v has been validated.", m.app.AppName)
对应的日志为:
2022/03/27 11:48:48 Validating sidecar for app testapp ....
2022/03/27 11:48:48 Streaming Kubernetes logs to ./container_logs/testapp-85d8d9db89-llmrr.daprd.log
2022/03/27 11:48:48 Streaming Kubernetes logs to ./container_logs/testapp-85d8d9db89-llmrr.testapp.log
2022/03/27 11:48:49 Sidecar for app testapp has been validated.
2022/03/27 11:48:57 Validating sidecar for app tester ....
2022/03/27 11:48:57 Streaming Kubernetes logs to ./container_logs/tester-7944b6bb68-wzfj2.tester.log
2022/03/27 11:48:57 Streaming Kubernetes logs to ./container_logs/tester-7944b6bb68-wzfj2.daprd.log
2022/03/27 11:48:58 Sidecar for app tester has been validated.
默认重试3次,每次间隔时间为 10 秒,所以执行4次 (1次 + 3次重试)总共40 秒之后,如果 sidecar 还没能启动起来,就会报错。
由于 sidecar 几乎是和应用同时启动,所以在应用启动完成后在检查 sidecar 通常会很快完成,由于应用自身启动时间就高达7秒。
创建 ingress
创建 ingress 的代码实现:
// Create Ingress endpoint
log.Printf("Creating ingress for app %v ....", m.app.AppName)
if _, err := m.CreateIngressService(); err != nil {
return err
}
log.Printf("Ingress for app %v has been created.", m.app.AppName)
从日志上看,创建 ingress 的速度很快,不到1秒:
2022/03/27 10:41:09 Creating ingress for app testapp ....
2022/03/27 10:41:10 Ingress for app testapp has been created.
2022/03/27 10:41:19 Creating ingress for app tester ....
2022/03/27 10:41:19 Ingress for app tester has been created.
但特别注意:这里的所谓created,应该只是将命令发给了k8s,也就是这里是异步返回。并不是 ingress 立即可用。
创建端口转发
创建 ingress 的代码实现:
log.Printf("Creating pod port forwarder for app %v ....", m.app.AppName)
m.forwarder = NewPodPortForwarder(m.client, m.namespace)
log.Printf("Pod port forwarder for app %v has been created.", m.app.AppName)
从日志上看,创建端口转发的速度也非常快,不到1秒:
2022/03/27 10:41:10 Creating pod port forwarder for app testapp ....
2022/03/27 10:41:10 Pod port forwarder for app testapp has been created.
2022/03/27 10:41:19 Creating pod port forwarder for app tester ....
2022/03/27 10:41:19 Pod port forwarder for app tester has been created.
但特别注意:这里的所谓created,应该只是将命令发给了k8s,也就是这里是异步返回。并不是端口转发立即可用。
完整的日志分析
下面是一个完整的日志,安装 testapp 和 tester 两个应用,耗时 19秒:
2022/03/27 11:48:39 Running setup...
2022/03/27 11:48:39 Installing test apps...
2022/03/27 11:48:39 Adding app {testapp 0 map[] true perf-service_invocation_http:dev-linux-amd64 docker.io/skyao 1 true true 4.0 0.1 800Mi 2500Mi 4.0 0.1 512Mi 250Mi <nil> false}
2022/03/27 11:48:39 Adding app {tester 3001 map[] true perf-tester:dev-linux-amd64 docker.io/skyao 1 true true 4.0 0.1 800Mi 2500Mi 4.0 0.1 512Mi 250Mi <nil> false}
2022/03/27 11:48:39 Installing apps ...
2022/03/27 11:48:41 Deploying app testapp ...
2022/03/27 11:48:48 App testapp has been deployed.
2022/03/27 11:48:48 Validating sidecar for app testapp ....
2022/03/27 11:48:48 Streaming Kubernetes logs to ./container_logs/testapp-85d8d9db89-llmrr.daprd.log
2022/03/27 11:48:48 Streaming Kubernetes logs to ./container_logs/testapp-85d8d9db89-llmrr.testapp.log
2022/03/27 11:48:49 Sidecar for app testapp has been validated.
2022/03/27 11:48:49 Creating ingress for app testapp ....
2022/03/27 11:48:49 Ingress for app testapp has been created.
2022/03/27 11:48:49 Creating pod port forwarder for app testapp ....
2022/03/27 11:48:49 Pod port forwarder for app testapp has been created.
2022/03/27 11:48:50 Deploying app tester ...
2022/03/27 11:48:57 App tester has been deployed.
2022/03/27 11:48:57 Validating sidecar for app tester ....
2022/03/27 11:48:57 Streaming Kubernetes logs to ./container_logs/tester-7944b6bb68-wzfj2.tester.log
2022/03/27 11:48:57 Streaming Kubernetes logs to ./container_logs/tester-7944b6bb68-wzfj2.daprd.log
2022/03/27 11:48:58 Sidecar for app tester has been validated.
2022/03/27 11:48:58 Creating ingress for app tester ....
2022/03/27 11:48:58 Ingress for app tester has been created.
2022/03/27 11:48:58 Creating pod port forwarder for app tester ....
2022/03/27 11:48:58 Pod port forwarder for app tester has been created.
2022/03/27 11:48:58 Apps are installed.
2022/03/27 11:48:58 Running tests...
这19秒总时长中比较耗时的操作主要是:
- Installing apps: 2 秒
- 部署 testapp: 7秒
- 部署 tester: 7秒
- 验证sidecar/创建ingress和端口转发:1秒 (两个app x 2)
等待测试应用就绪的流程
等待流程就绪
由于存在一个安装测试应用的流程, 而 k8s 部署/启动完成测试应用是需要一段时间的,因此,就需要一个机制能等待并检测到测试应用是否安装完成,这样才可以开启真正的测试即开始执行 testcase。
以 TestServiceInvocationHTTPPerformance 为例,删除测试执行的细节代码:
const numHealthChecks = 60 // Number of times to check for endpoint health per app.
func TestServiceInvocationHTTPPerformance(t *testing.T) {
p := perf.Params()
t.Logf("running service invocation http test with params: qps=%v, connections=%v, duration=%s, payload size=%v, payload=%v", p.QPS, p.ClientConnections, p.TestDuration, p.PayloadSizeKB, p.Payload) // line 79
// Get the ingress external url of test app
testAppURL := tr.Platform.AcquireAppExternalURL("testapp")
require.NotEmpty(t, testAppURL, "test app external URL must not be empty")
// Check if test app endpoint is available
t.Logf("test app url: %s", testAppURL+"/test") // line 86
_, err := utils.HTTPGetNTimes(testAppURL+"/test", numHealthChecks) // 在这里等待测试应用 testapp 就绪!
require.NoError(t, err)
// Get the ingress external url of tester app
testerAppURL := tr.Platform.AcquireAppExternalURL("tester")
require.NotEmpty(t, testerAppURL, "tester app external URL must not be empty")
// Check if tester app endpoint is available
t.Logf("teter app url: %s", testerAppURL) // line 95 // 在这里等待测试应用 tester 就绪!
_, err = utils.HTTPGetNTimes(testerAppURL, numHealthChecks)
require.NoError(t, err)
// Perform baseline test
......
// Perform dapr test
......
}
从执行日志中分别提取对应上面三处的日志内容:
service_invocation_http_test.go:79: running service invocation http test with params: qps=1, connections=1, duration=1m, payload size=0, payload=
2022/03/27 11:48:58 Waiting until service ingress is ready for testapp...
2022/03/27 11:49:03 Service ingress for testapp is ready...
service_invocation_http_test.go:86: test app url: 20.84.11.6:3000/test
2022/03/27 11:49:04 Waiting until service ingress is ready for tester...
2022/03/27 11:49:33 Service ingress for tester is ready...
service_invocation_http_test.go:95: teter app url: 20.85.250.98:3000
这两处就是在等待测试应用 testapp 和 tester 启动完成。检查的方式就是访问这两个应用的 public url (也就是 health check的地址),如果能访问(可连接,返回 htltp 200)则说明应用启动完成。如果失败,则继续等待。
这里面实际是有两个等待:
- 等待测试应用的 ingress 就绪
- 等待测试应用自身就绪
等待测试应用的 ingress 就绪
因为 pod ip 不能直接在 k8s 下访问,因此需要通过 ingress 和 端口转发。前面安装测试应用的流程中作了 ingress 的创建和端口转发的创建,但生效是需要时间的。在 AcquireAppExternalURL() 方法中会等待 ingress 就绪:
testAppURL := tr.Platform.AcquireAppExternalURL("testapp")
testerAppURL := tr.Platform.AcquireAppExternalURL("tester")
k8s下,实现的代码在 tests/runner/kube_testplatform.go
中:
// AcquireAppExternalURL returns the external url for 'name'.
func (c *KubeTestPlatform) AcquireAppExternalURL(name string) string {
app := c.AppResources.FindActiveResource(name)
return app.(*kube.AppManager).AcquireExternalURL()
}
// AcquireExternalURL gets external ingress endpoint from service when it is ready.
func (m *AppManager) AcquireExternalURL() string {
log.Printf("Waiting until service ingress is ready for %s...\n", m.app.AppName)
svc, err := m.WaitUntilServiceState(m.IsServiceIngressReady) // 等待直到 ingress reday
if err != nil {
return ""
}
log.Printf("Service ingress for %s is ready...\n", m.app.AppName)
return m.AcquireExternalURLFromService(svc)
}
具体的等待实现在 WaitUntilServiceState() 方法中:
// PollInterval is how frequently e2e tests will poll for updates.
PollInterval = 1 * time.Second
// PollTimeout is how long e2e tests will wait for resource updates when polling.
PollTimeout = 10 * time.Minute
// WaitUntilServiceState waits until isState returns true.
func (m *AppManager) WaitUntilServiceState(isState func(*apiv1.Service, error) bool) (*apiv1.Service, error) {
serviceClient := m.client.Services(m.namespace)
var lastService *apiv1.Service
waitErr := wait.PollImmediate(PollInterval, PollTimeout, func() (bool, error) {
var err error
lastService, err = serviceClient.Get(context.TODO(), m.app.AppName, metav1.GetOptions{})
done := isState(lastService, err)
if !done && err != nil {
return true, err
}
return done, nil
})
......
return lastService, nil
}
每隔1秒,时长 10 分钟,这个时间长度有点离谱。之前遇到过 ingress 无法访问的案例,就在这里等待长达10 分钟。
但如果能正常工作,只是启动速度慢,那这里的10分钟怎么也够 ingress 生效的。
等待测试应用自身就绪
实现代码如下:
const numHealthChecks = 60 // Number of times to check for endpoint health per app.
// HTTPGetNTimes calls the url n times and returns the first success or last error.
func HTTPGetNTimes(url string, n int) ([]byte, error) {
var res []byte
var err error
for i := n - 1; i >= 0; i-- {
res, err = HTTPGet(url)
if i == 0 {
break
}
if err != nil {
time.Sleep(time.Second)
} else {
return res, nil
}
}
return res, err
}
每秒检查一次,重试 60 次。60 秒之后只要启动成功就可以。
备注:这里的检查顺序,先检查 testapp,再检查 tester app,所以如果两者都启动的慢的话,顺序偏后的 tester app 有更多的启动时间。
bug:总是卡在等待测试应用就绪上
测试中发现,测试案例总是卡在等待测试应用就绪上,但奇怪的是,从打印日志上看,上面的两个等待过程中,第一个等待 ingress 达到 ready 状态总是通过,然后等待测试应用自身就绪(也就是通过 http://52.226.222.31:3000/test 地址访问)就总是不能成功:
=== Failed
=== FAIL: tests/perf/service_invocation_http TestServiceInvocationHTTPPerformance (248.07s)
service_invocation_http_test.go:79: running service invocation http test with params: qps=1, connections=1, duration=1m, payload size=0, payload=
2022/03/25 21:49:17 Waiting until service ingress is ready for testapp...
2022/03/25 21:49:20 Service ingress for testapp is ready...
service_invocation_http_test.go:86: test app url: 52.226.222.31:3000/test
service_invocation_http_test.go:88:
Error Trace: service_invocation_http_test.go:88
Error: Received unexpected error:
Get "http://52.226.222.31:3000/test": EOF
Test: TestServiceInvocationHTTPPerformance
翻了一下 tests/apps/perf/service_invocation_http/app.go
的代码,这是服务器端appt的实现,超级简单:
func handler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
}
func main() {
http.HandleFunc("/test", handler)
log.Fatal(http.ListenAndServe(":3000", nil))
}
从 k get pods -n dapr-tests
命令的输出看,testapp 的状态很早就是 running了。这个简单的应用不存在 60 秒还启动不起来的情况。但奇怪的是,有时这个问题又不存在,能正常的访问。而且一旦正常就会一直都正常,一旦不正常就一直不正常。
经过反复测试排查发现:在使用 azure 部署 k8s 时,pod 的外部访问地址,必须在连接公司 VPN (GlobalProtect)时才能正常访问,如果 VPN 未能开启,则会报错 Empty reply from server ,一直卡在这里直到 60 秒超时。
# 断开vpn
$ curl -i 20.81.110.42:3000/test
curl: (52) Empty reply from server
# 连接vpn
$ curl -i 20.81.110.42:3000/test
HTTP/1.1 200 OK
Date: Sun, 27 Mar 2022 07:10:02 GMT
Content-Length: 0
# 再次断开vpn
$ curl -i 20.81.110.42:3000/test
curl: (52) Empty reply from server
# 再次连接vpn
$ curl -i 20.81.110.42:3000/test
HTTP/1.1 200 OK
Date: Sun, 27 Mar 2022 07:12:05 GMT
Content-Length: 0
排除这个问题之后,性能测试的案例就可以正常
备注:我是在我本地机器 macbook 上跑测试案例的,案例和 azure 上的k8s集群通讯一直正常,但是就是有时可以访问 app 的external url,有时不能。没想到是这个原因
4 - 性能测试案例 state in-momery 的实现
测试逻辑
完整的端到端的 state 压力测试应该是这样的,以 redis 为例:
title state load test with redis store
hide footbox
skinparam style strictuml
actor test_case as "Test Case"
participant tester as "Tester"
participant fortio as "Fortio"
box "daprd" #LightBlue
participant grpc_api as "gRPC API"
participant redis_component as "Redis State Component"
end box
database redis_server as "Redis server"
test_case -> tester : baseline test
note left: baseline test
tester -> fortio : exec
fortio -> redis_server: redis native protocol
fortio <-- redis_server
tester <-- fortio
test_case <-- tester
|||
|||
|||
test_case -> tester : dapr test
note left: dapr test
tester -> fortio : exec
fortio -> grpc_api : gRPC
grpc_api -> redis_component
redis_component -> redis_server: redis native protocol
redis_component <-- redis_server
grpc_api <-- redis_component
fortio <-- grpc_api
tester <-- fortio
test_case <-- tester
但目前 dapr 仓库中的 perf test 的测试目标都是 dapr runtime,也就是不包括 dapr sdk 和 dapr components,因此,需要一个 in-memory 的 state 组件来进行压力测试以排除 redis 等外部组件的干扰,流程大体是这样:
title state load test with in-memory store
hide footbox
skinparam style strictuml
actor test_case as "Test Case"
participant tester as "Tester"
participant fortio as "Fortio"
box "daprd" #LightBlue
participant grpc_api as "gRPC API"
participant in_momory_component as "In-Memory Component"
end box
test_case -> tester : dapr test
note left: dapr test
tester -> fortio : exec
fortio -> grpc_api : gRPC
grpc_api -> in_momory_component
in_momory_component -> in_momory_component: in-memory operations
grpc_api <-- in_momory_component
fortio <-- grpc_api
tester <-- fortio
test_case <-- tester
in-memory 的 state 组件的性能消耗可以视为0,因此将访问 redis 的远程开销在 baseline test 和 dapr test 中同时去除之后,得到的新的 baseline test 和 dapr test 如下图:
title state load test with in-memory store
hide footbox
skinparam style strictuml
actor test_case as "Test Case"
participant tester as "Tester"
participant fortio as "Fortio"
box "daprd" #LightBlue
participant grpc_api as "gRPC API"
participant in_momory_component as "In-Memory Component"
end box
test_case -> tester : baseline test
note left: baseline test
tester -> fortio : exec
fortio -> fortio : do nothing
tester <-- fortio
test_case <-- tester
|||
|||
|||
test_case -> tester : dapr test
note left: dapr test
tester -> fortio : exec
fortio -> grpc_api : gRPC
grpc_api -> in_momory_component
in_momory_component -> in_momory_component: in-memory operations
grpc_api <-- in_momory_component
fortio <-- grpc_api
tester <-- fortio
test_case <-- tester
fortio命令
对于 baseline test 中的 no-op,fortio 的命令为:
./fortio load -json result.json -qps 1 -c 1 -t 1m -payload-size 0 -grpc -dapr capability=state,target=noop http://localhost:50001/
对于 dapr test 中的 state get 请求,fortio 的命令为:
./fortio load -json result.json -qps 1 -c 1 -t 1m -payload-size 0 -grpc --dapr capability=state,target=dapr,method=get,store=inmemorystate,key=abc123 http://127.0.0.1:50001
perf test case 的请求
5 - 性能测试案例 pubsub in-momery 的实现
测试逻辑
和 state in-momery 类似。
fortio命令
对于 baseline test 中的 no-op,fortio 的命令为:
./fortio load -json result.json -qps 1 -c 1 -t 1m -payload-size 0 -grpc -dapr capability=pubsub,target=noop http://localhost:50001/
对于 dapr test 中的 state get 请求,fortio 的命令为:
./fortio load -json result.json -qps 1 -c 1 -t 1m -payload-size 100 -grpc --dapr capability=pubsub,target=dapr,method=publish,store=inmemorypubsub,topic=topic123,contenttype=text/plain http://127.0.0.1:50001