这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

Dapr学习笔记

1 - 介绍

Dapr的介绍,以及Dapr的资料收集

1.1 - Dapr介绍

Dapr介绍

Dapr是什么?

Dapr 是 Distributed Application Runtime (分布式应用运行时)的缩写。

Dapr github首页的介绍是:

Dapr is a portable, event-driven, runtime for building distributed applications across cloud and edge.

Dapr是一种可移植的,事件驱动的运行时,用于构建跨云和边缘的分布式应用。

号称可以:

Any language, any framework, anywhere

详细介绍是:

Dapr是一种可移植的,serverless的,事件驱动的运行时,它使开发人员可以轻松构建弹性,无状态和有状态微服务,这些服务运行在云和边缘上,并包含多种语言和开发框架。

Dapr 整理了构建微服务应用为开放,独立的构建块的最佳实践,使您能够使用自己选择的语言和框架来构建可移植的应用程序。 每个构建块都是独立的,您可以在应用中使用其中的一个或多个。

注意:Dapr目前正处于社区开发中的 Alpha 阶段。 在其1.0稳定版本发布之前,不要将 Dapr 用于生产工作负载。

下图是 Dapr 的概念模型:

以下内容来自:https://github.com/dapr/dapr

目标

  • 使开发人员能够使用任何语言或框架来编写分布式应用
  • 通过提供最佳实践构建模块来解决开发人员构建微服务应用时面临的难题
  • 社区驱动,开放,供应商无关
  • 获得新的贡献者
  • 通过开放的API提供一致性和可移植性
  • 跨云和边缘,与平台无关
  • 拥抱可扩展性并提供可插入组件,而无需供应商锁定
  • 通过高性能和轻量级实现物联网(IoT)和边缘场景
  • 可以从现有代码中逐步采用,而没有运行时依赖

工作方式

Dapr向每个计算单元注入了一个Sidecar容器/进程。Sidecar与事件触发器进行交互,并通过标准HTTP或gRPC协议与计算单元进行通信。这使Dapr能够支持所有现有和将来的编程语言,而无需您导入框架或库。

Dapr通过标准的HTTP verbs 或gRPC interface 提供内置的状态管理,可靠消息传递(至少一次传递),触发器和绑定。这使您可以遵循相同的编程范例编写无状态,有状态和类似于actor的服务。您可以自由选择一致性模型,线程模型和消息传递模式。

Dapr在Kubernetes上原生运行,也可以作为机器上的独立二进制文件,在IoT设备上运行,也可以作为容器注入到任何系统中,无论是在云端还是在本地。

Dapr使用可插拔状态存储和消息总线(例如Redis)以及gRPC来提供广泛的通信方法,包括使用gRPC的直接 dapr-to-dapr 通讯和具有保证传递和至少一次语义的异步Pub-Sub。

为什么用 Dapr?

编写高性能,可伸缩和可靠的分布式应用很困难。 Dapr带给您成熟的模式和实践。 它将事件驱动和 actor 的语义统一到一个简单而一致的编程模型中。 它支持所有编程语言,没有框架锁定。 您不会接触到低级原语,例如线程,并发控制,分区和伸缩。 相反,您可以通过使用所选的熟悉的Web框架实现简单的Web服务器来编写代码。

Dapr在线程和状态一致性模型方面很灵活。 如果愿意,可以利用多线程,还可以在不同的一致性模型中进行选择。 这种灵活性使得无需人为约束即可实施高级方案。 您可能还选择利用其他Actor框架中熟悉的单线程调用。 Dapr是独一无二的,因为您可以在这些模型之间无缝转换而无需重写代码。

特性

  • 事件驱动的Pub-Sub系统,具有可插拔提供商和至少一次的语义
  • 输入和输出绑定,使用可插拔提供商
  • 具有可插拔数据存储的状态管理
  • 一致的服务到服务发现和调用
  • 选择加入状态模型:强大/最终的一致性,首次写入/最后写入获胜
  • 跨平台虚拟 actor
  • 密钥管理,从安全密钥库中提取密钥。
  • 限速
  • 内置可观测性支持
  • 使用专用的Operator和CRD在Kubernetes上原生运行
  • 通过HTTP和gRPC支持所有编程语言
  • 来自Azure,AWS,GCP的多云,开放式组件(绑定,发布-订阅,状态)
  • 在任何地方运行,无论是进程还是容器化
  • 轻量级(58MB二进制,4MB物理内存)
  • 作为Sidecar运行-无需特殊的SDK或类库
  • 专用的CLI-开发人员友好的体验,易于调试
  • Java,.NET Core,Go,Javascript,Python,Rust和C ++的客户端

1.2 - 概述

Dapr的概述

1.2.1 - Dapr的构建

Dapr的构建

build

在项目根目录下执行 :

$ make build
CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build  -ldflags="-X github.com/dapr/dapr/pkg/version.commit=v0.8.0-rc.2-96-g79a1f14 -X github.com/dapr/dapr/pkg/version.version=edge -s -w" -o ./dist/darwin_amd64/release/daprd ./cmd/daprd/main.go;
CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build  -ldflags="-X github.com/dapr/dapr/pkg/version.commit=v0.8.0-rc.2-96-g79a1f14 -X github.com/dapr/dapr/pkg/version.version=edge -s -w" -o ./dist/darwin_amd64/release/placement ./cmd/placement/main.go;
CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build  -ldflags="-X github.com/dapr/dapr/pkg/version.commit=v0.8.0-rc.2-96-g79a1f14 -X github.com/dapr/dapr/pkg/version.version=edge -s -w" -o ./dist/darwin_amd64/release/operator ./cmd/operator/main.go;
CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build  -ldflags="-X github.com/dapr/dapr/pkg/version.commit=v0.8.0-rc.2-96-g79a1f14 -X github.com/dapr/dapr/pkg/version.version=edge -s -w" -o ./dist/darwin_amd64/release/injector ./cmd/injector/main.go;
CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build  -ldflags="-X github.com/dapr/dapr/pkg/version.commit=v0.8.0-rc.2-96-g79a1f14 -X github.com/dapr/dapr/pkg/version.version=edge -s -w" -o ./dist/darwin_amd64/release/sentry ./cmd/sentry/main.go;

构建完成之后得到的文件:

$ ls -lh ./dist/darwin_amd64/release/
total 429128
-rwxr-xr-x  1 aoxiaojian  staff    89M Aug 13 17:19 daprd
-rwxr-xr-x  1 aoxiaojian  staff    33M Aug 13 17:20 injector
-rwxr-xr-x  1 aoxiaojian  staff    35M Aug 13 17:20 operator
-rwxr-xr-x  1 aoxiaojian  staff    12M Aug 13 17:19 placement
-rwxr-xr-x  1 aoxiaojian  staff    33M Aug 13 17:20 sentry

1.2.2 - Dapr的命令行参数

Dapr的命令行参数
name default value 可选值 说明
mode standalone standalone / kubernetes Runtime mode for Dapr
dapr-http-port 3500 HTTP port for Dapr API to listen on
dapr-grpc-port 50001 gRPC port for the Dapr API to listen on
dapr-internal-grpc-port "" gRPC port for the Dapr Internal API to listen on
如果不指定,则dapr会自动获取一个随机可用的空闲端口
app-port "" The port the application is listening on
如果不设置,则不能建立dapr和应用之间的 app channel
也就意味着dapr 无法发送请求给应用了
同时dapr也无法从应用读取配置: http://localhost:<app_port>/dapr/config
注意:app的地址在代码中写死 127.0.0.1
profile-port 7777 The port for the profile server
app-protocol http http / grpc Protocol for the application: grpc or http
components-path "" Path for components directory. If empty, components will not be loaded. Self-hosted mode only
仅在dapr mode为standalone时有效
config "" Path to config file, or name of a configuration object
被称为 global configuration
如果是kubernetes mode,读取k8s的配置
如果是standalone mode,读取配置文件
如果没有配置,则装载默认配置
app-id "" A unique ID for Dapr. Used for Service Discovery and state
control-plane-address "" Address for a Dapr control plane
仅在dapr mode为kubernetes时有效
sentry-address "" Address for the Sentry CA service
placement-host-address "" Address for the Dapr placement service
allowed-origins * Allowed HTTP origins
enable-profiling false True / false Enable profiling
version false True / false Prints the runtime version (然后dapr就会退出)
app-max-concurrency -1 Controls the concurrency level when forwarding requests to user code
enable-mtls false True / false Enables automatic mTLS for daprd to daprd communication channels

1.0 之后被弃用的flag:

name 说明
placement-address 改为 placement-host-address
如果同时设置,以 placement-host-address 为准
max-concurrency 改为 app-max-concurrency
如果同时设置,以 app-max-concurrency 为准
protocol 改为 app-protocol
如果同时设置,以 app-protocol 为准

-app-id hellogrpc -app-port 3000 -protocol grpc -dapr-http-port 3005 -dapr-grpc-port 52000 -placement-address localhost:50005 -components-path components

1.3 - Quickstart

Dapr的Quickstarts

从空白的操作系统开始,运行 dapr 的 quickstarts 。

1.3.1 - Quickstart的准备工作

运行 dapr 的 Quickstart 的准备工作

1.3.1.1 - 安装docker

quickstart 是运行在 docker 上的

安装 docker

参考: https://skyao.io/learning-docker/docs/installation.html

ubuntu 20.04

参考: https://skyao.io/learning-docker/docs/installation/ubuntu.html

添加 GPG key:

sudo install -m 0755 -d /etc/apt/keyrings
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /etc/apt/keyrings/docker.gpg
sudo chmod a+r /etc/apt/keyrings/docker.gpg

准备用于 apt 的仓库:

echo \
  "deb [arch="$(dpkg --print-architecture)" signed-by=/etc/apt/keyrings/docker.gpg] https://download.docker.com/linux/ubuntu \
  "$(. /etc/os-release && echo "$VERSION_CODENAME")" stable" | \
  sudo tee /etc/apt/sources.list.d/docker.list > /dev/null

执行 apt update :

sudo apt-get update

安装 20.10.21 版本:

VERSION_STRING=5:20.10.21~3-0~ubuntu-focal
sudo apt-get install docker-ce=$VERSION_STRING docker-ce-cli=$VERSION_STRING containerd.io docker-buildx-plugin docker-compose-plugin

为了以非 root 用户使用 docker:

sudo usermod -aG docker sky

重新登录或者重启。

1.3.1.2 - 安装pythod

为运行基于 pythod 的 quickstart 做准备

安装 pythod

docker 的 quickstart 要求 quickstart 3.7 版本及以上

ubuntu 20.04

默认自带 pythod 3.8, 可以直接使用:

$ which python3
/usr/bin/python3

但奇怪的是默认不自带pip3:

$ pip3 install -r requirements.txt

zsh: command not found: pip3

因此需要手工安装 python3-pip :

sudo apt install -y python3-pip

1.3.1.3 - 安装.net

为运行基于 .net 的 quickstart 做准备

安装 .net

docker 的 quickstart 对 .net 的要求是

实际测试不能用 .NET 7, 只能用 .NET 6

ubuntu 20.04

参考: https://learn.microsoft.com/en-us/dotnet/core/install/linux-ubuntu-2004

添加仓库:

wget https://packages.microsoft.com/config/ubuntu/20.04/packages-microsoft-prod.deb -O packages-microsoft-prod.deb
sudo dpkg -i packages-microsoft-prod.deb
rm packages-microsoft-prod.deb
sudo apt-get update

安装 dotnet-sdk-6.0 :

sudo apt-get install -y dotnet-sdk-6.0

安装 aspnetcore-runtime-6.0:

sudo apt-get install -y aspnetcore-runtime-6.0

1.3.1.4 - 安装jdk

为运行基于 java 的 quickstart 做准备

要求

docker 的 quickstart 对 java 的要求是 jdk 11

为了方便管理不同版本的 jdk, 推荐使用 sdkman

安装 sdkman

参考: https://skyao.io/learning-ubuntu-server/docs/development/common/sdkman.html

ubuntu 20.04

sudo apt install unzip zip
curl -s "https://get.sdkman.io" | bash

安装 jdk 11

ubuntu 20.04

执行

sdk list java

可以看到各种可选择的 jdk ,选择 11.0.20-zulu

sdk install java 11.0.20-zulu

安装 maven

ubuntu20.04

sudo apt install maven

1.3.2 - 安装 dapr cli

安装Dapr并进行初始化

安装 dapr CLI

参考: https://docs.dapr.io/getting-started/install-dapr-cli/

ubuntu 20.04

wget -q https://raw.githubusercontent.com/dapr/cli/master/install/install.sh -O - | /bin/bash

输出为:

Getting the latest Dapr CLI...
Your system is linux_amd64
Installing Dapr CLI...

Installing v1.11.0 Dapr CLI...
Downloading https://github.com/dapr/cli/releases/download/v1.11.0/dapr_linux_amd64.tar.gz ...
dapr installed into /usr/local/bin successfully.
CLI version: 1.11.0 
Runtime version: 1.11.3

To get started with Dapr, please visit https://docs.dapr.io/getting-started/

安装后检查:

dapr -h

初始化 dapr

参考: https://docs.dapr.io/getting-started/install-dapr-selfhost/

ubuntu 20.04

dapr init

输出为:

⌛  Making the jump to hyperspace...
ℹ️  Container images will be pulled from Docker Hub
ℹ️  Installing runtime version 1.11.3
←  Downloading binaries and setting up components...
Dapr runtime installed to /home/sky/.dapr/bin, you may run the following to add it to your path if you want to run daprd directly:
    export PATH=$PATH:/home/sky/.dapr/bin
✅  Downloading binaries and setting up components...
✅  Downloaded binaries and completed components set up.
ℹ️  daprd binary has been installed to /home/sky/.dapr/bin.
ℹ️  dapr_placement container is running.
ℹ️  dapr_redis container is running.
ℹ️  dapr_zipkin container is running.
ℹ️  Use `docker ps` to check running containers.
✅  Success! Dapr is up and running. To get started, go here: https://aka.ms/dapr-getting-started

此时执行:

docker ps

可以看到当前和 dapr 相关的几个容器正在运行:


CONTAINER ID   IMAGE                COMMAND                  CREATED              STATUS                   PORTS                                                 NAMES
bf674f47c1b9   daprio/dapr:1.11.3   "./placement"            About a minute ago   Up About a minute        0.0.0.0:50005->50005/tcp, :::50005->50005/tcp         dapr_placement
b26edce4fd09   openzipkin/zipkin    "start-zipkin"           6 hours ago          Up 2 minutes (healthy)   9410/tcp, 0.0.0.0:9411->9411/tcp, :::9411->9411/tcp   dapr_zipkin
0bb3b7b8e9dc   redis:6              "docker-entrypoint.s…"   6 hours ago          Up 2 minutes             0.0.0.0:6379->6379/tcp, :::6379->6379/tcp             dapr_redis

1.3.3 - 克隆代码仓库

克隆 quickstarts 的代码仓库

代码仓库

准备代码目录:

mkdir -p work/code/dapr
cd work/code/dapr

git clone quickstarts 的代码仓库:

git clone https://github.com/dapr/quickstarts.git
cd quickstarts

1.3.4 - 运行 workflow 的 quickstart

运行 dapr workflow 的 quickstart

1.3.4.1 - .net

运行 dapr workflow 的 .net quickstart

运行 quickstart

执行:

cd workflows/csharp/sdk/order-processor
dapr run --app-id order-processor dotnet run

输出为:

ℹ️  Starting Dapr with id order-processor. HTTP Port: 43787. gRPC Port: 36499
ℹ️  Checking if Dapr sidecar is listening on HTTP port 43787
INFO[0000] starting Dapr Runtime -- version 1.11.3 -- commit 9f99c6adca78dfc04b8063974f27b3a7534ae798  app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] log level set to: info                        app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] metrics server started on :45063/             app_id=order-processor instance=dapr15 scope=dapr.metrics type=log ver=1.11.3
INFO[0000] Resiliency configuration loaded               app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] standalone mode configured                    app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] app id: order-processor                       app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] mTLS is disabled. Skipping certificate request and tls validation  app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] Dapr trace sampler initialized: DaprTraceSampler(P=1.000000)  app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] local service entry announced: order-processor -> 192.168.99.15:46019  app_id=order-processor component="mdns (nameResolution/v1)" instance=dapr15 scope=dapr.contrib type=log ver=1.11.3
INFO[0000] Initialized name resolution to mdns           app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] Loading components…                           app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] Component loaded: pubsub (pubsub.redis/v1)    app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] Waiting for all outstanding components to be processed  app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] Using 'statestore' as actor state store       app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] Component loaded: statestore (state.redis/v1)  app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] All outstanding components processed          app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] Loading endpoints                             app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] Waiting for all outstanding http endpoints to be processed  app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] All outstanding http endpoints processed      app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] gRPC proxy enabled                            app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] gRPC server listening on TCP address: :36499  app_id=order-processor instance=dapr15 scope=dapr.runtime.grpc.api type=log ver=1.11.3
INFO[0000] Enabled gRPC tracing middleware               app_id=order-processor instance=dapr15 scope=dapr.runtime.grpc.api type=log ver=1.11.3
INFO[0000] Enabled gRPC metrics middleware               app_id=order-processor instance=dapr15 scope=dapr.runtime.grpc.api type=log ver=1.11.3
INFO[0000] Registering workflow engine for gRPC endpoint: [::]:36499  app_id=order-processor instance=dapr15 scope=dapr.runtime.grpc.api type=log ver=1.11.3
INFO[0000] API gRPC server is running on port 36499      app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] enabled metrics http middleware               app_id=order-processor instance=dapr15 scope=dapr.runtime.http type=log ver=1.11.3
INFO[0000] enabled tracing http middleware               app_id=order-processor instance=dapr15 scope=dapr.runtime.http type=log ver=1.11.3
INFO[0000] HTTP server listening on TCP address: :43787  app_id=order-processor instance=dapr15 scope=dapr.runtime.http type=log ver=1.11.3
INFO[0000] http server is running on port 43787          app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] The request body size parameter is: 4         app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] gRPC server listening on TCP address: :46019  app_id=order-processor instance=dapr15 scope=dapr.runtime.grpc.internal type=log ver=1.11.3
INFO[0000] Enabled gRPC tracing middleware               app_id=order-processor instance=dapr15 scope=dapr.runtime.grpc.internal type=log ver=1.11.3
INFO[0000] Enabled gRPC metrics middleware               app_id=order-processor instance=dapr15 scope=dapr.runtime.grpc.internal type=log ver=1.11.3
INFO[0000] internal gRPC server is running on port 46019  app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
WARN[0000] App channel is not initialized. Did you configure an app-port?  app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] actor runtime started. actor idle timeout: 1h0m0s. actor scan interval: 30s  app_id=order-processor instance=dapr15 scope=dapr.runtime.actor type=log ver=1.11.3
INFO[0000] Configuring workflow engine with actors backend  app_id=order-processor instance=dapr15 scope=dapr.runtime.wfengine type=log ver=1.11.3
INFO[0000] Registering component for dapr workflow engine...  app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] initializing Dapr workflow component          app_id=order-processor component="dapr (workflow.dapr/v1)" instance=dapr15 scope=dapr.contrib type=log ver=1.11.3
WARN[0000] app channel not initialized, make sure -app-port is specified if pubsub subscription is required  app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
WARN[0000] failed to read from bindings: app channel not initialized   app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] dapr initialized. Status: Running. Init Elapsed 10ms  app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] placement tables updated, version: 0          app_id=order-processor instance=dapr15 scope=dapr.runtime.actor.internal.placement type=log ver=1.11.3
ℹ️  Checking if Dapr sidecar is listening on GRPC port 36499
ℹ️  Dapr sidecar is up and running.
ℹ️  Updating metadata for appPID: 5624
ℹ️  Updating metadata for app command: dotnet run
✅  You're up and running! Both Dapr and your app logs will appear here.

== APP == info: Microsoft.DurableTask[1]
== APP ==       Durable Task worker is connecting to sidecar at localhost:36499.
== APP == info: Microsoft.Hosting.Lifetime[0]
== APP ==       Application started. Press Ctrl+C to shut down.
== APP == info: Microsoft.Hosting.Lifetime[0]
== APP ==       Hosting environment: Production
== APP == info: Microsoft.Hosting.Lifetime[0]
== APP ==       Content root path: /home/sky/work/code/dapr/quickstarts/workflows/csharp/sdk/order-processor
== APP == Starting workflow 375d349f purchasing 10 Cars
INFO[0003] Error processing operation DaprBuiltInActorNotFoundRetries. Retrying in 1s…  app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
== APP == info: Microsoft.DurableTask[4]
== APP ==       Sidecar work-item streaming connection established.
INFO[0003] work item stream established by user-agent: [grpc-dotnet/2.50.0 (.NET 6.0.22; CLR 6.0.22; net6.0; linux; x64)]  app_id=order-processor instance=dapr15 scope=dapr.runtime.wfengine type=log ver=1.11.3
INFO[0003] worker started with backend dapr.actors/v1-alpha  app_id=order-processor instance=dapr15 scope=wfengine.backend type=log ver=1.11.3
INFO[0003] Workflow engine started                       app_id=order-processor instance=dapr15 scope=dapr.runtime.wfengine type=log ver=1.11.3
INFO[0006] placement tables updated, version: 1          app_id=order-processor instance=dapr15 scope=dapr.runtime.actor.internal.placement type=log ver=1.11.3
INFO[0006] Recovered processing operation DaprBuiltInActorNotFoundRetries.  app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
WARN[0006] Redis does not support transaction rollbacks and should not be used in production as an actor state store.  app_id=order-processor component="statestore (state.redis/v1)" instance=dapr15 scope=dapr.contrib type=log ver=1.11.3
INFO[0006] created new workflow instance with ID '375d349f'  app_id=order-processor component="dapr (workflow.dapr/v1)" instance=dapr15 scope=dapr.contrib type=log ver=1.11.3
INFO[0006] 375d349f: starting new 'OrderProcessingWorkflow' instance with ID = '375d349f'.  app_id=order-processor instance=dapr15 scope=wfengine.backend type=log ver=1.11.3
== APP == info: WorkflowConsoleApp.Activities.NotifyActivity[0]
== APP ==       Received order 375d349f for 10 Cars at $15000
== APP == Your workflow has started. Here is the status of the workflow: Running
== APP == info: WorkflowConsoleApp.Activities.ReserveInventoryActivity[0]
== APP ==       Reserving inventory for order 375d349f of 10 Cars
== APP == info: WorkflowConsoleApp.Activities.ReserveInventoryActivity[0]
== APP ==       There are: 100, Cars available for purchase
== APP == info: WorkflowConsoleApp.Activities.ProcessPaymentActivity[0]
== APP ==       Processing payment: 375d349f for 10 Cars at $15000
== APP == info: WorkflowConsoleApp.Activities.ProcessPaymentActivity[0]
== APP ==       Payment for request ID '375d349f' processed successfully
== APP == info: WorkflowConsoleApp.Activities.UpdateInventoryActivity[0]
== APP ==       Checking Inventory for: Order# 375d349f for 10 Cars
== APP == info: WorkflowConsoleApp.Activities.UpdateInventoryActivity[0]
== APP ==       There are now: 90 Cars left in stock
== APP == info: WorkflowConsoleApp.Activities.NotifyActivity[0]
== APP ==       Order 375d349f has completed!
INFO[0021] 375d349f: 'OrderProcessingWorkflow' completed with a COMPLETED status.  app_id=order-processor instance=dapr15 scope=wfengine.backend type=log ver=1.11.3
== APP == Workflow Status: Completed
INFO[0021] work item stream closed                       app_id=order-processor instance=dapr15 scope=dapr.runtime.wfengine type=log ver=1.11.3
✅  Exited App successfully
ℹ️  
terminated signal received: shutting down
✅  Exited Dapr successfully

存在问题

.net quickstart 运行没问题,但是 zipkin 显示的 trace 和文档中不一致,startinstance 这个 span 没有了,导致无法看到调用关系。

等待修复。

1.3.4.2 - python

运行 dapr workflow 的 python quickstart

运行 quickstart

执行:

cd workflows/python/sdk/order-processor
pip3 install -r requirements.txt
dapr run --app-id order-processor --resources-path ../../../components/ -- python3 app.py

输出为:

ℹ️  Starting Dapr with id order-processor. HTTP Port: 32971. gRPC Port: 43899
ℹ️  Checking if Dapr sidecar is listening on HTTP port 32971
INFO[0000] starting Dapr Runtime -- version 1.11.3 -- commit 9f99c6adca78dfc04b8063974f27b3a7534ae798  app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] log level set to: info                        app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] metrics server started on :45495/             app_id=order-processor instance=dapr15 scope=dapr.metrics type=log ver=1.11.3
INFO[0000] Resiliency configuration loaded               app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] standalone mode configured                    app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] app id: order-processor                       app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] mTLS is disabled. Skipping certificate request and tls validation  app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] Dapr trace sampler initialized: DaprTraceSampler(P=1.000000)  app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] local service entry announced: order-processor -> 192.168.99.15:46027  app_id=order-processor component="mdns (nameResolution/v1)" instance=dapr15 scope=dapr.contrib type=log ver=1.11.3
INFO[0000] Initialized name resolution to mdns           app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] Loading components…                           app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] Waiting for all outstanding components to be processed  app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] Using 'statestore-actors' as actor state store  app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] Component loaded: statestore-actors (state.redis/v1)  app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] All outstanding components processed          app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] Loading endpoints                             app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] Waiting for all outstanding http endpoints to be processed  app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] All outstanding http endpoints processed      app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] gRPC proxy enabled                            app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] gRPC server listening on TCP address: :43899  app_id=order-processor instance=dapr15 scope=dapr.runtime.grpc.api type=log ver=1.11.3
INFO[0000] Enabled gRPC tracing middleware               app_id=order-processor instance=dapr15 scope=dapr.runtime.grpc.api type=log ver=1.11.3
INFO[0000] Enabled gRPC metrics middleware               app_id=order-processor instance=dapr15 scope=dapr.runtime.grpc.api type=log ver=1.11.3
INFO[0000] Registering workflow engine for gRPC endpoint: [::]:43899  app_id=order-processor instance=dapr15 scope=dapr.runtime.grpc.api type=log ver=1.11.3
INFO[0000] API gRPC server is running on port 43899      app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] enabled metrics http middleware               app_id=order-processor instance=dapr15 scope=dapr.runtime.http type=log ver=1.11.3
INFO[0000] enabled tracing http middleware               app_id=order-processor instance=dapr15 scope=dapr.runtime.http type=log ver=1.11.3
INFO[0000] HTTP server listening on TCP address: :32971  app_id=order-processor instance=dapr15 scope=dapr.runtime.http type=log ver=1.11.3
INFO[0000] http server is running on port 32971          app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] The request body size parameter is: 4         app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] gRPC server listening on TCP address: :46027  app_id=order-processor instance=dapr15 scope=dapr.runtime.grpc.internal type=log ver=1.11.3
INFO[0000] Enabled gRPC tracing middleware               app_id=order-processor instance=dapr15 scope=dapr.runtime.grpc.internal type=log ver=1.11.3
INFO[0000] Enabled gRPC metrics middleware               app_id=order-processor instance=dapr15 scope=dapr.runtime.grpc.internal type=log ver=1.11.3
INFO[0000] internal gRPC server is running on port 46027  app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
WARN[0000] App channel is not initialized. Did you configure an app-port?  app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] actor runtime started. actor idle timeout: 1h0m0s. actor scan interval: 30s  app_id=order-processor instance=dapr15 scope=dapr.runtime.actor type=log ver=1.11.3
INFO[0000] Configuring workflow engine with actors backend  app_id=order-processor instance=dapr15 scope=dapr.runtime.wfengine type=log ver=1.11.3
INFO[0000] Registering component for dapr workflow engine...  app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] initializing Dapr workflow component          app_id=order-processor component="dapr (workflow.dapr/v1)" instance=dapr15 scope=dapr.contrib type=log ver=1.11.3
WARN[0000] failed to read from bindings: app channel not initialized   app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] dapr initialized. Status: Running. Init Elapsed 8ms  app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] placement tables updated, version: 2          app_id=order-processor instance=dapr15 scope=dapr.runtime.actor.internal.placement type=log ver=1.11.3
ℹ️  Checking if Dapr sidecar is listening on GRPC port 43899
ℹ️  Dapr sidecar is up and running.
ℹ️  Updating metadata for appPID: 11366
ℹ️  Updating metadata for app command: python3 app.py
✅  You're up and running! Both Dapr and your app logs will appear here.

== APP == *** Welcome to the Dapr Workflow console app sample!
== APP == *** Using this app, you can place orders that start workflows.
== APP == 2023-09-27 07:44:02.567 durabletask-worker INFO: Starting gRPC worker that connects to 127.0.0.1:43899
INFO[0006] work item stream established by user-agent: [grpc-python/1.58.0 grpc-c/35.0.0 (linux; chttp2)]  app_id=order-processor instance=dapr15 scope=dapr.runtime.wfengine type=log ver=1.11.3
INFO[0006] worker started with backend dapr.actors/v1-alpha  app_id=order-processor instance=dapr15 scope=wfengine.backend type=log ver=1.11.3
== APP == 2023-09-27 07:44:02.579 durabletask-worker INFO: Successfully connected to 127.0.0.1:43899. Waiting for work items...
INFO[0006] Workflow engine started                       app_id=order-processor instance=dapr15 scope=dapr.runtime.wfengine type=log ver=1.11.3
== APP == item: InventoryItem(item_name=Paperclip, per_item_cost=5, quantity=100)
== APP == item: InventoryItem(item_name=Cars, per_item_cost=15000, quantity=100)
== APP == item: InventoryItem(item_name=Computers, per_item_cost=500, quantity=100)
== APP == ==========Begin the purchase of item:==========
== APP == Starting order workflow, purchasing 10 of cars
INFO[0006] Error processing operation DaprBuiltInActorNotFoundRetries. Retrying in 1s…  app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0009] placement tables updated, version: 3          app_id=order-processor instance=dapr15 scope=dapr.runtime.actor.internal.placement type=log ver=1.11.3
INFO[0010] Recovered processing operation DaprBuiltInActorNotFoundRetries.  app_id=order-processor instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
WARN[0010] Redis does not support transaction rollbacks and should not be used in production as an actor state store.  app_id=order-processor component="statestore-actors (state.redis/v1)" instance=dapr15 scope=dapr.contrib type=log ver=1.11.3
INFO[0010] created new workflow instance with ID '662f09df-fcfb-4ee1-bda0-1327e3a7116f'  app_id=order-processor component="dapr (workflow.dapr/v1)" instance=dapr15 scope=dapr.contrib type=log ver=1.11.3
INFO[0010] 662f09df-fcfb-4ee1-bda0-1327e3a7116f: starting new 'order_processing_workflow' instance with ID = '662f09df-fcfb-4ee1-bda0-1327e3a7116f'.  app_id=order-processor instance=dapr15 scope=wfengine.backend type=log ver=1.11.3
== APP == app.py:49: UserWarning: The Workflow API is an Alpha version and is subject to change.
== APP ==   start_resp = daprClient.start_workflow(workflow_component=workflow_component,
== APP == app.py:86: UserWarning: The Workflow API is an Alpha version and is subject to change.
== APP ==   state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component)
== APP == 2023-09-27 07:44:06.592 durabletask-worker INFO: 662f09df-fcfb-4ee1-bda0-1327e3a7116f: Waiting for 1 task(s) and 0 event(s).
== APP == INFO:NotifyActivity:Received order 662f09df-fcfb-4ee1-bda0-1327e3a7116f for 10 cars at $150000 !
== APP == 2023-09-27 07:44:06.605 durabletask-worker INFO: 662f09df-fcfb-4ee1-bda0-1327e3a7116f: Waiting for 1 task(s) and 0 event(s).
== APP == INFO:VerifyInventoryActivity:Verifying inventory for order 662f09df-fcfb-4ee1-bda0-1327e3a7116f of 10 cars
== APP == INFO:VerifyInventoryActivity:There are 100 Cars available for purchase
== APP == 2023-09-27 07:44:06.617 durabletask-worker INFO: 662f09df-fcfb-4ee1-bda0-1327e3a7116f: Waiting for 1 task(s) and 0 event(s).
== APP == INFO:RequestApprovalActivity:Requesting approval for payment of 150000 USD for 10 cars
== APP == 2023-09-27 07:44:06.627 durabletask-worker INFO: 662f09df-fcfb-4ee1-bda0-1327e3a7116f: Waiting for 1 task(s) and 1 event(s).
INFO[0020] Raised event manager_approval on workflow instance '662f09df-fcfb-4ee1-bda0-1327e3a7116f'  app_id=order-processor component="dapr (workflow.dapr/v1)" instance=dapr15 scope=dapr.contrib type=log ver=1.11.3
== APP == app.py:95: UserWarning: The Workflow API is an Alpha version and is subject to change.
== APP ==   state = daprClient.get_workflow(instance_id=_id, workflow_component=workflow_component)
== APP == app.py:79: UserWarning: The Workflow API is an Alpha version and is subject to change.
== APP ==   daprClient.raise_workflow_event(instance_id=_id, workflow_component=workflow_component,
== APP == 2023-09-27 07:44:16.598 durabletask-worker INFO: 662f09df-fcfb-4ee1-bda0-1327e3a7116f Event raised: manager_approval
== APP == 2023-09-27 07:44:16.598 durabletask-worker INFO: 662f09df-fcfb-4ee1-bda0-1327e3a7116f: Waiting for 2 task(s) and 0 event(s).
== APP == INFO:NotifyActivity:Payment for order 662f09df-fcfb-4ee1-bda0-1327e3a7116f has been approved!
== APP == 2023-09-27 07:44:16.608 durabletask-worker INFO: 662f09df-fcfb-4ee1-bda0-1327e3a7116f: Waiting for 2 task(s) and 0 event(s).
== APP == INFO:ProcessPaymentActivity:Processing payment: 662f09df-fcfb-4ee1-bda0-1327e3a7116f for 10 cars at 150000 USD
== APP == INFO:ProcessPaymentActivity:Payment for request ID 662f09df-fcfb-4ee1-bda0-1327e3a7116f processed successfully
== APP == 2023-09-27 07:44:16.618 durabletask-worker INFO: 662f09df-fcfb-4ee1-bda0-1327e3a7116f: Waiting for 2 task(s) and 0 event(s).
== APP == INFO:UpdateInventoryActivity:Checking inventory for order 662f09df-fcfb-4ee1-bda0-1327e3a7116f for 10 cars
== APP == INFO:UpdateInventoryActivity:There are now 90 cars left in stock
== APP == 2023-09-27 07:44:16.633 durabletask-worker INFO: 662f09df-fcfb-4ee1-bda0-1327e3a7116f: Waiting for 2 task(s) and 0 event(s).
== APP == INFO:NotifyActivity:Order 662f09df-fcfb-4ee1-bda0-1327e3a7116f has completed!
== APP == 2023-09-27 07:44:16.643 durabletask-worker INFO: 662f09df-fcfb-4ee1-bda0-1327e3a7116f: Orchestration completed with status: COMPLETED
INFO[0020] 662f09df-fcfb-4ee1-bda0-1327e3a7116f: 'order_processing_workflow' completed with a COMPLETED status.  app_id=order-processor instance=dapr15 scope=wfengine.backend type=log ver=1.11.3
== APP == Workflow completed! Result: Completed
== APP == Purchase of item is  Completed

1.3.4.3 - Java

运行 dapr workflow 的 Java quickstart

背景

Java 的 quickstart 还没有 merge:

https://github.com/dapr/quickstarts/pull/925

要执行的话需要用到

https://github.com/skyao/quickstarts/tree/java-workflow-quickstart

运行 quickstart

执行:

cd workflows/java/sdk/order-processor
mvn clean install
dapr run --app-id WorkflowConsoleApp --resources-path ../../../components/ --dapr-grpc-port 50001 -- java -jar target/OrderProcessingService-0.0.1-SNAPSHOT.jar io.dapr.quickstarts.workflows.WorkflowConsoleApp

输出为:

ℹ️  Starting Dapr with id WorkflowConsoleApp. HTTP Port: 45063. gRPC Port: 50001
ℹ️  Checking if Dapr sidecar is listening on HTTP port 45063
INFO[0000] starting Dapr Runtime -- version 1.11.3 -- commit 9f99c6adca78dfc04b8063974f27b3a7534ae798  app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] log level set to: info                        app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] metrics server started on :45793/             app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.metrics type=log ver=1.11.3
INFO[0000] Resiliency configuration loaded               app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] standalone mode configured                    app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] app id: WorkflowConsoleApp                    app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] mTLS is disabled. Skipping certificate request and tls validation  app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] Dapr trace sampler initialized: DaprTraceSampler(P=1.000000)  app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] local service entry announced: WorkflowConsoleApp -> 192.168.99.15:43665  app_id=WorkflowConsoleApp component="mdns (nameResolution/v1)" instance=dapr15 scope=dapr.contrib type=log ver=1.11.3
INFO[0000] Initialized name resolution to mdns           app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] Loading components…                           app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] Waiting for all outstanding components to be processed  app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] Using 'statestore-actors' as actor state store  app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] Component loaded: statestore-actors (state.redis/v1)  app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] All outstanding components processed          app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] Loading endpoints                             app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] Waiting for all outstanding http endpoints to be processed  app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] All outstanding http endpoints processed      app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] gRPC proxy enabled                            app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] gRPC server listening on TCP address: :50001  app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime.grpc.api type=log ver=1.11.3
INFO[0000] Enabled gRPC tracing middleware               app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime.grpc.api type=log ver=1.11.3
INFO[0000] Enabled gRPC metrics middleware               app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime.grpc.api type=log ver=1.11.3
INFO[0000] Registering workflow engine for gRPC endpoint: [::]:50001  app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime.grpc.api type=log ver=1.11.3
INFO[0000] API gRPC server is running on port 50001      app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] enabled metrics http middleware               app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime.http type=log ver=1.11.3
INFO[0000] enabled tracing http middleware               app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime.http type=log ver=1.11.3
INFO[0000] HTTP server listening on TCP address: :45063  app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime.http type=log ver=1.11.3
INFO[0000] http server is running on port 45063          app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] The request body size parameter is: 4         app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] gRPC server listening on TCP address: :43665  app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime.grpc.internal type=log ver=1.11.3
INFO[0000] Enabled gRPC tracing middleware               app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime.grpc.internal type=log ver=1.11.3
INFO[0000] Enabled gRPC metrics middleware               app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime.grpc.internal type=log ver=1.11.3
INFO[0000] internal gRPC server is running on port 43665  app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
WARN[0000] App channel is not initialized. Did you configure an app-port?  app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] actor runtime started. actor idle timeout: 1h0m0s. actor scan interval: 30s  app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime.actor type=log ver=1.11.3
INFO[0000] Configuring workflow engine with actors backend  app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime.wfengine type=log ver=1.11.3
INFO[0000] Registering component for dapr workflow engine...  app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] initializing Dapr workflow component          app_id=WorkflowConsoleApp component="dapr (workflow.dapr/v1)" instance=dapr15 scope=dapr.contrib type=log ver=1.11.3
WARN[0000] failed to read from bindings: app channel not initialized   app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] dapr initialized. Status: Running. Init Elapsed 7ms  app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0000] placement tables updated, version: 5          app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime.actor.internal.placement type=log ver=1.11.3
ℹ️  Checking if Dapr sidecar is listening on GRPC port 50001
ℹ️  Dapr sidecar is up and running.
ℹ️  Updating metadata for appPID: 13886
ℹ️  Updating metadata for app command: java -jar target/OrderProcessingService-0.0.1-SNAPSHOT.jar io.dapr.quickstarts.workflows.WorkflowConsoleApp
✅  You're up and running! Both Dapr and your app logs will appear here.

== APP == *** Welcome to the Dapr Workflow console app sample!
== APP == *** Using this app, you can place orders that start workflows.
INFO[0003] placement tables updated, version: 6          app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime.actor.internal.placement type=log ver=1.11.3
== APP == Start workflow runtime
== APP == Sep 27, 2023 7:51:22 AM com.microsoft.durabletask.DurableTaskGrpcWorker startAndBlock
== APP == INFO: Durable Task worker is connecting to sidecar at 127.0.0.1:50001.
INFO[0007] work item stream established by user-agent: [dapr-sdk-java/v1.10.0-SNAPSHOT grpc-java-netty/1.46.0]  app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime.wfengine type=log ver=1.11.3
INFO[0007] worker started with backend dapr.actors/v1-alpha  app_id=WorkflowConsoleApp instance=dapr15 scope=wfengine.backend type=log ver=1.11.3
INFO[0007] Workflow engine started                       app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime.wfengine type=log ver=1.11.3
== APP == ==========Begin the purchase of item:==========
== APP == Starting order workflow, purchasing 10 of cars
INFO[0007] Error processing operation DaprBuiltInActorNotFoundRetries. Retrying in 1s…  app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
INFO[0010] placement tables updated, version: 7          app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime.actor.internal.placement type=log ver=1.11.3
INFO[0010] Recovered processing operation DaprBuiltInActorNotFoundRetries.  app_id=WorkflowConsoleApp instance=dapr15 scope=dapr.runtime type=log ver=1.11.3
WARN[0010] Redis does not support transaction rollbacks and should not be used in production as an actor state store.  app_id=WorkflowConsoleApp component="statestore-actors (state.redis/v1)" instance=dapr15 scope=dapr.contrib type=log ver=1.11.3
INFO[0010] d4d383b9-d615-489f-bfad-f94fad4c169c: starting new 'io.dapr.quickstarts.workflows.OrderProcessingWorkflow' instance with ID = 'd4d383b9-d615-489f-bfad-f94fad4c169c'.  app_id=WorkflowConsoleApp instance=dapr15 scope=wfengine.backend type=log ver=1.11.3
== APP == scheduled new workflow instance of OrderProcessingWorkflow with instance ID: d4d383b9-d615-489f-bfad-f94fad4c169c
== APP == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Starting Workflow: io.dapr.quickstarts.workflows.OrderProcessingWorkflow
== APP == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Instance ID(order ID): d4d383b9-d615-489f-bfad-f94fad4c169c
== APP == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Current Orchestration Time: 2023-09-27T07:51:26.473Z
== APP == [Thread-0] INFO io.dapr.workflows.WorkflowContext - Received Order: OrderPayload [itemName=cars, totalCost=150000, quantity=10]
== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.NotifyActivity - Received Order: OrderPayload [itemName=cars, totalCost=150000, quantity=10]
== APP == workflow instance d4d383b9-d615-489f-bfad-f94fad4c169c started
== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ReserveInventoryActivity - Reserving inventory for order 'd4d383b9-d615-489f-bfad-f94fad4c169c' of 10 cars
== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ReserveInventoryActivity - There are 100 cars available for purchase
== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ReserveInventoryActivity - Reserved inventory for order 'd4d383b9-d615-489f-bfad-f94fad4c169c' of 10 cars
== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.RequestApprovalActivity - Requesting approval for order: OrderPayload [itemName=cars, totalCost=150000, quantity=10]
== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.RequestApprovalActivity - Approved requesting approval for order: OrderPayload [itemName=cars, totalCost=150000, quantity=10]
== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ProcessPaymentActivity - Processing payment: d4d383b9-d615-489f-bfad-f94fad4c169c for 10 cars at $150000
== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.ProcessPaymentActivity - Payment for request ID 'd4d383b9-d615-489f-bfad-f94fad4c169c' processed successfully
== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.UpdateInventoryActivity - Updating inventory for order 'd4d383b9-d615-489f-bfad-f94fad4c169c' of 10 cars
== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.UpdateInventoryActivity - Updated inventory for order 'd4d383b9-d615-489f-bfad-f94fad4c169c': there are now 90 cars left in stock
== APP == there are now 90 cars left in stock
== APP == [Thread-0] INFO io.dapr.quickstarts.workflows.activities.NotifyActivity - Order completed! : d4d383b9-d615-489f-bfad-f94fad4c169c
INFO[0021] d4d383b9-d615-489f-bfad-f94fad4c169c: 'io.dapr.quickstarts.workflows.OrderProcessingWorkflow' completed with a COMPLETED status.  app_id=WorkflowConsoleApp instance=dapr15 scope=wfengine.backend type=log ver=1.11.3
== APP == workflow instance completed, out is: {"processed":true}

1.4 - 资料收集

收集Dapr的各种资料

官方网站

社区

  • dapr-cn :Dapr中文社区,专注于dapr的文档、新闻稿本地化、新特性贡献以及中文社区推广
  • Dapr 中文文档库: Dapr 中文文档库,由 dapr-cn 创建并维护的对 docs.dapr.io 内容的翻译,旨在为更熟悉中文的开发者提供一些文档上的帮助。

备注:由于dapr翻译计划已经启动,我也参与其中,我在学习笔记中翻译的部分官方文档内容都将陆续迁移过去,之后会删除学习笔记中的官方文档翻译内容。

文档

文章&演讲

介绍性的文章:

实践性的文章:

视频

相关资料

2 - 服务调用

Dapr的服务调用/Service Invoke

2.1 - 服务调用

Dapr的Service Invoke概述

服务调用/Service Invoke 构建块

2.2 - 服务调用的API

Dapr的服务调用的API

2.2.1 - 服务调用API的Proto定义

Dapr服务调用API的Proto定义

InvokeService的定义

Service Invoken API 定义在 proto文件 dapr/proto/runtime/v1/dapr.proto 中:

service Dapr {
  // Invokes a method on a remote Dapr app.
  rpc InvokeService(InvokeServiceRequest) returns (common.v1.InvokeResponse) {}
  ...
}

InvokeServiceRequest 包含一个被调用服务的ID,和通用的 InvokeRequest:

// InvokeServiceRequest represents the request message for Service invocation.
message InvokeServiceRequest {
  // Required. Callee's app id.
  string id = 1;

  // Required. message which will be delivered to callee.
  common.v1.InvokeRequest message = 3;
}

AppCallback的定义

AppCallback API 定义在 proto文件 dapr/proto/runtime/v1/appcallback.proto 中:

service AppCallback {
  // Invokes service method with InvokeRequest.
  rpc OnInvoke (common.v1.InvokeRequest) returns (common.v1.InvokeResponse) {}
  ...
}

Invoke的通用定义

Invoke的通用定义在 proto文件 dapr/proto/common/v1/common.proto 中。

InvokeRequest是用来携带数据调用方法的消息,这个消息在Dapr gRPC服务的InvokeService和AppCallback gRPC服务的OnInvoke中使用:

// InvokeRequest is the message to invoke a method with the data.
// This message is used in InvokeService of Dapr gRPC Service and OnInvoke
// of AppCallback gRPC service.
message InvokeRequest {
  // Required. method is a method name which will be invoked by caller.
  string method = 1;

  // Required. Bytes value or Protobuf message which caller sent.
  // Dapr treats Any.value as bytes type if Any.type_url is unset.
  google.protobuf.Any data = 2;

  // The type of data content.
  //
  // This field is required if data delivers http request body
  // Otherwise, this is optional.
  string content_type = 3;

  // HTTP specific fields if request conveys http-compatible request.
  //
  // This field is required for http-compatible request. Otherwise,
  // this field is optional.
  HTTPExtension http_extension = 4;
}

InvokeResponse是包括应用程序回调的数据和内容类型的响应消息,该消息在Dapr gRPC服务的InvokeService方法和AppCallback gRPC服务的OnInvoke方法中使用:

// InvokeResponse is the response message inclduing data and its content type
// from app callback.
// This message is used in InvokeService of Dapr gRPC Service and OnInvoke
// of AppCallback gRPC service.
message InvokeResponse {
  // Required. The content body of InvokeService response.
  google.protobuf.Any data = 1;

  // Required. The type of data content.
  string content_type = 2;
}

相关的消息定义

HTTPExtension 消息的定义:

// 当Dapr运行时传递HTTP内容时,HTTPExtension包括HTTP verb和querystring。
// 
// For example, when callers calls http invoke api
// POST http://localhost:3500/v1.0/invoke/<app_id>/method/<method>?query1=value1&query2=value2
// 
// Dapr runtime will parse POST as a verb and extract querystring to quersytring map.
message HTTPExtension {
  // Type of HTTP 1.1 Methods
  // RFC 7231: https://tools.ietf.org/html/rfc7231#page-24
  enum Verb {
    NONE = 0;
    GET = 1;
    HEAD = 2;
    POST = 3;
    PUT = 4;
    DELETE = 5;
    CONNECT = 6;
    OPTIONS = 7;
    TRACE = 8;
  }

  // Required. HTTP verb.
  Verb verb = 1;

  // querystring includes HTTP querystring.
  map<string, string> querystring = 2;
}

Any消息的定义:

message Any {
  string type_url = 1;

  // Must be a valid serialized protocol buffer of the above specified type.
  bytes value = 2;
}

2.2.2 - 服务调用API的golang生成代码

服务调用API的golang生成代码

从proto api定义文件生成的golang代码,被存放在dapr项目的 pkg/proto/ 目录下。

grpc服务定义

DaprServer 是 dapr 服务的服务器端API定义,包含 InvokeService方法:

// DaprServer is the server API for Dapr service.
type DaprServer interface {
	// Invokes a method on a remote Dapr app.
	InvokeService(context.Context, *InvokeServiceRequest) (*v1.InvokeResponse, error)
   ......
}

AppCallbackServer 是 AppCallback 服务的服务器端API定义,包含 OnInvoke 方法:

// AppCallbackServer is the server API for AppCallback service.
type AppCallbackServer interface {
	// Invokes service method with InvokeRequest.
	OnInvoke(context.Context, *v1.InvokeRequest) (*v1.InvokeResponse, error)
	......
}

InvokeServiceRequest的定义

https://github.com/dapr/dapr/blob/11741c6cd697e08b2e776943e61bb2e3388c85a8/pkg/proto/runtime/v1/dapr.pb.go

type InvokeServiceRequest struct {
	// Required. Callee's app id.
	Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
	// Required. message which will be delivered to callee.
	Message              *v1.InvokeRequest `protobuf:"bytes,3,opt,name=message,proto3" json:"message,omitempty"`
	XXX_NoUnkeyedLiteral struct{}          `json:"-"`
	XXX_unrecognized     []byte            `json:"-"`
	XXX_sizecache        int32             `json:"-"`
}

InvokeRequest的定义

https://github.com/dapr/dapr/blob/de49fe260c8f7c53e146e27150faad8c0880fe90/pkg/proto/common/v1/common.pb.go

type InvokeRequest struct {
	// Required. method is a method name which will be invoked by caller.
	Method string `protobuf:"bytes,1,opt,name=method,proto3" json:"method,omitempty"`
	// Required. Bytes value or Protobuf message which caller sent.
	// Dapr treats Any.value as bytes type if Any.type_url is unset.
	Data *any.Any `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
	// The type of data content.
	//
	// This field is required if data delivers http request body
	// Otherwise, this is optional.
	ContentType string `protobuf:"bytes,3,opt,name=content_type,json=contentType,proto3" json:"content_type,omitempty"`
	// HTTP specific fields if request conveys http-compatible request.
	//
	// This field is required for http-compatible request. Otherwise,
	// this field is optional.
	HttpExtension        *HTTPExtension `protobuf:"bytes,4,opt,name=http_extension,json=httpExtension,proto3" json:"http_extension,omitempty"`
	XXX_NoUnkeyedLiteral struct{}       `json:"-"`
	XXX_unrecognized     []byte         `json:"-"`
	XXX_sizecache        int32          `json:"-"`
}

InvokeResponse的定义

type InvokeResponse struct {
	// Required. The content body of InvokeService response.
	Data *any.Any `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
	// Required. The type of data content.
	ContentType          string   `protobuf:"bytes,2,opt,name=content_type,json=contentType,proto3" json:"content_type,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

备注:只是在proto定义的字段上增加了一些 XXX_ 字段。

2.2.3 - 服务调用的go client定义

Dapr服务调用的go client定义

DaprClient

https://github.com/dapr/dapr/blob/11741c6cd697e08b2e776943e61bb2e3388c85a8/pkg/proto/runtime/v1/dapr.pb.go

// DaprClient is the client API for Dapr service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type DaprClient interface {
	// Invokes a method on a remote Dapr app.
	InvokeService(ctx context.Context, in *InvokeServiceRequest, opts ...grpc.CallOption) (*v1.InvokeResponse, error)
	......s
}

DaprClient 的实现:

type daprClient struct {
	cc *grpc.ClientConn
}

func NewDaprClient(cc *grpc.ClientConn) DaprClient {
	return &daprClient{cc}
}

func (c *daprClient) InvokeService(ctx context.Context, in *InvokeServiceRequest, opts ...grpc.CallOption) (*v1.InvokeResponse, error) {
	out := new(v1.InvokeResponse)
	err := c.cc.Invoke(ctx, "/dapr.proto.runtime.v1.Dapr/InvokeService", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

AppCallbackClient

https://github.com/dapr/dapr/blob/11741c6cd697e08b2e776943e61bb2e3388c85a8/pkg/proto/runtime/v1/appcallback.pb.go

// AppCallbackClient is the client API for AppCallback service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type AppCallbackClient interface {
	// Invokes service method with InvokeRequest.
	OnInvoke(ctx context.Context, in *v1.InvokeRequest, opts ...grpc.CallOption) (*v1.InvokeResponse, error)
	......
}

AppCallbackClient 的实现:

type appCallbackClient struct {
	cc *grpc.ClientConn
}

func NewAppCallbackClient(cc *grpc.ClientConn) AppCallbackClient {
	return &appCallbackClient{cc}
}

func (c *appCallbackClient) OnInvoke(ctx context.Context, in *v1.InvokeRequest, opts ...grpc.CallOption) (*v1.InvokeResponse, error) {
	out := new(v1.InvokeResponse)
	err := c.cc.Invoke(ctx, "/dapr.proto.runtime.v1.AppCallback/OnInvoke", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

2.2.4 - Dapr服务调用的go sdk定义

Dapr服务调用的go sdk定义

go sdk使用案例

https://github.com/dapr/go-sdk

要在另一个使用Dapr sidecar运行的服务上调用特定的方法,Dapr客户端提供了两个选项。

调用一个没有任何数据的服务:

resp, err = client.InvokeService(ctx, "service-name", "method-name") 

还有带数据调用服务:

content := &DataContent{
    ContentType: "application/json",
    Data:        []byte(`{ "id": "a123", "value": "demo", "valid": true }`)
}

resp, err := client.InvokeServiceWithContent(ctx, "service-name", "method-name", content)

go sdk提供的API

https://github.com/dapr/go-sdk/blob/d6de57c71a1d3c7ce3a3b81385609dfba18a1a18/client/invoke.go

go sdk在 client 上封装了两个方法用于服务调用,InvokeService方法用来发送不带数据的请求:

// InvokeService invokes service without raw data ([]byte).
func (c *GRPCClient) InvokeService(ctx context.Context, serviceID, method string) (out []byte, err error) {
...
}

InvokeServiceWithContent方法用来发现带数据的请求:

// InvokeServiceWithContent invokes service without content (data + content type).
func (c *GRPCClient) InvokeServiceWithContent(ctx context.Context, serviceID, method string, content *DataContent) (out []byte, err error) {
......
}

DataContent 的定义:

// DataContent the service invocation content
type DataContent struct {
	// Data is the input data
	Data []byte
	// ContentType is the type of the data content
	ContentType string
}

2.3 - 服务注册与服务发现

Service Invoke 中的服务注册与服务发现

2.3.1 - 概述

Dapr 服务调用的服务注册与服务发现概述

Service Invoke 中的服务注册与服务发现

2.3.2 - go-sdk中的服务注册

Dapr go-sdk中的服务注册

AppCallback Server启动时没有注册

go-sdk 中的 service/grpc/service.go gRPC 版本的 AppCallback Server启动时没有做服务注册。

func (s *Server) Start() error {
	gs := grpc.NewServer()
	pb.RegisterAppCallbackServer(gs, s) // 仅仅是注册 AppCallback Service 到 grpc server,不是服务注册
	return gs.Serve(s.listener)
}

service/http/service.go HTTP 版本的 AppCallback Server启动时没有做服务注册。

func (s *Server) Start() error {
	s.registerSubscribeHandler() // 仅仅是注册 SubscribeHandler,不是服务注册
	server := http.Server{
		Addr:    s.address,
		Handler: s.mux,
	}
	return server.ListenAndServe()
}

3 - 发布订阅

Dapr的发布订阅(pub-sub)模块

3.1 - 发布订阅的概述

Dapr的发布订阅(pub-sub)模块的概述

发布订阅消息/Publish 构建块

官方资料:

3.2 - 发布订阅的概念

Dapr的发布订阅(pub-sub)的概念

内容摘选自:https://github.com/dapr/docs/blob/master/concepts/publish-subscribe-messaging/README.md

Dapr使开发人员可以用发布/订阅模式设计其应用,使用消息代理,将发布者/生产者彼此解耦,并通过发送和接收与名称空间关联的消息(通常以 topic 的形式)进行通信。

这使事件生产者可以将消息发送给未运行的消费者,并且消费者可以根据对 topic 的订阅来接收消息。

Dapr 提供“至少一次”消息传递保证,并与各种消息代理实现集成。这些实现是可插拔的,并在Dapr运行时之外的 components-contrib 中开发。

行为与保证

Dapr保证消息传递的至少一次(At-Least-Once)语义。也就是说,当应用程序使用 Publish/Subscribe API将消息发布到 topic 时,当来自该端点的响应状态代码200,或者使用gRPC而不返回错误时,它可以假定消息至少传递给任何订阅者一次,。

Dapr 承担了处理诸如消费者分组和消费者分组内部的多个实例之类概念的重担。

App ID

Dapr有一个 app id 的概念。在Kubernetes中是使用dapr.io/id注释指定的,而 app-id 在Dapr CLI中使用 flag 指定的。Dapr要求为每个应用分配一个ID。

当同一应用ID的多个实例订阅同一个 topic 时,Dapr将确保仅将消息传递给一个实例。如果两个具有不同ID的不同应用订阅了一个 topic ,则每个应用中的至少一个实例将收到同一消息的副本。

Cloud Events

Dapr遵循 Cloud Events 0.3 规范 ,并将发送到 topic 的所有有效负载包装在 Cloud Events 封套内。

Dapr实现了 Cloud Events规范中的以下字段:

  • id
  • source
  • specversion
  • type
  • datacontenttype (可选的)

3.3 - 发布订阅的参考文档

Dapr的发布订阅(pub-sub)的参考文档

详细见:

https://github.com/dapr/docs/blob/master/reference/api/pubsub.md

发布消息到topic,只要发一个HTTP POST 请求到 dapr sidecar:

POST http://localhost:<daprPort>/v1.0/publish/<topic>

要接受消息,首先应用在这个地址接受 sidecar 请求,告知 dapr 自己要订阅的 topic:

GET http://localhost:<appPort>/dapr/subscribe

应答如:

"["TopicA","TopicB"]"

然后应用在这个地址接受sidecar 转发的订阅消息:

POST http://localhost:<appPort>/TopicA

消息封套

Dapr Pub/Sub 遵守 Cloud Events 0.3 版本。

4 - 资源绑定

Dapr的资源绑定(resource-binding)模块

4.1 - 资源绑定的概述

Dapr的资源绑定的概述

资源绑定/Resource Binding 构建块

官方资料:

4.2 - 资源绑定的API

Dapr的资源绑定的API

4.2.1 - 资源绑定的API概述

Dapr的资源绑定的API概述

4.2.2 - 资源绑定API的Proto定义

Dapr的资源绑定API的Proto定义

Output Binding的定义

Output Binding API 定义在 proto文件 dapr/proto/runtime/v1/dapr.proto 中:

service Dapr {
  // Invokes binding data to specific output bindings
  rpc InvokeBinding(InvokeBindingRequest) returns (InvokeBindingResponse) {}
  ...
}

InvokeBindingRequest 包含一个被绑定资源的name,数据data,元数据metadata和绑定资源的操作类型operaion:

// InvokeBindingRequest is the message to send data to output bindings
message InvokeBindingRequest {
  // The name of the output binding to invoke.
  string name = 1;

  // The data which will be sent to output binding.
  bytes data = 2;

  // The metadata passing to output binding components
  // 
  // Common metadata property:
  // - ttlInSeconds : the time to live in seconds for the message. 
  // If set in the binding definition will cause all messages to 
  // have a default time to live. The message ttl overrides any value
  // in the binding definition.
  map<string,string> metadata = 3;

  // The name of the operation type for the binding to invoke
  string operation = 4;
}

InvokeBindingResponse 的定义:

// InvokeBindingResponse is the message returned from an output binding invocation
message InvokeBindingResponse {
  // The data which will be sent to output binding.
  bytes data = 1;

  // The metadata returned from an external system
  map<string,string> metadata = 2;
}

Input Binding的定义

Input Binding API 定义在 proto文件 dapr/proto/runtime/v1/appcallback.proto 中:

service AppCallback {
  // Lists all input bindings subscribed by this app.
  rpc ListInputBindings(google.protobuf.Empty) returns (ListInputBindingsResponse) {}
  
  // Listens events from the input bindings
  //
  // User application can save the states or send the events to the output
  // bindings optionally by returning BindingEventResponse.
  rpc OnBindingEvent(BindingEventRequest) returns (BindingEventResponse) {}
  ...
}

ListInputBindingsResponse 的定义:

// ListInputBindingsResponse is the message including the list of input bindings.
message ListInputBindingsResponse {
  // The list of input bindings.
  repeated string bindings = 1;
}

BindingEventRequest的定义:

// BindingEventRequest represents input bindings event.
message BindingEventRequest {
  // Requried. The name of the input binding component.
  string name = 1;

  // Required. The payload that the input bindings sent
  bytes data = 2;

  // The metadata set by the input binging components.
  map<string,string> metadata = 3;
}

BindingEventResponse 的定义:

// BindingEventResponse includes operations to save state or
// send data to output bindings optionally.
message BindingEventResponse {
  // The name of state store where states are saved.
  string store_name = 1;

  // The state key values which will be stored in store_name.
  repeated common.v1.StateItem states = 2;

  // BindingEventConcurrency is the kind of concurrency 
  enum BindingEventConcurrency {
    // SEQUENTIAL sends data to output bindings specified in "to" sequentially.
    SEQUENTIAL = 0;
    // PARALLEL sends data to output bindings specified in "to" in parallel.
    PARALLEL = 1;
  }

  // The list of output bindings.
  repeated string to = 3;

  // The content which will be sent to "to" output bindings.
  bytes data = 4;

  // The concurrency of output bindings to send data to
  // "to" output bindings list. The default is SEQUENTIAL.
  BindingEventConcurrency concurrency = 5;
}

4.2.3 - 资源绑定API的Golang生成代码

Dapr的资源绑定API的Golang生成代码

从proto api定义文件生成的golang代码,被存放在dapr项目的 pkg/proto/ 目录下。

grpc服务定义

DaprServer 是 dapr 服务的服务器端API定义,包含 InvokeBinding 方法:

// DaprServer is the server API for Dapr service.
type DaprServer interface {
	// Invokes binding data to specific output bindings
	InvokeBinding(context.Context, *InvokeBindingRequest) (*InvokeBindingResponse, error)
   ......
}

AppCallbackServer 是 AppCallback 服务的服务器端API定义,包含 ListInputBindings 方法和 OnBindingEvent 方法:

// AppCallbackServer is the server API for AppCallback service.
type AppCallbackServer interface {
	// Lists all input bindings subscribed by this app.
	ListInputBindings(context.Context, *empty.Empty) (*ListInputBindingsResponse, error)
	// Listens events from the input bindings
	//
	// User application can save the states or send the events to the output
	// bindings optionally by returning BindingEventResponse.
	OnBindingEvent(context.Context, *BindingEventRequest) (*BindingEventResponse, error)
	......
}

InvokeBindingRequest的定义

pkg/proto/runtime/v1/dapr.pb.go:

// InvokeBindingResponse is the message returned from an output binding invocation
type InvokeBindingResponse struct {
	// The data which will be sent to output binding.
	Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
	// The metadata returned from an external system
	Metadata             map[string]string `protobuf:"bytes,2,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
	XXX_NoUnkeyedLiteral struct{}          `json:"-"`
	XXX_unrecognized     []byte            `json:"-"`
	XXX_sizecache        int32             `json:"-"`
}

InvokeBindingResponse的定义

pkg/proto/runtime/v1/dapr.pb.go:

// InvokeBindingResponse is the message returned from an output binding invocation
type InvokeBindingResponse struct {
	// The data which will be sent to output binding.
	Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
	// The metadata returned from an external system
	Metadata             map[string]string `protobuf:"bytes,2,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
	XXX_NoUnkeyedLiteral struct{}          `json:"-"`
	XXX_unrecognized     []byte            `json:"-"`
	XXX_sizecache        int32             `json:"-"`
}

ListInputBindingsResponse的定义

pkg/proto/runtime/v1/appcallback.pb.go:

// ListInputBindingsResponse is the message including the list of input bindings.
type ListInputBindingsResponse struct {
	// The list of input bindings.
	Bindings             []string `protobuf:"bytes,1,rep,name=bindings,proto3" json:"bindings,omitempty"`
	XXX_NoUnkeyedLiteral struct{} `json:"-"`
	XXX_unrecognized     []byte   `json:"-"`
	XXX_sizecache        int32    `json:"-"`
}

BindingEventRequest的定义

pkg/proto/runtime/v1/appcallback.pb.go:

// BindingEventRequest represents input bindings event.
type BindingEventRequest struct {
	// Requried. The name of the input binding component.
	Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
	// Required. The payload that the input bindings sent
	Data []byte `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"`
	// The metadata set by the input binging components.
	Metadata             map[string]string `protobuf:"bytes,3,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
	XXX_NoUnkeyedLiteral struct{}          `json:"-"`
	XXX_unrecognized     []byte            `json:"-"`
	XXX_sizecache        int32             `json:"-"`
}

BindingEventResponse的定义

pkg/proto/runtime/v1/appcallback.pb.go:

// BindingEventResponse includes operations to save state or
// send data to output bindings optionally.
type BindingEventResponse struct {
	// The name of state store where states are saved.
	StoreName string `protobuf:"bytes,1,opt,name=store_name,json=storeName,proto3" json:"store_name,omitempty"`
	// The state key values which will be stored in store_name.
	States []*v1.StateItem `protobuf:"bytes,2,rep,name=states,proto3" json:"states,omitempty"`
	// The list of output bindings.
	To []string `protobuf:"bytes,3,rep,name=to,proto3" json:"to,omitempty"`
	// The content which will be sent to "to" output bindings.
	Data []byte `protobuf:"bytes,4,opt,name=data,proto3" json:"data,omitempty"`
	// The concurrency of output bindings to send data to
	// "to" output bindings list. The default is SEQUENTIAL.
	Concurrency          BindingEventResponse_BindingEventConcurrency `protobuf:"varint,5,opt,name=concurrency,proto3,enum=dapr.proto.runtime.v1.BindingEventResponse_BindingEventConcurrency" json:"concurrency,omitempty"`
	XXX_NoUnkeyedLiteral struct{}                                     `json:"-"`
	XXX_unrecognized     []byte                                       `json:"-"`
	XXX_sizecache        int32                                        `json:"-"`
}

备注:只是在proto定义的字段上增加了一些 XXX_ 字段。

4.2.4 - 资源绑定API的go client定义

Dapr的资源绑定API的go client定义

DaprClient

/pkg/proto/runtime/v1/dapr.pb.go

// DaprClient is the client API for Dapr service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type DaprClient interface {
	// Invokes binding data to specific output bindings
	InvokeBinding(ctx context.Context, in *InvokeBindingRequest, opts ...grpc.CallOption) (*InvokeBindingResponse, error)
	......
}

DaprClient 的实现:

type daprClient struct {
	cc *grpc.ClientConn
}

func NewDaprClient(cc *grpc.ClientConn) DaprClient {
	return &daprClient{cc}
}

func (c *daprClient) InvokeBinding(ctx context.Context, in *InvokeBindingRequest, opts ...grpc.CallOption) (*InvokeBindingResponse, error) {
	out := new(InvokeBindingResponse)
  // 调用固定的grpc方法 `/dapr.proto.runtime.v1.Dapr/InvokeBinding`
	err := c.cc.Invoke(ctx, "/dapr.proto.runtime.v1.Dapr/InvokeBinding", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

AppCallbackClient

/pkg/proto/runtime/v1/appcallback.pb.go

// AppCallbackClient is the client API for AppCallback service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type AppCallbackClient interface {
  // Lists all input bindings subscribed by this app.
	ListInputBindings(ctx context.Context, in *empty.Empty, opts ...grpc.CallOption) (*ListInputBindingsResponse, error)
	// Listens events from the input bindings
	//
	// User application can save the states or send the events to the output
	// bindings optionally by returning BindingEventResponse.
	OnBindingEvent(ctx context.Context, in *BindingEventRequest, opts ...grpc.CallOption) (*BindingEventResponse, error)
	......
}

AppCallbackClient 的实现:

type appCallbackClient struct {
	cc *grpc.ClientConn
}

func NewAppCallbackClient(cc *grpc.ClientConn) AppCallbackClient {
	return &appCallbackClient{cc}
}

func (c *appCallbackClient) OnInvoke(ctx context.Context, in *v1.InvokeRequest, opts ...grpc.CallOption) (*v1.InvokeResponse, error) {
	out := new(v1.InvokeResponse)
	err := c.cc.Invoke(ctx, "/dapr.proto.runtime.v1.AppCallback/OnInvoke", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

4.2.5 - 资源绑定API的go sdk

Dapr的资源绑定API的go sdk

Output Binding

go sdk使用案例

https://github.com/dapr/go-sdk

与Service类似,Dapr客户端提供了两种方法来调用Dapr定义的绑定上的操作。Dapr支持输入、输出和双向绑定。

对于简单的,只输出的绑定。

in := &dapr.BindingInvocation{ Name: "binding-name", Operation: "operation-name" }
err = client.InvokeOutputBinding(ctx, in)

调用带有内容和元数据的方法:

in := &dapr.BindingInvocation{
    Name:      "binding-name",
    Operation: "operation-name",
    Data: []byte("hello"),
    Metadata: map[string]string{"k1": "v1", "k2": "v2"},
}

out, err := client.InvokeBinding(ctx, in)

go sdk提供的API

/client/invoke.go

go sdk在 client 上封装了 InvokeOutputBinding 方法用于发起 output binding 调用:

// InvokeOutputBinding invokes configured Dapr binding with data (allows nil).InvokeOutputBinding
// This method differs from InvokeBinding in that it doesn't expect any content being returned from the invoked method.
func (c *GRPCClient) InvokeOutputBinding(ctx context.Context, in *BindingInvocation) error {
	if _, err := c.InvokeBinding(ctx, in); err != nil {
		return errors.Wrap(err, "error invoking output binding")
	}
	return nil
}

InvokeServiceWithContent方法用来发现带数据的请求:

// InvokeBinding invokes specific operation on the configured Dapr binding.
// This method covers input, output, and bi-directional bindings.
func (c *GRPCClient) InvokeBinding(ctx context.Context, in *BindingInvocation) (out *BindingEvent, err error) {
	if in == nil {
		return nil, errors.New("binding invocation required")
	}
	if in.Name == "" {
		return nil, errors.New("binding invocation name required")
	}
	if in.Operation == "" {
		return nil, errors.New("binding invocation operation required")
	}

	req := &pb.InvokeBindingRequest{
		Name:      in.Name,
		Operation: in.Operation,
		Data:      in.Data,
		Metadata:  in.Metadata,
	}

	resp, err := c.protoClient.InvokeBinding(authContext(ctx), req)
	if err != nil {
		return nil, errors.Wrapf(err, "error invoking binding %s/%s", in.Name, in.Operation)
	}

	out = &BindingEvent{}

	if resp != nil {
		out.Data = resp.Data
		out.Metadata = resp.Metadata
	}

	return
}

BindingInvocation 的定义:

// BindingInvocation represents binding invocation request
type BindingInvocation struct {
	// Name is name of binding to invoke.
	Name string
	// Operation is the name of the operation type for the binding to invoke
	Operation string
	// Data is the input bindings sent
	Data []byte
	// Metadata is the input binding metadata
	Metadata map[string]string
}

和根据 proto 生成的 InvokeBindingRequest 是完全一样的,除了去除了生成的 state/sizeCache/unknownFields 等字段。

Input Binding

TODO

5 - 状态管理

Dapr的状态管理模块

5.1 - 状态管理概述

Dapr状态管理概述

状态管理 / State Management

5.2 - 状态管理的API

Dapr状态管理的API

5.2.1 - 状态管理API的概述

Dapr状态管理API的概述

5.2.2 - 状态管理API的Proto定义

Dapr状态管理API的Proto定义

State API的定义

State Management API 定义在 proto文件 dapr/proto/runtime/v1/dapr.proto 中:

service Dapr {
  // Gets the state for a specific key.
  rpc GetState(GetStateRequest) returns (GetStateResponse) {}

  // Gets a bulk of state items for a list of keys
  rpc GetBulkState(GetBulkStateRequest) returns (GetBulkStateResponse) {}

  // Saves the state for a specific key.
  rpc SaveState(SaveStateRequest) returns (google.protobuf.Empty) {}

  // Deletes the state for a specific key.
  rpc DeleteState(DeleteStateRequest) returns (google.protobuf.Empty) {}
  
  // Executes transactions for a specified store
  rpc ExecuteStateTransaction(ExecuteStateTransactionRequest) returns (google.protobuf.Empty) {}
  ...
}

另外的common.proto中定义了和state相关的消息和枚举:

// StateOptions configures concurrency and consistency for state operations
message StateOptions {
  // Enum describing the supported concurrency for state.
  enum StateConcurrency {
    CONCURRENCY_UNSPECIFIED = 0;
    CONCURRENCY_FIRST_WRITE = 1;
    CONCURRENCY_LAST_WRITE = 2;
  }

  // Enum describing the supported consistency for state.
  enum StateConsistency {
    CONSISTENCY_UNSPECIFIED = 0;
    CONSISTENCY_EVENTUAL = 1;
    CONSISTENCY_STRONG = 2;
  }

  StateConcurrency concurrency = 1;
  StateConsistency consistency = 2;
}

get state

GetStateRequest 包含store_name/key,还有并发要求和请求级别的metadata:

// GetStateRequest is the message to get key-value states from specific state store.
message GetStateRequest {
  // The name of state store.
  string store_name = 1;

  // The key of the desired state
  string key = 2;

  // The read consistency of the state store.
  common.v1.StateOptions.StateConsistency consistency = 3;

  // The metadata which will be sent to state store components.
  map<string,string> metadata = 4;
}

GetStateResponse 包含byte[] 形式的 state 数据 data,和特殊表示数据特定版本的etag:

// GetStateResponse is the response conveying the state value and etag.
message GetStateResponse {
  // The byte array data
  bytes data = 1;

  // The entity tag which represents the specific version of data.
  // ETag format is defined by the corresponding data store.
  string etag = 2;
}

Get Bulk State

GetBulkStateRequest 是批量接口,一次性获取多个key的数据:

// GetBulkStateRequest is the message to get a list of key-value states from specific state store.
message GetBulkStateRequest {
  // The name of state store.
  string store_name = 1;

  // The keys to get.
  repeated string keys = 2;

  // The number of parallel operations executed on the state store for a get operation.
  // 在状态存储上用于get操作的并行操作执行的数量:也就是并发数,同时执行的请求数量
  int32 parallelism = 3;

  // The metadata which will be sent to state store components.
  // 请求级别,意味着所有的key都是使用同样的metadata
  map<string,string> metadata = 4;
}

GetBulkStateResponse:

// GetBulkStateResponse is the response conveying the list of state values.
message GetBulkStateResponse {
  // The list of items containing the keys to get values for.
  // 为啥不用map?
  repeated BulkStateItem items = 1;
}

// BulkStateItem is the response item for a bulk get operation.
// Return values include the item key, data and etag.
message BulkStateItem {
  // state item key
  string key = 1;

  // The byte array data
  bytes data = 2;

  // The entity tag which represents the specific version of data.
  // ETag format is defined by the corresponding data store.
  string etag = 3;

  // The error that was returned from the state store in case of a failed get operation.
  // 这里考虑了出错的可能,有机会给出错误信息
  // 但是,单个get state 操作怎么没有定义错误信息?
  // 只能在http/grpc协议层上报错?TODO:看看代码实现
  string error = 4;
}

Save State

SaveStateRequest 支持多个状态的保存:

// SaveStateRequest is the message to save multiple states into state store.
message SaveStateRequest {
  // The name of state store.
  string store_name = 1;

  // The array of the state key values.
  repeated common.v1.StateItem states = 2;
}

// StateItem represents state key, value, and additional options to save state.
message StateItem {
  // Required. The state key
  string key = 1;

  // Required. The state data for key
  bytes value = 2;

  // The entity tag which represents the specific version of data.
  // The exact ETag format is defined by the corresponding data store.
  string etag = 3;

  // The metadata which will be passed to state store component.
  map<string,string> metadata = 4;

  // Options for concurrency and consistency to save the state.
  StateOptions options = 5;
}

response为 google.protobuf.Empty。

Delete State

DeleteStateRequest:

// DeleteStateRequest is the message to delete key-value states in the specific state store.
message DeleteStateRequest {
  // The name of state store.
  string store_name = 1;

  // The key of the desired state
  string key = 2;

  // The entity tag which represents the specific version of data.
  // The exact ETag format is defined by the corresponding data store.
  string etag = 3;

  // State operation options which includes concurrency/
  // consistency/retry_policy.
  common.v1.StateOptions options = 4;

  // The metadata which will be sent to state store components.
  map<string,string> metadata = 5;
}

response为 google.protobuf.Empty。

Execute State Transaction

ExecuteStateTransactionRequest

// ExecuteStateTransactionRequest is the message to execute multiple operations on a specified store.
message ExecuteStateTransactionRequest {
  // Required. name of state store.
  string storeName = 1;

  // Required. transactional operation list.
  repeated TransactionalStateOperation operations = 2;

  // The metadata used for transactional operations.
  map<string,string> metadata = 3;
}

// TransactionalStateOperation is the message to execute a specified operation with a key-value pair.
message TransactionalStateOperation {
  // The type of operation to be executed
  // 具体有哪些操作?
  string operationType = 1;

  // State values to be operated on 
  common.v1.StateItem request = 2;
}

response为 google.protobuf.Empty。

5.2.3 - 状态管理API的golang生成代码

Dapr状态管理API的golang生成代码

从proto api定义文件生成的golang代码,被存放在dapr项目的 pkg/proto/ 目录下。

grpc服务定义

DaprServer 是 dapr 服务的服务器端API定义,包含多个 state 相关的方法:

// DaprServer is the server API for Dapr service.
type DaprServer interface {
	// Gets the state for a specific key.
	GetState(context.Context, *GetStateRequest) (*GetStateResponse, error)
	// Gets a bulk of state items for a list of keys
	GetBulkState(context.Context, *GetBulkStateRequest) (*GetBulkStateResponse, error)
	// Saves the state for a specific key.
	SaveState(context.Context, *SaveStateRequest) (*empty.Empty, error)
	// Deletes the state for a specific key.
	DeleteState(context.Context, *DeleteStateRequest) (*empty.Empty, error)
	// Executes transactions for a specified store
	ExecuteStateTransaction(context.Context, *ExecuteStateTransactionRequest) (*empty.Empty, error)
   ......
}

5.2.4 - 状态管理API的go client定义

Dapr状态管理API的golang生成代码

DaprClient

https://github.com/dapr/dapr/blob/11741c6cd697e08b2e776943e61bb2e3388c85a8/pkg/proto/runtime/v1/dapr.pb.go

这是根据proto生成的go代码

type DaprClient interface {
	// Gets the state for a specific key.
	GetState(ctx context.Context, in *GetStateRequest, opts ...grpc.CallOption) (*GetStateResponse, error)
	// Gets a bulk of state items for a list of keys
	GetBulkState(ctx context.Context, in *GetBulkStateRequest, opts ...grpc.CallOption) (*GetBulkStateResponse, error)
	// Saves the state for a specific key.
	SaveState(ctx context.Context, in *SaveStateRequest, opts ...grpc.CallOption) (*empty.Empty, error)
	// Deletes the state for a specific key.
	DeleteState(ctx context.Context, in *DeleteStateRequest, opts ...grpc.CallOption) (*empty.Empty, error)
	// Executes transactions for a specified store
	ExecuteStateTransaction(ctx context.Context, in *ExecuteStateTransactionRequest, opts ...grpc.CallOption) (*empty.Empty, error)
	......
}

Get State

以 Get State 为例看 DaprClient 的实现:

func (c *daprClient) GetState(ctx context.Context, in *GetStateRequest, opts ...grpc.CallOption) (*GetStateResponse, error) {
	out := new(GetStateResponse)
	err := c.cc.Invoke(ctx, "/dapr.proto.runtime.v1.Dapr/GetState", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

只是简单调用远程方法。

5.2.5 - 状态管理API的go sdk封装

Dapr状态管理API的go sdk封装

go sdk使用案例

https://github.com/dapr/go-sdk

对于简单场景,只要给出 store name / key / data 就好了:

ctx := context.Background()
data := []byte("hello")
store := "my-store" // defined in the component YAML 

// save state with the key key1
if err := client.SaveState(ctx, store, "key1", data); err != nil {
    panic(err)
}

// get state for key key1
item, err := client.GetState(ctx, store, "key1")
if err != nil {
    panic(err)
}
fmt.Printf("data [key:%s etag:%s]: %s", item.Key, item.Etag, string(item.Value))

// delete state for key key1
if err := client.DeleteState(ctx, store, "key1"); err != nil {
    panic(err)
}

get state

简单get方法,使用默认的并发选项:

// GetState retreaves state from specific store using default consistency option.
func (c *GRPCClient) GetState(ctx context.Context, store, key string) (item *StateItem, err error) {
	return c.GetStateWithConsistency(ctx, store, key, StateConsistencyStrong)
}

但,默认并发选项是 StateConsistencyStrong,强一致性。

完整的get 方法:

// GetStateWithConsistency retreaves state from specific store using provided state consistency.
func (c *GRPCClient) GetStateWithConsistency(ctx context.Context, store, key string, sc StateConsistency) (item *StateItem, err error) {
	if store == "" {
		return nil, errors.New("nil store")
	}
	if key == "" {
		return nil, errors.New("nil key")
	}

	req := &pb.GetStateRequest{
		StoreName:   store,
		Key:         key,
		Consistency: (v1.StateOptions_StateConsistency(sc)),
	}

	result, err := c.protoClient.GetState(authContext(ctx), req)
	if err != nil {
		return nil, errors.Wrap(err, "error getting state")
	}

	return &StateItem{
		Etag:  result.Etag,
		Key:   key,
		Value: result.Data,
	}, nil
}

基本上也没做什么。

5.3 - 状态管理的高级特性

Dapr状态管理的高级特性

5.3.1 - 状态管理高级特性的概述

Dapr状态管理高级特性的概述

Dapr 状态管理的高级特性有:

  • 并发
  • 一致性
  • 事务性

5.3.2 - 状态管理高级特性之并发控制

Dapr状态管理高级特性之并发控制

设计分析

dapr state 目前要求操作的并发控制有两个: FirstWrite 和 LastWrite。

const (
	FirstWrite = "first-write"
	LastWrite  = "last-write"
)

LastWrite (Last Write Win模式)就简单了,每个写操作都只需要简单的执行即可,无需考虑是否并发。事实上就是不做并发控制。

FirstWrite (First Write Win模式)复杂一些,当有多个操作进行并发写时,只有第一个能成功。因此,必须有机制能够在执行写操作时判断从上次读到这次写,期间 state 数据没有被修改。也就是需要实现 CAS操作:CAS = Compare And Set。

Dapr state 的设计是引入一个名为 ETag 的机制:

  • ETag 是一个整型,每个状态都会关联一个 ETag
  • 每次创建或修改 state 时,ETag都会递增
  • 进行写操作时:先读取现有state,拿到当前的ETag;在提交写操作时,传入之前的ETag。底层 state store的实现应该在执行写操作之前检查ETag是否匹配。

具体到各个操作:

  1. Save state
    • grpc API:在请求的SaveStateRequest中通过 etag 字段提供
    • HTTP API:在请求的json内容中通过etag字段提供
  2. Get state
    • grpc API:在应答的 GetStateResponse 中通过 etag 字段提供
    • HTTP API:在应答的 ETag header中提供
  3. Get Bulk
    • grpc API:在应答的 GetBulkStateResponse 中通过 etag 字段提供
    • HTTP API:在应答的json中通过 etag 字段提供
  4. Delete State
    • grpc API:在请求的 DeleteStateRequest 中通过 etag 字段提供
    • HTTP API:通过请求的 If-Match header提供

实现分析

Redis实现

redis 为了实现 state 要求的 etag,就必须在常规的key/value存储模型上增加 key/etag 的存储,实现方式就是 key / map as value,将一个 map 作为value(刚好redis本身也支持map结构)。然后在map中存储 data / version 等多个信息:

  • key=version,存储ETag需要的version
  • key=data,存储state的实际数据

读取state的时候将整个map as value读取,然后分别取data和version即可。

但写操作会比较麻烦, redis 本身不直接提供对多个字段的原子操作方式,因此在save和delete操作时需要通过LUA脚本来完成。

  • concurrency 设置为 first-write :需要通过 etag 实现 CAS (Compare And Set)
  • concurrency 设置为 last-write :忽略 etag,即使请求设置了也要重置

5.3.3 - 状态管理高级特性之一致性

Dapr状态管理高级特性之一致性

设计分析

dapr state 目前对操作的一致性要求有两个: strong 和 eventual。

const (
	Strong     = "strong"
	Eventual   = "eventual"
)

eventual 就简单了,每个写操作都只需要简单的执行即可,后续的同步等操作由底层实现自行保证。

FirstWrite (First Write Win模式)复杂一些,当有多个操作进行并发写时,只有第一个能成功。因此,必须有机制能够在执行写操作时判断从上次读到这次写,期间 state 数据没有被修改。也就是需要实现 CAS操作:CAS = Compare And Set。

Dapr state 的设计是引入一个名为 ETag 的机制:

  • ETag 是一个整型,每个状态都会关联一个 ETag
  • 每次创建或修改 state 时,ETag都会递增
  • 进行写操作时:先读取现有state,拿到当前的ETag;在提交写操作时,传入之前的ETag。底层 state store的实现应该在执行写操作之前检查ETag是否匹配。

具体到各个操作:

  1. Save state
    • grpc API:在请求的SaveStateRequest中通过 etag 字段提供
    • HTTP API:在请求的json内容中通过etag字段提供
  2. Get state
    • grpc API:在应答的 GetStateResponse 中通过 etag 字段提供
    • HTTP API:在应答的 ETag header中提供
  3. Get Bulk
    • grpc API:在应答的 GetBulkStateResponse 中通过 etag 字段提供
    • HTTP API:在应答的json中通过 etag 字段提供
  4. Delete State
    • grpc API:在请求的 DeleteStateRequest 中通过 etag 字段提供
    • HTTP API:通过请求的 If-Match header提供

实现分析

Redis实现

redis 为了实现 state 要求的 etag,就必须在常规的key/value存储模型上增加 key/etag 的存储,实现方式就是 key / map as value,将一个 map 作为value(刚好redis本身也支持map结构)。然后在map中存储 data / version 等多个信息:

  • key=version,存储ETag需要的version
  • key=data,存储state的实际数据

读取state的时候将整个map as value读取,然后分别取data和version即可。

但写操作会比较麻烦, redis 本身不直接提供对多个字段的原子操作方式,因此在save和delete操作时需要通过LUA脚本来完成。

  • concurrency 设置为 first-write :需要通过 etag 实现 CAS (Compare And Set)
  • concurrency 设置为 last-write :忽略 etag,即使请求设置了也要重置

5.3.4 - 状态管理高级特性之事务性

Dapr状态管理高级特性之事务性

设计分析

如果 state store 要支持事务,则要求实现 TransactionalStore 接口:

type TransactionalStore interface {
   // Init方法是和普通store接口一致的
   Init(metadata Metadata) error
   // 增加的是 Multi 方法
   Multi(request *TransactionalStateRequest) error
}

Runtime ExecuteStateTransaction 方法会调用 state store 的 multi 方法。

实现分析

Redis实现

dapr redis state store的事务实现,是通过 redis-go 封装的 TxPipeline 实现的。

TODO:

  • redis-go 如何实现的
  • redis如何实现事务?multi?

6 - 可观测性

Dapr的可观测性模块

6.1 - 可观测性概述

Dapr的可观测性概述

可观测性/Observability构建块

6.2 - 可观测性的文档

Dapr可观测性的文档

6.2.1 - 可观测性文档的文档

Dapr可观测性的文档概述

Observability

https://docs.dapr.io/developing-applications/building-blocks/observability/

查看和度量跨组件和网络服务的消息调用。

本节包括在可观测性方面对开发者的指导。关于Dapr中可观测性概念的总体概述和关于监测的操作指导,请参见其他章节。

相关资料:

6.2.2 - 可观测性的概念

Dapr可观测性的概念

内容节选自:https://docs.dapr.io/concepts/observability-concept/

How to monitor applications through tracing, metrics, logs and health

如何通过跟踪、指标、日志和健康状况来监控应用程序

可观测性是控制理论中的一个术语。可观测性意味着您可以通过观察系统外部来回答系统内部发生了什么问题,而无需发布新的代码来回答新的问题。在生产环境和服务中,可观测性对于调试、运维和监控Dapr系统服务、组件和用户应用至关重要。

可观测性能力使用户能够监控Dapr系统服务、它们与用户应用程序的交互,并了解这些被监控的服务的行为。可观测性的功能分为以下几个方面。

分布式跟踪

分布式跟踪用于描述和监控Dapr系统服务和用户应用程序。分布式跟踪有助于确定故障发生的位置和导致性能不佳的原因。分布式跟踪特别适合于调试和监控分布式软件架构,如微服务。

您可以使用分布式跟踪来帮助调试和优化应用程序代码。分布式跟踪包含Dapr运行时、Dapr系统服务和用户应用程序之间跨越进程、节点、网络和安全边界的跟踪span。它提供了对服务调用(调用流)和服务依赖的详细了解。

Dapr使用W3C追踪上下文进行分布式追踪。

一般建议在生产中运行Dapr时开启跟踪。

Open Telemetry

Dapr与OpenTelemetry集成,用于跟踪、度量和日志。通过OpenTelemetry,您可以根据您的环境配置各种导出器,用于跟踪和度量,无论它是在云端还是在内部运行。

Metrics

度量是指随着时间的推移收集和存储的一系列测量值和计数。Dapr指标可以监控和了解Dapr系统服务和用户应用的行为。

例如,Dapr sidecar和用户应用之间的服务指标显示调用延迟、流量故障、请求的错误率等。

Dapr系统服务指标显示sidecar注入故障、系统服务的健康度,包括CPU使用情况、做出的actor放置次数等。

Logs

日志是事件发生的记录,可以用来判断故障或其他状态。

日志事件包含Dapr系统服务产生的警告、错误、信息和调试消息。每个日志事件包括元数据,如消息类型、主机名、组件名、App ID、ip地址等。

Health

Dapr为托管平台提供了一种使用HTTP端点确定其健康状况的方法。有了这个端点,Dapr进程或sidecar可以被探测到,以确定它的准备度(readiness)和活力(liveness),并采取相应的行动。

6.2.3 - 可观测性文档的Tracing

Dapr可观测性文档中的Tracing

内容节选自:https://docs.dapr.io/developing-applications/building-blocks/observability/tracing/

Dapr 使用 OpenTelemetry(以前称为OpenCensus)进行分布式追踪和指标收集。OpenTelemetry支持各种后端,包括Azure Monitor、Datadog、Instana、Jaeger、SignalFX、Stackdriver、Zipkin等。

追踪设计

Dapr 向 Dapr sidecar 中添加了 HTTP/gRPC 中间件(middleware)。中间件拦截所有Dapr和应用流量,并自动注入相关ID以跟踪分布式事务。此设计具有以下优点:

  • 无需代码检测。将自动跟踪所有流量(追踪级别可配置)。
  • 跨微服务的一致追踪行为。追踪是在 Dapr Sidecar 上配置和管理的,因此它在服务中可能保持一致,这些服务是不同团队提供的,并且可能以不同的编程语言编写。
  • 可配置和可扩展。通过利用 OpenTelemetry,可以将 Dapr 追踪配置为与流行的追踪后端一起使用,包括客户可能拥有的自定义后端。
  • OpenTelemetry 导出器被定义为一等公民的 Dapr 组件。可以同时定义并启用多个导出器。

W3C Correlation ID

Dapr使用标准的W3C跟踪上下文头文件。对于HTTP请求,Dapr使用 traceparent header。对于gRPC请求,Dapr使用 grpc-trace-bin header。当请求到达时没有trace ID时,Dapr会创建新的ID。否则,它将沿着调用链传递跟踪ID。

阅读W3C分布式跟踪,了解更多关于W3C Trace Context的背景。

这里有很大变化,早期的文档是这样描述,实现方式和现在的差异很大:

对于HTTP请求,Dapr会向请求注入 X-Correlation-ID header。对于gRPC调用,Dapr插入 X-Correlation-ID 作为 header 元数据的字段。当没有 Correlation ID 的请求到达时,Dapr将创建一个新的 Correlation。否则,它将沿调用链传递 Correlation ID。

配置

Dapr使用OpenCensus定义的概率才样 (probabilistic sampling) 。采样率定义了tracing span被采样的概率,其值可以在0和1之间(包括)。默认采样率是0.0001(即每10,000个span中采样一个)。

要改变默认的跟踪行为,请使用配置文件(在自托管模式下)或Kubernetes配置对象(在Kubernetes模式下)。例如,以下配置对象将采样率改为1(即每个span都会采样)。

apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
  name: tracing
  namespace: default
spec:
  tracing:
    samplingRate: "1"

同样,将 samplingRate 改为 0 将完全禁用跟踪。

有关如何在本地环境和Kubernetes环境中配置跟踪的更多细节,请参见参考文档部分。

Dapr支持可插拔的导出器,由配置文件(在自托管模式下)或Kubernetes自定义资源对象(在Kubernetes模式下)定义。例如,下面的清单(manifest)定义了一个Zipkin导出器。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: zipkin
  namespace: default
spec:
  type: exporters.zipkin
  metadata:
  - name: enabled
    value: "true"
  - name: exporterAddress
    value: "http://zipkin.default.svc.cluster.local:9411/api/v2/spans"

参考文档

6.2.4 - 可观测性文档的W3C追踪上下文

Dapr可观测性文档中的W3C追踪上下文

内容节选自:https://docs.dapr.io/developing-applications/building-blocks/observability/w3c-tracing/

使用Dapr进行W3C追踪的背景和场景。

6.2.5 - 可观测性文档的W3C追踪上下文概述

Dapr可观测性文档中的W3C追踪上下文概述

内容节选自:https://docs.dapr.io/developing-applications/building-blocks/observability/w3c-tracing/w3c-tracing-overview/

使用Dapr进行W3C追踪的背景和场景

介绍

Dapr使用W3C跟踪上下文对服务调用和pub/sub消息进行分布式跟踪。Dapr主要完成了生成和传播跟踪上下文信息的所有繁重工作,这些信息可以被发送到许多不同的诊断工具进行可视化和查询。只有极少数情况下,作为开发者,你需要传播或生成tracing header。

背景

分布式跟踪是一种由跟踪工具实现的方法,用于跟踪、分析和调试跨多个软件组件的事务。通常情况下,分布式跟踪会遍历一个以上的服务,这就要求它具有唯一的标识性。跟踪上下文传播将这种唯一标识传递出去。

在过去,跟踪上下文传播通常由每个不同的跟踪供应商单独实现。在多厂商的环境中,这会造成互操作性的问题,比如:

  • 由于没有共享的唯一标识符,不同追踪供应商收集的tracing无法相互关联。
  • 跨越不同追踪供应商边界的trace无法传播,因为没有统一协商的标识符集可以被转发。
  • 厂商特定的元数据可能会被中介机构放弃
  • 云平台厂商、中间商和服务商,由于没有标准可循,不能保证支持追踪上下文传播。

在过去,这些问题并没有产生重大影响,因为大多数应用程序由单一的跟踪供应商监控,并停留在单一平台供应商的边界内。今天,越来越多的应用是分布式的,并利用了多个中间件服务和云平台。

现代应用的这种转变呼唤一个分布式的跟踪上下文传播标准。W3C跟踪上下文规范为跟踪上下文传播数据的交换定义了一种普遍认同的格式–称为跟踪上下文。trace context通过以下方式解决了上述问题:

  • 为单个跟踪和请求提供独特的标识符,允许将多个供应商的跟踪数据链接在一起。
  • 提供一个约定俗成的机制,以转发特定供应商的跟踪数据,并避免在多个跟踪工具参与单一交易时出现跟踪中断的情况。
  • 提供一个中间商、平台和硬件提供商可以支持的行业标准。

传播跟踪数据的统一方法提高了分布式应用行为的可视性,便于问题和性能分析。

场景

有两种情况下,你需要了解跟踪是如何被使用的:

  1. Dapr生成并在服务之间传播跟踪上下文。
  2. Dapr生成跟踪上下文,你需要将跟踪上下文传播给另一个服务,或者你生成跟踪上下文,Dapr将跟踪上下文传播给一个服务。

Dapr在服务之间生成和传播跟踪上下文

在这些情况下,Dapr为你做了所有的工作。你不需要创建和传播任何跟踪头。Dapr会负责创建所有的跟踪头并传播它们。让我们通过实例来了解一下这些场景。

  1. 单个服务调用(service A -> service B

    Dapr在服务A中生成跟踪头,这些跟踪头从服务A传播到服务B。

  2. 多个顺序的服务调用 ( service A -> service B -> service C)

    Dapr在服务A请求开始时生成跟踪头,这些跟踪头从服务A->服务B->服务C,以此类推传播到更多的启用了Dapr的服务。

  3. 请求来自外部端点(例如从网关服务到启用Dapr的服务A的请求)

    Dapr 在服务 A 中生成跟踪头,这些跟踪头从服务 A 进一步传播到启用 Dapr 的服务服务 A->服务 B ->服务 C。和上面的场景2类似。

  4. Pub/sub消息 Dapr在发布的消息主题中生成跟踪头,这些跟踪头被传播到该主题上的任何监听服务。

你需要在服务之间传播或生成跟踪上下文

在这些情况下,Dapr为你做了一些工作,你需要创建或传播跟踪头。

  1. 从单个服务对不同服务的多次服务调用

    当你从一个服务中调用多个服务时,比如像这样从服务A中调用,你需要传播跟踪头。

     service A -> service B
     [ .. some code logic ..]
     service A -> service C
     [ .. some code logic ..]
     service A -> service D
     [ .. some code logic ..]
    

    在这种情况下,当服务A第一次调用服务B时,Dapr会在服务A中生成跟踪头,然后将这些跟踪头传播给服务B,这些跟踪头在服务B的响应中作为响应头的一部分返回。然而你需要将返回的跟踪上下文传播给下一个服务C和服务D,因为Dapr不知道你要重用同一个头。

    要了解如何从响应中提取跟踪头,并将跟踪头添加到请求中,请参阅 如何使用跟踪上下文 一文。

  2. 你选择了生成自己的跟踪上下文头。这是很少会遇到的。在某些情况下,您可能会特别选择将 W3C 跟踪头添加到服务调用中,例如,如果您有一个现有的应用程序目前没有使用 Dapr。在这种情况下,Dapr仍然会为您传播跟踪上下文头。如果你决定自己生成跟踪头,有三种方法可以实现:

    1. 您可以使用行业标准的 OpenCensus/OpenTelemetry SDK 来生成跟踪头,并将这些跟踪头传递给启用 Dapr 的服务。这是首选的建议。

    2. 你可以使用供应商SDK来生成W3C跟踪头,如DynaTrace SDK,并将这些跟踪头传递给启用Dapr的服务。

    3. 你可以按照W3C跟踪上下文规范手工制作一个跟踪上下文,并将这些跟踪头传递给启用Dapr的服务。

W3C trace headers

这些是由Dapr为HTTP和gRPC生成和传播的特定的跟踪上下文头。

跟踪上下文的HTTP header格式

当将HTTP响应的跟踪上下文头传播到HTTP请求时,你需要复制这些头。

Traceparent Header

traceparent header在跟踪系统中以通用的格式表示传入的请求,所有厂商都能理解。下面是一个traceparent header的例子:

traceparent: 00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01

traceparent字段在 这里 有详细说明

Tracestate Header

tracestate头 header包含了可能是特定于厂商的格式的parent。

tracestate: congo=t61rcWkgMzE

这里 是详细的 tracestate 字段的说明。

跟踪上下文的gRPC header格式

在gRPC API调用中,跟踪上下文是通过 grpc-trace-bin heaer传递的。

6.2.6 - 可观测性文档中的日志

Dapr可观测性文档中的日志

内容节选自:https://docs.dapr.io/developing-applications/building-blocks/observability/logs/

Dapr以纯文本或JSON格式生成结构化日志到stdout。默认情况下,所有的Dapr进程(运行时和系统服务)都以纯文本的形式写入控制台。要启用JSON格式的日志,需要在运行Dapr进程时添加-log-as-json命令标志。

如果要使用搜索引擎(如 Elastic Search 或 Azure Monitor)搜索日志,建议使用 JSON 格式的日志,日志收集器和搜索引擎可以使用内置的 JSON 分析器进行解析。

Log schema

Dapr根据以下模式生成日志:

字段 描述 例子
time ISO8601 Timestamp 2011-10-05T14:48:00.000Z
level Log Level (info/warn/debug/error) info
type Log Type log
msg Log Message hello dapr!
scope Logging Scope dapr.runtime
instance Container Name dapr-pod-xxxxx
app_id Dapr App ID dapr-app
ver Dapr Runtime Version 0.5.0

纯文本和JSON格式的日志

  • 纯文本日志示例
time="2020-03-11T17:08:48.303776-07:00" level=info msg="starting Dapr Runtime -- version 0.5.0-rc.2 -- commit v0.3.0-rc.0-155-g5dfcf2e" instance=dapr-pod-xxxx scope=dapr.runtime type=log ver=0.5.0-rc.2
time="2020-03-11T17:08:48.303913-07:00" level=info msg="log level set to: info" instance=dapr-pod-xxxx scope=dapr.runtime type=log ver=0.5.0-rc.2
  • JSON格式日志示例
{"instance":"dapr-pod-xxxx","level":"info","msg":"starting Dapr Runtime -- version 0.5.0-rc.2 -- commit v0.3.0-rc.0-155-g5dfcf2e","scope":"dapr.runtime","time":"2020-03-11T17:09:45.788005Z","type":"log","ver":"0.5.0-rc.2"}
{"instance":"dapr-pod-xxxx","level":"info","msg":"log level set to: info","scope":"dapr.runtime","time":"2020-03-11T17:09:45.788075Z","type":"log","ver":"0.5.0-rc.2"}

配置纯文本或JSON格式的日志

Dapr支持纯文本和JSON格式的日志。默认的格式是纯文本。如果您想使用纯文本与搜索引擎,您不需要更改任何配置选项。

要使用JSON格式的日志,您需要在安装Dapr和部署应用程序时添加额外的配置。建议使用JSON格式的日志,因为大多数日志收集器和搜索引擎可以通过内置的解析器更容易地解析JSON。

在Kubernetes中配置日志格式

以下步骤描述了如何为Kubernetes配置JSON格式的日志。

使用Helm chart将Dapr安装到集群中

您可以通过在Helm命令中添加--set global.logAsJson=true选项为Dapr系统服务启用JSON格式的日志。

helm install dapr dapr/dapr --namespace dapr-system --set global.logAsJson=true

为 Dapr Sidecar启用 JSON 格式的日志。

您可以通过在 deployment 中添加 dapr.io/log-as-json: "true " 注解,在Dapr sidecars-injector服务激活的Dapr sidecars中启用JSON格式的日志。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: pythonapp
  namespace: default
  labels:
    app: python
spec:
  replicas: 1
  selector:
    matchLabels:
      app: python
  template:
    metadata:
      labels:
        app: python
      annotations:
        dapr.io/enabled: "true"
        dapr.io/app-id: "pythonapp"
        dapr.io/log-as-json: "true"
...

日志收集器

如果你在Kubernetes集群中运行Dapr,Fluentd是一个流行的容器日志收集器。你可以使用Fluentd和json解析器插件来解析Dapr JSON格式的日志。本攻略介绍了如何在集群中配置Fleuntd。

如果你使用Azure Kubernetes服务,你可以使用默认的OMS代理与Azure Monitor收集日志,而不需要安装Fluentd。

搜索引擎

如果你使用Fluentd,我们建议使用Elastic Search和Kibana。这篇攻略介绍了如何在Kubernetes集群中设置Elastic Search和Kibana。

如果你使用的是Azure Kubernetes服务,你可以使用Azure监控容器,而无需安装任何额外的监控工具。另请阅读如何为容器启用Azure Monitor。

6.2.7 - 如何在Dapr中使用的W3C跟踪上下文

如何在Dapr中使用的W3C跟踪上下文

内容节选自:https://docs.dapr.io/developing-applications/building-blocks/observability/w3c-tracing/w3c-tracing-howto/

使用Dapr的W3C追踪标准。

如何从响应中检索跟踪上下文。

注意:在Dapr SDK中没有暴露的帮助方法来传播和检索跟踪上下文。你需要使用http/gRPC客户端通过http头和gRPC元数据来传播和检索跟踪头。

在Go中检索跟踪上下文

对于HTTP调用

OpenCensus Go SDK提供ochttp包,它提供了从http响应中获取跟踪上下文的方法。

要从HTTP响应中获取跟踪上下文,你可以这样:

f := tracecontext.HTTPFormat{}
sc, ok := f.SpanContextFromRequest(req)

对于gRPC调用

要在gRPC调用返回时检索跟踪上下文头,可以将响应头引用作为gRPC调用选项传递,该选项包含响应头。

var responseHeader metadata.MD

// Call the InvokeService with call option
// grpc.Header(&responseHeader)

client.InvokeService(ctx, &pb.InvokeServiceRequest{
		Id: "client",
		Message: &commonv1pb.InvokeRequest{
			Method:      "MyMethod",
			ContentType: "text/plain; charset=UTF-8",
			Data:        &any.Any{Value: []byte("Hello")},
		},
	},
	grpc.Header(&responseHeader))

如何在请求中传播跟踪上下文

注意:在Dapr SDK中没有暴露的帮助方法来传播和检索跟踪上下文。你需要使用http/gRPC客户端通过http头和gRPC元数据来传播和检索跟踪头。

在Go中传递跟踪上下文

对于HTTP调用

OpenCensus Go SDK提供ochttp包,提供在http请求中附加跟踪上下文的方法。

f := tracecontext.HTTPFormat{}
req, _ := http.NewRequest("GET", "http://localhost:3500/v1.0/invoke/mathService/method/api/v1/add", nil)

traceContext := span.SpanContext()
f.SpanContextToRequest(traceContext, req)

对于gRPC调用

traceContext := span.SpanContext()
traceContextBinary := propagation.Binary(traceContext)

然后你可以通过gRPC元数据用 grpc-trace-bin 头传递跟踪上下文。

ctx = metadata.AppendToOutgoingContext(ctx, "grpc-trace-bin", string(traceContextBinary))

6.2.8 - 可观测性文档中的Metrics

Dapr可观测性文档中的Metrics

内容节选自:https://docs.dapr.io/developing-applications/building-blocks/observability/metrics/

Dapr公开了一个Prometheus指标端点(metrics endpoint),您可以通过刮削该端点来更好地了解Dapr的行为方式,并针对特定条件设置警报。

配置

Dapr系统进程默认启用metrics端点,您可以通过命令行参数--enable-metrics=false来禁用它。

默认的metrics端口是9090。可以通过向Daprd传递命令行参数--metrics-port来重写这个端口。

如果要禁用Dapr Sidecar中的度量,可以使用 metrics spec 配置,并设置 enabled: false来禁用Dapr运行时的度量。

apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
  name: tracing
  namespace: default
spec:
  tracing:
    samplingRate: "1"
  metric:
    enabled: false

Metrics

每个Dapr系统进程都会默认发出Go运行时/进程指标,并有自己的指标

6.2.9 - 可观测性文档中的sidecar健康状态

Dapr可观测性文档中的sidecar健康状态

内容节选自:https://docs.dapr.io/developing-applications/building-blocks/observability/sidecar-health/

Dapr提供了一种使用HTTP /healthz 端点来确定其健康状况的方法。有了这个端点,Dapr进程或sidecar可以被探测到它的健康状况,从而确定它的准备度(readiness)和活力(liveness)。参见健康API

Dapr /healthz 端点可以被应用托管平台的健康探针使用。本主题介绍了Dapr如何与来自不同托管平台的探针集成。

作为用户,当将Dapr部署到托管平台(例如Kubernetes)时,Dapr健康端点会自动为您配置。你不需要配置任何东西。

注意:Dapr actor 也有一个健康 API 端点,Dapr 会探测应用程序,以响应 Dapr 发出的信号,即 actor 应用程序是健康且正在运行的。请参阅actor健康API

健康端点:与Kubernetes的整合

Kubernetes使用就绪(readiness)和活泼度(liveness)探针来确定容器的健康状况。

kubelet使用 liveness probes 来知道何时重启容器。例如,liveness探针可以捕捉到一个死锁,即应用程序正在运行,但无法取得进展。在这样的状态下重启容器,可以帮助使应用在有bug的情况下仍有更多的可用性。

kubelet使用 readiness probes 来了解一个容器何时准备好开始接受流量。当一个pod的所有容器都准备好时,就认为它已经准备好了。这个就绪信号的一个用途是控制哪些pod被用作Kubernetes服务的后端。当一个pod没有准备好时,它将从Kubernetes服务负载平衡器中移除。

当与Kubernetes集成时,Dapr sidecar被注入一个Kubernetes探针配置,告诉它使用Dapr healthz端点。这是由Sidecar Injector系统服务完成的。与kubelet的集成如下图所示。

如何在Kubernetes中配置Liveness探针?

在pod配置文件中,在容器规范部分添加了活泼度(Liveness)探针,如下图所示。

 livenessProbe:
      httpGet:
        path: /healthz
        port: 8080
      initialDelaySeconds: 3
      periodSeconds: 3

在上面的例子中,periodSeconds字段指定kubelet应该每3秒执行一次活度探测。initialDelaySeconds字段告诉kubelet在执行第一个探测之前应该等待3秒。要执行探测,kubelet会向在容器中运行并在本例中监听端口8080的服务器发送HTTP GET请求。如果服务器的 /healthz 路径的处理程序返回一个成功代码,那么kubelet认为容器还活着并且是健康的。如果处理程序返回失败代码,kubelet就会杀死容器并重新启动它。

任何大于或等于200且小于400的代码表示成功。任何其他代码表示失败。

如何在Kubernetes中配置readiness探针?

就绪(readiness)探针的配置与活度探针类似,唯一的区别是使用readinessProbe字段而不是活度探针字段。

readinessProbe:
			httpGet:
        path: /healthz
        port: 8080
      initialDelaySeconds: 3
      periodSeconds: 3

Dapr sidecar健康端点如何用Kubernetes配置?

如上所述,这个配置是由Sidecar Injector服务自动完成的。本节介绍了在活泼度和准备度探针上设置的具体数值。

Dapr在端口3500上有其HTTP健康端点 /v1.0/healthz,这可以和Kubernetes一起用于就绪和活泼度探针。当Dapr sidecar被注入时,在pod配置文件中用以下值配置就绪和活泼度探针。

livenessProbe:
      httpGet:
        path: v1.0/healthz
        port: 3500
      initialDelaySeconds: 5
      periodSeconds: 10
      timeoutSeconds : 5
      failureThreshold : 3
readinessProbe:
      httpGet:
        path: v1.0/healthz
        port: 3500
      initialDelaySeconds: 5
      periodSeconds: 10
      timeoutSeconds : 5
      failureThreshold: 3

7 - 安全

Dapr的可观测性模块

7.1 - 安全概述

Dapr的安全模块

安全/Secrets 构建块

官方资料:

7.2 - 安全的概念

Dapr的安全的概念

内容节选自:https://github.com/dapr/docs/blob/master/concepts/secrets/

Dapr秘密管理

Dapr为开发人员提供了一种一致的方式来提取应用 secrets,而无需了解所使用的 secrets store 的详细信息。 secrets store 是Dapr中的组件。Dapr允许用户编写新的 secrets store 组件实现,这些实现既可以用来保存其他Dapr组件的 secrets(例如,状态存储组件用来读取/写入状态的secrets),也可以为应用提供专用的 secrets 构建块API。使用 secrets 构造块API,您可以轻松地从命名的 secrets store 读取应用可以使用的 secrets。

Secrets store 的例子包括KubernetesHashicorp VaultAzure KeyVault

在Dapr Components中引用 secrets store

您可以将凭据放在 Dapr 支持的 secrets store 中,并在 Dapr 组件中引用该 secrets,而不是在Dapr组件中包括凭据。

学习心得:

https://github.com/dapr/docs/blob/master/howto/setup-secret-store/gcp-secret-manager.md

这个文档的最下面的用法,我觉得还是挺有参考价值的。 sidecar 搞定 secrets 的数据,放在自己内容,其他模块可直接引用,应用根本不需要真正拿到这些敏感信息。比如这个例子里面,应用连接 redis 去做 状态管理,redis 的连接密码就这么轻松搞定

下沉的能力越多,build block越多,类似的需要做安全管理的字段越多,这个玩法的价值就越大

检索Secrets

服务代码可以调用 secrets 构造块API,以从Dapr支持的 secret store 中检索 secrets。

8 - Actor

Dapr的Actor模块

8.1 - Actor概述

Dapr的Actor概述

Actors 构建块

官方资料:

8.2 - Actor的概念

Dapr中Actor的概念

内容节选自:https://github.com/dapr/docs/blob/master/concepts/actors/README.md

Dapr运行时提供了一个基于 Virtual Actor 模式的actor实现。Dapr actors API提供了一个单线程编程模型,该模型利用了运行Dapr的底层平台所提供的可伸缩性和可靠性保证。

Actor 介绍

Actor 是具有单线程执行的隔离,独立的计算和状态单元。

Actor 模型 是一种用于并发或者分布式系统的计算机模型,大量的 Actor 可以同时执行且彼此独立。Actor 可以相互交流,并且可以创建更多的 Actor。

何时使用Actor

Dapr actor 是 actor 设计模式的一种实现。与任何软件设计模式一样,是否使用特定模式取决于软件设计问题是否适合该模式。

尽管 Actor 设计模式可以很好地适配许多分布式系统的问题和场景,但是必须仔细考虑模式的约束以及实现该模式的框架。作为一般指导,在以下情况下,请考虑使用 Actor 模式来为您的问题或场景建模:

  • 问题空间涉及大量(成千上万个)小型,独立且隔离的状态和逻辑单元。
  • 想要使用单线程对象,无需与外部组件进行大量交互(包括跨参与者查询状态)
  • actor实例不会通过发出I/O操作而以不可预测的延迟阻塞调用者。

Dapr Actor

每个 Actor 都被定义为 Actor 类型的实例,类比对象是类的实例。例如,可能存在实现计算器功能的 Actor 类型,并且可能有许多这种类型的 Actor 分布在整个集群的各个节点上。每个此类 Actor 都由 Actor ID 唯一标识。

Actor生命周期

Dapr Actor 是虚拟的,这意味着他们的生命周期不依赖于其在内存中的表示。结果就是,不需要显式创建或销毁它们。Dapr actor 运行时在其第一次收到对该 actor ID 的请求时会自动激活 actor。如果一段时间未使用actor,则Dapr Actors 运行时会垃圾收集内存中的对象。如果以后需要重新激活,它还将保留有关 Actor 存在的知识。

对 actor 方法和提醒的调用会重置空闲时间,例如提醒触发将使actor保持活动状态。Actor 提醒触发 Actor 是活跃还是不活跃,如果被激发为不活跃的演员,它将首先激活该 Actor。Actor timer不会重置空闲时间,因此 timer 触发不会使Actor保持活动状态。计时器仅在actor处于活动状态时触发。

Dapr运行时用于查看是否可以垃圾收集actor而使用的空闲超时和扫描间隔是可配置的。当Dapr运行时调用actor服务以获取受支持的actor类型时,可以传递此信息。

由于虚拟 Actor 模型,这种虚拟角色生命周期的抽象带来了一些警告,实际上,Dapr Actors的实现有时会偏离该模型。

第一次将消息发送到其 Actor ID 时,会自动激活一个 Actor(导致要构造一个 Actor 对象)。一段时间后,actor对象将被垃圾回收。将来,再次使用actor ID,将导致构造新的actor对象。由于状态存储在已配置的状态提供程序中以供Dapr运行时使用,因此 Actor 的状态比对象的生存期更长。

分发和故障转移

为了提供可伸缩性和可靠性,actor实例分布在整个群集中,Dapr会根据需要自动将其从故障节点迁移到正常节点。

Actor分布在actor服务的各个实例上,而这些实例分布在集群中的各个节点上。每个服务实例都包含一组给定类型的 Actor。

Actor Placement 服务

Dapr actor运行时为您管理分发方案和密钥范围设置。这是由actor Placement服务完成的。创建服务的新实例时,相应的Dapr运行时会注册它可以创建的参与者类型,并且Placement服务会针对给定的参与者类型计算所有实例之间的分区。每种Actor类型的分区信息表都会更新并存储在环境中运行的每个Dapr实例中,并且可以在创建和破坏Actor服务的新实例时动态更改。如下图所示。

当客户端调用具有特定ID(例如,Actor ID 123)的actor时,客户端的 Dapr 实例会对actor类型和ID进行哈希处理,并使用该信息调用相应的Dapr实例,该实例可以为这个特定的Actor ID的请求提供服务。结果就是,总是为任何给定的Actor ID调用相同的分区(或服务实例)。如下图所示。

Actor ID创建和调用

这简化了一些选择,但也需要考虑一些问题:

  • 默认情况下,Actor 被随机放置在 pod 中,以实现均匀分布。
  • 因为 Actor 是随机放置的,所以期望 Actor 的操作总是需要网络通信,包括方法调用数据的序列化和反序列化,这会导致延迟和开销。

注意:Dapr Actor Placement 服务仅用于演员放置,因此,如果服务未使用Dapr Actor,则不需要。Placement 服务可以在所有托管环境中运行,例如,自托管,Kubernetes

Actor通讯

可以通过调用 HTTP/gRPC 端点与 Dapr 交互以调用 actor 方法:

POST/GET/PUT/DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/<method/state/timers/reminders>

您可以在请求体中为actor方法提供任何数据,并且请求的响应将在响应体中,这是来自actor调用的数据。

并发

Dapr Actors运行时提供了一个简单的基于回合的访问模型,用于访问actor方法。这意味着在任何时候,调用一个actor对象的代码的时,最多只能有一个线程处于活动状态。基于回合的访问极大地简化了并发系统,因为不需要数据访问的同步机制。这也意味着,在设计系统时,必须为每个 Actor 实例的单线程访问性质做特殊考虑。

一个actor实例不能同时处理多个请求。如果期望 actor 实例处理并发请求,则可能导致吞吐量瓶颈。

如果两个 Actor 之间存在循环请求,而同时向其中一个 Actor 发出外部请求,则 Actor 之间可能会彼此死锁。Dapr actor运行时会在 actor 调用上自动超时,并向调用方抛出异常以中断可能的死锁情况。

基于回合的访问

回合包括响应其他 Actor 或客户端的请求而完全执行 Actor方法,或完全执行 timer/reminder 回调。即使这些方法和回调是异步的,Dapr Actors运行时也不会交错它们。在允许新的回合之前,必须将回合完全完成。换句话说,在允许对方法或回调的新调用之前,当前正在执行的actor方法或计时器/提醒回调必须完全完成。如果方法或回调已从该方法或回调返回执行,并且该方法或回调返回的任务已完成,则认为该方法或回调已完成。值得强调的是,即使在不同的方法,计时器和回调中,也要尊重基于回合的并发。

Dapr actors运行时强制执行基于回合的并发,方式是通过在回合开始时获取每个 Actor 的锁并在回合结束时释放该锁。因此,基于回合的并发是基于每个 Actor 而不是跨 Actor 实现的。Actor方法和计时器/提醒回调可以代表不同的actor同时执行。

以下示例说明了上述概念。考虑一个实现两个异步方法(例如Method1和Method2),一个计时器和一个提醒的actor类型。下图显示了代表属于此 Actor 类型的两个 Actor(ActorId1和ActorId2)执行这些方法和回调的时间线示例。

其他内容:

9 - Daprd

Dapr的Daprd

9.1 - Daprd概述

Dapr的Daprd概述

Daprd

10 - Injector

Dapr的Injector

10.1 - Injector概述

Dapr的Injector概述

Injector

11 - Placement

Dapr的Placement

11.1 - Placement概述

Dapr Placement概述

Placement

12 - Sentry

Dapr的Sentry

12.1 - Sentry概述

Dapr Sentry概述

Sentry

13 - Operator

Dapr Operator

13.1 - Operator概述

Dapr Operator概述

Operator

14 - SDK

Dapr的SDK

14.1 - SDK概述

Dapr的SDK概述

SDK概述

14.2 - Dapr Go SDK

Dapr的Go SDK

14.2.1 - Go SDK概述

Dapr的Go SDK概述

Go SDK

14.2.2 - Go SDK的proto代码生成

Dapr的Go SDK中从protos生成代码

准备工作

准备protoc

安装Protoc,目前 daprd 要求的版本是 v3.14.0

具体见 Daprd Proto代码生成

sdk 和 daprd保持一致,但实际目前go sdk采用的是 protoc 3.11.2。

将proto生成源码

现在 go sdk的make file 提供了 protos 命令来从proto生成go代码:

$ make protos
# 下载安装gogoreplace
go install github.com/gogo/protobuf/gogoreplace
go: finding module for package github.com/gogo/protobuf/gogoreplace
go: found github.com/gogo/protobuf/gogoreplace in github.com/gogo/protobuf v1.3.2
# 删除本地的proto文件
rm -f ./dapr/proto/common/v1/*
rm -f ./dapr/proto/runtime/v1/*
# 从 dapr/dapr 仓库下载 common.proto 文件到本地
wget -q https://raw.githubusercontent.com/dapr/dapr/master/dapr/proto//common/v1/common.proto -O ./dapr/proto/common/v1/common.proto
# 使用 gogoreplace 工具替换 common.proto 文件中的 go_package
gogoreplace 'option go_package = "github.com/dapr/dapr/pkg/proto/common/v1;common";' \
        'option go_package = "github.com/dapr/go-sdk/dapr/proto/common/v1;common";' \
        ./dapr/proto/common/v1/common.proto
wget -q https://raw.githubusercontent.com/dapr/dapr/master/dapr/proto//runtime/v1/appcallback.proto -O ./dapr/proto/runtime/v1/appcallback.proto
gogoreplace 'option go_package = "github.com/dapr/dapr/pkg/proto/runtime/v1;runtime";' \
        'option go_package = "github.com/dapr/go-sdk/dapr/proto/runtime/v1;runtime";' \
        ./dapr/proto/runtime/v1/appcallback.proto
wget -q https://raw.githubusercontent.com/dapr/dapr/master/dapr/proto//runtime/v1/dapr.proto -O ./dapr/proto/runtime/v1/dapr.proto
gogoreplace 'option go_package = "github.com/dapr/dapr/pkg/proto/runtime/v1;runtime";' \
        'option go_package = "github.com/dapr/go-sdk/dapr/proto/runtime/v1;runtime";' \
        ./dapr/proto/runtime/v1/dapr.proto
# 从proto文件生成go代码
protoc --go_out=. --go_opt=paths=source_relative \
       --go-grpc_out=. --go-grpc_opt=paths=source_relative \
           dapr/proto/common/v1/common.proto
protoc --go_out=. --go_opt=paths=source_relative \
           --go-grpc_out=. --go-grpc_opt=paths=source_relative \
           dapr/proto/runtime/v1/*.proto
# 删除刚才下载到本地的proto
rm -f ./dapr/proto/common/v1/*.proto
rm -f ./dapr/proto/runtime/v1/*.proto

遇到的问题

gogoreplace的安装

不知为何我遇到下面的报错:

$ make protos
go install github.com/gogo/protobuf/gogoreplace
cannot find package "." in:
        /home/sky/work/code/skyao/go-sdk/vendor/github.com/gogo/protobuf/gogoreplace
make: *** [Makefile:48:protos] 错误 1

单独执行这个 go install 命令也是同样错误:

$ go install github.com/gogo/protobuf/gogoreplace
cannot find package "." in:
        /home/sky/work/code/skyao/go-sdk/vendor/github.com/gogo/protobuf/gogoreplace

解决的方法是进入上一级目录(不要在go-sdk目录下)进行安装:

$ cd ..
$ go install github.com/gogo/protobuf/gogoreplace
go install: version is required when current directory is not in a module
        Try 'go install github.com/gogo/protobuf/gogoreplace@latest' to install the latest version

$ go install github.com/gogo/protobuf/gogoreplace@latest

然后临时注释掉这一行:

#go install github.com/gogo/protobuf/gogoreplace

方法有点恶心,主要是没有找到问题所在。

cgo报错stdlib.h找不到

在ubuntu 20.04新系统上遇到这个错误:

$ make test
go mod tidy
go test -count=1 \
                -race \
                -coverprofile=coverage.txt \
                -covermode=atomic \
                ./...
# runtime/cgo
_cgo_export.c:3:10: fatal error: stdlib.h: 没有那个文件或目录
    3 | #include <stdlib.h>
      |          ^~~~~~~~~~
compilation terminated.

这是因为 ubuntu 默认没有C和C++编译环境,执行以下命令安装即可:

$ sudo aptitude install build-essential

后记:更新protoc版本

发现 go sdk 使用的protoc 版本是 v3.11.2,而之前我为了满足 dapr/dapr 仓库的要求安装的是 protoc v3.14.0。两个版本生成的代码会有一些细微的差异,也就造成了生成的代码会合git 仓库中的现有代码不同。

在不同仓库之间切换使用不同的 protoc 版本实在是太不方便了,最简单的方法还是统一到同一个版本。

提交了issue和PR 给到社区,希望可以升级 go sdk 的 protoc 版本到 v3.14.0:

https://github.com/dapr/go-sdk/pull/141

14.3 - Java SDK

Dapr的Java SDK

14.3.1 - Java SDK概述

Dapr的Java SDK概述

Java SDK