dapr仓库中的代码:
dapr仓库的源码学习
- 1: 工具类代码的源码学习
- 1.1: concurrency的源码学习
- 1.1.1: limiter.go的源码学习
- 2: 类库类代码的源码学习
- 2.1: grcp的源码学习
- 2.1.1: util.go的源码学习
- 2.1.2: port.go的源码学习
- 2.1.3: dial.go的源码学习
- 3: 基础代码的源码学习
- 3.1: version的源码学习
- 3.2: modes的源码学习
- 3.3: cors的源码学习
- 3.4: proto的源码学习
- 3.5: config的源码学习
- 3.6: credentials的源码学习
- 3.6.1: certchain.go的源码学习
- 3.6.2: credentials.go的源码学习
- 3.6.3: tls.go的源码学习
- 3.6.4: grpc.go的源码学习
- 4: Runtime的源码学习
- 4.1: options.go的源码学习
- 4.2: config.go的源码学习
- 4.3: cli.go的源码学习
- 4.4: Runtime App Channel的源码学习
- 4.4.1: channel.go的源码学习
- 4.4.2: grpc_channel.go的源码学习
- 5: Components的源码学习
- 5.1: Binding组件的源码学习
- 5.2: Middleware组件的源码学习
- 5.3: NameResolution组件的源码学习
- 5.4: PubSub组件的源码学习
- 5.5: SecretStores组件的源码学习
- 5.6: Store组件的源码学习
- 5.7: workflow组件的源码学习
- 5.7.1: registry.go的源码学习
- 6: Healthz的源码学习
- 6.1: health.go的源码学习
- 6.2: server.go的源码学习
- 7: Metrics的源码学习
- 7.1: exporter.go的源码学习
- 7.2: options.go的源码学习
- 8: workflow的源码
- 8.1: workflow API
- 8.2: workflow HTTP API
- 8.3: workflow gRPC API
- 9: 状态管理的源码
- 9.1: 状态管理源码的概述
- 9.2: 状态管理的初始化源码分析
- 9.3: 状态管理的runtime处理源码分析
- 9.4: 状态管理中Redis实现的处理源码分析
- 10: 资源绑定的源码
- 10.1: 资源绑定的源码概述
- 10.2: 资源绑定的初始化源码分析
- 10.3: 资源绑定的Redis output实现源码分析
- 10.4: 资源绑定的output处理源码分析
- 10.5: 资源绑定的Metadata总结
- 11: Injector的源码分析
- 11.1: Injector的代码实现
- 11.2: main.go的源码学习
- 11.3: config.go的源码学习
- 11.4: injector.go的源码学习
- 11.5: patch_operation.go的源码学习
- 11.6: pod_patch.go的源码学习
1 - 工具类代码的源码学习
工具类代码指完全作为工具使用的代码,这些代码往往是在代码调用链的最底层,自身没有任何特定逻辑,只专注于完成某个特定的功能,作为上层代码的工具使用。
工具类代码处于代码依赖关系的最底层。
1.1 - concurrency的源码学习
concurrency packge的代码不多,暂时只有一个 limiter.go。
1.1.1 - limiter.go的源码学习
Dapr concurrency package中的 limiter.go 文件的源码学习,rating limiter的代码实现和使用场景。
重点:充分利用 golang chan 的特性
代码实现
Limiter 结构体定义
// Limiter object
type Limiter struct {
limit int
tickets chan int
numInProgress int32
}
字段说明:
- limit:最大并发数的限制,这是一个配置项,默认100,初始化后不再修改。
- tickets:用 go 的 chan 来保存和分发 tickets
- numInProgress:当前正在执行中的数量,这是一个实时状态
构建Limiter
const (
// DefaultLimit is the default concurrency limit
DefaultLimit = 100
)
// NewLimiter allocates a new ConcurrencyLimiter
func NewLimiter(limit int) *Limiter {
if limit <= 0 {
limit = DefaultLimit
}
// allocate a limiter instance
c := &Limiter{
limit: limit,
// tickets chan 的 size 设置为 limit
tickets: make(chan int, limit),
}
// allocate the tickets:
// 开始时先准备和limit数量相当的可用 tickets
for i := 0; i < c.limit; i++ {
c.tickets <- i
}
return c
}
Limiter的实现
// Execute adds a function to the execution queue.
// if num of go routines allocated by this instance is < limit
// launch a new go routine to execute job
// else wait until a go routine becomes available
func (c *Limiter) Execute(job func(param interface{}), param interface{}) int {
// 从 chan 中拿一个有效票据
// 如果当前 chan 中有票据,则说明 go routines 的数量还没有达到 limit 的最大限制,还可以继续启动go routine执行job
// 如果当前 chan 中没有票据,则说明 go routines 的数量已经达到 limit 的最大限制,需要限速了。execute方法会阻塞在这里,等待有job执行完成释放票据
ticket := <-c.tickets
// 拿到之后更新numInProgress,数量加一,要求是原子操作
atomic.AddInt32(&c.numInProgress, 1)
// 启动 go routine 执行 job
go func(param interface{}) {
// 通过defer来做 job 完成后的清理
defer func() {
// 将票据释放给 chan,这样后续的 job 有机会申请到
c.tickets <- ticket
// 更新numInProgress,数量减一,要求是原子操作
atomic.AddInt32(&c.numInProgress, -1)
}()
// 执行job
job(param)
}(param)
// 返回当前的票据号
return ticket
}
wait方法
wait方法会阻塞并等待所有的已经通过 execute() 方法拿到票据的 go routine 执行完毕。
// Wait will block all the previously Executed jobs completed running.
//
// IMPORTANT: calling the Wait function while keep calling Execute leads to
// un-desired race conditions
func (c *Limiter) Wait() {
// 这是从 chan 中读取所有的票据,只要有任何票据被 job 释放都会去争抢
// 最后wait()方法获取到所有的票据,其他 job 自然就无法获取票据从而阻塞住所有job的工作
// 但这并不能保证一定能第一时间抢的到,如果还有其他的 job 也在调用 execute() 方法申请票据,那只有等这个 job 完成工作释放票据时再次争抢
for i := 0; i < c.limit; i++ {
<-c.tickets
}
}
使用场景
并行执行批量操作时限速
在 pkg/grpc/api.go
和 pkg/http/api.go
的 GetBulkState()方法中,通过 limiter 来限制批量操作的并发数量:
// 构建limiter,limit参数由 请求参数中的 Parallelism 制定
limiter := concurrency.NewLimiter(int(in.Parallelism))
n := len(reqs)
for i := 0; i < n; i++ {
fn := func(param interface{}) {
......
}
// 提交 job 给 limiter
limiter.Execute(fn, &reqs[i])
}
// 等待所有的 job 执行完成
limiter.Wait()
在 actor 中也有类似的代码:
limiter := concurrency.NewLimiter(actorMetadata.RemindersMetadata.PartitionCount)
for i := range getRequests {
fn := func(param interface{}) {
......
}
limiter.Execute(fn, &bulkResponse[i])
}
limiter.Wait()
2 - 类库类代码的源码学习
类库类代码指为了更方便的使用第三方类库而封装的辅助类代码,这些代码也通常是在代码调用链的底层,专注于完成某方面特定的功能,可能会带有一点点 dapr 的逻辑。
工具类代码处于代码依赖关系的倒数第二层底层,仅仅比工具类代码高一层。
2.1 - grcp的源码学习
2.1.1 - util.go的源码学习
Dapr grpc package中的 util.go文件的源码分析,目前只有用于转换state参数类型的两个方法。
stateConsistencyToString 方法
stateConsistencyToString 方法将 StateOptions_StateConsistency 转为 string:
func stateConsistencyToString(c commonv1pb.StateOptions_StateConsistency) string {
switch c {
case commonv1pb.StateOptions_CONSISTENCY_EVENTUAL:
return "eventual"
case commonv1pb.StateOptions_CONSISTENCY_STRONG:
return "strong"
}
return ""
}
stateConcurrencyToString 方法
方法 方法将 StateOptions_StateConsistency 转为 string:
func stateConcurrencyToString(c commonv1pb.StateOptions_StateConcurrency) string {
switch c {
case commonv1pb.StateOptions_CONCURRENCY_FIRST_WRITE:
return "first-write"
case commonv1pb.StateOptions_CONCURRENCY_LAST_WRITE:
return "last-write"
}
return ""
}
2.1.2 - port.go的源码学习
Dapr grpc package中的 port.go文件的源码分析,只有一个 GetFreePort 方法用于获取一个空闲的端口。
GetFreePort 方法
GetFreePort 方法从操作系统获取一个空闲可用的端口:
// GetFreePort returns a free port from the OS
func GetFreePort() (int, error) {
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
return 0, err
}
l, err := net.ListenTCP("tcp", addr)
if err != nil {
return 0, err
}
defer l.Close()
return l.Addr().(*net.TCPAddr).Port, nil
}
通过将端口设置为0, 来让操作系统自动分配一个可用的端口。注意返回时一定要关闭这个连接。
2.1.3 - dial.go的源码学习
Dapr grpc package中的 dial.go文件的源码分析,目前只有用于建连获取地址前缀的一个方法。
GetDialAddressPrefix 方法
GetDialAddressPrefix 为给定的 DaprMode 返回 dial 前缀,用于gPRC 客户端连接:
// GetDialAddressPrefix returns a dial prefix for a gRPC client connections
// For a given DaprMode.
func GetDialAddressPrefix(mode modes.DaprMode) string {
if runtime.GOOS == "windows" {
return ""
}
switch mode {
case modes.KubernetesMode:
return "dns:///"
default:
return ""
}
}
注意:Kubernetes 模式下 返回 “dns:///”
调用场景,只在 grpc.go 的 GetGRPCConnection() 方法中被调用:
// GetGRPCConnection returns a new grpc connection for a given address and inits one if doesn't exist
func (g *Manager) GetGRPCConnection(address, id string, namespace string, skipTLS, recreateIfExists, sslEnabled bool) (*grpc.ClientConn, error) {
dialPrefix := GetDialAddressPrefix(g.mode)
......
conn, err := grpc.DialContext(ctx, dialPrefix+address, opts...)
......
}
3 - 基础代码的源码学习
基础代码是 Dapr 代码中最基础的部分,这些代码已经是 dapr 自身逻辑的组成部分,但处于比较偏底层,也不是 dapr 的主要链路,通常代码量也不大。
基础代码在依赖关系中位于工具类代码和类库类代码之上。
3.1 - version的源码学习
代码实现
version 的代码超级简单,就一个 version.go,内容也只有一点点:
// Values for these are injected by the build.
var (
version = "edge"
commit string
)
// Version returns the Dapr version. This is either a semantic version
// number or else, in the case of unreleased code, the string "edge".
func Version() string {
return version
}
// Commit returns the git commit SHA for the code that Dapr was built from.
func Commit() string {
return commit
}
- version:要不就是语义话版本,比如
1.0.0
这种,要不就是edge
表示未发布的代码 - commit:build的时候的 git commit
如何注入
Values for these are injected by the build.
那是怎么注入的呢? Build 总不能调用代码,而且这两个值也是private。
Dapr 下的 Makefile 文件中:
# git rev-list -1 HEAD 得到的 git commit 的 hash 值
# 如:63147334aa246d76f9f65708c257460567a1cff4
GIT_COMMIT = $(shell git rev-list -1 HEAD)
# git describe --always --abbrev=7 --dirty 得到的是版本信息
# 如:v1.0.0-rc.4-5-g6314733
GIT_VERSION = $(shell git describe --always --abbrev=7 --dirty)
ifdef REL_VERSION
DAPR_VERSION := $(REL_VERSION)
else
DAPR_VERSION := edge
endif
BASE_PACKAGE_NAME := github.com/dapr/dapr
DEFAULT_LDFLAGS:=-X $(BASE_PACKAGE_NAME)/pkg/version.commit=$(GIT_VERSION) -X $(BASE_PACKAGE_NAME)/pkg/version.version=$(DAPR_VERSION)
ifeq ($(origin DEBUG), undefined)
BUILDTYPE_DIR:=release
LDFLAGS:="$(DEFAULT_LDFLAGS) -s -w"
else ifeq ($(DEBUG),0)
BUILDTYPE_DIR:=release
LDFLAGS:="$(DEFAULT_LDFLAGS) -s -w"
else
BUILDTYPE_DIR:=debug
GCFLAGS:=-gcflags="all=-N -l"
LDFLAGS:="$(DEFAULT_LDFLAGS)"
$(info Build with debugger information)
endif
define genBinariesForTarget
.PHONY: $(5)/$(1)
$(5)/$(1):
CGO_ENABLED=$(CGO) GOOS=$(3) GOARCH=$(4) go build $(GCFLAGS) -ldflags=$(LDFLAGS) \
-o $(5)/$(1) $(2)/;
endef
TODO:没看懂,有时间详细研究一下这个makefile。
3.2 - modes的源码学习
代码实现
modes 的代码超级简单,就一个 modes.go,内容也只有一点点:
// DaprMode is the runtime mode for Dapr.
type DaprMode string
const (
// KubernetesMode is a Kubernetes Dapr mode
KubernetesMode DaprMode = "kubernetes"
// StandaloneMode is a Standalone Dapr mode
StandaloneMode DaprMode = "standalone"
)
Dapr有两种运行模式
- kubernetes 模式
- standalone 模式
运行模式的总结
两种模式的差异:
-
配置文件读取的方式:
- standalone 模式下读取本地文件,文件路径由命令行参数
config
指定。 - kubernetes 模式下读取k8s中存储的CRD,CRD的名称由命令行参数
config
指定。
config := flag.String("config", "", "Path to config file, or name of a configuration object")
- standalone 模式下读取本地文件,文件路径由命令行参数
-
TODO
3.3 - cors的源码学习
代码实现
cors 的代码超级简单,就一个 cors.go,内容也只有一点点:
// DefaultAllowedOrigins is the default origins allowed for the Dapr HTTP servers
const DefaultAllowedOrigins = "*"
AllowedOrigins配置的读取
AllowedOrigins 配置在启动时通过命令行参数 allowed-origins
传入,默认值为 DefaultAllowedOrigins ("*")。然后传入给 NewRuntimeConfig()方法:
func FromFlags() (*DaprRuntime, error) {
allowedOrigins := flag.String("allowed-origins", cors.DefaultAllowedOrigins, "Allowed HTTP origins")
runtimeConfig := NewRuntimeConfig(*appID, placementAddresses, *controlPlaneAddress, *allowedOrigins ......)
}
之后保存在 NewRuntimeConfig 的 AllowedOrigins 字段中:
func NewRuntimeConfig(
id string, placementAddresses []string,
controlPlaneAddress, allowedOrigins ......) *Config {
return &Config{
AllowedOrigins: allowedOrigins,
......
}
AllowedOrigins配置的使用
pkg/http/server.go
的 useCors() 方法:
func (s *server) useCors(next fasthttp.RequestHandler) fasthttp.RequestHandler {
if s.config.AllowedOrigins == cors_dapr.DefaultAllowedOrigins {
return next
}
log.Infof("enabled cors http middleware")
origins := strings.Split(s.config.AllowedOrigins, ",")
corsHandler := s.getCorsHandler(origins)
return corsHandler.CorsMiddleware(next)
}
3.4 - proto的源码学习
3.5 - config的源码学习
3.6 - credentials的源码学习
3.6.1 - certchain.go的源码学习
Dapr credentials package中的 certchain.go 文件的源码学习,credentials 结构体持有证书相关的各种 path。
CertChain 结构体定义
CertChain 结构体持有证书信任链的PEM值:
// CertChain holds the certificate trust chain PEM values
type CertChain struct {
RootCA []byte
Cert []byte
Key []byte
}
装载证书的LoadFromDisk 方法
LoadFromDisk 方法从给定目录中读取 CertChain:
// LoadFromDisk retruns a CertChain from a given directory
func LoadFromDisk(rootCertPath, issuerCertPath, issuerKeyPath string) (*CertChain, error) {
rootCert, err := ioutil.ReadFile(rootCertPath)
if err != nil {
return nil, err
}
cert, err := ioutil.ReadFile(issuerCertPath)
if err != nil {
return nil, err
}
key, err := ioutil.ReadFile(issuerKeyPath)
if err != nil {
return nil, err
}
return &CertChain{
RootCA: rootCert,
Cert: cert,
Key: key,
}, nil
}
使用场景
placement 的 main.go 中,如果 mTLS 开启了,则会读取 tls 证书:
func loadCertChains(certChainPath string) *credentials.CertChain {
tlsCreds := credentials.NewTLSCredentials(certChainPath)
log.Info("mTLS enabled, getting tls certificates")
// try to load certs from disk, if not yet there, start a watch on the local filesystem
chain, err := credentials.LoadFromDisk(tlsCreds.RootCertPath(), tlsCreds.CertPath(), tlsCreds.KeyPath())
......
}
operator 的 operator.go 中,也会判断,如果 MTLSEnabled :
var certChain *credentials.CertChain
if o.config.MTLSEnabled {
log.Info("mTLS enabled, getting tls certificates")
// try to load certs from disk, if not yet there, start a watch on the local filesystem
chain, err := credentials.LoadFromDisk(o.config.Credentials.RootCertPath(), o.config.Credentials.CertPath(), o.config.Credentials.KeyPath())
......
}
备注:上面两段代码重复度极高,最好能重构一下。
sentry 中也有调用:
func (c *defaultCA) validateAndBuildTrustBundle() (*trustRootBundle, error) {
var (
issuerCreds *certs.Credentials
rootCertBytes []byte
issuerCertBytes []byte
)
// certs exist on disk or getting created, load them when ready
if !shouldCreateCerts(c.config) {
err := detectCertificates(c.config.RootCertPath)
if err != nil {
return nil, err
}
certChain, err := credentials.LoadFromDisk(c.config.RootCertPath, c.config.IssuerCertPath, c.config.IssuerKeyPath)
if err != nil {
return nil, errors.Wrap(err, "error loading cert chain from disk")
}
TODO: 证书相关的细节后面单独细看。
3.6.2 - credentials.go的源码学习
Dapr credentials package中的 credentials.go文件的源码学习,credentials 结构体持有证书相关的各种 path。
TLSCredentials 结构体定义
只有一个字段 credentialsPath:
// TLSCredentials holds paths for credentials
type TLSCredentials struct {
credentialsPath string
}
构造方法很简单:
// NewTLSCredentials returns a new TLSCredentials
func NewTLSCredentials(path string) TLSCredentials {
return TLSCredentials{
credentialsPath: path,
}
}
获取相关 path 的方法
获取 credentialsPath,这个path中保存有 TLS 证书:
// Path returns the directory holding the TLS credentials
func (t *TLSCredentials) Path() string {
return t.credentialsPath
}
分别获取 root cert / cert / cert key 的 path:
// RootCertPath returns the file path for the root cert
func (t *TLSCredentials) RootCertPath() string {
return filepath.Join(t.credentialsPath, RootCertFilename)
}
// CertPath returns the file path for the cert
func (t *TLSCredentials) CertPath() string {
return filepath.Join(t.credentialsPath, IssuerCertFilename)
}
// KeyPath returns the file path for the cert key
func (t *TLSCredentials) KeyPath() string {
return filepath.Join(t.credentialsPath, IssuerKeyFilename)
}
3.6.3 - tls.go的源码学习
Dapr credentials package中的 tls.go文件的源码学习,从 cert/key 中装载 tls.config 对象。
TLSConfigFromCertAndKey() 方法
TLSConfigFromCertAndKey() 方法从 PEM 格式中有效的 cert/key 对中返回 tls.config 对象:
// TLSConfigFromCertAndKey return a tls.config object from valid cert/key pair in PEM format.
func TLSConfigFromCertAndKey(certPem, keyPem []byte, serverName string, rootCA *x509.CertPool) (*tls.Config, error) {
cert, err := tls.X509KeyPair(certPem, keyPem)
if err != nil {
return nil, err
}
// nolint:gosec
config := &tls.Config{
InsecureSkipVerify: false,
RootCAs: rootCA,
ServerName: serverName,
Certificates: []tls.Certificate{cert},
}
return config, nil
}
3.6.4 - grpc.go的源码学习
Dapr credentials package中的 grpc.go文件的源码学习,获取服务器端选项和客户端选项。
GetServerOptions() 方法
func GetServerOptions(certChain *CertChain) ([]grpc.ServerOption, error) {
opts := []grpc.ServerOption{}
if certChain == nil {
return opts, nil
}
cp := x509.NewCertPool()
cp.AppendCertsFromPEM(certChain.RootCA)
cert, err := tls.X509KeyPair(certChain.Cert, certChain.Key)
if err != nil {
return opts, nil
}
// nolint:gosec
config := &tls.Config{
ClientCAs: cp,
// Require cert verification
ClientAuth: tls.RequireAndVerifyClientCert,
Certificates: []tls.Certificate{cert},
}
opts = append(opts, grpc.Creds(credentials.NewTLS(config)))
return opts, nil
}
GetClientOptions() 方法
func GetClientOptions(certChain *CertChain, serverName string) ([]grpc.DialOption, error) {
opts := []grpc.DialOption{}
if certChain != nil {
cp := x509.NewCertPool()
ok := cp.AppendCertsFromPEM(certChain.RootCA)
if !ok {
return nil, errors.New("failed to append PEM root cert to x509 CertPool")
}
config, err := TLSConfigFromCertAndKey(certChain.Cert, certChain.Key, serverName, cp)
if err != nil {
return nil, errors.Wrap(err, "failed to create tls config from cert and key")
}
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(config)))
} else {
opts = append(opts, grpc.WithInsecure())
}
return opts, nil
}
TODO: 好吧,细节后面看,加密我不熟。
4 - Runtime的源码学习
4.1 - options.go的源码学习
Dapr runtime package中的 options.go 文件的源码学习,用于定制 runtime 中包含的组件。
runtimeOpts 结构体定义
runtimeOpts封装了需要包含在 runtime 中的 component:
type (
// runtimeOpts encapsulates the components to include in the runtime.
runtimeOpts struct {
secretStores []secretstores.SecretStore
states []state.State
pubsubs []pubsub.PubSub
nameResolutions []nameresolution.NameResolution
inputBindings []bindings.InputBinding
outputBindings []bindings.OutputBinding
httpMiddleware []http.Middleware
}
)
Option 方法
Option 方法用于定制 runtime:
type (
// Option is a function that customizes the runtime.
Option func(o *runtimeOpts)
)
定制runtime的With系列方法
提供多个 WithXxx() 方法,用于定制 runtime 的组件:
// WithSecretStores adds secret store components to the runtime.
func WithSecretStores(secretStores ...secretstores.SecretStore) Option {
return func(o *runtimeOpts) {
o.secretStores = append(o.secretStores, secretStores...)
}
}
// WithStates adds state store components to the runtime.
func WithStates(states ...state.State) Option {
return func(o *runtimeOpts) {
o.states = append(o.states, states...)
}
}
// WithPubSubs adds pubsub store components to the runtime.
func WithPubSubs(pubsubs ...pubsub.PubSub) Option {
return func(o *runtimeOpts) {
o.pubsubs = append(o.pubsubs, pubsubs...)
}
}
// WithNameResolutions adds name resolution components to the runtime.
func WithNameResolutions(nameResolutions ...nameresolution.NameResolution) Option {
return func(o *runtimeOpts) {
o.nameResolutions = append(o.nameResolutions, nameResolutions...)
}
}
// WithInputBindings adds input binding components to the runtime.
func WithInputBindings(inputBindings ...bindings.InputBinding) Option {
return func(o *runtimeOpts) {
o.inputBindings = append(o.inputBindings, inputBindings...)
}
}
// WithOutputBindings adds output binding components to the runtime.
func WithOutputBindings(outputBindings ...bindings.OutputBinding) Option {
return func(o *runtimeOpts) {
o.outputBindings = append(o.outputBindings, outputBindings...)
}
}
// WithHTTPMiddleware adds HTTP middleware components to the runtime.
func WithHTTPMiddleware(httpMiddleware ...http.Middleware) Option {
return func(o *runtimeOpts) {
o.httpMiddleware = append(o.httpMiddleware, httpMiddleware...)
}
}
这些方法都只是简单的往 runtimeOpts 结构体的各个组件字段里面保存信息,用于后续 runtime 的初始化。
4.2 - config.go的源码学习
Dapr runtime package中的 cli.go 文件的源码学习,解析命令行标记并返回 DaprRuntime 实例。
cli.go 基本上就一个 FromFlags() 方法。
常量定义
protocol,目前只支持 http 和 grpc :
// Protocol is a communications protocol
type Protocol string
const (
// GRPCProtocol is a gRPC communication protocol
GRPCProtocol Protocol = "grpc"
// HTTPProtocol is a HTTP communication protocol
HTTPProtocol Protocol = "http"
)
各种端口的默认值:
const (
// DefaultDaprHTTPPort is the default http port for Dapr
DefaultDaprHTTPPort = 3500
// DefaultDaprAPIGRPCPort is the default API gRPC port for Dapr
DefaultDaprAPIGRPCPort = 50001
// DefaultProfilePort is the default port for profiling endpoints
DefaultProfilePort = 7777
// DefaultMetricsPort is the default port for metrics endpoints
DefaultMetricsPort = 9090
)
http默认配置,目前只有一个 MaxRequestBodySize :
const (
// DefaultMaxRequestBodySize is the default option for the maximum body size in MB for Dapr HTTP servers
DefaultMaxRequestBodySize = 4
)
Config 结构体
// Config holds the Dapr Runtime configuration
type Config struct {
ID string
HTTPPort int
ProfilePort int
EnableProfiling bool
APIGRPCPort int
InternalGRPCPort int
ApplicationPort int
ApplicationProtocol Protocol
Mode modes.DaprMode
PlacementAddresses []string
GlobalConfig string
AllowedOrigins string
Standalone config.StandaloneConfig
Kubernetes config.KubernetesConfig
MaxConcurrency int
mtlsEnabled bool
SentryServiceAddress string
CertChain *credentials.CertChain
AppSSL bool
MaxRequestBodySize int
}
有点乱,所有的字段都是扁平的,以后越加越多。。。
构建Config
简单赋值构建 config 结构体,这个参数是在太多了一点:
// NewRuntimeConfig returns a new runtime config
func NewRuntimeConfig(
id string, placementAddresses []string,
controlPlaneAddress, allowedOrigins, globalConfig, componentsPath, appProtocol, mode string,
httpPort, internalGRPCPort, apiGRPCPort, appPort, profilePort int,
enableProfiling bool, maxConcurrency int, mtlsEnabled bool, sentryAddress string, appSSL bool, maxRequestBodySize int) *Config {
return &Config{
ID: id,
HTTPPort: httpPort,
InternalGRPCPort: internalGRPCPort,
APIGRPCPort: apiGRPCPort,
ApplicationPort: appPort,
ProfilePort: profilePort,
ApplicationProtocol: Protocol(appProtocol),
Mode: modes.DaprMode(mode),
PlacementAddresses: placementAddresses,
GlobalConfig: globalConfig,
AllowedOrigins: allowedOrigins,
Standalone: config.StandaloneConfig{
ComponentsPath: componentsPath,
},
Kubernetes: config.KubernetesConfig{
ControlPlaneAddress: controlPlaneAddress,
},
EnableProfiling: enableProfiling,
MaxConcurrency: maxConcurrency,
mtlsEnabled: mtlsEnabled,
SentryServiceAddress: sentryAddress,
AppSSL: appSSL,
MaxRequestBodySize: maxRequestBodySize,
}
}
4.3 - cli.go的源码学习
Dapr runtime package中的 cli.go 文件的源码学习,解析命令行标记并返回 DaprRuntime 实例。
cli.go 基本上就一个 FromFlags() 方法。
FromFlags()概述
FromFlags() 方法解析命令行标记并返回 DaprRuntime 实例:
// FromFlags parses command flags and returns DaprRuntime instance
func FromFlags() (*DaprRuntime, error) {
......
return NewDaprRuntime(runtimeConfig, globalConfig, accessControlList), nil
}
解析命令行标记
通用标记
代码如下:
mode := flag.String("mode", string(modes.StandaloneMode), "Runtime mode for Dapr")
daprHTTPPort := flag.String("dapr-http-port", fmt.Sprintf("%v", DefaultDaprHTTPPort), "HTTP port for Dapr API to listen on")
daprAPIGRPCPort := flag.String("dapr-grpc-port", fmt.Sprintf("%v", DefaultDaprAPIGRPCPort), "gRPC port for the Dapr API to listen on")
daprInternalGRPCPort := flag.String("dapr-internal-grpc-port", "", "gRPC port for the Dapr Internal API to listen on")
appPort := flag.String("app-port", "", "The port the application is listening on")
profilePort := flag.String("profile-port", fmt.Sprintf("%v", DefaultProfilePort), "The port for the profile server")
appProtocol := flag.String("app-protocol", string(HTTPProtocol), "Protocol for the application: grpc or http")
componentsPath := flag.String("components-path", "", "Path for components directory. If empty, components will not be loaded. Self-hosted mode only")
config := flag.String("config", "", "Path to config file, or name of a configuration object")
appID := flag.String("app-id", "", "A unique ID for Dapr. Used for Service Discovery and state")
controlPlaneAddress := flag.String("control-plane-address", "", "Address for a Dapr control plane")
sentryAddress := flag.String("sentry-address", "", "Address for the Sentry CA service")
placementServiceHostAddr := flag.String("placement-host-address", "", "Addresses for Dapr Actor Placement servers")
allowedOrigins := flag.String("allowed-origins", cors.DefaultAllowedOrigins, "Allowed HTTP origins")
enableProfiling := flag.Bool("enable-profiling", false, "Enable profiling")
runtimeVersion := flag.Bool("version", false, "Prints the runtime version")
appMaxConcurrency := flag.Int("app-max-concurrency", -1, "Controls the concurrency level when forwarding requests to user code")
enableMTLS := flag.Bool("enable-mtls", false, "Enables automatic mTLS for daprd to daprd communication channels")
appSSL := flag.Bool("app-ssl", false, "Sets the URI scheme of the app to https and attempts an SSL connection")
daprHTTPMaxRequestSize := flag.Int("dapr-http-max-request-size", -1, "Increasing max size of request body in MB to handle uploading of big files. By default 4 MB.")
TODO:应该有命令行参数的文档,对照文档学习一遍。
解析日志相关的标记
loggerOptions := logger.DefaultOptions()
loggerOptions.AttachCmdFlags(flag.StringVar, flag.BoolVar)
解析metrics相关的标记
metricsExporter := metrics.NewExporter(metrics.DefaultMetricNamespace)
// attaching only metrics-port option
metricsExporter.Options().AttachCmdFlag(flag.StringVar)
然后执行解析:
flag.Parse()
执行version命令
如果只是version命令,则打印版本信息之后就可以退出进程了:
runtimeVersion := flag.Bool("version", false, "Prints the runtime version")
if *runtimeVersion {
fmt.Println(version.Version())
os.Exit(0)
}
初始化日志和metrics
日志初始化
根据日志属性初始化logger:
loggerOptions := logger.DefaultOptions()
loggerOptions.AttachCmdFlags(flag.StringVar, flag.BoolVar)
if *appID == "" {
return nil, errors.New("app-id parameter cannot be empty")
}
// Apply options to all loggers
loggerOptions.SetAppID(*appID)
if err := logger.ApplyOptionsToLoggers(&loggerOptions); err != nil {
return nil, err
}
完成日志初始化之后就可以愉快的打印日志了:
log.Infof("starting Dapr Runtime -- version %s -- commit %s", version.Version(), version.Commit())
log.Infof("log level set to: %s", loggerOptions.OutputLevel)
metrics初始化
初始化dapr metrics exporter:
// Initialize dapr metrics exporter
if err := metricsExporter.Init(); err != nil {
log.Fatal(err)
}
解析配置
解析dapr各种端口设置
dapr-http-port / dapr-grpc-port / profile-port / dapr-internal-grpc-port / app-port :
daprHTTP, err := strconv.Atoi(*daprHTTPPort)
if err != nil {
return nil, errors.Wrap(err, "error parsing dapr-http-port flag")
}
daprAPIGRPC, err := strconv.Atoi(*daprAPIGRPCPort)
if err != nil {
return nil, errors.Wrap(err, "error parsing dapr-grpc-port flag")
}
profPort, err := strconv.Atoi(*profilePort)
if err != nil {
return nil, errors.Wrap(err, "error parsing profile-port flag")
}
var daprInternalGRPC int
if *daprInternalGRPCPort != "" {
daprInternalGRPC, err = strconv.Atoi(*daprInternalGRPCPort)
if err != nil {
return nil, errors.Wrap(err, "error parsing dapr-internal-grpc-port")
}
} else {
daprInternalGRPC, err = grpc.GetFreePort()
if err != nil {
return nil, errors.Wrap(err, "failed to get free port for internal grpc server")
}
}
var applicationPort int
if *appPort != "" {
applicationPort, err = strconv.Atoi(*appPort)
if err != nil {
return nil, errors.Wrap(err, "error parsing app-port")
}
}
解析其他配置
继续解析 maxRequestBodySize / placementAddresses / concurrency / appProtocol 等 配置:
var maxRequestBodySize int
if *daprHTTPMaxRequestSize != -1 {
maxRequestBodySize = *daprHTTPMaxRequestSize
} else {
maxRequestBodySize = DefaultMaxRequestBodySize
}
placementAddresses := []string{}
if *placementServiceHostAddr != "" {
placementAddresses = parsePlacementAddr(*placementServiceHostAddr)
}
var concurrency int
if *appMaxConcurrency != -1 {
concurrency = *appMaxConcurrency
}
appPrtcl := string(HTTPProtocol)
if *appProtocol != string(HTTPProtocol) {
appPrtcl = *appProtocol
}
构建Runtime的三大配置
构建runtimeConfig
runtimeConfig := NewRuntimeConfig(*appID, placementAddresses, *controlPlaneAddress, *allowedOrigins, *config, *componentsPath,
appPrtcl, *mode, daprHTTP, daprInternalGRPC, daprAPIGRPC, applicationPort, profPort, *enableProfiling, concurrency, *enableMTLS, *sentryAddress, *appSSL, maxRequestBodySize)
MTLS相关的配置:
if *enableMTLS {
runtimeConfig.CertChain, err = security.GetCertChain()
if err != nil {
return nil, err
}
}
构建globalConfig
var globalConfig *global_config.Configuration
根据 config 配置文件的配置,还有 dapr 模式的配置,读取相应的配置文件:
config := flag.String("config", "", "Path to config file, or name of a configuration object")
if *config != "" {
switch modes.DaprMode(*mode) {
case modes.KubernetesMode:
client, conn, clientErr := client.GetOperatorClient(*controlPlaneAddress, security.TLSServerName, runtimeConfig.CertChain)
if clientErr != nil {
return nil, clientErr
}
defer conn.Close()
namespace = os.Getenv("NAMESPACE")
globalConfig, configErr = global_config.LoadKubernetesConfiguration(*config, namespace, client)
case modes.StandaloneMode:
globalConfig, _, configErr = global_config.LoadStandaloneConfiguration(*config)
}
if configErr != nil {
log.Debugf("Config error: %v", configErr)
}
}
if configErr != nil {
log.Fatalf("error loading configuration: %s", configErr)
}
简单说:kubernetes 模式下读取CRD,standalone 模式下读取本地配置文件。
如果 config 没有配置,则使用默认的 global 配置:
if globalConfig == nil {
log.Info("loading default configuration")
globalConfig = global_config.LoadDefaultConfiguration()
}
构建accessControlList
var accessControlList *global_config.AccessControlList
accessControlList, err = global_config.ParseAccessControlSpec(globalConfig.Spec.AccessControlSpec, string(runtimeConfig.ApplicationProtocol))
if err != nil {
log.Fatalf(err.Error())
}
构造 DaprRuntime 实例
最后构造 DaprRuntime 实例:
return NewDaprRuntime(runtimeConfig, globalConfig, accessControlList), nil
4.4 - Runtime App Channel的源码学习
4.4.1 - channel.go的源码学习
Dapr channel package中的 channel.go 文件的源码学习,定义 AppChannel 接口和方法。
AppChannel 是和用户代码进行通讯的抽象。
常量定义 DefaultChannelAddress,考虑到 dapr 通常是以 sidecar 模式部署的,因此默认channel 地址是 127.0.0.1
const (
// DefaultChannelAddress is the address that user application listen to
DefaultChannelAddress = "127.0.0.1"
)
方法定义:
// AppChannel is an abstraction over communications with user code
type AppChannel interface {
GetBaseAddress() string
InvokeMethod(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error)
}
4.4.2 - grpc_channel.go的源码学习
Dapr channel package中的 grpc_channel.go 文件的源码学习,AppChannel 的 gRPC 实现。
Channel 结构体定义
Channel是一个具体的AppChannel实现,用于与基于gRPC的用户代码进行交互。
// Channel is a concrete AppChannel implementation for interacting with gRPC based user code
type Channel struct {
// grpc 客户端连接
client *grpc.ClientConn
// user code(应用)的地址
baseAddress string
// 限流用的 go chan
ch chan int
tracingSpec config.TracingSpec
appMetadataToken string
}
创建 Channel 结构体
// CreateLocalChannel creates a gRPC connection with user code
func CreateLocalChannel(port, maxConcurrency int, conn *grpc.ClientConn, spec config.TracingSpec) *Channel {
c := &Channel{
client: conn,
// baseAddress 就是 "ip:port"
baseAddress: fmt.Sprintf("%s:%d", channel.DefaultChannelAddress, port),
tracingSpec: spec,
appMetadataToken: auth.GetAppToken(),
}
if maxConcurrency > 0 {
// 如果有并发控制要求,则创建用于并发控制的go channel
c.ch = make(chan int, maxConcurrency)
}
return c
}
GetBaseAddress 方法
// GetBaseAddress returns the application base address
func (g *Channel) GetBaseAddress() string {
return g.baseAddress
}
这个方法用来获取app的基础路径,可以拼接其他的字路径,如:
func (a *actorsRuntime) startAppHealthCheck(opts ...health.Option) {
healthAddress := fmt.Sprintf("%s/healthz", a.appChannel.GetBaseAddress())
ch := health.StartEndpointHealthCheck(healthAddress, opts...)
......
}
备注:只有 actor 这一个地方用到了这个方法
InvokeMethod 方法
InvokeMethod 方法通过 gRPC 调用 user code:
// InvokeMethod invokes user code via gRPC
func (g *Channel) InvokeMethod(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
var rsp *invokev1.InvokeMethodResponse
var err error
switch req.APIVersion() {
case internalv1pb.APIVersion_V1:
// 目前只支持 v1 版本
rsp, err = g.invokeMethodV1(ctx, req)
default:
// Reject unsupported version
// 其他版本会被拒绝
rsp = nil
err = status.Error(codes.Unimplemented, fmt.Sprintf("Unsupported spec version: %d", req.APIVersion()))
}
return rsp, err
}
invokeMethodV1() 的实现
// invokeMethodV1 calls user applications using daprclient v1
func (g *Channel) invokeMethodV1(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
if g.ch != nil {
// 往 ch 里面发一个int,等价于当前并发数量 + 1
g.ch <- 1
}
// 创建一个 app callback 的 client
clientV1 := runtimev1pb.NewAppCallbackClient(g.client)
// 将内部 metadata 转为 grpc 的 metadata
grpcMetadata := invokev1.InternalMetadataToGrpcMetadata(ctx, req.Metadata(), true)
if g.appMetadataToken != "" {
grpcMetadata.Set(auth.APITokenHeader, g.appMetadataToken)
}
// Prepare gRPC Metadata
ctx = metadata.NewOutgoingContext(context.Background(), grpcMetadata)
var header, trailer metadata.MD
// 调用user code
resp, err := clientV1.OnInvoke(ctx, req.Message(), grpc.Header(&header), grpc.Trailer(&trailer))
if g.ch != nil {
// 从 ch 中读取一个int,等价于当前并发数量 - 1
// 但这个操作并没有额外保护,如果上面的代码发生 panic,岂不是这个计数器就出错了?
// 考虑把这个操作放在 deffer 中进行会比较安全
<-g.ch
}
var rsp *invokev1.InvokeMethodResponse
if err != nil {
// Convert status code
respStatus := status.Convert(err)
// Prepare response
rsp = invokev1.NewInvokeMethodResponse(int32(respStatus.Code()), respStatus.Message(), respStatus.Proto().Details)
} else {
rsp = invokev1.NewInvokeMethodResponse(int32(codes.OK), "", nil)
}
rsp.WithHeaders(header).WithTrailers(trailer)
return rsp.WithMessage(resp), nil
}
使用这个方法的地方有:
- actor 的 callLocalActor() 和 deactivateActor()
- Grpc api 中的 CallLocal()
- messaging 中 direct_message 的 invokeLocal()
- runtime中
- getConfigurationHTTP()
- isAppSubscribedToBinding()
- publishMessageHTTP()
- sendBindingEventToApp()
5 - Components的源码学习
5.1 - Binding组件的源码学习
5.2 - Middleware组件的源码学习
5.3 - NameResolution组件的源码学习
5.4 - PubSub组件的源码学习
5.5 - SecretStores组件的源码学习
5.6 - Store组件的源码学习
5.7 - workflow组件的源码学习
5.7.1 - registry.go的源码学习
结构体定义
Registry 结构体
Registry 结构体是用来注册返回工作流实现的组件接口
import (
wfs "github.com/dapr/components-contrib/workflows"
)
// Registry is an interface for a component that returns registered state store implementations.
type Registry struct {
Logger logger.Logger
workflowComponents map[string]func(logger.Logger) wfs.Workflow
}
这里的 Workflow 在 components-contrib 中定义。
默认Registry
默认Registry的创建
package 中定义了一个 默认Registry, singleton, 还是 public的:
// DefaultRegistry is the singleton with the registry .
var DefaultRegistry *Registry = NewRegistry()
// NewRegistry is used to create workflow registry.
func NewRegistry() *Registry {
return &Registry{
workflowComponents: map[string]func(logger.Logger) wfs.Workflow{},
}
}
RegisterComponent() 方法
RegisterComponent() 方法在在 register 结构体的 workflowComponents 字段中加入一条或多条记录
func (s *Registry) RegisterComponent(componentFactory func(logger.Logger) wfs.Workflow, names ...string) {
for _, name := range names {
s.workflowComponents[createFullName(name)] = componentFactory
}
}
func createFullName(name string) string {
return strings.ToLower("workflow." + name)
}
key 是 "workflow." + name
转小写, value 是传入的 componentFactory,这是一个函数,只要传入一个 logger,就能返回 Workflow 实现。
create() 方法
create() 方法根据指定的 name ,version 来构建对应的 workflow 实现:
func (s *Registry) Create(name, version, logName string) (wfs.Workflow, error) {
if method, ok := s.getWorkflowComponent(name, version, logName); ok {
return method(), nil
}
return nil, fmt.Errorf("couldn't find wokflow %s/%s", name, version)
}
关键实现代码在 getWorkflowComponent() 方法中:
func (s *Registry) getWorkflowComponent(name, version, logName string) (func() wfs.Workflow, bool) {
nameLower := strings.ToLower(name)
versionLower := strings.ToLower(version)
// 用 nameLower+"/"+versionLower 拼接出 key
// 然后在 register 结构体的 workflowComponents 字段中查找
// TODO: 保存的时候是 key 是 `"workflow." + name` 转小写
workflowFn, ok := s.workflowComponents[nameLower+"/"+versionLower]
if ok {
return s.wrapFn(workflowFn, logName), true
}
// 如果没有找到,看看是不是 InitialVersion
if components.IsInitialVersion(versionLower) {
// 如果是 InitialVersion,则不需要拼接 version 内容,直接通过 name 来查找
// TODO:这要求 name 必须是 "workflow." 开头?
workflowFn, ok = s.workflowComponents[nameLower]
if ok {
return s.wrapFn(workflowFn, logName), true
}
}
return nil, false
}
如果有在 workflowComponents 字段中找到注册的 workflow 实现的 factory, 则用这个 factory 生成 workflow 的实现:
func (s *Registry) wrapFn(componentFactory func(logger.Logger) wfs.Workflow, logName string) func() wfs.Workflow {
return func() wfs.Workflow {
// registey 的 logger 会被用来做 workflow 实现的 logger
l := s.Logger
if logName != "" && l != nil {
// 在 logger 中增加 component 字段,值为 logName
l = l.WithFields(map[string]any{
"component": logName,
})
}
// 最后调用 factory 的方法来构建 workflow 实现
return componentFactory(l)
}
}
总结
需要小心核对 key 的内容:
- 是否带 “workflow.” 前缀
- 是否带version 或者 是否是 InitialVersion
6 - Healthz的源码学习
6.1 - health.go的源码学习
Dapr health package中的 health.go 文件的源码分析,health checking的客户端实现
代码实现
Option 方法定义
// Option is an a function that applies a health check option
type Option func(o *healthCheckOptions)
healthCheckOptions 结构体定义
healthCheckOptions 结构体
type healthCheckOptions struct {
initialDelay time.Duration
requestTimeout time.Duration
failureThreshold int
interval time.Duration
successStatusCode int
}
With系列方法
WithXxx 方法用来设置上述5个健康检查的选项,每个方法都返回一个 Option 函数:
// WithInitialDelay sets the initial delay for the health check
func WithInitialDelay(delay time.Duration) Option {
return func(o *healthCheckOptions) {
o.initialDelay = delay
}
}
// WithFailureThreshold sets the failure threshold for the health check
func WithFailureThreshold(threshold int) Option {
return func(o *healthCheckOptions) {
o.failureThreshold = threshold
}
}
// WithRequestTimeout sets the request timeout for the health check
func WithRequestTimeout(timeout time.Duration) Option {
return func(o *healthCheckOptions) {
o.requestTimeout = timeout
}
}
// WithSuccessStatusCode sets the status code for the health check
func WithSuccessStatusCode(code int) Option {
return func(o *healthCheckOptions) {
o.successStatusCode = code
}
}
// WithInterval sets the interval for the health check
func WithInterval(interval time.Duration) Option {
return func(o *healthCheckOptions) {
o.interval = interval
}
}
StartEndpointHealthCheck 方法
StartEndpointHealthCheck 方法用给定的选项在指定的地址上启动健康检查。它返回一个通道,如果端点是健康的则发出true,如果满足失败条件则发出false。
// StartEndpointHealthCheck starts a health check on the specified address with the given options.
// It returns a channel that will emit true if the endpoint is healthy and false if the failure conditions
// Have been met.
func StartEndpointHealthCheck(endpointAddress string, opts ...Option) chan bool {
options := &healthCheckOptions{}
applyDefaults(options)
// 执行每个 Option 函数来设置健康检查的选项
for _, o := range opts {
o(options)
}
signalChan := make(chan bool, 1)
go func(ch chan<- bool, endpointAddress string, options *healthCheckOptions) {
// 设置健康检查的间隔时间 interval,默认5秒一次
ticker := time.NewTicker(options.interval)
failureCount := 0
// 先 sleep initialDelay 时间再开始健康检查
time.Sleep(options.initialDelay)
// 创建 http client,设置请求超时时间为 requestTimeout
client := &fasthttp.Client{
MaxConnsPerHost: 5, // Limit Keep-Alive connections
ReadTimeout: options.requestTimeout,
MaxIdemponentCallAttempts: 1,
}
req := fasthttp.AcquireRequest()
req.SetRequestURI(endpointAddress)
req.Header.SetMethod(fasthttp.MethodGet)
defer fasthttp.ReleaseRequest(req)
for range ticker.C {
resp := fasthttp.AcquireResponse()
err := client.DoTimeout(req, resp, options.requestTimeout)
// 通过检查应答的状态码来判断健康检查是否成功: successStatusCode
if err != nil || resp.StatusCode() != options.successStatusCode {
// 健康检查失败,错误计数器加一
failureCount++
// 如果连续错误次数达到阈值 failureThreshold,则视为健康检查失败,发送false到channel
if failureCount == options.failureThreshold {
ch <- false
}
} else {
// 健康检查成功,发送 true 到 channel
ch <- true
// 同时重制 failureCount
failureCount = 0
}
fasthttp.ReleaseResponse(resp)
}
}(signalChan, endpointAddress, options)
return signalChan
}
applyDefaults() 方法设置默认属性:
const (
initialDelay = time.Second * 1
failureThreshold = 2
requestTimeout = time.Second * 2
interval = time.Second * 5
successStatusCode = 200
)
func applyDefaults(o *healthCheckOptions) {
o.failureThreshold = failureThreshold
o.initialDelay = initialDelay
o.requestTimeout = requestTimeout
o.successStatusCode = successStatusCode
o.interval = interval
}
健康检查方式总结
对某一个给定地址 endpointAddress 进行健康检查的步骤和方式为:
- 先 sleep initialDelay 时间再开始健康检查:可能对方还在初始化过程中
- 每隔间隔时间 interval 时间发起一次健康检查
- 每次健康检查是向目标地址 endpointAddress 发起一个 HTTP GET 请求,超时时间为 requestTimeout
- 检查应答判断是否健康
- 返回应答并且应答的状态码是 successStatusCode 则视为本地健康检查成功
- 超时或者应答的状态码不是 successStatusCode 则视为本地健康检查失败
- 如果失败则开始累加计数器,然后间隔 interval 时间之后再次进行健康检查
- 如果多次失败,累计达到阈值 failureThreshold,报告为健康检查失败
- 只要单次成功,则清理之前的错误累计次数,报告为健康检查成功。
6.2 - server.go的源码学习
Dapr health package中的 server.go 文件的源码分析,healthz server的实现
代码实现
Health server
healthz server 的接口定义:
// Server is the interface for the healthz server
type Server interface {
Run(context.Context, int) error
Ready()
NotReady()
}
server 结构体,ready 字段保存状态:
type server struct {
ready bool
log logger.Logger
}
创建 healthz server的方法:
// NewServer returns a new healthz server
func NewServer(log logger.Logger) Server {
return &server{
log: log,
}
}
设置 ready 状态的两个方法:
// Ready sets a ready state for the endpoint handlers
func (s *server) Ready() {
s.ready = true
}
// NotReady sets a not ready state for the endpoint handlers
func (s *server) NotReady() {
s.ready = false
}
运行healthz server
Run 方法启动一个带有 healthz 端点的 http 服务器,端口通过参数 port 指定:
// Run starts a net/http server with a healthz endpoint
func (s *server) Run(ctx context.Context, port int) error {
router := http.NewServeMux()
router.Handle("/healthz", s.healthz())
srv := &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: router,
}
...
}
启动之后:
doneCh := make(chan struct{})
go func() {
select {
case <-ctx.Done():
s.log.Info("Healthz server is shutting down")
shutdownCtx, cancel := context.WithTimeout(
context.Background(),
time.Second*5,
)
defer cancel()
srv.Shutdown(shutdownCtx) // nolint: errcheck
case <-doneCh:
}
}()
s.log.Infof("Healthz server is listening on %s", srv.Addr)
err := srv.ListenAndServe()
if err != http.ErrServerClosed {
s.log.Errorf("Healthz server error: %s", err)
}
close(doneCh)
return err
}
healthz server 处理请求
healthz() 方法是 health endpoint 的 handler,根据当前 healthz server 的 ready 字段的状态值返回 HTTP 状态码:
// healthz is a health endpoint handler
func (s *server) healthz() http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var status int
if s.ready {
// ready 返回 200
status = http.StatusOK
} else {
// 不 ready 则返回 503
status = http.StatusServiceUnavailable
}
w.WriteHeader(status)
})
}
使用场景
healthz server 在 injector / placement / sentry / operator 中都有使用,这些进程都是在 main 方法中启动 healthz server。
injector
injector 启动在 8080 端口:
const (
healthzPort = 8080
)
func main() {
......
go func() {
healthzServer := health.NewServer(log)
healthzServer.Ready()
healthzErr := healthzServer.Run(ctx, healthzPort)
if healthzErr != nil {
log.Fatalf("failed to start healthz server: %s", healthzErr)
}
}()
......
}
placement
placement 默认启动在 8080 端口(也可以通过命令行参数修改端口):
const (
defaultHealthzPort = 8080
)
func main() {
flag.IntVar(&cfg.healthzPort, "healthz-port", cfg.healthzPort, "sets the HTTP port for the healthz server")
......
go startHealthzServer(cfg.healthzPort)
......
}
func startHealthzServer(healthzPort int) {
healthzServer := health.NewServer(log)
healthzServer.Ready()
if err := healthzServer.Run(context.Background(), healthzPort); err != nil {
log.Fatalf("failed to start healthz server: %s", err)
}
}
sentry
sentry 启动在 8080 端口:
const (
healthzPort = 8080
)
func main() {
......
go func() {
healthzServer := health.NewServer(log)
healthzServer.Ready()
err := healthzServer.Run(ctx, healthzPort)
if err != nil {
log.Fatalf("failed to start healthz server: %s", err)
}
}()
......
}
operator
operator 启动在 8080 端口:
const (
healthzPort = 8080
)
func main() {
......
go func() {
healthzServer := health.NewServer(log)
healthzServer.Ready()
err := healthzServer.Run(ctx, healthzPort)
if err != nil {
log.Fatalf("failed to start healthz server: %s", err)
}
}()
......
}
darpd
特别指出:daprd 没有使用 healthz server,daprd 是直接在 dapr HTTP api 的基础上增加了 healthz 的功能。
具体代码在 http/api.go 中:
func NewAPI(......
api.endpoints = append(api.endpoints, api.constructHealthzEndpoints()...)
return api
}
func (a *api) constructHealthzEndpoints() []Endpoint {
return []Endpoint{
{
Methods: []string{fasthttp.MethodGet},
Route: "healthz",
Version: apiVersionV1,
Handler: a.onGetHealthz,
},
}
}
onGetHealthz() 方法处理请求:
func (a *api) onGetHealthz(reqCtx *fasthttp.RequestCtx) {
if !a.readyStatus {
msg := NewErrorResponse("ERR_HEALTH_NOT_READY", messages.ErrHealthNotReady)
respondWithError(reqCtx, fasthttp.StatusInternalServerError, msg)
log.Debug(msg)
} else {
respondEmpty(reqCtx)
}
}
func respondEmpty(ctx *fasthttp.RequestCtx) {
ctx.Response.SetBody(nil)
ctx.Response.SetStatusCode(fasthttp.StatusNoContent)
}
注意:这里成功时返回的状态码是 204 StatusNoContent,而不是通常的 200 OK。
7 - Metrics的源码学习
7.1 - exporter.go的源码学习
Dapr metrics package中的 exporter.go文件的源码分析,包括结构体定义、方法实现。当前只支持 Prometheus。
Exporter定义和实现
Exporter 接口定义
Exporter 接口定义:
// Exporter is the interface for metrics exporters
type Exporter interface {
// Init initializes metrics exporter
Init() error
// Options returns Exporter options
Options() *Options
}
exporter 结构体定义
exporter 结构体定义:
// exporter is the base struct
type exporter struct {
namespace string
options *Options
logger logger.Logger
}
构建 exporter
// NewExporter creates new MetricsExporter instance
func NewExporter(namespace string) Exporter {
// TODO: support multiple exporters
return &promMetricsExporter{
&exporter{
namespace: namespace,
options: defaultMetricOptions(),
logger: logger.NewLogger("dapr.metrics"),
},
nil,
}
}
当前只支持 promMetrics 的 Exporter。
接口方法Options()的实现
Options() 方法简单返回 m.options:
// Options returns current metric exporter options
func (m *exporter) Options() *Options {
return m.options
}
具体的赋值在 defaultMetricOptions().
Prometheus Exporter的实现
promMetricsExporter 结构体定义
// promMetricsExporter is prometheus metric exporter
type promMetricsExporter struct {
*exporter
ocExporter *ocprom.Exporter
}
内嵌 exporter (相当于继承),还有一个 ocprom.Exporter 字段。
接口方法 Init() 的实现
初始化 opencensus 的 exporter:
// Init initializes opencensus exporter
func (m *promMetricsExporter) Init() error {
if !m.exporter.Options().MetricsEnabled {
return nil
}
// Add default health metrics for process
// 添加默认的 health metrics: 进程信息,和 go 信息
registry := prom.NewRegistry()
registry.MustRegister(prom.NewProcessCollector(prom.ProcessCollectorOpts{}))
registry.MustRegister(prom.NewGoCollector())
var err error
m.ocExporter, err = ocprom.NewExporter(ocprom.Options{
Namespace: m.namespace,
Registry: registry,
})
if err != nil {
return errors.Errorf("failed to create Prometheus exporter: %v", err)
}
// register exporter to view
view.RegisterExporter(m.ocExporter)
// start metrics server
return m.startMetricServer()
}
startMetricServer() 方法的实现
启动 MetricServer, 监听端口来自 options 的 MetricsPort,监听路径为 defaultMetricsPath:
const (
defaultMetricsPath = "/"
)
// startMetricServer starts metrics server
func (m *promMetricsExporter) startMetricServer() error {
if !m.exporter.Options().MetricsEnabled {
// skip if metrics is not enabled
return nil
}
addr := fmt.Sprintf(":%d", m.options.MetricsPort())
if m.ocExporter == nil {
return errors.New("exporter was not initialized")
}
m.exporter.logger.Infof("metrics server started on %s%s", addr, defaultMetricsPath)
go func() {
mux := http.NewServeMux()
mux.Handle(defaultMetricsPath, m.ocExporter)
if err := http.ListenAndServe(addr, mux); err != nil {
m.exporter.logger.Fatalf("failed to start metrics server: %v", err)
}
}()
return nil
}
7.2 - options.go的源码学习
Dapr metrics package中的 options.go文件的源码学习
代码实现
Options 结构体定义
// Options defines the sets of options for Dapr logging
type Options struct {
// OutputLevel is the level of logging
MetricsEnabled bool
metricsPort string
}
默认值
metrics 默认端口 9090, 默认启用 metrics:
const (
defaultMetricsPort = "9090"
defaultMetricsEnabled = true
)
func defaultMetricOptions() *Options {
return &Options{
metricsPort: defaultMetricsPort,
MetricsEnabled: defaultMetricsEnabled,
}
}
MetricsPort() 方法实现
MetricsPort() 方法用于获取 metrics 端口,如果配置错误,则使用默认端口 9090:
// MetricsPort gets metrics port.
func (o *Options) MetricsPort() uint64 {
port, err := strconv.ParseUint(o.metricsPort, 10, 64)
if err != nil {
// Use default metrics port as a fallback
port, _ = strconv.ParseUint(defaultMetricsPort, 10, 64)
}
return port
}
解析命令行标记的方法
AttachCmdFlags() 方法
AttachCmdFlags() 方法解析 metrics-port 和 enable-metrics 两个命令行标记:
// AttachCmdFlags attaches metrics options to command flags
func (o *Options) AttachCmdFlags(
stringVar func(p *string, name string, value string, usage string),
boolVar func(p *bool, name string, value bool, usage string)) {
stringVar(
&o.metricsPort,
"metrics-port",
defaultMetricsPort,
"The port for the metrics server")
boolVar(
&o.MetricsEnabled,
"enable-metrics",
defaultMetricsEnabled,
"Enable prometheus metric")
}
AttachCmdFlag() 方法
AttachCmdFlag() 方法只解析 metrics-port 命令行标记(不解析 enable-metrics ) :
// AttachCmdFlag attaches single metrics option to command flags
func (o *Options) AttachCmdFlag(
stringVar func(p *string, name string, value string, usage string)) {
stringVar(
&o.metricsPort,
"metrics-port",
defaultMetricsPort,
"The port for the metrics server")
}
使用场景
只解析 metrics-port 命令行标记 的 AttachCmdFlag() 方法在 dapr runtime 启动时被调用(也只被这一个地方调用):
metricsExporter := metrics.NewExporter(metrics.DefaultMetricNamespace)
// attaching only metrics-port option
metricsExporter.Options().AttachCmdFlag(flag.StringVar)
而解析 metrics-port 和 enable-metrics 两个命令行标记的 AttachCmdFlags() 方法被 injector / operator / placement / sentry 调用:
func init() {
metricsExporter := metrics.NewExporter(metrics.DefaultMetricNamespace)
metricsExporter.Options().AttachCmdFlags(flag.StringVar, flag.BoolVar)
}
8 - workflow的源码
8.1 - workflow API
proto 定义
dapr/proto/runtime/v1/dapr.proto
service Dapr {
// Starts a new instance of a workflow
rpc StartWorkflowAlpha1 (StartWorkflowRequest) returns (StartWorkflowResponse) {}
// Gets details about a started workflow instance
rpc GetWorkflowAlpha1 (GetWorkflowRequest) returns (GetWorkflowResponse) {}
// Purge Workflow
rpc PurgeWorkflowAlpha1 (PurgeWorkflowRequest) returns (google.protobuf.Empty) {}
// Terminates a running workflow instance
rpc TerminateWorkflowAlpha1 (TerminateWorkflowRequest) returns (google.protobuf.Empty) {}
// Pauses a running workflow instance
rpc PauseWorkflowAlpha1 (PauseWorkflowRequest) returns (google.protobuf.Empty) {}
// Resumes a paused workflow instance
rpc ResumeWorkflowAlpha1 (ResumeWorkflowRequest) returns (google.protobuf.Empty) {}
// Raise an event to a running workflow instance
rpc RaiseEventWorkflowAlpha1 (RaiseEventWorkflowRequest) returns (google.protobuf.Empty) {}
}
workflow 没有 sidecar 往应用方向发请求的场景,也就是没有 appcallback 。
生成的 go 代码
pkg/proto/runtime/v1
下存放的是根据 proto 生成的 go 代码
比如 pkg/proto/runtime/v1/dapr_grpc.pb.go
8.2 - workflow HTTP API
pkg/http/api.go
构建workflow的endpoint
const (
workflowComponent = "workflowComponent"
workflowName = "workflowName"
)
func NewAPI(opts APIOpts) API {
api := &api{
......
api.endpoints = append(api.endpoints, api.constructWorkflowEndpoints()...)
return api
}
constructWorkflowEndpoints() 方法的实现在 pkg/http/api_workflow.go
中:
func (a *api) constructWorkflowEndpoints() []Endpoint {
return []Endpoint{
{
Methods: []string{http.MethodGet},
Route: "workflows/{workflowComponent}/{instanceID}",
Version: apiVersionV1alpha1,
Handler: a.onGetWorkflowHandler(),
},
{
Methods: []string{http.MethodPost},
Route: "workflows/{workflowComponent}/{instanceID}/raiseEvent/{eventName}",
Version: apiVersionV1alpha1,
Handler: a.onRaiseEventWorkflowHandler(),
},
{
Methods: []string{http.MethodPost},
Route: "workflows/{workflowComponent}/{workflowName}/start",
Version: apiVersionV1alpha1,
Handler: a.onStartWorkflowHandler(),
},
{
Methods: []string{http.MethodPost},
Route: "workflows/{workflowComponent}/{instanceID}/pause",
Version: apiVersionV1alpha1,
Handler: a.onPauseWorkflowHandler(),
},
{
Methods: []string{http.MethodPost},
Route: "workflows/{workflowComponent}/{instanceID}/resume",
Version: apiVersionV1alpha1,
Handler: a.onResumeWorkflowHandler(),
},
{
Methods: []string{http.MethodPost},
Route: "workflows/{workflowComponent}/{instanceID}/terminate",
Version: apiVersionV1alpha1,
Handler: a.onTerminateWorkflowHandler(),
},
{
Methods: []string{http.MethodPost},
Route: "workflows/{workflowComponent}/{instanceID}/purge",
Version: apiVersionV1alpha1,
Handler: a.onPurgeWorkflowHandler(),
},
}
}
handler 实现
pkg/http/api_workflow.go
onStartWorkflowHandler()
// Route: "workflows/{workflowComponent}/{workflowName}/start?instanceID={instanceID}",
// Workflow Component: Component specified in yaml
// Workflow Name: Name of the workflow to run
// Instance ID: Identifier of the specific run
func (a *api) onStartWorkflowHandler() http.HandlerFunc {
return UniversalHTTPHandler(
a.universal.StartWorkflowAlpha1,
// UniversalHTTPHandlerOpts 是范型结构体
UniversalHTTPHandlerOpts[*runtimev1pb.StartWorkflowRequest, *runtimev1pb.StartWorkflowResponse]{
// We pass the input body manually rather than parsing it using protojson
SkipInputBody: true,
InModifier: func(r *http.Request, in *runtimev1pb.StartWorkflowRequest) (*runtimev1pb.StartWorkflowRequest, error) {
in.WorkflowName = chi.URLParam(r, workflowName)
in.WorkflowComponent = chi.URLParam(r, workflowComponent)
// instance id 是可选的,如果没有指定则生成一个随机的
// The instance ID is optional. If not specified, we generate a random one.
in.InstanceId = r.URL.Query().Get(instanceID)
if in.InstanceId == "" {
randomID, err := uuid.NewRandom()
if err != nil {
return nil, err
}
in.InstanceId = randomID.String()
}
// HTTP request body 直接用来做 workflow 的 Input
// We accept the HTTP request body as the input to the workflow
// without making any assumptions about its format.
var err error
in.Input, err = io.ReadAll(r.Body)
if err != nil {
return nil, messages.ErrBodyRead.WithFormat(err)
}
return in, nil
},
SuccessStatusCode: http.StatusAccepted,
})
}
onGetWorkflowHandler()
// Route: POST "workflows/{workflowComponent}/{instanceID}"
func (a *api) onGetWorkflowHandler() http.HandlerFunc {
return UniversalHTTPHandler(
a.universal.GetWorkflowAlpha1,
UniversalHTTPHandlerOpts[*runtimev1pb.GetWorkflowRequest, *runtimev1pb.GetWorkflowResponse]{
InModifier: workflowInModifier[*runtimev1pb.GetWorkflowRequest],
})
}
workflowInModifier() 方法是通用方法,读取 WorkflowComponent 和 InstanceId 两个参数:
// Shared InModifier method for all universal handlers for workflows that adds the "WorkflowComponent" and "InstanceId" properties
func workflowInModifier[T runtimev1pb.WorkflowRequests](r *http.Request, in T) (T, error) {
in.SetWorkflowComponent(chi.URLParam(r, workflowComponent))
in.SetInstanceId(chi.URLParam(r, instanceID))
return in, nil
}
8.3 - workflow gRPC API
proto 定义
dapr/proto/runtime/v1/dapr.proto
service Dapr {
// Starts a new instance of a workflow
rpc StartWorkflowAlpha1 (StartWorkflowRequest) returns (StartWorkflowResponse) {}
// Gets details about a started workflow instance
rpc GetWorkflowAlpha1 (GetWorkflowRequest) returns (GetWorkflowResponse) {}
// Purge Workflow
rpc PurgeWorkflowAlpha1 (PurgeWorkflowRequest) returns (google.protobuf.Empty) {}
// Terminates a running workflow instance
rpc TerminateWorkflowAlpha1 (TerminateWorkflowRequest) returns (google.protobuf.Empty) {}
// Pauses a running workflow instance
rpc PauseWorkflowAlpha1 (PauseWorkflowRequest) returns (google.protobuf.Empty) {}
// Resumes a paused workflow instance
rpc ResumeWorkflowAlpha1 (ResumeWorkflowRequest) returns (google.protobuf.Empty) {}
// Raise an event to a running workflow instance
rpc RaiseEventWorkflowAlpha1 (RaiseEventWorkflowRequest) returns (google.protobuf.Empty) {}
}
workflow 没有 sidecar 往应用方向发请求的场景,也就是没有 appcallback 。
生成的 go 代码
pkg/proto/runtime/v1
下存放的是根据 proto 生成的 go 代码
比如 pkg/proto/runtime/v1/dapr_grpc.pb.go
9 - 状态管理的源码
9.1 - 状态管理源码的概述
状态管理的源码
9.2 - 状态管理的初始化源码分析
State Store Registry
stateStoreRegistry的初始化准备
stateStoreRegistry Registry 的初始化在 runtime 初始化时进行:
func NewDaprRuntime(runtimeConfig *Config, globalConfig *config.Configuration) *DaprRuntime {
......
stateStoreRegistry: state_loader.NewRegistry(),
}
func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
......
a.stateStoreRegistry.Register(opts.states...)
......
}
这些 opts 来自 runtime 启动时的配置,如 cmd/daprd/main.go 下:
func main() {
rt, err := runtime.FromFlags()
if err != nil {
log.Fatal(err)
}
err = rt.Run(
......
runtime.WithStates(
state_loader.New("redis", func() state.Store {
return state_redis.NewRedisStateStore(logContrib)
}),
state_loader.New("consul", func() state.Store {
return consul.NewConsulStateStore(logContrib)
}),
state_loader.New("azure.blobstorage", func() state.Store {
return state_azure_blobstorage.NewAzureBlobStorageStore(logContrib)
}),
state_loader.New("azure.cosmosdb", func() state.Store {
return state_cosmosdb.NewCosmosDBStateStore(logContrib)
}),
state_loader.New("azure.tablestorage", func() state.Store {
return state_azure_tablestorage.NewAzureTablesStateStore(logContrib)
}),
//state_loader.New("etcd", func() state.Store {
// return etcd.NewETCD(logContrib)
//}),
state_loader.New("cassandra", func() state.Store {
return cassandra.NewCassandraStateStore(logContrib)
}),
state_loader.New("memcached", func() state.Store {
return memcached.NewMemCacheStateStore(logContrib)
}),
state_loader.New("mongodb", func() state.Store {
return mongodb.NewMongoDB(logContrib)
}),
state_loader.New("zookeeper", func() state.Store {
return zookeeper.NewZookeeperStateStore(logContrib)
}),
state_loader.New("gcp.firestore", func() state.Store {
return firestore.NewFirestoreStateStore(logContrib)
}),
state_loader.New("postgresql", func() state.Store {
return postgresql.NewPostgreSQLStateStore(logContrib)
}),
state_loader.New("sqlserver", func() state.Store {
return sqlserver.NewSQLServerStateStore(logContrib)
}),
state_loader.New("hazelcast", func() state.Store {
return hazelcast.NewHazelcastStore(logContrib)
}),
state_loader.New("cloudstate.crdt", func() state.Store {
return cloudstate.NewCRDT(logContrib)
}),
state_loader.New("couchbase", func() state.Store {
return couchbase.NewCouchbaseStateStore(logContrib)
}),
state_loader.New("aerospike", func() state.Store {
return aerospike.NewAerospikeStateStore(logContrib)
}),
),
......
}
在这里配置各种 state store 的实现。
State Store Registry的实现方式
pkg/components/state/registry.go,定义了registry的接口和数据结构:
// Registry is an interface for a component that returns registered state store implementations
type Registry interface {
Register(components ...State)
CreateStateStore(name string) (state.Store, error)
}
type stateStoreRegistry struct {
stateStores map[string]func() state.Store
}
state.Store 是 dapr 定义的标准 state store的接口,所有的实现都要遵循这个接口。定义在 github.com/dapr/components-contrib/state/store.go
文件中:
// Store is an interface to perform operations on store
type Store interface {
Init(metadata Metadata) error
Delete(req *DeleteRequest) error
BulkDelete(req []DeleteRequest) error
Get(req *GetRequest) (*GetResponse, error)
Set(req *SetRequest) error
BulkSet(req []SetRequest) error
}
前面 runtime 初始化时,每个实现都通过 New 方法将 name 和对应的 state store 关联起来:
type State struct {
Name string
FactoryMethod func() state.Store
}
func New(name string, factoryMethod func() state.Store) State {
return State{
Name: name,
FactoryMethod: factoryMethod,
}
}
State Store的初始化流程
pkg/runtime/runtime.go :
State 的初始化在 runtime 初始化时进行:
func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
......
go a.processComponents()
......
}
func (a *DaprRuntime) processComponents() {
for {
comp, more := <-a.pendingComponents
if !more {
a.pendingComponentsDone <- true
return
}
if err := a.processOneComponent(comp); err != nil {
log.Errorf("process component %s error, %s", comp.Name, err)
}
}
}
processOneComponent:
func (a *DaprRuntime) processOneComponent(comp components_v1alpha1.Component) error {
res := a.preprocessOneComponent(&comp)
compCategory := a.figureOutComponentCategory(comp)
......
return nil
}
doProcessOneComponent:
func (a *DaprRuntime) doProcessOneComponent(category ComponentCategory, comp components_v1alpha1.Component) error {
switch category {
case stateComponent:
return a.initState(comp)
}
......
return nil
}
initState方法的实现:
// Refer for state store api decision https://github.com/dapr/dapr/blob/master/docs/decision_records/api/API-008-multi-state-store-api-design.md
func (a *DaprRuntime) initState(s components_v1alpha1.Component) error {
// 构建 state store(这里才开始集成components的代码)
store, err := a.stateStoreRegistry.CreateStateStore(s.Spec.Type)
if err != nil {
log.Warnf("error creating state store %s: %s", s.Spec.Type, err)
diag.DefaultMonitoring.ComponentInitFailed(s.Spec.Type, "creation")
return err
}
if store != nil {
props := a.convertMetadataItemsToProperties(s.Spec.Metadata)
// components的store实现在这里做初始化,如建连
err := store.Init(state.Metadata{
Properties: props,
})
if err != nil {
diag.DefaultMonitoring.ComponentInitFailed(s.Spec.Type, "init")
log.Warnf("error initializing state store %s: %s", s.Spec.Type, err)
return err
}
// 将初始化完成的store实现存放在runtime中
a.stateStores[s.ObjectMeta.Name] = store
// set specified actor store if "actorStateStore" is true in the spec.
actorStoreSpecified := props[actorStateStore]
if actorStoreSpecified == "true" {
if a.actorStateStoreCount++; a.actorStateStoreCount == 1 {
a.actorStateStoreName = s.ObjectMeta.Name
}
}
diag.DefaultMonitoring.ComponentInitialized(s.Spec.Type)
}
if a.actorStateStoreName == "" || a.actorStateStoreCount != 1 {
log.Warnf("either no actor state store or multiple actor state stores are specified in the configuration, actor stores specified: %d", a.actorStateStoreCount)
}
return nil
}
其中 CreateStateStore 方法的实现在 pkg/components/state/registry.go
中:
func (s *stateStoreRegistry) CreateStateStore(name string) (state.Store, error) {
if method, ok := s.stateStores[name]; ok {
return method(), nil
}
return nil, errors.Errorf("couldn't find state store %s", name)
}
9.3 - 状态管理的runtime处理源码分析
runtime 处理 state 请求的代码在 pkg/grpc/api.go
中。
get state
func (a *api) GetState(ctx context.Context, in *runtimev1pb.GetStateRequest) (*runtimev1pb.GetStateResponse, error) {
// 找 store name 对应的 state store
// 所以请求里面的 store name,必须对应 yaml 文件里面的 name
store, err := a.getStateStore(in.StoreName)
if err != nil {
apiServerLogger.Debug(err)
return &runtimev1pb.GetStateResponse{}, err
}
req := state.GetRequest{
Key: a.getModifiedStateKey(in.Key),
Metadata: in.Metadata,
Options: state.GetStateOption{
Consistency: stateConsistencyToString(in.Consistency),
},
}
// 执行查询
// 里面实际上会先执行 HGETALL 命令,失败后再执行 GET 命令
getResponse, err := store.Get(&req)
if err != nil {
err = fmt.Errorf("ERR_STATE_GET: %s", err)
apiServerLogger.Debug(err)
return &runtimev1pb.GetStateResponse{}, err
}
response := &runtimev1pb.GetStateResponse{}
if getResponse != nil {
response.Etag = getResponse.ETag
response.Data = getResponse.Data
}
return response, nil
}
get bulk state
get bulk 方法的实现是有 runtime 封装 get 方法而成,底层 state store 只需要实现单个查询的 get 即可。
func (a *api) GetBulkState(ctx context.Context, in *runtimev1pb.GetBulkStateRequest) (*runtimev1pb.GetBulkStateResponse, error) {
store, err := a.getStateStore(in.StoreName)
if err != nil {
apiServerLogger.Debug(err)
return &runtimev1pb.GetBulkStateResponse{}, err
}
resp := &runtimev1pb.GetBulkStateResponse{}
// 如果 Parallelism <= 0,则取默认值100
limiter := concurrency.NewLimiter(int(in.Parallelism))
for _, k := range in.Keys {
fn := func(param interface{}) {
req := state.GetRequest{
Key: a.getModifiedStateKey(param.(string)),
Metadata: in.Metadata,
}
r, err := store.Get(&req)
item := &runtimev1pb.BulkStateItem{
Key: param.(string),
}
if err != nil {
item.Error = err.Error()
} else if r != nil {
item.Data = r.Data
item.Etag = r.ETag
}
resp.Items = append(resp.Items, item)
}
limiter.Execute(fn, k)
}
limiter.Wait()
return resp, nil
}
save state
func (a *api) SaveState(ctx context.Context, in *runtimev1pb.SaveStateRequest) (*empty.Empty, error) {
store, err := a.getStateStore(in.StoreName)
if err != nil {
apiServerLogger.Debug(err)
return &empty.Empty{}, err
}
reqs := []state.SetRequest{}
for _, s := range in.States {
req := state.SetRequest{
Key: a.getModifiedStateKey(s.Key),
Metadata: s.Metadata,
Value: s.Value,
ETag: s.Etag,
}
if s.Options != nil {
req.Options = state.SetStateOption{
Consistency: stateConsistencyToString(s.Options.Consistency),
Concurrency: stateConcurrencyToString(s.Options.Concurrency),
}
}
reqs = append(reqs, req)
}
// 调用 store 的 BulkSet 方法
// 事实上store的Set方法根本没有被 runtime 调用???
err = store.BulkSet(reqs)
if err != nil {
err = fmt.Errorf("ERR_STATE_SAVE: %s", err)
apiServerLogger.Debug(err)
return &empty.Empty{}, err
}
return &empty.Empty{}, nil
}
delete state
func (a *api) DeleteState(ctx context.Context, in *runtimev1pb.DeleteStateRequest) (*empty.Empty, error) {
store, err := a.getStateStore(in.StoreName)
if err != nil {
apiServerLogger.Debug(err)
return &empty.Empty{}, err
}
req := state.DeleteRequest{
Key: a.getModifiedStateKey(in.Key),
Metadata: in.Metadata,
ETag: in.Etag,
}
if in.Options != nil {
req.Options = state.DeleteStateOption{
Concurrency: stateConcurrencyToString(in.Options.Concurrency),
Consistency: stateConsistencyToString(in.Options.Consistency),
}
}
// 调用 store 的delete方法
// store 的 BulkDelete 方法没有调用
// runtime 也没有对外暴露 BulkDelete 方法
err = store.Delete(&req)
if err != nil {
err = fmt.Errorf("ERR_STATE_DELETE: failed deleting state with key %s: %s", in.Key, err)
apiServerLogger.Debug(err)
return &empty.Empty{}, err
}
return &empty.Empty{}, nil
}
Execute State Transaction
如果要支持事务,则要求实现 TransactionalStore 接口:
type TransactionalStore interface {
// Init方法是和普通store接口一致的
Init(metadata Metadata) error
// 增加的是 Multi 方法
Multi(request *TransactionalStateRequest) error
}
runtime 的 ExecuteStateTransaction 方法的实现:
func (a *api) ExecuteStateTransaction(ctx context.Context, in *runtimev1pb.ExecuteStateTransactionRequest) (*empty.Empty, error) {
if a.stateStores == nil || len(a.stateStores) == 0 {
err := errors.New("ERR_STATE_STORE_NOT_CONFIGURED")
apiServerLogger.Debug(err)
return &empty.Empty{}, err
}
storeName := in.StoreName
if a.stateStores[storeName] == nil {
err := errors.New("ERR_STATE_STORE_NOT_FOUND")
apiServerLogger.Debug(err)
return &empty.Empty{}, err
}
// 检测是否是 TransactionalStore
transactionalStore, ok := a.stateStores[storeName].(state.TransactionalStore)
if !ok {
err := errors.New("ERR_STATE_STORE_NOT_SUPPORTED")
apiServerLogger.Debug(err)
return &empty.Empty{}, err
}
// 构造请求
operations := []state.TransactionalStateOperation{}
for _, inputReq := range in.Operations {
var operation state.TransactionalStateOperation
var req = inputReq.Request
switch state.OperationType(inputReq.OperationType) {
case state.Upsert:
setReq := state.SetRequest{
Key: a.getModifiedStateKey(req.Key),
// Limitation:
// components that cannot handle byte array need to deserialize/serialize in
// component sepcific way in components-contrib repo.
Value: req.Value,
Metadata: req.Metadata,
ETag: req.Etag,
}
if req.Options != nil {
setReq.Options = state.SetStateOption{
Concurrency: stateConcurrencyToString(req.Options.Concurrency),
Consistency: stateConsistencyToString(req.Options.Consistency),
}
}
operation = state.TransactionalStateOperation{
Operation: state.Upsert,
Request: setReq,
}
case state.Delete:
delReq := state.DeleteRequest{
Key: a.getModifiedStateKey(req.Key),
Metadata: req.Metadata,
ETag: req.Etag,
}
if req.Options != nil {
delReq.Options = state.DeleteStateOption{
Concurrency: stateConcurrencyToString(req.Options.Concurrency),
Consistency: stateConsistencyToString(req.Options.Consistency),
}
}
operation = state.TransactionalStateOperation{
Operation: state.Delete,
Request: delReq,
}
default:
err := fmt.Errorf("ERR_OPERATION_NOT_SUPPORTED: operation type %s not supported", inputReq.OperationType)
apiServerLogger.Debug(err)
return &empty.Empty{}, err
}
operations = append(operations, operation)
}
// 调用 state store 的 Multi 方法执行有事务性的多个操作
err := transactionalStore.Multi(&state.TransactionalStateRequest{
Operations: operations,
Metadata: in.Metadata,
})
if err != nil {
err = fmt.Errorf("ERR_STATE_TRANSACTION: %s", err)
apiServerLogger.Debug(err)
return &empty.Empty{}, err
}
return &empty.Empty{}, nil
}
9.4 - 状态管理中Redis实现的处理源码分析
状态管理的redis实现
Redis的实现在 dapr/components-contrib 下,/state/redis/redis.go 中:
// StateStore is a Redis state store
type StateStore struct {
client *redis.Client
json jsoniter.API
metadata metadata
replicas int
logger logger.Logger
}
// NewRedisStateStore returns a new redis state store
func NewRedisStateStore(logger logger.Logger) *StateStore {
return &StateStore{
json: jsoniter.ConfigFastest,
logger: logger,
}
}
初始化
在 dapr runtime 初始化时,关联 redis 的 state 实现:
state_loader.New("redis", func() state.Store {
return state_redis.NewRedisStateStore(logContrib)
}),
然后 Init 方法会在 state 初始化时被 dapr runtime 调用,Redis的实现内容为:
// Init does metadata and connection parsing
func (r *StateStore) Init(metadata state.Metadata) error {
m, err := parseRedisMetadata(metadata)
if err != nil {
return err
}
r.metadata = m
if r.metadata.failover {
r.client = r.newFailoverClient(m)
} else {
r.client = r.newClient(m)
}
if _, err = r.client.Ping().Result(); err != nil {
return fmt.Errorf("redis store: error connecting to redis at %s: %s", m.host, err)
}
r.replicas, err = r.getConnectedSlaves()
return err
}
get state
get的实现方式:
// Get retrieves state from redis with a key
func (r *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
res, err := r.client.DoContext(context.Background(), "HGETALL", req.Key).Result() // Prefer values with ETags
if err != nil {
return r.directGet(req) //Falls back to original get
}
if res == nil {
// 结果为空的处理1
return &state.GetResponse{}, nil
}
vals := res.([]interface{})
if len(vals) == 0 {
// 结果为空的处理2
// 所以如果没有找到对应key的值,是给空应答,而不是报错
return &state.GetResponse{}, nil
}
data, version, err := r.getKeyVersion(vals)
if err != nil {
return nil, err
}
return &state.GetResponse{
Data: []byte(data),
ETag: version,
}, nil
}
支持ETag的实现方式
要支持ETag,就不能简单用 redis 的 key / value 方式直接在value中存放state的数据(data字段,byte[]格式),这个“value”需要包含出data之外的其他Etag字段,比如 version。
redis state实现的设计方式方式是:对于每个存储在 redis 中的 state item中,其value是一个hashmap,在这个value hashmap中通过不同的key存放多个信息:
- data:state的数据
- version:ETag需要的version
所以前面要用 HGETALL 命令把这个hashamap的所有key/value都取出来,然后现在要通过getKeyVersion方法来从这些key/value中读取data和version:
func (r *StateStore) getKeyVersion(vals []interface{}) (data string, version string, err error) {
seenData := false
seenVersion := false
for i := 0; i < len(vals); i += 2 {
field, _ := strconv.Unquote(fmt.Sprintf("%q", vals[i]))
switch field {
case "data":
data, _ = strconv.Unquote(fmt.Sprintf("%q", vals[i+1]))
seenData = true
case "version":
version, _ = strconv.Unquote(fmt.Sprintf("%q", vals[i+1]))
seenVersion = true
}
}
if !seenData || !seenVersion {
return "", "", errors.New("required hash field 'data' or 'version' was not found")
}
return data, version, nil
}
返回的时候,带上ETag:
return &state.GetResponse{
Data: []byte(data),
ETag: version,
}, nil
不支持ETag的实现方式
如果 HGETALL 命令执行失败,则fall back到普通场景:redis中只简单保存数据,没有etag。此时保存方式就是简单的key/value,用简单的 GET 命令直接读取:
func (r *StateStore) directGet(req *state.GetRequest) (*state.GetResponse, error) {
res, err := r.client.DoContext(context.Background(), "GET", req.Key).Result()
if err != nil {
return nil, err
}
if res == nil {
return &state.GetResponse{}, nil
}
s, _ := strconv.Unquote(fmt.Sprintf("%q", res))
return &state.GetResponse{
Data: []byte(s),
}, nil
}
备注:这个设计有个性能问题,如果redis中的数据是用简单key/value存储,没有etag,则每次读取都要进行两个:第一次 HGETALL 命令失败,然后 fall back 用 GET 命令再读第二次。
save state
redis的实现,有 set 方法和 BulkSet
// Set saves state into redis
func (r *StateStore) Set(req *state.SetRequest) error {
return state.SetWithOptions(r.setValue, req)
}
// BulkSet performs a bulks save operation
func (r *StateStore) BulkSet(req []state.SetRequest) error {
for i := range req {
err := r.Set(&req[i])
if err != nil {
// 这个地方有异议
// 按照代码逻辑,只要有一个save操作失败,就直接return而放弃后续的操作
return err
}
}
return nil
}
实际实现在 r.setValue 方法中:
func (r *StateStore) setValue(req *state.SetRequest) error {
err := state.CheckRequestOptions(req.Options)
if err != nil {
return err
}
// 解析etag,要求etag必须是可以转为整型
ver, err := r.parseETag(req.ETag)
if err != nil {
return err
}
// LastWrite win意味着无视ETag的异同,强制写入
// 所以这里重置 ver 为 0
if req.Options.Concurrency == state.LastWrite {
ver = 0
}
bt, _ := utils.Marshal(req.Value, r.json.Marshal)
// 用 EVAL 命令执行一段 LUA 脚本,脚本内容为 setQuery
_, err = r.client.DoContext(context.Background(), "EVAL", setQuery, 1, req.Key, ver, bt).Result()
if err != nil {
return fmt.Errorf("failed to set key %s: %s", req.Key, err)
}
// 如果要求强一致性,而且副本数量大于0
if req.Options.Consistency == state.Strong && r.replicas > 0 {
// 则需要等待所有副本数都写入成功
_, err = r.client.DoContext(context.Background(), "WAIT", r.replicas, 1000).Result()
if err != nil {
return fmt.Errorf("timed out while waiting for %v replicas to acknowledge write", r.replicas)
}
}
return nil
}
更多redis细节:
- setQuery 脚本
setQuery = "local var1 = redis.pcall(\"HGET\", KEYS[1], \"version\"); if type(var1) == \"table\" then redis.call(\"DEL\", KEYS[1]); end; if not var1 or type(var1)==\"table\" or var1 == \"\" or var1 == ARGV[1] or ARGV[1] == \"0\" then redis.call(\"HSET\", KEYS[1], \"data\", ARGV[2]) return redis.call(\"HINCRBY\", KEYS[1], \"version\", 1) else return error(\"failed to set key \" .. KEYS[1]) end"
- WAIT numreplicas timeout 命令:https://redis.io/commands/wait
delete state
// Delete performs a delete operation
func (r *StateStore) Delete(req *state.DeleteRequest) error {
err := state.CheckRequestOptions(req.Options)
if err != nil {
return err
}
return state.DeleteWithOptions(r.deleteValue, req)
}
// 内部循环调用 Delete
// BulkDelete 方法没有暴露给 dapr runtime
// BulkDelete performs a bulk delete operation
func (r *StateStore) BulkDelete(req []state.DeleteRequest) error {
for i := range req {
err := r.Delete(&req[i])
if err != nil {
return err
}
}
return nil
}
实际实现在 r.deleteValue 方法中:
func (r *StateStore) deleteValue(req *state.DeleteRequest) error {
if req.ETag == "" {
// ETag的空值则改为 “0” / 零值
req.ETag = "0"
}
_, err := r.client.DoContext(context.Background(), "EVAL", delQuery, 1, req.Key, req.ETag).Result()
if err != nil {
return fmt.Errorf("failed to delete key '%s' due to ETag mismatch", req.Key)
}
return nil
}
更多redis细节:
- delQuery 脚本
delQuery = "local var1 = redis.pcall(\"HGET\", KEYS[1], \"version\"); if not var1 or type(var1)==\"table\" or var1 == ARGV[1] or var1 == \"\" or ARGV[1] == \"0\" then return redis.call(\"DEL\", KEYS[1]) else return error(\"failed to delete \" .. KEYS[1]) end"
State Transaction
redis state store 实现了 TransactionalStore,它的 Multi方式:
// Multi performs a transactional operation. succeeds only if all operations succeed, and fails if one or more operations fail
func (r *StateStore) Multi(request *state.TransactionalStateRequest) error {
// 用的是 redis-go 封装的 TxPipeline
pipe := r.client.TxPipeline()
for _, o := range request.Operations {
if o.Operation == state.Upsert {
req := o.Request.(state.SetRequest)
bt, _ := utils.Marshal(req.Value, r.json.Marshal)
pipe.Set(req.Key, bt, defaultExpirationTime)
} else if o.Operation == state.Delete {
req := o.Request.(state.DeleteRequest)
pipe.Del(req.Key)
}
}
_, err := pipe.Exec()
return err
}
10 - 资源绑定的源码
10.1 - 资源绑定的源码概述
10.2 - 资源绑定的初始化源码分析
Binding Registry
Binding Registry的初始化准备
Binding Registry 的初始化在 runtime 初始化时进行:
func NewDaprRuntime(runtimeConfig *Config, globalConfig *config.Configuration) *DaprRuntime {
......
bindingsRegistry: bindings_loader.NewRegistry(),
}
func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
......
a.bindingsRegistry.RegisterInputBindings(opts.inputBindings...)
a.bindingsRegistry.RegisterOutputBindings(opts.outputBindings...)
......
}
这些 opts 来自 runtime 启动时的配置,如 cmd/daprd/main.go 下:
func main() {
rt, err := runtime.FromFlags()
if err != nil {
log.Fatal(err)
}
err = rt.Run(
......
runtime.WithInputBindings(
bindings_loader.NewInput("aws.sqs", func() bindings.InputBinding {
return sqs.NewAWSSQS(logContrib)
}),
bindings_loader.NewInput("aws.kinesis", func() bindings.InputBinding {
return kinesis.NewAWSKinesis(logContrib)
}),
bindings_loader.NewInput("azure.eventhubs", func() bindings.InputBinding {
return eventhubs.NewAzureEventHubs(logContrib)
}),
bindings_loader.NewInput("kafka", func() bindings.InputBinding {
return kafka.NewKafka(logContrib)
}),
bindings_loader.NewInput("mqtt", func() bindings.InputBinding {
return mqtt.NewMQTT(logContrib)
}),
bindings_loader.NewInput("rabbitmq", func() bindings.InputBinding {
return bindings_rabbitmq.NewRabbitMQ(logContrib)
}),
bindings_loader.NewInput("azure.servicebusqueues", func() bindings.InputBinding {
return servicebusqueues.NewAzureServiceBusQueues(logContrib)
}),
bindings_loader.NewInput("azure.storagequeues", func() bindings.InputBinding {
return storagequeues.NewAzureStorageQueues(logContrib)
}),
bindings_loader.NewInput("gcp.pubsub", func() bindings.InputBinding {
return pubsub.NewGCPPubSub(logContrib)
}),
bindings_loader.NewInput("kubernetes", func() bindings.InputBinding {
return kubernetes.NewKubernetes(logContrib)
}),
bindings_loader.NewInput("azure.eventgrid", func() bindings.InputBinding {
return eventgrid.NewAzureEventGrid(logContrib)
}),
bindings_loader.NewInput("twitter", func() bindings.InputBinding {
return twitter.NewTwitter(logContrib)
}),
bindings_loader.NewInput("cron", func() bindings.InputBinding {
return cron.NewCron(logContrib)
}),
),
runtime.WithOutputBindings(
bindings_loader.NewOutput("aws.sqs", func() bindings.OutputBinding {
return sqs.NewAWSSQS(logContrib)
}),
bindings_loader.NewOutput("aws.sns", func() bindings.OutputBinding {
return sns.NewAWSSNS(logContrib)
}),
bindings_loader.NewOutput("aws.kinesis", func() bindings.OutputBinding {
return kinesis.NewAWSKinesis(logContrib)
}),
bindings_loader.NewOutput("azure.eventhubs", func() bindings.OutputBinding {
return eventhubs.NewAzureEventHubs(logContrib)
}),
bindings_loader.NewOutput("aws.dynamodb", func() bindings.OutputBinding {
return dynamodb.NewDynamoDB(logContrib)
}),
bindings_loader.NewOutput("azure.cosmosdb", func() bindings.OutputBinding {
return bindings_cosmosdb.NewCosmosDB(logContrib)
}),
bindings_loader.NewOutput("gcp.bucket", func() bindings.OutputBinding {
return bucket.NewGCPStorage(logContrib)
}),
bindings_loader.NewOutput("http", func() bindings.OutputBinding {
return http.NewHTTP(logContrib)
}),
bindings_loader.NewOutput("kafka", func() bindings.OutputBinding {
return kafka.NewKafka(logContrib)
}),
bindings_loader.NewOutput("mqtt", func() bindings.OutputBinding {
return mqtt.NewMQTT(logContrib)
}),
bindings_loader.NewOutput("rabbitmq", func() bindings.OutputBinding {
return bindings_rabbitmq.NewRabbitMQ(logContrib)
}),
bindings_loader.NewOutput("redis", func() bindings.OutputBinding {
return redis.NewRedis(logContrib)
}),
bindings_loader.NewOutput("aws.s3", func() bindings.OutputBinding {
return s3.NewAWSS3(logContrib)
}),
bindings_loader.NewOutput("azure.blobstorage", func() bindings.OutputBinding {
return blobstorage.NewAzureBlobStorage(logContrib)
}),
bindings_loader.NewOutput("azure.servicebusqueues", func() bindings.OutputBinding {
return servicebusqueues.NewAzureServiceBusQueues(logContrib)
}),
bindings_loader.NewOutput("azure.storagequeues", func() bindings.OutputBinding {
return storagequeues.NewAzureStorageQueues(logContrib)
}),
bindings_loader.NewOutput("gcp.pubsub", func() bindings.OutputBinding {
return pubsub.NewGCPPubSub(logContrib)
}),
bindings_loader.NewOutput("azure.signalr", func() bindings.OutputBinding {
return signalr.NewSignalR(logContrib)
}),
bindings_loader.NewOutput("twilio.sms", func() bindings.OutputBinding {
return sms.NewSMS(logContrib)
}),
bindings_loader.NewOutput("twilio.sendgrid", func() bindings.OutputBinding {
return sendgrid.NewSendGrid(logContrib)
}),
bindings_loader.NewOutput("azure.eventgrid", func() bindings.OutputBinding {
return eventgrid.NewAzureEventGrid(logContrib)
}),
bindings_loader.NewOutput("cron", func() bindings.OutputBinding {
return cron.NewCron(logContrib)
}),
bindings_loader.NewOutput("twitter", func() bindings.OutputBinding {
return twitter.NewTwitter(logContrib)
}),
bindings_loader.NewOutput("influx", func() bindings.OutputBinding {
return influx.NewInflux(logContrib)
}),
),
......
}
在这里配置各种 inputbinding 和 output binding的实现。
Binding Registry的实现方式
pkg/components/bindings/registry.go,定义了多个数据结构:
type (
// InputBinding is an input binding component definition.
InputBinding struct {
Name string
FactoryMethod func() bindings.InputBinding
}
// OutputBinding is an output binding component definition.
OutputBinding struct {
Name string
FactoryMethod func() bindings.OutputBinding
}
// Registry is the interface of a components that allows callers to get registered instances of input and output bindings
Registry interface {
RegisterInputBindings(components ...InputBinding)
RegisterOutputBindings(components ...OutputBinding)
CreateInputBinding(name string) (bindings.InputBinding, error)
CreateOutputBinding(name string) (bindings.OutputBinding, error)
}
bindingsRegistry struct {
inputBindings map[string]func() bindings.InputBinding
outputBindings map[string]func() bindings.OutputBinding
}
)
前面 runtime 初始化时,每个实现都通过 NewInput 方法和 NewOutput方法,将 name 和对应的InputBinding/OutputBinding关联起来:
// NewInput creates a InputBinding.
func NewInput(name string, factoryMethod func() bindings.InputBinding) InputBinding {
return InputBinding{
Name: name,
FactoryMethod: factoryMethod,
}
}
// NewOutput creates a OutputBinding.
func NewOutput(name string, factoryMethod func() bindings.OutputBinding) OutputBinding {
return OutputBinding{
Name: name,
FactoryMethod: factoryMethod,
}
}
RegisterInputBindings 和 RegisterOutputBindings 方法用来注册 input binding 和 output binding
的实现,在runtime 初始化时被调用:
// RegisterInputBindings registers one or more new input bindings.
func (b *bindingsRegistry) RegisterInputBindings(components ...InputBinding) {
for _, component := range components {
b.inputBindings[createFullName(component.Name)] = component.FactoryMethod
}
}
// RegisterOutputBindings registers one or more new output bindings.
func (b *bindingsRegistry) RegisterOutputBindings(components ...OutputBinding) {
for _, component := range components {
b.outputBindings[createFullName(component.Name)] = component.FactoryMethod
}
}
func createFullName(name string) string {
// createFullName统一增加前缀 bindings.
return fmt.Sprintf("bindings.%s", name)
}
binding的初始化流程
pkg/runtime/runtime.go :
Binding 的初始化在 runtime 初始化时进行:
func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
......
go a.processComponents()
......
}
func (a *DaprRuntime) processComponents() {
for {
comp, more := <-a.pendingComponents
if !more {
a.pendingComponentsDone <- true
return
}
if err := a.processOneComponent(comp); err != nil {
log.Errorf("process component %s error, %s", comp.Name, err)
}
}
}
processOneComponent:
func (a *DaprRuntime) processOneComponent(comp components_v1alpha1.Component) error {
res := a.preprocessOneComponent(&comp)
compCategory := a.figureOutComponentCategory(comp)
......
return nil
}
doProcessOneComponent:
func (a *DaprRuntime) doProcessOneComponent(category ComponentCategory, comp components_v1alpha1.Component) error {
switch category {
case bindingsComponent:
return a.initBinding(comp)
......
}
return nil
}
initBinding:
func (a *DaprRuntime) initBinding(c components_v1alpha1.Component) error {
if err := a.initOutputBinding(c); err != nil {
log.Errorf("failed to init output bindings: %s", err)
return err
}
if err := a.initInputBinding(c); err != nil {
log.Errorf("failed to init input bindings: %s", err)
return err
}
return nil
}
在这里进行 input binding 和 output binding 的初始化。
Output Binding的初始化
pkg/runtime/runtime.go:
func (a *DaprRuntime) initOutputBinding(c components_v1alpha1.Component) error {
// 成功
binding, err := a.bindingsRegistry.CreateOutputBinding(c.Spec.Type)
if err != nil {
log.Warnf("failed to create output binding %s (%s): %s", c.ObjectMeta.Name, c.Spec.Type, err)
diag.DefaultMonitoring.ComponentInitFailed(c.Spec.Type, "creation")
return err
}
if binding != nil {
err := binding.Init(bindings.Metadata{
Properties: a.convertMetadataItemsToProperties(c.Spec.Metadata),
Name: c.ObjectMeta.Name,
})
if err != nil {
log.Errorf("failed to init output binding %s (%s): %s", c.ObjectMeta.Name, c.Spec.Type, err)
diag.DefaultMonitoring.ComponentInitFailed(c.Spec.Type, "init")
return err
}
log.Infof("successful init for output binding %s (%s)", c.ObjectMeta.Name, c.Spec.Type)
a.outputBindings[c.ObjectMeta.Name] = binding
diag.DefaultMonitoring.ComponentInitialized(c.Spec.Type)
}
return nil
}
其中 CreateOutputBinding 方法的实现在 pkg/components/bindings/registry.go
中:
// Create instantiates an output binding based on `name`.
func (b *bindingsRegistry) CreateOutputBinding(name string) (bindings.OutputBinding, error) {
if method, ok := b.outputBindings[name]; ok {
// 调用 factory 方法生成具体实现的 outputBinding
return method(), nil
}
return nil, errors.Errorf("couldn't find output binding %s", name)
}
Input Binding的初始化
TODO
10.3 - 资源绑定的Redis output实现源码分析
备注:根据 https://github.com/dapr/docs/blob/master/concepts/bindings/README.md 的描述,redis 只实现了 output binding。
output binding 的实现
Redis的实现在 dapr/components-contrib 下,/bindings/redis/redis.go 中:
func (r *Redis) Operations() []bindings.OperationKind {
// 只支持create
return []bindings.OperationKind{bindings.CreateOperation}
}
func (r *Redis) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
// 通过 metadata 传递 key
if val, ok := req.Metadata["key"]; ok && val != "" {
key := val
// 调用标准 redis 客户端,执行 SET 命令
_, err := r.client.DoContext(context.Background(), "SET", key, req.Data).Result()
if err != nil {
return nil, err
}
return nil, nil
}
return nil, errors.New("redis binding: missing key on write request metadata")
}
完整分析
初始化:
在 dapr runtime 初始化时,关联 redis 的 output binding实现:
bindings_loader.NewOutput("redis", func() bindings.OutputBinding {
return redis.NewRedis(logContrib)
}),
然后 Init 方法会在 output binding初始化时被 dapr runtime 调用,Redis的实现内容为:
// Init performs metadata parsing and connection creation
func (r *Redis) Init(meta bindings.Metadata) error {
// 解析metadata
m, err := r.parseMetadata(meta)
if err != nil {
return err
}
// redis 连接属性
opts := &redis.Options{
Addr: m.host,
Password: m.password,
DB: defaultDB,
MaxRetries: m.maxRetries,
MaxRetryBackoff: m.maxRetryBackoff,
}
/* #nosec */
if m.enableTLS {
opts.TLSConfig = &tls.Config{
InsecureSkipVerify: m.enableTLS,
}
}
// 建立redis连接
r.client = redis.NewClient(opts)
_, err = r.client.Ping().Result()
if err != nil {
return fmt.Errorf("redis binding: error connecting to redis at %s: %s", m.host, err)
}
return err
}
10.4 - 资源绑定的output处理源码分析
pkc/grpc/api.go
中的 InvokeBinding 方法:
func (a *api) InvokeBinding(ctx context.Context, in *runtimev1pb.InvokeBindingRequest) (*runtimev1pb.InvokeBindingResponse, error) {
req := &bindings.InvokeRequest{
Metadata: in.Metadata,
Operation: bindings.OperationKind(in.Operation),
}
if in.Data != nil {
req.Data = in.Data
}
r := &runtimev1pb.InvokeBindingResponse{}
// 关键实现在这里
resp, err := a.sendToOutputBindingFn(in.Name, req)
if err != nil {
err = fmt.Errorf("ERR_INVOKE_OUTPUT_BINDING: %s", err)
apiServerLogger.Debug(err)
return r, err
}
if resp != nil {
r.Data = resp.Data
r.Metadata = resp.Metadata
}
return r, nil
}
sendToOutputBindingFn 方法的初始化在这里:
func (a *DaprRuntime) getGRPCAPI() grpc.API {
return grpc.NewAPI(a.runtimeConfig.ID, a.appChannel, a.stateStores, a.secretStores, a.getPublishAdapter(), a.directMessaging, a.actor, a.sendToOutputBinding, a.globalConfig.Spec.TracingSpec)
}
sendToOutputBinding 方法的实现在 pkg/runtime/runtime.go
:
func (a *DaprRuntime) sendToOutputBinding(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
if req.Operation == "" {
return nil, errors.New("operation field is missing from request")
}
// 根据 name 找已经注册好的 binding
if binding, ok := a.outputBindings[name]; ok {
ops := binding.Operations()
for _, o := range ops {
// 找到改 binding 下支持的 operation
if o == req.Operation {
// 关键代码,需要转到具体的实现了
return binding.Invoke(req)
}
}
supported := make([]string, len(ops))
for _, o := range ops {
supported = append(supported, string(o))
}
return nil, errors.Errorf("binding %s does not support operation %s. supported operations:%s", name, req.Operation, strings.Join(supported, " "))
}
return nil, errors.Errorf("couldn't find output binding %s", name)
}
10.5 - 资源绑定的Metadata总结
总结一下各种binding实现中 metadata 的设计和使用:
实现 | 配置级别的metadata | 请求级别的metadata |
---|---|---|
alicloud oss | key | |
HTTP | url / method | 无 |
cron | schedule | 无 |
MQTT | url / topic | 无 |
RabbitMQ | host / queueName / durable deleteWhenUnused / prefetchCount |
ttlInSeconds |
Redis | host / password / enableTLS / maxRetries / maxRetryBackoff |
key |
Influx | url / token / org / bucket | 无 |
Kafka | brokers / topics / publishTopic consumerGroup / authRequried saslUsername / saslPassword |
key |
Kubernetes | namespace / resyncPeriodInSec / | 无 |
twilio-sendgrid | apiKey / emailFrom / emailTo subject / emailCc / emailBcc |
emailFrom / emailTo / subject emailCc / emailBcc |
twilio-sms | toNumber / fromNumber / accountSid authToken / timeout |
toNumber |
consumerKey / consumerSecret / accessToken accessSecret / query |
query / lang / result / since_id | |
gcp-bucket | bucket / type / project_id / private_key_id private_key / client_email / client_id auth_uri / token_uri auth_provider_x509_cert_url / client_x509_cert_url |
name |
gcp-pubsub | topic / subscription / type / project_id / private_key_id / private_key client_email / client_id / auth_uri / token_uri auth_provider_x509_cert_url / client_x509_cert_url |
topic |
Azure-blobstorage | storageAccount / storageAccessKey / container | blobName / ContentType / ContentMD5 ContentEncoding / ContentLanguage ContentDisposition / CacheControl |
Azure-cosmosDB | url / masterKey / database / collection / partitionKey |
无 |
Azure-EventGrid | tenantId / subscriptionId / clientId clientSecret / subscriberEndpoint handshakePort / scope eventSubscriptionName / accessKey topicEndpoint |
无 |
Azure-EventHubs | connection / consumerGroup / storageAccountName / storageAccountKey / storageContainerName partitionID / partitionKey |
partitionKey |
Azure-ServiceBusQueues | connectionString / queueName / ttl | id / correlationID / ttlInSeconds |
Azure-SignalR | connectionString / hub | hub / group / user |
Azure-storagequeue | ttlInSeconds | |
Aws-dynamodb | region / endpoint / accessKey secretKey / table |
无 |
Aws-kinesis | streamName / consumerName / region endpoint / accessKey secretKey / mode |
partitionKey |
Aws-s3 | region / endpoint / accessKey secretKey / bucket |
key |
Aws-sns | topicArn / region / endpoint accessKey / secretKey |
无 |
Aws-sqs | queueName / region / endpoint accessKey / secretKey |
无 |
11 - Injector的源码分析
11.1 - Injector的代码实现
Inject的流程
以e2e中的 stateapp 为例。
应用的原始Deployment
tests/apps/stateapp/service.yaml
中是 stateapp 的 Service 定义和 Deployment定义。
Service的定义没有什么特殊:
kind: Service
apiVersion: v1
metadata:
name: stateapp
labels:
testapp: stateapp
spec:
selector:
testapp: stateapp
ports:
- protocol: TCP
port: 80
targetPort: 3000
type: LoadBalancer
deployment的定义:
apiVersion: apps/v1
kind: Deployment
metadata:
name: stateapp
labels:
testapp: stateapp
spec:
replicas: 1
selector:
matchLabels:
testapp: stateapp
template: # stateapp的pod定义
metadata:
labels:
testapp: stateapp
annotations:
dapr.io/enabled: "true"
dapr.io/app-id: "stateapp"
dapr.io/app-port: "3000"
spec: #stateapp的container定义,暂时pod中只定义了这个一个container
containers:
- name: stateapp
image: docker.io/YOUR_DOCKER_ALIAS/e2e-stateapp:dev
ports:
- containerPort: 3000
imagePullPolicy: Always
单独看 stateapp 的 pod 定义的 annotations ,
annotations:
dapr.io/enabled: "true"
dapr.io/app-id: "stateapp"
dapr.io/app-port: "3000"
源码
getPodPatchOperations:
func (i *injector) getPodPatchOperations(ar *v1beta1.AdmissionReview,
namespace, image string, kubeClient *kubernetes.Clientset, daprClient scheme.Interface) ([]PatchOperation, error) {
req := ar.Request
var pod corev1.Pod
if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
errors.Wrap(err, "could not unmarshal raw object")
return nil, err
}
log.Infof(
"AdmissionReview for Kind=%v, Namespace=%v Name=%v (%v) UID=%v "+
"patchOperation=%v UserInfo=%v",
req.Kind,
req.Namespace,
req.Name,
pod.Name,
req.UID,
req.Operation,
req.UserInfo,
)
if !isResourceDaprEnabled(pod.Annotations) || podContainsSidecarContainer(&pod) {
return nil, nil
}
...
这个info日志打印的例子如下:
{"instance":"dapr-sidecar-injector-5f6f4bb6df-n5dsk","level":"info","msg":"AdmissionReview for Kind=/v1, Kind=Pod, Namespace=dapr-tests Name= () UID=d0126a13-9efd-432e-894a-5ddbee55898c patchOperation=CREATE UserInfo={system:serviceaccount:kube-system:replicaset-controller 3e5de149-07a3-434e-a8de-209abee69760 [system:serviceaccounts system:serviceaccounts:kube-system system:authenticated] map[]}","scope":"dapr.injector","time":"2020-09-25T07:07:07.6482457Z","type":"log","ver":"edge"}
可以看到在 namespace dapr-tests 下 pod 有 CREATE operation时Injector有开始工作。
isResourceDaprEnabled(pod.Annotations)
检查是否是 dapr,判断的方式是看 pod 是否有名为dapr.io/enabled
的 annotation并且设置为true,缺省为false:
const (
daprEnabledKey = "dapr.io/enabled"
)
func isResourceDaprEnabled(annotations map[string]string) bool {
return getBoolAnnotationOrDefault(annotations, daprEnabledKey, false)
}
podContainsSidecarContainer 检查 pod 是不是已经包含 dapr的sidecar,判断的方式是看 container 的名字是不是 daprd
:
const (
sidecarContainerName = "daprd"
)
func podContainsSidecarContainer(pod *corev1.Pod) bool {
for _, c := range pod.Spec.Containers {
if c.Name == sidecarContainerName {
return true
}
}
return false
}
继续getPodPatchOperations():
id := getAppID(pod)
// Keep DNS resolution outside of getSidecarContainer for unit testing.
placementAddress := fmt.Sprintf("%s:80", getKubernetesDNS(placementService, namespace))
sentryAddress := fmt.Sprintf("%s:80", getKubernetesDNS(sentryService, namespace))
apiSrvAddress := fmt.Sprintf("%s:80", getKubernetesDNS(apiAddress, namespace))
getAppID(pod) 通过读取 annotation 来获取应用id,注意 “dapr.io/id” 已经废弃,1.0 之后将被删除,替换为dapr.io/app-id":
const (
appIDKey = "dapr.io/app-id"
// Deprecated, remove in v1.0
idKey = "dapr.io/id"
)
func getAppID(pod corev1.Pod) string {
id := getStringAnnotationOrDefault(pod.Annotations, appIDKey, "")
if id != "" {
return id
}
return getStringAnnotationOrDefault(pod.Annotations, idKey, pod.GetName())
}
mtlsEnabled的判断
var trustAnchors string
var certChain string
var certKey string
var identity string
mtlsEnabled := mTLSEnabled(daprClient)
if mtlsEnabled {
trustAnchors, certChain, certKey = getTrustAnchorsAndCertChain(kubeClient, namespace)
identity = fmt.Sprintf("%s:%s", req.Namespace, pod.Spec.ServiceAccountName)
}
mTLSEnabled判断的方式,居然是读取所有的namespace下的dapr configuration:
const (
// NamespaceAll is the default argument to specify on a context when you want to list or filter resources across all namespaces
NamespaceAll string = ""
)
func mTLSEnabled(daprClient scheme.Interface) bool {
resp, err := daprClient.ConfigurationV1alpha1().Configurations(meta_v1.NamespaceAll).List(meta_v1.ListOptions{})
if err != nil {
return defaultMtlsEnabled
}
for _, c := range resp.Items {
if c.GetName() == defaultConfig { // "daprsystem"
return c.Spec.MTLSSpec.Enabled
}
}
return defaultMtlsEnabled
}
通过读取k8s的资源来判断是否要开启 mtls,tests/config/dapr_mtls_off_config.yaml
有example内容:
apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
name: daprsystem # 名字一定要是 daprsystem
spec:
mtls:
enabled: "false" # 在这里配置要不要开启 mtls
workloadCertTTL: "1h"
allowedClockSkew: "20m"
但这个坑货
E0925 09:37:53.480772 1 reflector.go:153] sigs.k8s.io/controller-runtime/pkg/cache/internal/informers_map.go:224: Failed to list *v1alpha1.Configuration: v1alpha1.ConfigurationList.Items: []v1alpha1.Configuration: v1alpha1.Configuration.Spec: v1alpha1.ConfigurationSpec.MTLSSpec: v1alpha1.MTLSSpec.Enabled: ReadBool: expect t or f, but found ", error found in #10 byte of ...|enabled":"false","wo|..., bigger context ...|pec":{"mtls":{"allowedClockSkew":"20m","enabled":"false","workloadCertTTL":"1h"}}},{"apiVersion":"da|...
生效的应用pod定义
apiVersion: v1
kind: Pod
metadata:
annotations:
dapr.io/app-id: stateapp
dapr.io/app-port: "3000"
dapr.io/enabled: "true"
dapr.io/sidecar-cpu-limit: "4.0"
dapr.io/sidecar-cpu-request: "0.5"
dapr.io/sidecar-memory-limit: 512Mi
dapr.io/sidecar-memory-request: 250Mi
creationTimestamp: "2020-09-25T07:07:07Z"
generateName: stateapp-567b6b9c6f-
labels:
pod-template-hash: 567b6b9c6f
testapp: stateapp
name: stateapp-567b6b9c6f-84kzb
namespace: dapr-tests
ownerReferences:
- apiVersion: apps/v1
blockOwnerDeletion: true
controller: true
kind: ReplicaSet
name: stateapp-567b6b9c6f
uid: 25a34367-79ed-4e19-868a-5b063a45b1f4
resourceVersion: "146616"
selfLink: /api/v1/namespaces/dapr-tests/pods/stateapp-567b6b9c6f-84kzb
uid: 0f4060df-0312-4d73-91c1-6f085462b33d
spec:
affinity:
nodeAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
nodeSelectorTerms:
- matchExpressions:
- key: kubernetes.io/os
operator: In
values:
- linux
- key: kubernetes.io/arch
operator: In
values:
- amd64
containers:
- env:
- name: DAPR_HTTP_PORT
value: "3500"
- name: DAPR_GRPC_PORT
value: "50001"
image: docker.io/skyao/e2e-stateapp:dev-linux-amd64
imagePullPolicy: Always
name: stateapp
ports:
- containerPort: 3000
name: http
protocol: TCP
resources: {}
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /var/run/secrets/kubernetes.io/serviceaccount
name: default-token-qncjc
readOnly: true
- args:
- --mode
- kubernetes
- --dapr-http-port
- "3500"
- --dapr-grpc-port
- "50001"
- --dapr-internal-grpc-port
- "50002"
- --app-port
- "3000"
- --app-id
- stateapp
- --control-plane-address
- dapr-api.dapr-system.svc.cluster.local:80
- --app-protocol
- http
- --placement-host-address
- dapr-placement.dapr-system.svc.cluster.local:80
- --config
- ""
- --log-level
- info
- --app-max-concurrency
- "-1"
- --sentry-address
- dapr-sentry.dapr-system.svc.cluster.local:80
- --metrics-port
- "9090"
- --enable-mtls
command:
- /daprd
env:
- name: DAPR_HOST_IP
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: status.podIP
- name: NAMESPACE
value: dapr-tests
- name: DAPR_TRUST_ANCHORS
value: |
-----BEGIN CERTIFICATE-----
MIIB3TCCAYKgAwIBAgIRAMra+wjgMY6ABDtu3/vJ0NcwCgYIKoZIzj0EAwIwMTEX
MBUGA1UEChMOZGFwci5pby9zZW50cnkxFjAUBgNVBAMTDWNsdXN0ZXIubG9jYWww
HhcNMjAwOTI1MDU1ODAzWhcNMjEwOTI1MDU1ODAzWjAxMRcwFQYDVQQKEw5kYXBy
LmlvL3NlbnRyeTEWMBQGA1UEAxMNY2x1c3Rlci5sb2NhbDBZMBMGByqGSM49AgEG
CCqGSM49AwEHA0IABE/w/8YBtRJPYNJkcDM05e9PhrbGjBU/RQd09J909OJebDe8
rthysygWrcGYHYKziKK2Pyc1j4ua2xklLC5DFEWjezB5MA4GA1UdDwEB/wQEAwIC
BDAdBgNVHSUEFjAUBggrBgEFBQcDAQYIKwYBBQUHAwIwDwYDVR0TAQH/BAUwAwEB
/zAdBgNVHQ4EFgQUQ2v6OiayM9V4DPAU6UZHGe/nc1swGAYDVR0RBBEwD4INY2x1
c3Rlci5sb2NhbDAKBggqhkjOPQQDAgNJADBGAiEAtVBx9vDXiRE3fXJTU2yK11W5
eo+Ce4+U6/vXDtzw4PUCIQDlLOB45ihOAhhLVLG9akhgwJOrgZLEW3FZjRabpSsb
og==
-----END CERTIFICATE-----
- name: DAPR_CERT_CHAIN
value: |
-----BEGIN CERTIFICATE-----
MIIBxDCCAWqgAwIBAgIQQ1sfEH4aYacFZwBau+aOozAKBggqhkjOPQQDAjAxMRcw
FQYDVQQKEw5kYXByLmlvL3NlbnRyeTEWMBQGA1UEAxMNY2x1c3Rlci5sb2NhbDAe
Fw0yMDA5MjUwNTU4MDNaFw0yMTA5MjUwNTU4MDNaMBgxFjAUBgNVBAMTDWNsdXN0
ZXIubG9jYWwwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAARhj7MQ1uiOkZvJ0AYV
uiFca/Iu9D5O98E5JN1mjCohRawk+QT1PjW05YtmyVji4Tt6ckIMvOXwG3aoTsGO
UbRio30wezAOBgNVHQ8BAf8EBAMCAQYwDwYDVR0TAQH/BAUwAwEB/zAdBgNVHQ4E
FgQUTPUh0WWBB5baKs3aJjMzInVLX/EwHwYDVR0jBBgwFoAUQ2v6OiayM9V4DPAU
6UZHGe/nc1swGAYDVR0RBBEwD4INY2x1c3Rlci5sb2NhbDAKBggqhkjOPQQDAgNI
ADBFAiBO0oCadeYyLM+RkSAYPSTtjMyEZ0wv1/BsWuUMg+KZ6AIhALHnT0pxiqlj
miYT4WZWvaBc17AbUh1efgV2DAaNKm54
-----END CERTIFICATE-----
- name: DAPR_CERT_KEY
value: |
-----BEGIN EC PRIVATE KEY-----
MHcCAQEEIDj6niLJ5ep+fDdY71bKyWl9RZHrXyRjND6pWySL2Q4UoAoGCCqGSM49
AwEHoUQDQgAEYY+zENbojpGbydAGFbohXGvyLvQ+TvfBOSTdZowqIUWsJPkE9T41
tOWLZslY4uE7enJCDLzl8Bt2qE7BjlG0Yg==
-----END EC PRIVATE KEY-----
- name: SENTRY_LOCAL_IDENTITY
value: default:dapr-tests
image: docker.io/skyao/daprd:dev-linux-amd64
imagePullPolicy: Always
livenessProbe:
failureThreshold: 3
httpGet:
path: /v1.0/healthz
port: 3500
scheme: HTTP
initialDelaySeconds: 3
periodSeconds: 6
successThreshold: 1
timeoutSeconds: 3
name: daprd
ports:
- containerPort: 3500
name: dapr-http
protocol: TCP
- containerPort: 50001
name: dapr-grpc
protocol: TCP
- containerPort: 50002
name: dapr-internal
protocol: TCP
- containerPort: 9090
name: dapr-metrics
protocol: TCP
readinessProbe:
failureThreshold: 3
httpGet:
path: /v1.0/healthz
port: 3500
scheme: HTTP
initialDelaySeconds: 3
periodSeconds: 6
successThreshold: 1
timeoutSeconds: 3
resources:
limits:
cpu: "4"
memory: 512Mi
requests:
cpu: 500m
memory: 250Mi
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /var/run/secrets/kubernetes.io/serviceaccount
name: default-token-qncjc
readOnly: true
dnsPolicy: ClusterFirst
enableServiceLinks: true
nodeName: docker-desktop
priority: 0
restartPolicy: Always
schedulerName: default-scheduler
securityContext: {}
serviceAccount: default
serviceAccountName: default
terminationGracePeriodSeconds: 30
tolerations:
- effect: NoExecute
key: node.kubernetes.io/not-ready
operator: Exists
tolerationSeconds: 300
- effect: NoExecute
key: node.kubernetes.io/unreachable
operator: Exists
tolerationSeconds: 300
volumes:
- name: default-token-qncjc
secret:
defaultMode: 420
secretName: default-token-qncjc
status:
conditions:
- lastProbeTime: null
lastTransitionTime: "2020-09-25T07:07:07Z"
status: "True"
type: Initialized
- lastProbeTime: null
lastTransitionTime: "2020-09-25T07:07:07Z"
message: 'containers with unready status: [daprd]'
reason: ContainersNotReady
status: "False"
type: Ready
- lastProbeTime: null
lastTransitionTime: "2020-09-25T07:07:07Z"
message: 'containers with unready status: [daprd]'
reason: ContainersNotReady
status: "False"
type: ContainersReady
- lastProbeTime: null
lastTransitionTime: "2020-09-25T07:07:07Z"
status: "True"
type: PodScheduled
containerStatuses:
- containerID: docker://26a1d85ac6e2accd833832681b8dc2aa809e3c0fcfa293398bd5e7c2e8bf3e2b
image: skyao/daprd:dev-linux-amd64
imageID: docker-pullable://skyao/daprd@sha256:387f3bf4e7397c43dca9ac2d248a9ce790b1c1888aa0d6de3b07107ce124752f
lastState:
terminated:
containerID: docker://26a1d85ac6e2accd833832681b8dc2aa809e3c0fcfa293398bd5e7c2e8bf3e2b
exitCode: 1
finishedAt: "2020-09-25T08:03:14Z"
reason: Error
startedAt: "2020-09-25T08:03:04Z"
name: daprd
ready: false
restartCount: 21
started: false
state:
waiting:
message: back-off 5m0s restarting failed container=daprd pod=stateapp-567b6b9c6f-84kzb_dapr-tests(0f4060df-0312-4d73-91c1-6f085462b33d)
reason: CrashLoopBackOff
- containerID: docker://737745ace04213c9519ad1f91e248015c89a80e2b3d61081c3c530d1c89bdbae
image: skyao/e2e-stateapp:dev-linux-amd64
imageID: docker-pullable://skyao/e2e-stateapp@sha256:16351b331f1338a61348c9a87fce43728369f1bf18ee69d9d45fb13db0283644
lastState: {}
name: stateapp
ready: true
restartCount: 0
started: true
state:
running:
startedAt: "2020-09-25T07:07:24Z"
hostIP: 192.168.65.3
phase: Running
podIP: 10.1.0.194
podIPs:
- ip: 10.1.0.194
qosClass: Burstable
startTime: "2020-09-25T07:07:07Z"
其他
injector自身的pod定义
dapr-sidecar-injector
apiVersion: v1
kind: Pod
metadata:
annotations:
prometheus.io/path: /
prometheus.io/port: "9090"
prometheus.io/scrape: "true"
creationTimestamp: "2020-09-25T05:57:37Z"
generateName: dapr-sidecar-injector-5f6f4bb6df-
labels:
app: dapr-sidecar-injector
app.kubernetes.io/component: sidecar-injector
app.kubernetes.io/managed-by: helm
app.kubernetes.io/name: dapr
app.kubernetes.io/part-of: dapr
app.kubernetes.io/version: dev-linux-amd64
pod-template-hash: 5f6f4bb6df
name: dapr-sidecar-injector-5f6f4bb6df-n5dsk
namespace: dapr-system
ownerReferences:
- apiVersion: apps/v1
blockOwnerDeletion: true
controller: true
kind: ReplicaSet
name: dapr-sidecar-injector-5f6f4bb6df
uid: ff47b1df-6da7-4a19-b99d-15622ca3a485
resourceVersion: "133143"
selfLink: /api/v1/namespaces/dapr-system/pods/dapr-sidecar-injector-5f6f4bb6df-n5dsk
uid: 40df3834-4df2-495a-aa26-5b2a22de7639
spec:
affinity:
nodeAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- preference:
matchExpressions:
- key: kubernetes.io/os
operator: In
values:
- linux
weight: 1
containers:
- args:
- --log-level
- info
- --log-as-json
- --metrics-port
- "9090"
command:
- /injector
env:
- name: TLS_CERT_FILE
value: /dapr/cert/tls.crt
- name: TLS_KEY_FILE
value: /dapr/cert/tls.key
- name: SIDECAR_IMAGE
value: docker.io/skyao/daprd:dev-linux-amd64
- name: NAMESPACE
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.namespace
image: docker.io/skyao/dapr:dev-linux-amd64
imagePullPolicy: Always
livenessProbe:
failureThreshold: 5
httpGet:
path: /healthz
port: 8080
scheme: HTTP
initialDelaySeconds: 3
periodSeconds: 3
successThreshold: 1
timeoutSeconds: 1
name: dapr-sidecar-injector
ports:
- containerPort: 4000
name: https
protocol: TCP
- containerPort: 9090
name: metrics
protocol: TCP
readinessProbe:
failureThreshold: 5
httpGet:
path: /healthz
port: 8080
scheme: HTTP
initialDelaySeconds: 3
periodSeconds: 3
successThreshold: 1
timeoutSeconds: 1
resources: {}
securityContext:
runAsUser: 1000
terminationMessagePath: /dev/termination-log
terminationMessagePolicy: File
volumeMounts:
- mountPath: /dapr/cert
name: cert
readOnly: true
- mountPath: /var/run/secrets/kubernetes.io/serviceaccount
name: dapr-operator-token-lgpvc
readOnly: true
dnsPolicy: ClusterFirst
enableServiceLinks: true
nodeName: docker-desktop
priority: 0
restartPolicy: Always
schedulerName: default-scheduler
securityContext: {}
serviceAccount: dapr-operator
serviceAccountName: dapr-operator
terminationGracePeriodSeconds: 30
tolerations:
- effect: NoExecute
key: node.kubernetes.io/not-ready
operator: Exists
tolerationSeconds: 300
- effect: NoExecute
key: node.kubernetes.io/unreachable
operator: Exists
tolerationSeconds: 300
volumes:
- name: cert
secret:
defaultMode: 420
secretName: dapr-sidecar-injector-cert
- name: dapr-operator-token-lgpvc
secret:
defaultMode: 420
secretName: dapr-operator-token-lgpvc
status:
conditions:
- lastProbeTime: null
lastTransitionTime: "2020-09-25T05:57:37Z"
status: "True"
type: Initialized
- lastProbeTime: null
lastTransitionTime: "2020-09-25T05:58:10Z"
status: "True"
type: Ready
- lastProbeTime: null
lastTransitionTime: "2020-09-25T05:58:10Z"
status: "True"
type: ContainersReady
- lastProbeTime: null
lastTransitionTime: "2020-09-25T05:57:37Z"
status: "True"
type: PodScheduled
containerStatuses:
- containerID: docker://a820646b468a07eabdd89ca133f062a93e85256afc6c19c1bdf13b56980ec5e9
image: skyao/dapr:dev-linux-amd64
imageID: docker-pullable://skyao/dapr@sha256:77003eee9fd02d9fc24c2e9f385a6c86223bc35915cede98a8897c0dfc51ee61
lastState: {}
name: dapr-sidecar-injector
ready: true
restartCount: 0
started: true
state:
running:
startedAt: "2020-09-25T05:58:06Z"
hostIP: 192.168.65.3
phase: Running
podIP: 10.1.0.188
podIPs:
- ip: 10.1.0.188
qosClass: BestEffort
startTime: "2020-09-25T05:57:37Z"
11.2 - main.go的源码学习
Dapr injector 中的 main.go 文件的源码分析。
init() 方法
init() 进行初始化,包括 flag (logger, metric),
flag 设定和读取
func init() {
loggerOptions := logger.DefaultOptions()
// 这里设定了 `log-level` 和 `log-as-json`
loggerOptions.AttachCmdFlags(flag.StringVar, flag.BoolVar)
metricsExporter := metrics.NewExporter(metrics.DefaultMetricNamespace)
// 这里设定了 `metrics-port` 和 `enable-metrics`
metricsExporter.Options().AttachCmdFlags(flag.StringVar, flag.BoolVar)
flag.Parse()
参考 injector pod yaml文件中 Command 段:
Command:
/injector
Args:
--log-level
info
--log-as-json
--enable-metrics
--metrics-port
9090
初始化 logger
// Apply options to all loggers
if err := logger.ApplyOptionsToLoggers(&loggerOptions); err != nil {
log.Fatal(err)
} else {
log.Infof("log level set to: %s", loggerOptions.OutputLevel)
}
初始化 metrics
// Initialize dapr metrics exporter
if err := metricsExporter.Init(); err != nil {
log.Fatal(err)
}
// Initialize injector service metrics
if err := monitoring.InitMetrics(); err != nil {
log.Fatal(err)
}
main() 方法
获取配置
从环境变量中读取配置:
func main() {
logger.DaprVersion = version.Version()
log.Infof("starting Dapr Sidecar Injector -- version %s -- commit %s", version.Version(), version.Commit())
ctx := signals.Context()
cfg, err := injector.GetConfigFromEnvironment()
if err != nil {
log.Fatalf("error getting config: %s", err)
}
......
}
获取daprClient
kubeClient := utils.GetKubeClient()
conf := utils.GetConfig()
daprClient, _ := scheme.NewForConfig(conf)
启动 healthz server
go func() {
healthzServer := health.NewServer(log)
healthzServer.Ready()
healthzErr := healthzServer.Run(ctx, healthzPort)
if healthzErr != nil {
log.Fatalf("failed to start healthz server: %s", healthzErr)
}
}()
service account
uids, err := injector.AllowedControllersServiceAccountUID(ctx, kubeClient)
if err != nil {
log.Fatalf("failed to get authentication uids from services accounts: %s", err)
}
创建 injector
injector.NewInjector(uids, cfg, daprClient, kubeClient).Run(ctx)
graceful shutdown
简单的sleep 5秒作为 graceful shutdown :
shutdownDuration := 5 * time.Second
log.Infof("allowing %s for graceful shutdown to complete", shutdownDuration)
<-time.After(shutdownDuration)
11.3 - config.go的源码学习
Dapr injector package中的 config.go 文件的源码分析。
代码实现
Config 结构体定义
Injector 相关的配置项定义:
// Config represents configuration options for the Dapr Sidecar Injector webhook server
type Config struct {
TLSCertFile string `envconfig:"TLS_CERT_FILE" required:"true"`
TLSKeyFile string `envconfig:"TLS_KEY_FILE" required:"true"`
SidecarImage string `envconfig:"SIDECAR_IMAGE" required:"true"`
SidecarImagePullPolicy string `envconfig:"SIDECAR_IMAGE_PULL_POLICY"`
Namespace string `envconfig:"NAMESPACE" required:"true"`
}
NewConfigWithDefaults() 方法
只设置了一个 SidecarImagePullPolicy 的默认值:
func NewConfigWithDefaults() Config {
return Config{
SidecarImagePullPolicy: "Always",
}
}
这个方法只被下面的 GetConfigFromEnvironment() 方法调用。
GetConfigFromEnvironment() 方法
从环境中获取配置
func GetConfigFromEnvironment() (Config, error) {
c := NewConfigWithDefaults()
err := envconfig.Process("", &c)
return c, err
}
envconfig.Process() 的代码实现会通过反射读取到 Config 结构体的信息,然后根据设定的环境变量名来读取。
这个方法的调用只有一个地方,在injector main 函数的开始位置:
func main() {
log.Infof("starting Dapr Sidecar Injector -- version %s -- commit %s", version.Version(), version.Commit())
ctx := signals.Context()
cfg, err := injector.GetConfigFromEnvironment()
if err != nil {
log.Fatalf("error getting config: %s", err)
}
......
}
通过命令如 k describe pod dapr-sidecar-injector-6f656b7dd-sg87p -n dapr-system
拿到 injector pod 的yaml 文件,可以看到 Environment 的这一段:
Environment:
TLS_CERT_FILE: /dapr/cert/tls.crt
TLS_KEY_FILE: /dapr/cert/tls.key
SIDECAR_IMAGE: docker.io/skyao/daprd:dev-linux-amd64
SIDECAR_IMAGE_PULL_POLICY: IfNotPresent
NAMESPACE: dapr-system (v1:metadata.namespace)
injector yaml 备用
以下是完整的 injector pod yaml,留着备用:
Name: dapr-sidecar-injector-6f656b7dd-sg87p
Namespace: dapr-system
Priority: 0
Node: docker-desktop/192.168.65.3
Start Time: Mon, 19 Apr 2021 15:04:07 +0800
Labels: app=dapr-sidecar-injector
app.kubernetes.io/component=sidecar-injector
app.kubernetes.io/managed-by=helm
app.kubernetes.io/name=dapr
app.kubernetes.io/part-of=dapr
app.kubernetes.io/version=dev-linux-amd64
pod-template-hash=6f656b7dd
Annotations: prometheus.io/path: /
prometheus.io/port: 9090
prometheus.io/scrape: true
Status: Running
IP: 10.1.2.162
IPs:
IP: 10.1.2.162
Controlled By: ReplicaSet/dapr-sidecar-injector-6f656b7dd
Containers:
dapr-sidecar-injector:
Container ID: docker://544dabf00bdaba9cf8f320218dd0b7e6d2ebce7fbf5184ce162d58bc693162d9
Image: docker.io/skyao/dapr:dev-linux-amd64
Image ID: docker-pullable://skyao/dapr@sha256:b4843ee78eabf014e15749bc4daa5c249ce3d33f796a89aaba9d117dd3dc76c9
Ports: 4000/TCP, 9090/TCP
Host Ports: 0/TCP, 0/TCP
Command:
/injector
Args:
--log-level
info
--log-as-json
--enable-metrics
--metrics-port
9090
State: Running
Started: Mon, 19 Apr 2021 15:04:08 +0800
Ready: True
Restart Count: 0
Liveness: http-get http://:8080/healthz delay=3s timeout=1s period=3s #success=1 #failure=5
Readiness: http-get http://:8080/healthz delay=3s timeout=1s period=3s #success=1 #failure=5
Environment:
TLS_CERT_FILE: /dapr/cert/tls.crt
TLS_KEY_FILE: /dapr/cert/tls.key
SIDECAR_IMAGE: docker.io/skyao/daprd:dev-linux-amd64
SIDECAR_IMAGE_PULL_POLICY: IfNotPresent
NAMESPACE: dapr-system (v1:metadata.namespace)
Mounts:
/dapr/cert from cert (ro)
/var/run/secrets/kubernetes.io/serviceaccount from dapr-operator-token-cjpnd (ro)
Conditions:
Type Status
Initialized True
Ready True
ContainersReady True
PodScheduled True
Volumes:
cert:
Type: Secret (a volume populated by a Secret)
SecretName: dapr-sidecar-injector-cert
Optional: false
dapr-operator-token-cjpnd:
Type: Secret (a volume populated by a Secret)
SecretName: dapr-operator-token-cjpnd
Optional: false
QoS Class: BestEffort
Node-Selectors: <none>
Tolerations: node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Scheduled 17m default-scheduler Successfully assigned dapr-system/dapr-sidecar-injector-6f656b7dd-sg87p to docker-desktop
Normal Pulled 17m kubelet Container image "docker.io/skyao/dapr:dev-linux-amd64" already present on machine
Normal Created 17m kubelet Created container dapr-sidecar-injector
Normal Started 17m kubelet Started container dapr-sidecar-injector
11.4 - injector.go的源码学习
主流程代码
接口和结构体定义和创建
Injector 是Dapr运行时 sidecar 注入组件的接口。
// Injector is the interface for the Dapr runtime sidecar injection component
type Injector interface {
Run(ctx context.Context)
}
injector 结构体定义:
type injector struct {
config Config
deserializer runtime.Decoder
server *http.Server
kubeClient *kubernetes.Clientset
daprClient scheme.Interface
authUIDs []string
}
创建新的 injector 结构体(这个方法在injecot的main方法中被调用):
// NewInjector returns a new Injector instance with the given config
func NewInjector(authUIDs []string, config Config, daprClient scheme.Interface, kubeClient *kubernetes.Clientset) Injector {
mux := http.NewServeMux()
i := &injector{
config: config,
deserializer: serializer.NewCodecFactory(
runtime.NewScheme(),
).UniversalDeserializer(),
// 启动http server
server: &http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: mux,
},
kubeClient: kubeClient,
daprClient: daprClient,
authUIDs: authUIDs,
}
// 给 k8s 调用的 mutate 端点
mux.HandleFunc("/mutate", i.handleRequest)
return i
}
Run()方法
最核心的run方法,
func (i *injector) Run(ctx context.Context) {
doneCh := make(chan struct{})
// 启动go routing,监听 ctx 和 doneCh 的信号
go func() {
select {
case <-ctx.Done():
log.Info("Sidecar injector is shutting down")
shutdownCtx, cancel := context.WithTimeout(
context.Background(),
time.Second*5,
)
defer cancel()
i.server.Shutdown(shutdownCtx) // nolint: errcheck
case <-doneCh:
}
}()
// 打印启动时的日志,这行日志可以通过
log.Infof("Sidecar injector is listening on %s, patching Dapr-enabled pods", i.server.Addr)
// TODO:这里有时会报错,证书有问题,导致injector无法正常工作,后面再来检查
err := i.server.ListenAndServeTLS(i.config.TLSCertFile, i.config.TLSKeyFile)
if err != http.ErrServerClosed {
log.Errorf("Sidecar injector error: %s", err)
}
close(doneCh)
}
可以对比通过 k logs dapr-sidecar-injector-86b8dc4dcd-bkbgw -n dapr-system
命令查看到的injecot 日志内容:
{"instance":"dapr-sidecar-injector-86b8dc4dcd-bkbgw","level":"info","msg":"log level set to: info","scope":"dapr.injector","time":"2021-05-11T01:13:20.1904136Z","type":"log","ver":"unknown"}
{"instance":"dapr-sidecar-injector-86b8dc4dcd-bkbgw","level":"info","msg":"metrics server started on :9090/","scope":"dapr.metrics","time":"2021-05-11T01:13:20.1907347Z","type":"log","ver":"unknown"}
{"instance":"dapr-sidecar-injector-86b8dc4dcd-bkbgw","level":"info","msg":"starting Dapr Sidecar Injector -- version edge -- commit v1.0.0-rc.4-163-g9a4210a-dirty","scope":"dapr.injector","time":"2021-05-11T01:13:20.191669Z","type":"log","ver":"unknown"}
{"instance":"dapr-sidecar-injector-86b8dc4dcd-bkbgw","level":"info","msg":"Healthz server is listening on :8080","scope":"dapr.injector","time":"2021-05-11T01:13:20.1928941Z","type":"log","ver":"unknown"}
{"instance":"dapr-sidecar-injector-86b8dc4dcd-bkbgw","level":"info","msg":"Sidecar injector is listening on :4000, patching Dapr-enabled pods","scope":"dapr.injector","time":"2021-05-11T01:13:20.208587Z","type":"log","ver":"unknown"}
handleRequest方法
handleRequest方法用来处理来自 k8s api server的 mutate 调用:
mux.HandleFunc("/mutate", i.handleRequest)
func (i *injector) handleRequest(w http.ResponseWriter, r *http.Request) {
......
}
代码比较长,忽略部分细节代码。
读取请求的body,验证长度和content-type:
defer r.Body.Close()
var body []byte
if r.Body != nil {
if data, err := ioutil.ReadAll(r.Body); err == nil {
body = data
}
}
if len(body) == 0 {
log.Error("empty body")
http.Error(w, "empty body", http.StatusBadRequest)
return
}
contentType := r.Header.Get("Content-Type")
if contentType != "application/json" {
log.Errorf("Content-Type=%s, expect application/json", contentType)
http.Error(
w,
"invalid Content-Type, expect `application/json`",
http.StatusUnsupportedMediaType,
)
return
}
反序列化body,并做一些基本的验证:
ar := v1.AdmissionReview{}
_, gvk, err := i.deserializer.Decode(body, nil, &ar)
if err != nil {
log.Errorf("Can't decode body: %v", err)
} else {
if !utils.StringSliceContains(ar.Request.UserInfo.UID, i.authUIDs) {
err = errors.Wrapf(err, "unauthorized request")
log.Error(err)
} else if ar.Request.Kind.Kind != "Pod" {
err = errors.Wrapf(err, "invalid kind for review: %s", ar.Kind)
log.Error(err)
} else {
patchOps, err = i.getPodPatchOperations(&ar, i.config.Namespace, i.config.SidecarImage, i.config.SidecarImagePullPolicy, i.kubeClient, i.daprClient)
}
}
getPodPatchOperations 是核心代码,后面细看。
统一处理前面可能产生的错误,以及 getPodPatchOperations() 的处理结果:
diagAppID := getAppIDFromRequest(ar.Request)
if err != nil {
admissionResponse = toAdmissionResponse(err)
log.Errorf("Sidecar injector failed to inject for app '%s'. Error: %s", diagAppID, err)
monitoring.RecordFailedSidecarInjectionCount(diagAppID, "patch")
} else if len(patchOps) == 0 {
// len(patchOps) == 0 表示什么都没改,返回 Allowed: true
admissionResponse = &v1.AdmissionResponse{
Allowed: true,
}
} else {
var patchBytes []byte
// 将 patchOps 序列化为json
patchBytes, err = json.Marshal(patchOps)
if err != nil {
admissionResponse = toAdmissionResponse(err)
} else {
// 返回AdmissionResponse
admissionResponse = &v1.AdmissionResponse{
Allowed: true,
Patch: patchBytes,
PatchType: func() *v1.PatchType {
pt := v1.PatchTypeJSONPatch
return &pt
}(),
}
}
}
组装 AdmissionReview:
admissionReview := v1.AdmissionReview{}
if admissionResponse != nil {
admissionReview.Response = admissionResponse
if ar.Request != nil {
admissionReview.Response.UID = ar.Request.UID
admissionReview.SetGroupVersionKind(*gvk)
}
}
将应答序列化并返回:
log.Infof("ready to write response ...")
respBytes, err := json.Marshal(admissionReview)
if err != nil {
http.Error(
w,
err.Error(),
http.StatusInternalServerError,
)
log.Errorf("Sidecar injector failed to inject for app '%s'. Can't deserialize response: %s", diagAppID, err)
monitoring.RecordFailedSidecarInjectionCount(diagAppID, "response")
}
w.Header().Set("Content-Type", "application/json")
if _, err := w.Write(respBytes); err != nil {
log.Error(err)
} else {
log.Infof("Sidecar injector succeeded injection for app '%s'", diagAppID)
monitoring.RecordSuccessfulSidecarInjectionCount(diagAppID)
}
帮助类代码
toAdmissionResponse方法
toAdmissionResponse 方法用于从一个 error 创建 k8s 的 AdmissionResponse :
// toAdmissionResponse is a helper function to create an AdmissionResponse
// with an embedded error
func toAdmissionResponse(err error) *v1.AdmissionResponse {
return &v1.AdmissionResponse{
Result: &metav1.Status{
Message: err.Error(),
},
}
}
获取AppID
getAppIDFromRequest() 方法从 AdmissionRequest 中获取AppID:
func getAppIDFromRequest(req *v1.AdmissionRequest) string {
// default App ID
appID := ""
// if req is not given
if req == nil {
return appID
}
var pod corev1.Pod
// 解析pod的raw数据为json
if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
log.Warnf("could not unmarshal raw object: %v", err)
} else {
// 然后从pod信息中获取appID
appID = getAppID(pod)
}
return appID
}
getAppID()方法的实现如下,首先读取 “dapr.io/app-id” 的 Annotation,如果没有,则取 pod 的 name 作为默认AppID:
const appIDKey = "dapr.io/app-id"
func getAppID(pod corev1.Pod) string {
return getStringAnnotationOrDefault(pod.Annotations, appIDKey, pod.GetName())
}
分支代码
ServiceAccount 相关代码
AllowedControllersServiceAccountUID()方法返回UID数组,这些是 webhook handler 上容许的 service account 列表:
var allowedControllersServiceAccounts = []string{
"replicaset-controller",
"deployment-controller",
"cronjob-controller",
"job-controller",
"statefulset-controller",
}
// AllowedControllersServiceAccountUID returns an array of UID, list of allowed service account on the webhook handler
func AllowedControllersServiceAccountUID(ctx context.Context, kubeClient *kubernetes.Clientset) ([]string, error) {
allowedUids := []string{}
for i, allowedControllersServiceAccount := range allowedControllersServiceAccounts {
saUUID, err := getServiceAccount(ctx, kubeClient, allowedControllersServiceAccount)
// i == 0 => "replicaset-controller" is the only one mandatory
if err != nil && i == 0 {
return nil, err
} else if err != nil {
log.Warnf("Unable to get SA %s UID (%s)", allowedControllersServiceAccount, err)
continue
}
allowedUids = append(allowedUids, saUUID)
}
return allowedUids, nil
}
func getServiceAccount(ctx context.Context, kubeClient *kubernetes.Clientset, allowedControllersServiceAccount string) (string, error) {
ctxWithTimeout, cancel := context.WithTimeout(ctx, getKubernetesServiceAccountTimeoutSeconds*time.Second)
defer cancel()
sa, err := kubeClient.CoreV1().ServiceAccounts(metav1.NamespaceSystem).Get(ctxWithTimeout, allowedControllersServiceAccount, metav1.GetOptions{})
if err != nil {
return "", err
}
return string(sa.ObjectMeta.UID), nil
}
11.5 - patch_operation.go的源码学习
代码非常简单,只定义了一个结构体 PatchOperation,用来表示要应用于Kubernetes资源的一个单独的变化。
// PatchOperation represents a discreet change to be applied to a Kubernetes resource
type PatchOperation struct {
Op string `json:"op"`
Path string `json:"path"`
Value interface{} `json:"value,omitempty"`
}
11.6 - pod_patch.go的源码学习
主流程
getPodPatchOperations() 是最重要的方法,injector 对 pod 的修改就在这里进行:
func (i *injector) getPodPatchOperations(ar *v1.AdmissionReview,
namespace, image, imagePullPolicy string, kubeClient *kubernetes.Clientset, daprClient scheme.Interface) ([]PatchOperation, error) {
......
return patchOps, nil
}
解析request,得到 pod 对象 (这里和前面重复了?):
req := ar.Request
var pod corev1.Pod
if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
errors.Wrap(err, "could not unmarshal raw object")
return nil, err
}
判断是否需要 injector 做处理:
if !isResourceDaprEnabled(pod.Annotations) || podContainsSidecarContainer(&pod) {
return nil, nil
}
// 判断是否启动了dapr,依据是是否设置 annotation "dapr.io/enabled" 为 true,默认为false
const daprEnabledKey = "dapr.io/enabled"
func isResourceDaprEnabled(annotations map[string]string) bool {
return getBoolAnnotationOrDefault(annotations, daprEnabledKey, false)
}
// 判断是否包含了 dapr 的 sidecar container
const sidecarContainerName = "daprd"
func podContainsSidecarContainer(pod *corev1.Pod) bool {
for _, c := range pod.Spec.Containers {
// 检测方式是循环pod中的所有container,检查是否有container的名字为 "daprd"
if c.Name == sidecarContainerName {
return true
}
}
return false
}
创建 daprd sidecar container:
sidecarContainer, err := getSidecarContainer(pod.Annotations, id, image, imagePullPolicy, req.Namespace, apiSrvAddress, placementAddress, tokenMount, trustAnchors, certChain, certKey, sentryAddress, mtlsEnabled, identity)
getSidecarContainer()的细节后面看,先走完主流程。
patchOps := []PatchOperation{}
envPatchOps := []PatchOperation{}
var path string
var value interface{}
if len(pod.Spec.Containers) == 0 {
// 如果pod的container数量为0(什么情况下会有这种没有container的pod?)
path = containersPath
value = []corev1.Container{*sidecarContainer}
} else {
// 将 daprd 的sidecar 加入
envPatchOps = addDaprEnvVarsToContainers(pod.Spec.Containers)
// TODO:path 的设值有什么规范或者要求?
path = "/spec/containers/-"
value = sidecarContainer
}
patchOps = append(
patchOps,
PatchOperation{
Op: "add",
Path: path,
Value: value,
},
)
patchOps = append(patchOps, envPatchOps...)
addDaprEnvVarsToContainers
// This function add Dapr environment variables to all the containers in any Dapr enabled pod.
// The containers can be injected or user defined.
func addDaprEnvVarsToContainers(containers []corev1.Container) []PatchOperation {
portEnv := []corev1.EnvVar{
{
Name: userContainerDaprHTTPPortName,
Value: strconv.Itoa(sidecarHTTPPort),
},
{
Name: userContainerDaprGRPCPortName,
Value: strconv.Itoa(sidecarAPIGRPCPort),
},
}
envPatchOps := make([]PatchOperation, 0, len(containers))
for i, container := range containers {
path := fmt.Sprintf("%s/%d/env", containersPath, i)
patchOps := getEnvPatchOperations(container.Env, portEnv, path)
envPatchOps = append(envPatchOps, patchOps...)
}
return envPatchOps
}
分支流程:mTLS的处理
mtlsEnabled := mTLSEnabled(daprClient)
if mtlsEnabled {
trustAnchors, certChain, certKey = getTrustAnchorsAndCertChain(kubeClient, namespace)
identity = fmt.Sprintf("%s:%s", req.Namespace, pod.Spec.ServiceAccountName)
}
func mTLSEnabled(daprClient scheme.Interface) bool {
resp, err := daprClient.ConfigurationV1alpha1().Configurations(meta_v1.NamespaceAll).List(meta_v1.ListOptions{})
if err != nil {
log.Errorf("Failed to load dapr configuration from k8s, use default value %t for mTLSEnabled: %s", defaultMtlsEnabled, err)
return defaultMtlsEnabled
}
for _, c := range resp.Items {
if c.GetName() == defaultConfig {
return c.Spec.MTLSSpec.Enabled
}
}
log.Infof("Dapr system configuration (%s) is not found, use default value %t for mTLSEnabled", defaultConfig, defaultMtlsEnabled)
return defaultMtlsEnabled
}