这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

dapr仓库的源码学习

Dapr源码学习之dapr仓库

dapr仓库中的代码:

https://github.com/dapr/dapr

1 - 工具类代码的源码学习

Dapr 工具类代码的源码学习

工具类代码指完全作为工具使用的代码,这些代码往往是在代码调用链的最底层,自身没有任何特定逻辑,只专注于完成某个特定的功能,作为上层代码的工具使用。

工具类代码处于代码依赖关系的最底层。

1.1 - concurrency的源码学习

Dapr concurrency package的源码学习

concurrency packge的代码不多,暂时只有一个 limiter.go。

1.1.1 - limiter.go的源码学习

rating limiter的代码实现和使用场景

Dapr concurrency package中的 limiter.go 文件的源码学习,rating limiter的代码实现和使用场景。

重点:充分利用 golang chan 的特性

代码实现

Limiter 结构体定义

// Limiter object
type Limiter struct {
   limit         int
   tickets       chan int
   numInProgress int32
}

字段说明:

  • limit:最大并发数的限制,这是一个配置项,默认100,初始化后不再修改。
  • tickets:用 go 的 chan 来保存和分发 tickets
  • numInProgress:当前正在执行中的数量,这是一个实时状态

构建Limiter

const (
   // DefaultLimit is the default concurrency limit
   DefaultLimit = 100
)

// NewLimiter allocates a new ConcurrencyLimiter
func NewLimiter(limit int) *Limiter {
   if limit <= 0 {
      limit = DefaultLimit
   }

   // allocate a limiter instance
   c := &Limiter{
      limit:   limit,
      // tickets chan 的 size 设置为 limit
      tickets: make(chan int, limit),
   }

   // allocate the tickets:
   // 开始时先准备和limit数量相当的可用 tickets
   for i := 0; i < c.limit; i++ {
      c.tickets <- i
   }

   return c
}

Limiter的实现

// Execute adds a function to the execution queue.
// if num of go routines allocated by this instance is < limit
// launch a new go routine to execute job
// else wait until a go routine becomes available
func (c *Limiter) Execute(job func(param interface{}), param interface{}) int {
   // 从 chan 中拿一个有效票据
   // 如果当前 chan 中有票据,则说明 go routines 的数量还没有达到 limit 的最大限制,还可以继续启动go routine执行job
   // 如果当前 chan 中没有票据,则说明 go routines 的数量已经达到 limit 的最大限制,需要限速了。execute方法会阻塞在这里,等待有job执行完成释放票据
   ticket := <-c.tickets
   // 拿到之后更新numInProgress,数量加一,要求是原子操作
   atomic.AddInt32(&c.numInProgress, 1)
   // 启动 go routine 执行 job
   go func(param interface{}) {
      // 通过defer来做 job 完成后的清理
      defer func() {
         // 将票据释放给 chan,这样后续的 job 有机会申请到
         c.tickets <- ticket
         // 更新numInProgress,数量减一,要求是原子操作
         atomic.AddInt32(&c.numInProgress, -1)
      }()

      // 执行job
      job(param)
   }(param)
   
   // 返回当前的票据号
   return ticket
}

wait方法

wait方法会阻塞并等待所有的已经通过 execute() 方法拿到票据的 go routine 执行完毕。

// Wait will block all the previously Executed jobs completed running.
//
// IMPORTANT: calling the Wait function while keep calling Execute leads to
//            un-desired race conditions
func (c *Limiter) Wait() {
   // 这是从 chan 中读取所有的票据,只要有任何票据被 job 释放都会去争抢
   // 最后wait()方法获取到所有的票据,其他 job 自然就无法获取票据从而阻塞住所有job的工作
   // 但这并不能保证一定能第一时间抢的到,如果还有其他的 job 也在调用 execute() 方法申请票据,那只有等这个 job 完成工作释放票据时再次争抢
   for i := 0; i < c.limit; i++ {
      <-c.tickets
   }
}

使用场景

并行执行批量操作时限速

pkg/grpc/api.gopkg/http/api.go 的 GetBulkState()方法中,通过 limiter 来限制批量操作的并发数量:

// 构建limiter,limit参数由 请求参数中的 Parallelism 制定
limiter := concurrency.NewLimiter(int(in.Parallelism))
n := len(reqs)
for i := 0; i < n; i++ {
   fn := func(param interface{}) {
		......
   }
    // 提交 job 给 limiter
   limiter.Execute(fn, &reqs[i])
}

// 等待所有的 job 执行完成
limiter.Wait()

在 actor 中也有类似的代码:

limiter := concurrency.NewLimiter(actorMetadata.RemindersMetadata.PartitionCount)
for i := range getRequests {
    fn := func(param interface{}) {
    	......
    }
    limiter.Execute(fn, &bulkResponse[i])
}
limiter.Wait()

2 - 类库类代码的源码学习

Dapr 类库类代码的源码学习

类库类代码指为了更方便的使用第三方类库而封装的辅助类代码,这些代码也通常是在代码调用链的底层,专注于完成某方面特定的功能,可能会带有一点点 dapr 的逻辑。

工具类代码处于代码依赖关系的倒数第二层底层,仅仅比工具类代码高一层。

2.1 - grcp的源码学习

Dapr grpc package的源码学习

2.1.1 - util.go的源码学习

目前只有用于转换state参数类型的两个方法

Dapr grpc package中的 util.go文件的源码分析,目前只有用于转换state参数类型的两个方法。

stateConsistencyToString 方法

stateConsistencyToString 方法将 StateOptions_StateConsistency 转为 string:

func stateConsistencyToString(c commonv1pb.StateOptions_StateConsistency) string {
	switch c {
	case commonv1pb.StateOptions_CONSISTENCY_EVENTUAL:
		return "eventual"
	case commonv1pb.StateOptions_CONSISTENCY_STRONG:
		return "strong"
	}

	return ""
}

stateConcurrencyToString 方法

方法 方法将 StateOptions_StateConsistency 转为 string:

func stateConcurrencyToString(c commonv1pb.StateOptions_StateConcurrency) string {
	switch c {
	case commonv1pb.StateOptions_CONCURRENCY_FIRST_WRITE:
		return "first-write"
	case commonv1pb.StateOptions_CONCURRENCY_LAST_WRITE:
		return "last-write"
	}

	return ""
}

2.1.2 - port.go的源码学习

只有一个 GetFreePort 方法用于获取一个空闲的端口。

Dapr grpc package中的 port.go文件的源码分析,只有一个 GetFreePort 方法用于获取一个空闲的端口。

GetFreePort 方法

GetFreePort 方法从操作系统获取一个空闲可用的端口:

// GetFreePort returns a free port from the OS
func GetFreePort() (int, error) {
	addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
	if err != nil {
		return 0, err
	}

	l, err := net.ListenTCP("tcp", addr)
	if err != nil {
		return 0, err
	}
	defer l.Close()
	return l.Addr().(*net.TCPAddr).Port, nil
}

通过将端口设置为0, 来让操作系统自动分配一个可用的端口。注意返回时一定要关闭这个连接。

2.1.3 - dial.go的源码学习

目前只有用于建连获取地址前缀的一个方法

Dapr grpc package中的 dial.go文件的源码分析,目前只有用于建连获取地址前缀的一个方法。

GetDialAddressPrefix 方法

GetDialAddressPrefix 为给定的 DaprMode 返回 dial 前缀,用于gPRC 客户端连接:

// GetDialAddressPrefix returns a dial prefix for a gRPC client connections
// For a given DaprMode.
func GetDialAddressPrefix(mode modes.DaprMode) string {
	if runtime.GOOS == "windows" {
		return ""
	}

	switch mode {
	case modes.KubernetesMode:
		return "dns:///"
	default:
		return ""
	}
}

注意:Kubernetes 模式下 返回 “dns:///”

调用场景,只在 grpc.go 的 GetGRPCConnection() 方法中被调用:

// GetGRPCConnection returns a new grpc connection for a given address and inits one if doesn't exist
func (g *Manager) GetGRPCConnection(address, id string, namespace string, skipTLS, recreateIfExists, sslEnabled bool) (*grpc.ClientConn, error) {
    dialPrefix := GetDialAddressPrefix(g.mode)
    ......
    conn, err := grpc.DialContext(ctx, dialPrefix+address, opts...)
    ......
}

3 - 基础代码的源码学习

Dapr 基础代码的源码学习

基础代码是 Dapr 代码中最基础的部分,这些代码已经是 dapr 自身逻辑的组成部分,但处于比较偏底层,也不是 dapr 的主要链路,通常代码量也不大。

基础代码在依赖关系中位于工具类代码和类库类代码之上。

3.1 - version的源码学习

Dapr version package的源码学习

代码实现

version 的代码超级简单,就一个 version.go,内容也只有一点点:

// Values for these are injected by the build.
var (
   version = "edge"
   commit  string
)

// Version returns the Dapr version. This is either a semantic version
// number or else, in the case of unreleased code, the string "edge".
func Version() string {
   return version
}

// Commit returns the git commit SHA for the code that Dapr was built from.
func Commit() string {
   return commit
}
  • version:要不就是语义话版本,比如 1.0.0 这种,要不就是 edge 表示未发布的代码
  • commit:build的时候的 git commit

如何注入

Values for these are injected by the build.

那是怎么注入的呢? Build 总不能调用代码,而且这两个值也是private。

Dapr 下的 Makefile 文件中:

# git rev-list -1 HEAD 得到的 git commit 的 hash 值
# 如:63147334aa246d76f9f65708c257460567a1cff4
GIT_COMMIT  = $(shell git rev-list -1 HEAD)
# git describe --always --abbrev=7 --dirty 得到的是版本信息
# 如:v1.0.0-rc.4-5-g6314733
GIT_VERSION = $(shell git describe --always --abbrev=7 --dirty)

ifdef REL_VERSION
   DAPR_VERSION := $(REL_VERSION)
else
   DAPR_VERSION := edge
endif

BASE_PACKAGE_NAME := github.com/dapr/dapr

DEFAULT_LDFLAGS:=-X $(BASE_PACKAGE_NAME)/pkg/version.commit=$(GIT_VERSION) -X $(BASE_PACKAGE_NAME)/pkg/version.version=$(DAPR_VERSION)

ifeq ($(origin DEBUG), undefined)
  BUILDTYPE_DIR:=release
  LDFLAGS:="$(DEFAULT_LDFLAGS) -s -w"
else ifeq ($(DEBUG),0)
  BUILDTYPE_DIR:=release
  LDFLAGS:="$(DEFAULT_LDFLAGS) -s -w"
else
  BUILDTYPE_DIR:=debug
  GCFLAGS:=-gcflags="all=-N -l"
  LDFLAGS:="$(DEFAULT_LDFLAGS)"
  $(info Build with debugger information)
endif

define genBinariesForTarget
.PHONY: $(5)/$(1)
$(5)/$(1):
	CGO_ENABLED=$(CGO) GOOS=$(3) GOARCH=$(4) go build $(GCFLAGS) -ldflags=$(LDFLAGS) \
	-o $(5)/$(1) $(2)/;
endef

TODO:没看懂,有时间详细研究一下这个makefile。

3.2 - modes的源码学习

Dapr modes package的源码学习

代码实现

modes 的代码超级简单,就一个 modes.go,内容也只有一点点:

// DaprMode is the runtime mode for Dapr.
type DaprMode string

const (
	// KubernetesMode is a Kubernetes Dapr mode
	KubernetesMode DaprMode = "kubernetes"
	// StandaloneMode is a Standalone Dapr mode
	StandaloneMode DaprMode = "standalone"
)

Dapr有两种运行模式

  • kubernetes 模式
  • standalone 模式

运行模式的总结

两种模式的差异:

  1. 配置文件读取的方式:

    • standalone 模式下读取本地文件,文件路径由命令行参数 config 指定。
    • kubernetes 模式下读取k8s中存储的CRD,CRD的名称由命令行参数 config 指定。
    config := flag.String("config", "", "Path to config file, or name of a configuration object")
    
  2. TODO

3.3 - cors的源码学习

Dapr cors package的源码学习

代码实现

cors 的代码超级简单,就一个 cors.go,内容也只有一点点:

// DefaultAllowedOrigins is the default origins allowed for the Dapr HTTP servers
const DefaultAllowedOrigins = "*"

AllowedOrigins配置的读取

AllowedOrigins 配置在启动时通过命令行参数 allowed-origins 传入,默认值为 DefaultAllowedOrigins ("*")。然后传入给 NewRuntimeConfig()方法:

func FromFlags() (*DaprRuntime, error) {
allowedOrigins := flag.String("allowed-origins", cors.DefaultAllowedOrigins, "Allowed HTTP origins")

	runtimeConfig := NewRuntimeConfig(*appID, placementAddresses, *controlPlaneAddress, *allowedOrigins ......)
}

之后保存在 NewRuntimeConfig 的 AllowedOrigins 字段中:

func NewRuntimeConfig(
   id string, placementAddresses []string,
   controlPlaneAddress, allowedOrigins ......) *Config {
   return &Config{
   	AllowedOrigins:      allowedOrigins,
   	......
   }

AllowedOrigins配置的使用

pkg/http/server.go 的 useCors() 方法:

func (s *server) useCors(next fasthttp.RequestHandler) fasthttp.RequestHandler {
   if s.config.AllowedOrigins == cors_dapr.DefaultAllowedOrigins {
      return next
   }

   log.Infof("enabled cors http middleware")
   origins := strings.Split(s.config.AllowedOrigins, ",")
   corsHandler := s.getCorsHandler(origins)
   return corsHandler.CorsMiddleware(next)
}

3.4 - proto的源码学习

Dapr proto package的源码学习

3.5 - config的源码学习

Dapr config package的源码学习

3.6 - credentials的源码学习

Dapr credentials package的源码学习

3.6.1 - certchain.go的源码学习

credentials 结构体持有证书相关的各种 path

Dapr credentials package中的 certchain.go 文件的源码学习,credentials 结构体持有证书相关的各种 path。

CertChain 结构体定义

CertChain 结构体持有证书信任链的PEM值:

// CertChain holds the certificate trust chain PEM values
type CertChain struct {
	RootCA []byte
	Cert   []byte
	Key    []byte
}

装载证书的LoadFromDisk 方法

LoadFromDisk 方法从给定目录中读取 CertChain:

// LoadFromDisk retruns a CertChain from a given directory
func LoadFromDisk(rootCertPath, issuerCertPath, issuerKeyPath string) (*CertChain, error) {
   rootCert, err := ioutil.ReadFile(rootCertPath)
   if err != nil {
      return nil, err
   }
   cert, err := ioutil.ReadFile(issuerCertPath)
   if err != nil {
      return nil, err
   }
   key, err := ioutil.ReadFile(issuerKeyPath)
   if err != nil {
      return nil, err
   }
   return &CertChain{
      RootCA: rootCert,
      Cert:   cert,
      Key:    key,
   }, nil
}

使用场景

placement 的 main.go 中,如果 mTLS 开启了,则会读取 tls 证书:

func loadCertChains(certChainPath string) *credentials.CertChain {
   tlsCreds := credentials.NewTLSCredentials(certChainPath)

   log.Info("mTLS enabled, getting tls certificates")
   // try to load certs from disk, if not yet there, start a watch on the local filesystem
   chain, err := credentials.LoadFromDisk(tlsCreds.RootCertPath(), tlsCreds.CertPath(), tlsCreds.KeyPath())
	......
}

operator 的 operator.go 中,也会判断,如果 MTLSEnabled :

var certChain *credentials.CertChain
if o.config.MTLSEnabled {
   log.Info("mTLS enabled, getting tls certificates")
   // try to load certs from disk, if not yet there, start a watch on the local filesystem
   chain, err := credentials.LoadFromDisk(o.config.Credentials.RootCertPath(), o.config.Credentials.CertPath(), o.config.Credentials.KeyPath())
   ......
}

备注:上面两段代码重复度极高,最好能重构一下。

sentry 中也有调用:

func (c *defaultCA) validateAndBuildTrustBundle() (*trustRootBundle, error) {
	var (
		issuerCreds     *certs.Credentials
		rootCertBytes   []byte
		issuerCertBytes []byte
	)

	// certs exist on disk or getting created, load them when ready
	if !shouldCreateCerts(c.config) {
		err := detectCertificates(c.config.RootCertPath)
		if err != nil {
			return nil, err
		}

		certChain, err := credentials.LoadFromDisk(c.config.RootCertPath, c.config.IssuerCertPath, c.config.IssuerKeyPath)
		if err != nil {
			return nil, errors.Wrap(err, "error loading cert chain from disk")
		}

TODO: 证书相关的细节后面单独细看。

3.6.2 - credentials.go的源码学习

credentials 结构体持有证书相关的各种 path

Dapr credentials package中的 credentials.go文件的源码学习,credentials 结构体持有证书相关的各种 path。

TLSCredentials 结构体定义

只有一个字段 credentialsPath:

// TLSCredentials holds paths for credentials
type TLSCredentials struct {
   credentialsPath string
}

构造方法很简单:

// NewTLSCredentials returns a new TLSCredentials
func NewTLSCredentials(path string) TLSCredentials {
   return TLSCredentials{
      credentialsPath: path,
   }
}

获取相关 path 的方法

获取 credentialsPath,这个path中保存有 TLS 证书:

// Path returns the directory holding the TLS credentials
func (t *TLSCredentials) Path() string {
   return t.credentialsPath
}

分别获取 root cert / cert / cert key 的 path:

// RootCertPath returns the file path for the root cert
func (t *TLSCredentials) RootCertPath() string {
   return filepath.Join(t.credentialsPath, RootCertFilename)
}

// CertPath returns the file path for the cert
func (t *TLSCredentials) CertPath() string {
   return filepath.Join(t.credentialsPath, IssuerCertFilename)
}

// KeyPath returns the file path for the cert key
func (t *TLSCredentials) KeyPath() string {
   return filepath.Join(t.credentialsPath, IssuerKeyFilename)
}

3.6.3 - tls.go的源码学习

从 cert/key 中装载 tls.config 对象

Dapr credentials package中的 tls.go文件的源码学习,从 cert/key 中装载 tls.config 对象。

TLSConfigFromCertAndKey() 方法

TLSConfigFromCertAndKey() 方法从 PEM 格式中有效的 cert/key 对中返回 tls.config 对象:

// TLSConfigFromCertAndKey return a tls.config object from valid cert/key pair in PEM format.
func TLSConfigFromCertAndKey(certPem, keyPem []byte, serverName string, rootCA *x509.CertPool) (*tls.Config, error) {
	cert, err := tls.X509KeyPair(certPem, keyPem)
	if err != nil {
		return nil, err
	}

	// nolint:gosec
	config := &tls.Config{
		InsecureSkipVerify: false,
		RootCAs:            rootCA,
		ServerName:         serverName,
		Certificates:       []tls.Certificate{cert},
	}

	return config, nil
}

3.6.4 - grpc.go的源码学习

获取服务器端选项和客户端选项

Dapr credentials package中的 grpc.go文件的源码学习,获取服务器端选项和客户端选项。

GetServerOptions() 方法

func GetServerOptions(certChain *CertChain) ([]grpc.ServerOption, error) {
	opts := []grpc.ServerOption{}
	if certChain == nil {
		return opts, nil
	}

	cp := x509.NewCertPool()
	cp.AppendCertsFromPEM(certChain.RootCA)

	cert, err := tls.X509KeyPair(certChain.Cert, certChain.Key)
	if err != nil {
		return opts, nil
	}

	// nolint:gosec
	config := &tls.Config{
		ClientCAs: cp,
		// Require cert verification
		ClientAuth:   tls.RequireAndVerifyClientCert,
		Certificates: []tls.Certificate{cert},
	}
	opts = append(opts, grpc.Creds(credentials.NewTLS(config)))

	return opts, nil
}

GetClientOptions() 方法

func GetClientOptions(certChain *CertChain, serverName string) ([]grpc.DialOption, error) {
	opts := []grpc.DialOption{}
	if certChain != nil {
		cp := x509.NewCertPool()
		ok := cp.AppendCertsFromPEM(certChain.RootCA)
		if !ok {
			return nil, errors.New("failed to append PEM root cert to x509 CertPool")
		}
		config, err := TLSConfigFromCertAndKey(certChain.Cert, certChain.Key, serverName, cp)
		if err != nil {
			return nil, errors.Wrap(err, "failed to create tls config from cert and key")
		}
		opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(config)))
	} else {
		opts = append(opts, grpc.WithInsecure())
	}
	return opts, nil
}

TODO: 好吧,细节后面看,加密我不熟。

4 - Runtime的源码学习

Dapr runtime的源码学习

4.1 - options.go的源码学习

用于定制 runtime 中包含的组件

Dapr runtime package中的 options.go 文件的源码学习,用于定制 runtime 中包含的组件。

runtimeOpts 结构体定义

runtimeOpts封装了需要包含在 runtime 中的 component:

type (
	// runtimeOpts encapsulates the components to include in the runtime.
	runtimeOpts struct {
		secretStores    []secretstores.SecretStore
		states          []state.State
		pubsubs         []pubsub.PubSub
		nameResolutions []nameresolution.NameResolution
		inputBindings   []bindings.InputBinding
		outputBindings  []bindings.OutputBinding
		httpMiddleware  []http.Middleware
	}
)

Option 方法

Option 方法用于定制 runtime:

type (
	// Option is a function that customizes the runtime.
	Option func(o *runtimeOpts)
)

定制runtime的With系列方法

提供多个 WithXxx() 方法,用于定制 runtime 的组件:


// WithSecretStores adds secret store components to the runtime.
func WithSecretStores(secretStores ...secretstores.SecretStore) Option {
	return func(o *runtimeOpts) {
		o.secretStores = append(o.secretStores, secretStores...)
	}
}

// WithStates adds state store components to the runtime.
func WithStates(states ...state.State) Option {
	return func(o *runtimeOpts) {
		o.states = append(o.states, states...)
	}
}

// WithPubSubs adds pubsub store components to the runtime.
func WithPubSubs(pubsubs ...pubsub.PubSub) Option {
	return func(o *runtimeOpts) {
		o.pubsubs = append(o.pubsubs, pubsubs...)
	}
}

// WithNameResolutions adds name resolution components to the runtime.
func WithNameResolutions(nameResolutions ...nameresolution.NameResolution) Option {
	return func(o *runtimeOpts) {
		o.nameResolutions = append(o.nameResolutions, nameResolutions...)
	}
}

// WithInputBindings adds input binding components to the runtime.
func WithInputBindings(inputBindings ...bindings.InputBinding) Option {
	return func(o *runtimeOpts) {
		o.inputBindings = append(o.inputBindings, inputBindings...)
	}
}

// WithOutputBindings adds output binding components to the runtime.
func WithOutputBindings(outputBindings ...bindings.OutputBinding) Option {
	return func(o *runtimeOpts) {
		o.outputBindings = append(o.outputBindings, outputBindings...)
	}
}

// WithHTTPMiddleware adds HTTP middleware components to the runtime.
func WithHTTPMiddleware(httpMiddleware ...http.Middleware) Option {
	return func(o *runtimeOpts) {
		o.httpMiddleware = append(o.httpMiddleware, httpMiddleware...)
	}
}

这些方法都只是简单的往 runtimeOpts 结构体的各个组件字段里面保存信息,用于后续 runtime 的初始化。

4.2 - config.go的源码学习

解析命令行标记并返回 DaprRuntime 实例

Dapr runtime package中的 cli.go 文件的源码学习,解析命令行标记并返回 DaprRuntime 实例。

cli.go 基本上就一个 FromFlags() 方法。

常量定义

protocol,目前只支持 http 和 grpc :

// Protocol is a communications protocol
type Protocol string

const (
	// GRPCProtocol is a gRPC communication protocol
	GRPCProtocol Protocol = "grpc"
	// HTTPProtocol is a HTTP communication protocol
	HTTPProtocol Protocol = "http"
)

各种端口的默认值:

const (
	// DefaultDaprHTTPPort is the default http port for Dapr
	DefaultDaprHTTPPort = 3500
	// DefaultDaprAPIGRPCPort is the default API gRPC port for Dapr
	DefaultDaprAPIGRPCPort = 50001
	// DefaultProfilePort is the default port for profiling endpoints
	DefaultProfilePort = 7777
	// DefaultMetricsPort is the default port for metrics endpoints
	DefaultMetricsPort = 9090
)

http默认配置,目前只有一个 MaxRequestBodySize :

const (
	// DefaultMaxRequestBodySize is the default option for the maximum body size in MB for Dapr HTTP servers
	DefaultMaxRequestBodySize = 4
)

Config 结构体

// Config holds the Dapr Runtime configuration
type Config struct {
	ID                   string
	HTTPPort             int
	ProfilePort          int
	EnableProfiling      bool
	APIGRPCPort          int
	InternalGRPCPort     int
	ApplicationPort      int
	ApplicationProtocol  Protocol
	Mode                 modes.DaprMode
	PlacementAddresses   []string
	GlobalConfig         string
	AllowedOrigins       string
	Standalone           config.StandaloneConfig
	Kubernetes           config.KubernetesConfig
	MaxConcurrency       int
	mtlsEnabled          bool
	SentryServiceAddress string
	CertChain            *credentials.CertChain
	AppSSL               bool
	MaxRequestBodySize   int
}

有点乱,所有的字段都是扁平的,以后越加越多。。。

构建Config

简单赋值构建 config 结构体,这个参数是在太多了一点:

// NewRuntimeConfig returns a new runtime config
func NewRuntimeConfig(
   id string, placementAddresses []string,
   controlPlaneAddress, allowedOrigins, globalConfig, componentsPath, appProtocol, mode string,
   httpPort, internalGRPCPort, apiGRPCPort, appPort, profilePort int,
   enableProfiling bool, maxConcurrency int, mtlsEnabled bool, sentryAddress string, appSSL bool, maxRequestBodySize int) *Config {
   return &Config{
      ID:                  id,
      HTTPPort:            httpPort,
      InternalGRPCPort:    internalGRPCPort,
      APIGRPCPort:         apiGRPCPort,
      ApplicationPort:     appPort,
      ProfilePort:         profilePort,
      ApplicationProtocol: Protocol(appProtocol),
      Mode:                modes.DaprMode(mode),
      PlacementAddresses:  placementAddresses,
      GlobalConfig:        globalConfig,
      AllowedOrigins:      allowedOrigins,
      Standalone: config.StandaloneConfig{
         ComponentsPath: componentsPath,
      },
      Kubernetes: config.KubernetesConfig{
         ControlPlaneAddress: controlPlaneAddress,
      },
      EnableProfiling:      enableProfiling,
      MaxConcurrency:       maxConcurrency,
      mtlsEnabled:          mtlsEnabled,
      SentryServiceAddress: sentryAddress,
      AppSSL:               appSSL,
      MaxRequestBodySize:   maxRequestBodySize,
   }
}

4.3 - cli.go的源码学习

解析命令行标记并返回 DaprRuntime 实例

Dapr runtime package中的 cli.go 文件的源码学习,解析命令行标记并返回 DaprRuntime 实例。

cli.go 基本上就一个 FromFlags() 方法。

FromFlags()概述

FromFlags() 方法解析命令行标记并返回 DaprRuntime 实例:

// FromFlags parses command flags and returns DaprRuntime instance
func FromFlags() (*DaprRuntime, error) {
   ......
   return NewDaprRuntime(runtimeConfig, globalConfig, accessControlList), nil
}

解析命令行标记

通用标记

代码如下:

mode := flag.String("mode", string(modes.StandaloneMode), "Runtime mode for Dapr")
daprHTTPPort := flag.String("dapr-http-port", fmt.Sprintf("%v", DefaultDaprHTTPPort), "HTTP port for Dapr API to listen on")
daprAPIGRPCPort := flag.String("dapr-grpc-port", fmt.Sprintf("%v", DefaultDaprAPIGRPCPort), "gRPC port for the Dapr API to listen on")
daprInternalGRPCPort := flag.String("dapr-internal-grpc-port", "", "gRPC port for the Dapr Internal API to listen on")
appPort := flag.String("app-port", "", "The port the application is listening on")
profilePort := flag.String("profile-port", fmt.Sprintf("%v", DefaultProfilePort), "The port for the profile server")
appProtocol := flag.String("app-protocol", string(HTTPProtocol), "Protocol for the application: grpc or http")
componentsPath := flag.String("components-path", "", "Path for components directory. If empty, components will not be loaded. Self-hosted mode only")
config := flag.String("config", "", "Path to config file, or name of a configuration object")
appID := flag.String("app-id", "", "A unique ID for Dapr. Used for Service Discovery and state")
controlPlaneAddress := flag.String("control-plane-address", "", "Address for a Dapr control plane")
sentryAddress := flag.String("sentry-address", "", "Address for the Sentry CA service")
placementServiceHostAddr := flag.String("placement-host-address", "", "Addresses for Dapr Actor Placement servers")
allowedOrigins := flag.String("allowed-origins", cors.DefaultAllowedOrigins, "Allowed HTTP origins")
enableProfiling := flag.Bool("enable-profiling", false, "Enable profiling")
runtimeVersion := flag.Bool("version", false, "Prints the runtime version")
appMaxConcurrency := flag.Int("app-max-concurrency", -1, "Controls the concurrency level when forwarding requests to user code")
enableMTLS := flag.Bool("enable-mtls", false, "Enables automatic mTLS for daprd to daprd communication channels")
appSSL := flag.Bool("app-ssl", false, "Sets the URI scheme of the app to https and attempts an SSL connection")
daprHTTPMaxRequestSize := flag.Int("dapr-http-max-request-size", -1, "Increasing max size of request body in MB to handle uploading of big files. By default 4 MB.")

TODO:应该有命令行参数的文档,对照文档学习一遍。

解析日志相关的标记

loggerOptions := logger.DefaultOptions()
loggerOptions.AttachCmdFlags(flag.StringVar, flag.BoolVar)

解析metrics相关的标记

metricsExporter := metrics.NewExporter(metrics.DefaultMetricNamespace)

// attaching only metrics-port option
metricsExporter.Options().AttachCmdFlag(flag.StringVar)

然后执行解析:

flag.Parse()

执行version命令

如果只是version命令,则打印版本信息之后就可以退出进程了:

runtimeVersion := flag.Bool("version", false, "Prints the runtime version")

if *runtimeVersion {
   fmt.Println(version.Version())
   os.Exit(0)
}

初始化日志和metrics

日志初始化

根据日志属性初始化logger:

loggerOptions := logger.DefaultOptions()
loggerOptions.AttachCmdFlags(flag.StringVar, flag.BoolVar)

if *appID == "" {
   return nil, errors.New("app-id parameter cannot be empty")
}

// Apply options to all loggers
loggerOptions.SetAppID(*appID)
if err := logger.ApplyOptionsToLoggers(&loggerOptions); err != nil {
   return nil, err
}

完成日志初始化之后就可以愉快的打印日志了:

log.Infof("starting Dapr Runtime -- version %s -- commit %s", version.Version(), version.Commit())
log.Infof("log level set to: %s", loggerOptions.OutputLevel)

metrics初始化

初始化dapr metrics exporter:

// Initialize dapr metrics exporter
if err := metricsExporter.Init(); err != nil {
   log.Fatal(err)
}

解析配置

解析dapr各种端口设置

dapr-http-port / dapr-grpc-port / profile-port / dapr-internal-grpc-port / app-port :

daprHTTP, err := strconv.Atoi(*daprHTTPPort)
if err != nil {
   return nil, errors.Wrap(err, "error parsing dapr-http-port flag")
}

daprAPIGRPC, err := strconv.Atoi(*daprAPIGRPCPort)
if err != nil {
   return nil, errors.Wrap(err, "error parsing dapr-grpc-port flag")
}

profPort, err := strconv.Atoi(*profilePort)
if err != nil {
   return nil, errors.Wrap(err, "error parsing profile-port flag")
}

var daprInternalGRPC int
if *daprInternalGRPCPort != "" {
   daprInternalGRPC, err = strconv.Atoi(*daprInternalGRPCPort)
   if err != nil {
      return nil, errors.Wrap(err, "error parsing dapr-internal-grpc-port")
   }
} else {
   daprInternalGRPC, err = grpc.GetFreePort()
   if err != nil {
      return nil, errors.Wrap(err, "failed to get free port for internal grpc server")
   }
}

var applicationPort int
if *appPort != "" {
   applicationPort, err = strconv.Atoi(*appPort)
   if err != nil {
      return nil, errors.Wrap(err, "error parsing app-port")
   }
}

解析其他配置

继续解析 maxRequestBodySize / placementAddresses / concurrency / appProtocol 等 配置:

var maxRequestBodySize int
if *daprHTTPMaxRequestSize != -1 {
   maxRequestBodySize = *daprHTTPMaxRequestSize
} else {
   maxRequestBodySize = DefaultMaxRequestBodySize
}

placementAddresses := []string{}
if *placementServiceHostAddr != "" {
   placementAddresses = parsePlacementAddr(*placementServiceHostAddr)
}

var concurrency int
if *appMaxConcurrency != -1 {
   concurrency = *appMaxConcurrency
}

appPrtcl := string(HTTPProtocol)
if *appProtocol != string(HTTPProtocol) {
   appPrtcl = *appProtocol
}

构建Runtime的三大配置

构建runtimeConfig

runtimeConfig := NewRuntimeConfig(*appID, placementAddresses, *controlPlaneAddress, *allowedOrigins, *config, *componentsPath,
   appPrtcl, *mode, daprHTTP, daprInternalGRPC, daprAPIGRPC, applicationPort, profPort, *enableProfiling, concurrency, *enableMTLS, *sentryAddress, *appSSL, maxRequestBodySize)

MTLS相关的配置:

if *enableMTLS {
   runtimeConfig.CertChain, err = security.GetCertChain()
   if err != nil {
      return nil, err
   }
}

构建globalConfig

var globalConfig *global_config.Configuration

根据 config 配置文件的配置,还有 dapr 模式的配置,读取相应的配置文件:

config := flag.String("config", "", "Path to config file, or name of a configuration object")

if *config != "" {
   switch modes.DaprMode(*mode) {
      case modes.KubernetesMode:
      client, conn, clientErr := client.GetOperatorClient(*controlPlaneAddress, security.TLSServerName, runtimeConfig.CertChain)
      if clientErr != nil {
         return nil, clientErr
      }
      defer conn.Close()
      namespace = os.Getenv("NAMESPACE")
      globalConfig, configErr = global_config.LoadKubernetesConfiguration(*config, namespace, client)
      case modes.StandaloneMode:
      globalConfig, _, configErr = global_config.LoadStandaloneConfiguration(*config)
   }

   if configErr != nil {
      log.Debugf("Config error: %v", configErr)
   }
}

if configErr != nil {
   log.Fatalf("error loading configuration: %s", configErr)
}

简单说:kubernetes 模式下读取CRD,standalone 模式下读取本地配置文件。

如果 config 没有配置,则使用默认的 global 配置:

if globalConfig == nil {
   log.Info("loading default configuration")
   globalConfig = global_config.LoadDefaultConfiguration()
}

构建accessControlList

var accessControlList *global_config.AccessControlList

accessControlList, err = global_config.ParseAccessControlSpec(globalConfig.Spec.AccessControlSpec, string(runtimeConfig.ApplicationProtocol))
if err != nil {
   log.Fatalf(err.Error())
}

构造 DaprRuntime 实例

最后构造 DaprRuntime 实例:

return NewDaprRuntime(runtimeConfig, globalConfig, accessControlList), nil

4.4 - Runtime App Channel的源码学习

Dapr runtime 中 App Channel的源码学习

4.4.1 - channel.go的源码学习

定义 AppChannel 接口和方法

Dapr channel package中的 channel.go 文件的源码学习,定义 AppChannel 接口和方法。

AppChannel 是和用户代码进行通讯的抽象。

常量定义 DefaultChannelAddress,考虑到 dapr 通常是以 sidecar 模式部署的,因此默认channel 地址是 127.0.0.1

const (
   // DefaultChannelAddress is the address that user application listen to
   DefaultChannelAddress = "127.0.0.1"
)

方法定义:

// AppChannel is an abstraction over communications with user code
type AppChannel interface {
   GetBaseAddress() string
   InvokeMethod(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error)
}

4.4.2 - grpc_channel.go的源码学习

AppChannel 的 gRPC 实现。

Dapr channel package中的 grpc_channel.go 文件的源码学习,AppChannel 的 gRPC 实现。

Channel 结构体定义

Channel是一个具体的AppChannel实现,用于与基于gRPC的用户代码进行交互。

// Channel is a concrete AppChannel implementation for interacting with gRPC based user code
type Channel struct {
  // grpc 客户端连接
	client           *grpc.ClientConn
  // user code(应用)的地址
	baseAddress      string
  // 限流用的 go chan
	ch               chan int
	tracingSpec      config.TracingSpec
	appMetadataToken string
}

创建 Channel 结构体

// CreateLocalChannel creates a gRPC connection with user code
func CreateLocalChannel(port, maxConcurrency int, conn *grpc.ClientConn, spec config.TracingSpec) *Channel {
	c := &Channel{
		client:           conn,
    // baseAddress 就是 "ip:port"
		baseAddress:      fmt.Sprintf("%s:%d", channel.DefaultChannelAddress, port),
		tracingSpec:      spec,
		appMetadataToken: auth.GetAppToken(),
	}
	if maxConcurrency > 0 {
    // 如果有并发控制要求,则创建用于并发控制的go channel
		c.ch = make(chan int, maxConcurrency)
	}
	return c
}

GetBaseAddress 方法

// GetBaseAddress returns the application base address
func (g *Channel) GetBaseAddress() string {
   return g.baseAddress
}

这个方法用来获取app的基础路径,可以拼接其他的字路径,如:

func (a *actorsRuntime) startAppHealthCheck(opts ...health.Option) {
	healthAddress := fmt.Sprintf("%s/healthz", a.appChannel.GetBaseAddress())
	ch := health.StartEndpointHealthCheck(healthAddress, opts...)
	......
}

备注:只有 actor 这一个地方用到了这个方法

InvokeMethod 方法

InvokeMethod 方法通过 gRPC 调用 user code:

// InvokeMethod invokes user code via gRPC
func (g *Channel) InvokeMethod(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
   var rsp *invokev1.InvokeMethodResponse
   var err error

   switch req.APIVersion() {
   case internalv1pb.APIVersion_V1:
      // 目前只支持 v1 版本
      rsp, err = g.invokeMethodV1(ctx, req)

   default:
      // Reject unsupported version
      // 其他版本会被拒绝
      rsp = nil
      err = status.Error(codes.Unimplemented, fmt.Sprintf("Unsupported spec version: %d", req.APIVersion()))
   }

   return rsp, err
}

invokeMethodV1() 的实现

// invokeMethodV1 calls user applications using daprclient v1
func (g *Channel) invokeMethodV1(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
   if g.ch != nil {
      // 往 ch 里面发一个int,等价于当前并发数量 + 1
      g.ch <- 1
   }

   // 创建一个 app callback 的 client
   clientV1 := runtimev1pb.NewAppCallbackClient(g.client)
   // 将内部 metadata 转为 grpc 的 metadata
   grpcMetadata := invokev1.InternalMetadataToGrpcMetadata(ctx, req.Metadata(), true)

   if g.appMetadataToken != "" {
      grpcMetadata.Set(auth.APITokenHeader, g.appMetadataToken)
   }

   // Prepare gRPC Metadata
   ctx = metadata.NewOutgoingContext(context.Background(), grpcMetadata)

   var header, trailer metadata.MD
   // 调用user code
   resp, err := clientV1.OnInvoke(ctx, req.Message(), grpc.Header(&header), grpc.Trailer(&trailer))

   if g.ch != nil {
      // 从 ch 中读取一个int,等价于当前并发数量 - 1
      // 但这个操作并没有额外保护,如果上面的代码发生 panic,岂不是这个计数器就出错了?
      // 考虑把这个操作放在 deffer 中进行会比较安全
      <-g.ch
   }

   var rsp *invokev1.InvokeMethodResponse
   if err != nil {
      // Convert status code
      respStatus := status.Convert(err)
      // Prepare response
      rsp = invokev1.NewInvokeMethodResponse(int32(respStatus.Code()), respStatus.Message(), respStatus.Proto().Details)
   } else {
      rsp = invokev1.NewInvokeMethodResponse(int32(codes.OK), "", nil)
   }

   rsp.WithHeaders(header).WithTrailers(trailer)

   return rsp.WithMessage(resp), nil
}

使用这个方法的地方有:

  • actor 的 callLocalActor() 和 deactivateActor()
  • Grpc api 中的 CallLocal()
  • messaging 中 direct_message 的 invokeLocal()
  • runtime中
    • getConfigurationHTTP()
    • isAppSubscribedToBinding()
    • publishMessageHTTP()
    • sendBindingEventToApp()

5 - Components的源码学习

Dapr Components的源码学习

5.1 - Binding组件的源码学习

Dapr Binding组件的源码学习

5.2 - Middleware组件的源码学习

Dapr Middleware组件的源码学习

5.3 - NameResolution组件的源码学习

Dapr NameResolution组件的源码学习

5.4 - PubSub组件的源码学习

Dapr PubSub组件的源码学习

5.5 - SecretStores组件的源码学习

Dapr SecretStores组件的源码学习

5.6 - Store组件的源码学习

Dapr Store组件的源码学习

5.7 - workflow组件的源码学习

Dapr workflow组件的源码学习

5.7.1 - registry.go的源码学习

注册 workflow 组件

结构体定义

Registry 结构体

Registry 结构体是用来注册返回工作流实现的组件接口

import (
	wfs "github.com/dapr/components-contrib/workflows"
)
// Registry is an interface for a component that returns registered state store implementations.
type Registry struct {
	Logger             logger.Logger
	workflowComponents map[string]func(logger.Logger) wfs.Workflow
}

这里的 Workflow 在 components-contrib 中定义。

默认Registry

默认Registry的创建

package 中定义了一个 默认Registry, singleton, 还是 public的:

// DefaultRegistry is the singleton with the registry .
var DefaultRegistry *Registry = NewRegistry()

// NewRegistry is used to create workflow registry.
func NewRegistry() *Registry {
	return &Registry{
		workflowComponents: map[string]func(logger.Logger) wfs.Workflow{},
	}
}

RegisterComponent() 方法

RegisterComponent() 方法在在 register 结构体的 workflowComponents 字段中加入一条或多条记录

func (s *Registry) RegisterComponent(componentFactory func(logger.Logger) wfs.Workflow, names ...string) {
	for _, name := range names {
		s.workflowComponents[createFullName(name)] = componentFactory
	}
}

func createFullName(name string) string {
	return strings.ToLower("workflow." + name)
}

key 是 "workflow." + name 转小写, value 是传入的 componentFactory,这是一个函数,只要传入一个 logger,就能返回 Workflow 实现。

create() 方法

create() 方法根据指定的 name ,version 来构建对应的 workflow 实现:

func (s *Registry) Create(name, version, logName string) (wfs.Workflow, error) {
	if method, ok := s.getWorkflowComponent(name, version, logName); ok {
		return method(), nil
	}
	return nil, fmt.Errorf("couldn't find wokflow %s/%s", name, version)
}

关键实现代码在 getWorkflowComponent() 方法中:

func (s *Registry) getWorkflowComponent(name, version, logName string) (func() wfs.Workflow, bool) {
	nameLower := strings.ToLower(name)
	versionLower := strings.ToLower(version)
    // 用 nameLower+"/"+versionLower 拼接出 key
    // 然后在 register 结构体的 workflowComponents 字段中查找
    // TODO: 保存的时候是 key 是 `"workflow." + name` 转小写
	workflowFn, ok := s.workflowComponents[nameLower+"/"+versionLower]
	if ok {
		return s.wrapFn(workflowFn, logName), true
	}
    // 如果没有找到,看看是不是 InitialVersion
	if components.IsInitialVersion(versionLower) {
        // 如果是 InitialVersion,则不需要拼接 version 内容,直接通过 name 来查找
        // TODO:这要求 name 必须是 "workflow." 开头?
		workflowFn, ok = s.workflowComponents[nameLower]
		if ok {
			return s.wrapFn(workflowFn, logName), true
		}
	}
	return nil, false
}

如果有在 workflowComponents 字段中找到注册的 workflow 实现的 factory, 则用这个 factory 生成 workflow 的实现:

func (s *Registry) wrapFn(componentFactory func(logger.Logger) wfs.Workflow, logName string) func() wfs.Workflow {
	return func() wfs.Workflow {
        // registey 的 logger 会被用来做 workflow 实现的 logger
		l := s.Logger
		if logName != "" && l != nil {
            // 在 logger 中增加 component 字段,值为 logName
			l = l.WithFields(map[string]any{
				"component": logName,
			})
		}
        // 最后调用 factory 的方法来构建 workflow 实现
		return componentFactory(l)
	}
}

总结

需要小心核对 key 的内容:

  1. 是否带 “workflow.” 前缀
  2. 是否带version 或者 是否是 InitialVersion

6 - Healthz的源码学习

Dapr Healthz的源码学习

6.1 - health.go的源码学习

health checking的客户端实现

Dapr health package中的 health.go 文件的源码分析,health checking的客户端实现

代码实现

Option 方法定义

// Option is an a function that applies a health check option
type Option func(o *healthCheckOptions)

healthCheckOptions 结构体定义

healthCheckOptions 结构体

type healthCheckOptions struct {
	initialDelay      time.Duration
	requestTimeout    time.Duration
	failureThreshold  int
	interval          time.Duration
	successStatusCode int
}

With系列方法

WithXxx 方法用来设置上述5个健康检查的选项,每个方法都返回一个 Option 函数:

// WithInitialDelay sets the initial delay for the health check
func WithInitialDelay(delay time.Duration) Option {
	return func(o *healthCheckOptions) {
		o.initialDelay = delay
	}
}

// WithFailureThreshold sets the failure threshold for the health check
func WithFailureThreshold(threshold int) Option {
	return func(o *healthCheckOptions) {
		o.failureThreshold = threshold
	}
}

// WithRequestTimeout sets the request timeout for the health check
func WithRequestTimeout(timeout time.Duration) Option {
	return func(o *healthCheckOptions) {
		o.requestTimeout = timeout
	}
}

// WithSuccessStatusCode sets the status code for the health check
func WithSuccessStatusCode(code int) Option {
	return func(o *healthCheckOptions) {
		o.successStatusCode = code
	}
}

// WithInterval sets the interval for the health check
func WithInterval(interval time.Duration) Option {
	return func(o *healthCheckOptions) {
		o.interval = interval
	}
}

StartEndpointHealthCheck 方法

StartEndpointHealthCheck 方法用给定的选项在指定的地址上启动健康检查。它返回一个通道,如果端点是健康的则发出true,如果满足失败条件则发出false。

// StartEndpointHealthCheck starts a health check on the specified address with the given options.
// It returns a channel that will emit true if the endpoint is healthy and false if the failure conditions
// Have been met.
func StartEndpointHealthCheck(endpointAddress string, opts ...Option) chan bool {
	options := &healthCheckOptions{}
	applyDefaults(options)

   // 执行每个 Option 函数来设置健康检查的选项
	for _, o := range opts {
		o(options)
	}
	signalChan := make(chan bool, 1)

	go func(ch chan<- bool, endpointAddress string, options *healthCheckOptions) {
      // 设置健康检查的间隔时间 interval,默认5秒一次
		ticker := time.NewTicker(options.interval)
		failureCount := 0
      // 先 sleep initialDelay 时间再开始健康检查
		time.Sleep(options.initialDelay)

      // 创建 http client,设置请求超时时间为 requestTimeout
		client := &fasthttp.Client{
			MaxConnsPerHost:           5, // Limit Keep-Alive connections
			ReadTimeout:               options.requestTimeout,
			MaxIdemponentCallAttempts: 1,
		}

		req := fasthttp.AcquireRequest()
		req.SetRequestURI(endpointAddress)
		req.Header.SetMethod(fasthttp.MethodGet)
		defer fasthttp.ReleaseRequest(req)

		for range ticker.C {
			resp := fasthttp.AcquireResponse()
			err := client.DoTimeout(req, resp, options.requestTimeout)
         // 通过检查应答的状态码来判断健康检查是否成功: successStatusCode
			if err != nil || resp.StatusCode() != options.successStatusCode {
            // 健康检查失败,错误计数器加一
				failureCount++
            // 如果连续错误次数达到阈值 failureThreshold,则视为健康检查失败,发送false到channel
				if failureCount == options.failureThreshold {
					ch <- false
				}
			} else {
            // 健康检查成功,发送 true 到 channel
				ch <- true
            // 同时重制 failureCount
				failureCount = 0
			}
			fasthttp.ReleaseResponse(resp)
		}
	}(signalChan, endpointAddress, options)
	return signalChan
}

applyDefaults() 方法设置默认属性:

const (
	initialDelay      = time.Second * 1
	failureThreshold  = 2
	requestTimeout    = time.Second * 2
	interval          = time.Second * 5
	successStatusCode = 200
)

func applyDefaults(o *healthCheckOptions) {
   o.failureThreshold = failureThreshold
   o.initialDelay = initialDelay
   o.requestTimeout = requestTimeout
   o.successStatusCode = successStatusCode
   o.interval = interval
}

健康检查方式总结

对某一个给定地址 endpointAddress 进行健康检查的步骤和方式为:

  1. 先 sleep initialDelay 时间再开始健康检查:可能对方还在初始化过程中
  2. 每隔间隔时间 interval 时间发起一次健康检查
  3. 每次健康检查是向目标地址 endpointAddress 发起一个 HTTP GET 请求,超时时间为 requestTimeout
  4. 检查应答判断是否健康
    • 返回应答并且应答的状态码是 successStatusCode 则视为本地健康检查成功
    • 超时或者应答的状态码不是 successStatusCode 则视为本地健康检查失败
  5. 如果失败则开始累加计数器,然后间隔 interval 时间之后再次进行健康检查
  6. 如果多次失败,累计达到阈值 failureThreshold,报告为健康检查失败
  7. 只要单次成功,则清理之前的错误累计次数,报告为健康检查成功。

6.2 - server.go的源码学习

healthz server的实现

Dapr health package中的 server.go 文件的源码分析,healthz server的实现

代码实现

Health server

healthz server 的接口定义:

// Server is the interface for the healthz server
type Server interface {
	Run(context.Context, int) error
	Ready()
	NotReady()
}

server 结构体,ready 字段保存状态:

type server struct {
	ready bool
	log   logger.Logger
}

创建 healthz server的方法:

// NewServer returns a new healthz server
func NewServer(log logger.Logger) Server {
   return &server{
      log: log,
   }
}

设置 ready 状态的两个方法:

// Ready sets a ready state for the endpoint handlers
func (s *server) Ready() {
   s.ready = true
}

// NotReady sets a not ready state for the endpoint handlers
func (s *server) NotReady() {
   s.ready = false
}

运行healthz server

Run 方法启动一个带有 healthz 端点的 http 服务器,端口通过参数 port 指定:

// Run starts a net/http server with a healthz endpoint
func (s *server) Run(ctx context.Context, port int) error {
   router := http.NewServeMux()
   router.Handle("/healthz", s.healthz())

   srv := &http.Server{
      Addr:    fmt.Sprintf(":%d", port),
      Handler: router,
   }
   ...
}

启动之后:

   doneCh := make(chan struct{})

   go func() {
      select {
      case <-ctx.Done():
         s.log.Info("Healthz server is shutting down")
         shutdownCtx, cancel := context.WithTimeout(
            context.Background(),
            time.Second*5,
         )
         defer cancel()
         srv.Shutdown(shutdownCtx) // nolint: errcheck
      case <-doneCh:
      }
   }()

   s.log.Infof("Healthz server is listening on %s", srv.Addr)
   err := srv.ListenAndServe()
   if err != http.ErrServerClosed {
      s.log.Errorf("Healthz server error: %s", err)
   }
   close(doneCh)
   return err
}

healthz server 处理请求

healthz() 方法是 health endpoint 的 handler,根据当前 healthz server 的 ready 字段的状态值返回 HTTP 状态码:

// healthz is a health endpoint handler
func (s *server) healthz() http.Handler {
   return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
      var status int
      if s.ready {
      	// ready 返回 200
         status = http.StatusOK
      } else {
         // 不 ready 则返回 503
         status = http.StatusServiceUnavailable
      }
      w.WriteHeader(status)
   })
}

使用场景

healthz server 在 injector / placement / sentry / operator 中都有使用,这些进程都是在 main 方法中启动 healthz server。

injector

injector 启动在 8080 端口:

const (
	healthzPort = 8080
)

func main() {
   ......
	go func() {
		healthzServer := health.NewServer(log)
		healthzServer.Ready()

		healthzErr := healthzServer.Run(ctx, healthzPort)
		if healthzErr != nil {
			log.Fatalf("failed to start healthz server: %s", healthzErr)
		}
	}()
	......
}

placement

placement 默认启动在 8080 端口(也可以通过命令行参数修改端口):

const (
	defaultHealthzPort       = 8080
)

func main() {
	flag.IntVar(&cfg.healthzPort, "healthz-port", cfg.healthzPort, "sets the HTTP port for the healthz server")
   ......
	go startHealthzServer(cfg.healthzPort)
	......
}

func startHealthzServer(healthzPort int) {
	healthzServer := health.NewServer(log)
	healthzServer.Ready()

	if err := healthzServer.Run(context.Background(), healthzPort); err != nil {
		log.Fatalf("failed to start healthz server: %s", err)
	}
}

sentry

sentry 启动在 8080 端口:

const (
	healthzPort = 8080
)

func main() {
   ......
	go func() {
		healthzServer := health.NewServer(log)
		healthzServer.Ready()

		err := healthzServer.Run(ctx, healthzPort)
		if err != nil {
			log.Fatalf("failed to start healthz server: %s", err)
		}
	}()
	......
}

operator

operator 启动在 8080 端口:

const (
	healthzPort = 8080
)

func main() {
   ......
	go func() {
		healthzServer := health.NewServer(log)
		healthzServer.Ready()

		err := healthzServer.Run(ctx, healthzPort)
		if err != nil {
			log.Fatalf("failed to start healthz server: %s", err)
		}
	}()
	......
}

darpd

特别指出:daprd 没有使用 healthz server,daprd 是直接在 dapr HTTP api 的基础上增加了 healthz 的功能。

具体代码在 http/api.go 中:

func NewAPI(......
   api.endpoints = append(api.endpoints, api.constructHealthzEndpoints()...)
	return api
}

func (a *api) constructHealthzEndpoints() []Endpoint {
   return []Endpoint{
      {
         Methods: []string{fasthttp.MethodGet},
         Route:   "healthz",
         Version: apiVersionV1,
         Handler: a.onGetHealthz,
      },
   }
}

onGetHealthz() 方法处理请求:

func (a *api) onGetHealthz(reqCtx *fasthttp.RequestCtx) {
   if !a.readyStatus {
      msg := NewErrorResponse("ERR_HEALTH_NOT_READY", messages.ErrHealthNotReady)
      respondWithError(reqCtx, fasthttp.StatusInternalServerError, msg)
      log.Debug(msg)
   } else {
      respondEmpty(reqCtx)
   }
}

func respondEmpty(ctx *fasthttp.RequestCtx) {
	ctx.Response.SetBody(nil)
	ctx.Response.SetStatusCode(fasthttp.StatusNoContent)
}

注意:这里成功时返回的状态码是 204 StatusNoContent,而不是通常的 200 OK。

7 - Metrics的源码学习

Dapr Metrics的源码学习

7.1 - exporter.go的源码学习

Exporter 是用于 metrics 导出器的接口,当前只支持 Prometheus

Dapr metrics package中的 exporter.go文件的源码分析,包括结构体定义、方法实现。当前只支持 Prometheus。

Exporter定义和实现

Exporter 接口定义

Exporter 接口定义:

// Exporter is the interface for metrics exporters
type Exporter interface {
	// Init initializes metrics exporter
	Init() error
	// Options returns Exporter options
	Options() *Options
}

exporter 结构体定义

exporter 结构体定义:

// exporter is the base struct
type exporter struct {
	namespace string
	options   *Options
	logger    logger.Logger
}

构建 exporter

// NewExporter creates new MetricsExporter instance
func NewExporter(namespace string) Exporter {
	// TODO: support multiple exporters
	return &promMetricsExporter{
		&exporter{
			namespace: namespace,
			options:   defaultMetricOptions(),
			logger:    logger.NewLogger("dapr.metrics"),
		},
		nil,
	}
}

当前只支持 promMetrics 的 Exporter。

接口方法Options()的实现

Options() 方法简单返回 m.options:

// Options returns current metric exporter options
func (m *exporter) Options() *Options {
	return m.options
}

具体的赋值在 defaultMetricOptions().

Prometheus Exporter的实现

promMetricsExporter 结构体定义

// promMetricsExporter is prometheus metric exporter
type promMetricsExporter struct {
	*exporter
	ocExporter *ocprom.Exporter
}

内嵌 exporter (相当于继承),还有一个 ocprom.Exporter 字段。

接口方法 Init() 的实现

初始化 opencensus 的 exporter:


// Init initializes opencensus exporter
func (m *promMetricsExporter) Init() error {
	if !m.exporter.Options().MetricsEnabled {
		return nil
	}

	// Add default health metrics for process
	
	// 添加默认的 health metrics: 进程信息,和 go 信息
	registry := prom.NewRegistry()
	registry.MustRegister(prom.NewProcessCollector(prom.ProcessCollectorOpts{}))
	registry.MustRegister(prom.NewGoCollector())

	var err error
	m.ocExporter, err = ocprom.NewExporter(ocprom.Options{
		Namespace: m.namespace,
		Registry:  registry,
	})

	if err != nil {
		return errors.Errorf("failed to create Prometheus exporter: %v", err)
	}

	// register exporter to view
	view.RegisterExporter(m.ocExporter)

	// start metrics server
	return m.startMetricServer()
}

startMetricServer() 方法的实现

启动 MetricServer, 监听端口来自 options 的 MetricsPort,监听路径为 defaultMetricsPath:


const (
	defaultMetricsPath     = "/"
)

// startMetricServer starts metrics server
func (m *promMetricsExporter) startMetricServer() error {
	if !m.exporter.Options().MetricsEnabled {
		// skip if metrics is not enabled
		return nil
	}

	addr := fmt.Sprintf(":%d", m.options.MetricsPort())

	if m.ocExporter == nil {
		return errors.New("exporter was not initialized")
	}

	m.exporter.logger.Infof("metrics server started on %s%s", addr, defaultMetricsPath)
	go func() {
		mux := http.NewServeMux()
		mux.Handle(defaultMetricsPath, m.ocExporter)

		if err := http.ListenAndServe(addr, mux); err != nil {
			m.exporter.logger.Fatalf("failed to start metrics server: %v", err)
		}
	}()

	return nil
}

7.2 - options.go的源码学习

metrics 相关的配置选项

Dapr metrics package中的 options.go文件的源码学习

代码实现

Options 结构体定义

// Options defines the sets of options for Dapr logging
type Options struct {
	// OutputLevel is the level of logging
	MetricsEnabled bool

	metricsPort string
}

默认值

metrics 默认端口 9090, 默认启用 metrics:

const (
	defaultMetricsPort    = "9090"
	defaultMetricsEnabled = true
)

func defaultMetricOptions() *Options {
	return &Options{
		metricsPort:    defaultMetricsPort,
		MetricsEnabled: defaultMetricsEnabled,
	}
}

MetricsPort() 方法实现

MetricsPort() 方法用于获取 metrics 端口,如果配置错误,则使用默认端口 9090:

// MetricsPort gets metrics port.
func (o *Options) MetricsPort() uint64 {
	port, err := strconv.ParseUint(o.metricsPort, 10, 64)
	if err != nil {
		// Use default metrics port as a fallback
		port, _ = strconv.ParseUint(defaultMetricsPort, 10, 64)
	}

	return port
}

解析命令行标记的方法

AttachCmdFlags() 方法

AttachCmdFlags() 方法解析 metrics-port 和 enable-metrics 两个命令行标记:

// AttachCmdFlags attaches metrics options to command flags
func (o *Options) AttachCmdFlags(
	stringVar func(p *string, name string, value string, usage string),
	boolVar func(p *bool, name string, value bool, usage string)) {
	stringVar(
		&o.metricsPort,
		"metrics-port",
		defaultMetricsPort,
		"The port for the metrics server")
	boolVar(
		&o.MetricsEnabled,
		"enable-metrics",
		defaultMetricsEnabled,
		"Enable prometheus metric")
}

AttachCmdFlag() 方法

AttachCmdFlag() 方法只解析 metrics-port 命令行标记(不解析 enable-metrics ) :

// AttachCmdFlag attaches single metrics option to command flags
func (o *Options) AttachCmdFlag(
	stringVar func(p *string, name string, value string, usage string)) {
	stringVar(
		&o.metricsPort,
		"metrics-port",
		defaultMetricsPort,
		"The port for the metrics server")
}

使用场景

只解析 metrics-port 命令行标记 的 AttachCmdFlag() 方法在 dapr runtime 启动时被调用(也只被这一个地方调用):

metricsExporter := metrics.NewExporter(metrics.DefaultMetricNamespace)

// attaching only metrics-port option
metricsExporter.Options().AttachCmdFlag(flag.StringVar)

而解析 metrics-port 和 enable-metrics 两个命令行标记的 AttachCmdFlags() 方法被 injector / operator / placement / sentry 调用:

func init() {
	metricsExporter := metrics.NewExporter(metrics.DefaultMetricNamespace)
	metricsExporter.Options().AttachCmdFlags(flag.StringVar, flag.BoolVar)
}

8 - workflow的源码

Dapr workflow的源码

8.1 - workflow API

Dapr workflow的API定义

proto 定义

dapr/proto/runtime/v1/dapr.proto

service Dapr {
  // Starts a new instance of a workflow
  rpc StartWorkflowAlpha1 (StartWorkflowRequest) returns (StartWorkflowResponse) {}

  // Gets details about a started workflow instance
  rpc GetWorkflowAlpha1 (GetWorkflowRequest) returns (GetWorkflowResponse) {}

  // Purge Workflow
  rpc PurgeWorkflowAlpha1 (PurgeWorkflowRequest) returns (google.protobuf.Empty) {}

  // Terminates a running workflow instance
  rpc TerminateWorkflowAlpha1 (TerminateWorkflowRequest) returns (google.protobuf.Empty) {}

  // Pauses a running workflow instance
  rpc PauseWorkflowAlpha1 (PauseWorkflowRequest) returns (google.protobuf.Empty) {}

  // Resumes a paused workflow instance
  rpc ResumeWorkflowAlpha1 (ResumeWorkflowRequest) returns (google.protobuf.Empty) {}

  // Raise an event to a running workflow instance
  rpc RaiseEventWorkflowAlpha1 (RaiseEventWorkflowRequest) returns (google.protobuf.Empty) {}
}

workflow 没有 sidecar 往应用方向发请求的场景,也就是没有 appcallback 。

生成的 go 代码

pkg/proto/runtime/v1 下存放的是根据 proto 生成的 go 代码

比如 pkg/proto/runtime/v1/dapr_grpc.pb.go

8.2 - workflow HTTP API

Dapr workflow的HTTP API实现

pkg/http/api.go

构建workflow的endpoint

const (
    workflowComponent        = "workflowComponent"
	workflowName             = "workflowName"
)

func NewAPI(opts APIOpts) API {
	api := &api{
        ......
	api.endpoints = append(api.endpoints, api.constructWorkflowEndpoints()...)
	return api
}
    

constructWorkflowEndpoints() 方法的实现在 pkg/http/api_workflow.go 中:

func (a *api) constructWorkflowEndpoints() []Endpoint {
	return []Endpoint{
		{
			Methods: []string{http.MethodGet},
			Route:   "workflows/{workflowComponent}/{instanceID}",
			Version: apiVersionV1alpha1,
			Handler: a.onGetWorkflowHandler(),
		},
		{
			Methods: []string{http.MethodPost},
			Route:   "workflows/{workflowComponent}/{instanceID}/raiseEvent/{eventName}",
			Version: apiVersionV1alpha1,
			Handler: a.onRaiseEventWorkflowHandler(),
		},
		{
			Methods: []string{http.MethodPost},
			Route:   "workflows/{workflowComponent}/{workflowName}/start",
			Version: apiVersionV1alpha1,
			Handler: a.onStartWorkflowHandler(),
		},
		{
			Methods: []string{http.MethodPost},
			Route:   "workflows/{workflowComponent}/{instanceID}/pause",
			Version: apiVersionV1alpha1,
			Handler: a.onPauseWorkflowHandler(),
		},
		{
			Methods: []string{http.MethodPost},
			Route:   "workflows/{workflowComponent}/{instanceID}/resume",
			Version: apiVersionV1alpha1,
			Handler: a.onResumeWorkflowHandler(),
		},
		{
			Methods: []string{http.MethodPost},
			Route:   "workflows/{workflowComponent}/{instanceID}/terminate",
			Version: apiVersionV1alpha1,
			Handler: a.onTerminateWorkflowHandler(),
		},
		{
			Methods: []string{http.MethodPost},
			Route:   "workflows/{workflowComponent}/{instanceID}/purge",
			Version: apiVersionV1alpha1,
			Handler: a.onPurgeWorkflowHandler(),
		},
	}
}

handler 实现

pkg/http/api_workflow.go

onStartWorkflowHandler()

// Route:   "workflows/{workflowComponent}/{workflowName}/start?instanceID={instanceID}",
// Workflow Component: Component specified in yaml
// Workflow Name: Name of the workflow to run
// Instance ID: Identifier of the specific run
func (a *api) onStartWorkflowHandler() http.HandlerFunc {
	return UniversalHTTPHandler(
		a.universal.StartWorkflowAlpha1,
        // UniversalHTTPHandlerOpts 是范型结构体
		UniversalHTTPHandlerOpts[*runtimev1pb.StartWorkflowRequest, *runtimev1pb.StartWorkflowResponse]{
			// We pass the input body manually rather than parsing it using protojson
			SkipInputBody: true,
			InModifier: func(r *http.Request, in *runtimev1pb.StartWorkflowRequest) (*runtimev1pb.StartWorkflowRequest, error) {
				in.WorkflowName = chi.URLParam(r, workflowName)
				in.WorkflowComponent = chi.URLParam(r, workflowComponent)

                // instance id 是可选的,如果没有指定则生成一个随机的
				// The instance ID is optional. If not specified, we generate a random one.
				in.InstanceId = r.URL.Query().Get(instanceID)
				if in.InstanceId == "" {
					randomID, err := uuid.NewRandom()
					if err != nil {
						return nil, err
					}
					in.InstanceId = randomID.String()
				}

                // HTTP request body 直接用来做 workflow 的 Input
				// We accept the HTTP request body as the input to the workflow
				// without making any assumptions about its format.
				var err error
				in.Input, err = io.ReadAll(r.Body)
				if err != nil {
					return nil, messages.ErrBodyRead.WithFormat(err)
				}
				return in, nil
			},
			SuccessStatusCode: http.StatusAccepted,
		})
}

onGetWorkflowHandler()

// Route: POST "workflows/{workflowComponent}/{instanceID}"
func (a *api) onGetWorkflowHandler() http.HandlerFunc {
	return UniversalHTTPHandler(
		a.universal.GetWorkflowAlpha1,
		UniversalHTTPHandlerOpts[*runtimev1pb.GetWorkflowRequest, *runtimev1pb.GetWorkflowResponse]{
			InModifier: workflowInModifier[*runtimev1pb.GetWorkflowRequest],
		})
}

workflowInModifier() 方法是通用方法,读取 WorkflowComponent 和 InstanceId 两个参数:

// Shared InModifier method for all universal handlers for workflows that adds the "WorkflowComponent" and "InstanceId" properties
func workflowInModifier[T runtimev1pb.WorkflowRequests](r *http.Request, in T) (T, error) {
	in.SetWorkflowComponent(chi.URLParam(r, workflowComponent))
	in.SetInstanceId(chi.URLParam(r, instanceID))
	return in, nil
}

8.3 - workflow gRPC API

Dapr workflow的gRPC API实现

proto 定义

dapr/proto/runtime/v1/dapr.proto

service Dapr {
  // Starts a new instance of a workflow
  rpc StartWorkflowAlpha1 (StartWorkflowRequest) returns (StartWorkflowResponse) {}

  // Gets details about a started workflow instance
  rpc GetWorkflowAlpha1 (GetWorkflowRequest) returns (GetWorkflowResponse) {}

  // Purge Workflow
  rpc PurgeWorkflowAlpha1 (PurgeWorkflowRequest) returns (google.protobuf.Empty) {}

  // Terminates a running workflow instance
  rpc TerminateWorkflowAlpha1 (TerminateWorkflowRequest) returns (google.protobuf.Empty) {}

  // Pauses a running workflow instance
  rpc PauseWorkflowAlpha1 (PauseWorkflowRequest) returns (google.protobuf.Empty) {}

  // Resumes a paused workflow instance
  rpc ResumeWorkflowAlpha1 (ResumeWorkflowRequest) returns (google.protobuf.Empty) {}

  // Raise an event to a running workflow instance
  rpc RaiseEventWorkflowAlpha1 (RaiseEventWorkflowRequest) returns (google.protobuf.Empty) {}
}

workflow 没有 sidecar 往应用方向发请求的场景,也就是没有 appcallback 。

生成的 go 代码

pkg/proto/runtime/v1 下存放的是根据 proto 生成的 go 代码

比如 pkg/proto/runtime/v1/dapr_grpc.pb.go

9 - 状态管理的源码

Dapr状态管理的源码

9.1 - 状态管理源码的概述

Dapr状态管理源码的概述

状态管理的源码

9.2 - 状态管理的初始化源码分析

Dapr状态管理的初始化源码分析

State Store Registry

stateStoreRegistry的初始化准备

stateStoreRegistry Registry 的初始化在 runtime 初始化时进行:

func NewDaprRuntime(runtimeConfig *Config, globalConfig *config.Configuration) *DaprRuntime {
  ......
  stateStoreRegistry:     state_loader.NewRegistry(),
}

func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {	
  ......
  a.stateStoreRegistry.Register(opts.states...)
  ......
}

这些 opts 来自 runtime 启动时的配置,如 cmd/daprd/main.go 下:

func main() {
	rt, err := runtime.FromFlags()
	if err != nil {
		log.Fatal(err)
	}

	err = rt.Run(
    ......
    runtime.WithStates(
			state_loader.New("redis", func() state.Store {
				return state_redis.NewRedisStateStore(logContrib)
			}),
			state_loader.New("consul", func() state.Store {
				return consul.NewConsulStateStore(logContrib)
			}),
			state_loader.New("azure.blobstorage", func() state.Store {
				return state_azure_blobstorage.NewAzureBlobStorageStore(logContrib)
			}),
			state_loader.New("azure.cosmosdb", func() state.Store {
				return state_cosmosdb.NewCosmosDBStateStore(logContrib)
			}),
			state_loader.New("azure.tablestorage", func() state.Store {
				return state_azure_tablestorage.NewAzureTablesStateStore(logContrib)
			}),
			//state_loader.New("etcd", func() state.Store {
			//	return etcd.NewETCD(logContrib)
			//}),
			state_loader.New("cassandra", func() state.Store {
				return cassandra.NewCassandraStateStore(logContrib)
			}),
			state_loader.New("memcached", func() state.Store {
				return memcached.NewMemCacheStateStore(logContrib)
			}),
			state_loader.New("mongodb", func() state.Store {
				return mongodb.NewMongoDB(logContrib)
			}),
			state_loader.New("zookeeper", func() state.Store {
				return zookeeper.NewZookeeperStateStore(logContrib)
			}),
			state_loader.New("gcp.firestore", func() state.Store {
				return firestore.NewFirestoreStateStore(logContrib)
			}),
			state_loader.New("postgresql", func() state.Store {
				return postgresql.NewPostgreSQLStateStore(logContrib)
			}),
			state_loader.New("sqlserver", func() state.Store {
				return sqlserver.NewSQLServerStateStore(logContrib)
			}),
			state_loader.New("hazelcast", func() state.Store {
				return hazelcast.NewHazelcastStore(logContrib)
			}),
			state_loader.New("cloudstate.crdt", func() state.Store {
				return cloudstate.NewCRDT(logContrib)
			}),
			state_loader.New("couchbase", func() state.Store {
				return couchbase.NewCouchbaseStateStore(logContrib)
			}),
			state_loader.New("aerospike", func() state.Store {
				return aerospike.NewAerospikeStateStore(logContrib)
			}),
		),
    ......
}

在这里配置各种 state store 的实现。

State Store Registry的实现方式

pkg/components/state/registry.go,定义了registry的接口和数据结构:

// Registry is an interface for a component that returns registered state store implementations
type Registry interface {
	Register(components ...State)
	CreateStateStore(name string) (state.Store, error)
}

type stateStoreRegistry struct {
	stateStores map[string]func() state.Store
}

state.Store 是 dapr 定义的标准 state store的接口,所有的实现都要遵循这个接口。定义在 github.com/dapr/components-contrib/state/store.go 文件中:

// Store is an interface to perform operations on store
type Store interface {
	Init(metadata Metadata) error
	Delete(req *DeleteRequest) error
	BulkDelete(req []DeleteRequest) error
	Get(req *GetRequest) (*GetResponse, error)
	Set(req *SetRequest) error
	BulkSet(req []SetRequest) error
}

前面 runtime 初始化时,每个实现都通过 New 方法将 name 和对应的 state store 关联起来:

type State struct {
	Name          string
	FactoryMethod func() state.Store
}

func New(name string, factoryMethod func() state.Store) State {
	return State{
		Name:          name,
		FactoryMethod: factoryMethod,
	}
}

State Store的初始化流程

pkg/runtime/runtime.go :

State 的初始化在 runtime 初始化时进行:

func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
	......
	go a.processComponents()
	......
}
func (a *DaprRuntime) processComponents() {
   for {
      comp, more := <-a.pendingComponents
      if !more {
         a.pendingComponentsDone <- true
         return
      }
      if err := a.processOneComponent(comp); err != nil {
         log.Errorf("process component %s error, %s", comp.Name, err)
      }
   }
}

processOneComponent:

func (a *DaprRuntime) processOneComponent(comp components_v1alpha1.Component) error {
	res := a.preprocessOneComponent(&comp)
  
	compCategory := a.figureOutComponentCategory(comp)

	......
	return nil
}

doProcessOneComponent:

func (a *DaprRuntime) doProcessOneComponent(category ComponentCategory, comp components_v1alpha1.Component) error {
	switch category {
	case stateComponent:
		return a.initState(comp)
	}
		......
	return nil
}

initState方法的实现:

// Refer for state store api decision  https://github.com/dapr/dapr/blob/master/docs/decision_records/api/API-008-multi-state-store-api-design.md
func (a *DaprRuntime) initState(s components_v1alpha1.Component) error {
	// 构建 state store(这里才开始集成components的代码)
	store, err := a.stateStoreRegistry.CreateStateStore(s.Spec.Type)
	if err != nil {
		log.Warnf("error creating state store %s: %s", s.Spec.Type, err)
		diag.DefaultMonitoring.ComponentInitFailed(s.Spec.Type, "creation")
		return err
	}
	if store != nil {
		props := a.convertMetadataItemsToProperties(s.Spec.Metadata)
		// components的store实现在这里做初始化,如建连
		err := store.Init(state.Metadata{
			Properties: props,
		})
		if err != nil {
			diag.DefaultMonitoring.ComponentInitFailed(s.Spec.Type, "init")
			log.Warnf("error initializing state store %s: %s", s.Spec.Type, err)
			return err
		}

		// 将初始化完成的store实现存放在runtime中
		a.stateStores[s.ObjectMeta.Name] = store

		// set specified actor store if "actorStateStore" is true in the spec.
		actorStoreSpecified := props[actorStateStore]
		if actorStoreSpecified == "true" {
			if a.actorStateStoreCount++; a.actorStateStoreCount == 1 {
				a.actorStateStoreName = s.ObjectMeta.Name
			}
		}
		diag.DefaultMonitoring.ComponentInitialized(s.Spec.Type)
	}

	if a.actorStateStoreName == "" || a.actorStateStoreCount != 1 {
		log.Warnf("either no actor state store or multiple actor state stores are specified in the configuration, actor stores specified: %d", a.actorStateStoreCount)
	}

	return nil
}

其中 CreateStateStore 方法的实现在 pkg/components/state/registry.go 中:

func (s *stateStoreRegistry) CreateStateStore(name string) (state.Store, error) {
	if method, ok := s.stateStores[name]; ok {
		return method(), nil
	}
	return nil, errors.Errorf("couldn't find state store %s", name)
}

9.3 - 状态管理的runtime处理源码分析

Dapr状态管理的runtime处理源码分析

runtime 处理 state 请求的代码在 pkg/grpc/api.go 中。

get state

func (a *api) GetState(ctx context.Context, in *runtimev1pb.GetStateRequest) (*runtimev1pb.GetStateResponse, error) {
  // 找 store name 对应的 state store
  // 所以请求里面的 store name,必须对应 yaml 文件里面的 name
	store, err := a.getStateStore(in.StoreName)
	if err != nil {
		apiServerLogger.Debug(err)
		return &runtimev1pb.GetStateResponse{}, err
	}

	req := state.GetRequest{
		Key:      a.getModifiedStateKey(in.Key),
		Metadata: in.Metadata,
		Options: state.GetStateOption{
			Consistency: stateConsistencyToString(in.Consistency),
		},
	}

  // 执行查询
  // 里面实际上会先执行 HGETALL 命令,失败后再执行 GET 命令
	getResponse, err := store.Get(&req)
	if err != nil {
		err = fmt.Errorf("ERR_STATE_GET: %s", err)
		apiServerLogger.Debug(err)
		return &runtimev1pb.GetStateResponse{}, err
	}

	response := &runtimev1pb.GetStateResponse{}
	if getResponse != nil {
		response.Etag = getResponse.ETag
		response.Data = getResponse.Data
	}
	return response, nil
}

get bulk state

get bulk 方法的实现是有 runtime 封装 get 方法而成,底层 state store 只需要实现单个查询的 get 即可。

func (a *api) GetBulkState(ctx context.Context, in *runtimev1pb.GetBulkStateRequest) (*runtimev1pb.GetBulkStateResponse, error) {
   store, err := a.getStateStore(in.StoreName)
   if err != nil {
      apiServerLogger.Debug(err)
      return &runtimev1pb.GetBulkStateResponse{}, err
   }

   resp := &runtimev1pb.GetBulkStateResponse{}
   // 如果 Parallelism <= 0,则取默认值100
   limiter := concurrency.NewLimiter(int(in.Parallelism))

   for _, k := range in.Keys {
      fn := func(param interface{}) {
         req := state.GetRequest{
            Key:      a.getModifiedStateKey(param.(string)),
            Metadata: in.Metadata,
         }

         r, err := store.Get(&req)
         item := &runtimev1pb.BulkStateItem{
            Key: param.(string),
         }
         if err != nil {
            item.Error = err.Error()
         } else if r != nil {
            item.Data = r.Data
            item.Etag = r.ETag
         }
         resp.Items = append(resp.Items, item)
      }

      limiter.Execute(fn, k)
   }
   limiter.Wait()

   return resp, nil
}

save state

func (a *api) SaveState(ctx context.Context, in *runtimev1pb.SaveStateRequest) (*empty.Empty, error) {
   store, err := a.getStateStore(in.StoreName)
   if err != nil {
      apiServerLogger.Debug(err)
      return &empty.Empty{}, err
   }

   reqs := []state.SetRequest{}
   for _, s := range in.States {
      req := state.SetRequest{
         Key:      a.getModifiedStateKey(s.Key),
         Metadata: s.Metadata,
         Value:    s.Value,
         ETag:     s.Etag,
      }
      if s.Options != nil {
         req.Options = state.SetStateOption{
            Consistency: stateConsistencyToString(s.Options.Consistency),
            Concurrency: stateConcurrencyToString(s.Options.Concurrency),
         }
      }
      reqs = append(reqs, req)
   }

   // 调用 store 的 BulkSet 方法
   // 事实上store的Set方法根本没有被 runtime 调用???
   err = store.BulkSet(reqs)
   if err != nil {
      err = fmt.Errorf("ERR_STATE_SAVE: %s", err)
      apiServerLogger.Debug(err)
      return &empty.Empty{}, err
   }
   return &empty.Empty{}, nil
}

delete state

func (a *api) DeleteState(ctx context.Context, in *runtimev1pb.DeleteStateRequest) (*empty.Empty, error) {
   store, err := a.getStateStore(in.StoreName)
   if err != nil {
      apiServerLogger.Debug(err)
      return &empty.Empty{}, err
   }

   req := state.DeleteRequest{
      Key:      a.getModifiedStateKey(in.Key),
      Metadata: in.Metadata,
      ETag:     in.Etag,
   }
   if in.Options != nil {
      req.Options = state.DeleteStateOption{
         Concurrency: stateConcurrencyToString(in.Options.Concurrency),
         Consistency: stateConsistencyToString(in.Options.Consistency),
      }
   }

   // 调用 store 的delete方法
   // store 的 BulkDelete 方法没有调用
   // runtime 也没有对外暴露 BulkDelete 方法
   err = store.Delete(&req)
   if err != nil {
      err = fmt.Errorf("ERR_STATE_DELETE: failed deleting state with key %s: %s", in.Key, err)
      apiServerLogger.Debug(err)
      return &empty.Empty{}, err
   }
   return &empty.Empty{}, nil
}

Execute State Transaction

如果要支持事务,则要求实现 TransactionalStore 接口:

type TransactionalStore interface {
   // Init方法是和普通store接口一致的
   Init(metadata Metadata) error
   // 增加的是 Multi 方法
   Multi(request *TransactionalStateRequest) error
}

runtime 的 ExecuteStateTransaction 方法的实现:

func (a *api) ExecuteStateTransaction(ctx context.Context, in *runtimev1pb.ExecuteStateTransactionRequest) (*empty.Empty, error) {
   if a.stateStores == nil || len(a.stateStores) == 0 {
      err := errors.New("ERR_STATE_STORE_NOT_CONFIGURED")
      apiServerLogger.Debug(err)
      return &empty.Empty{}, err
   }

   storeName := in.StoreName

   if a.stateStores[storeName] == nil {
      err := errors.New("ERR_STATE_STORE_NOT_FOUND")
      apiServerLogger.Debug(err)
      return &empty.Empty{}, err
   }

   // 检测是否是 TransactionalStore
   transactionalStore, ok := a.stateStores[storeName].(state.TransactionalStore)
   if !ok {
      err := errors.New("ERR_STATE_STORE_NOT_SUPPORTED")
      apiServerLogger.Debug(err)
      return &empty.Empty{}, err
   }

   // 构造请求
   operations := []state.TransactionalStateOperation{}
   for _, inputReq := range in.Operations {
      var operation state.TransactionalStateOperation
      var req = inputReq.Request
      switch state.OperationType(inputReq.OperationType) {
      case state.Upsert:
         setReq := state.SetRequest{
            Key: a.getModifiedStateKey(req.Key),
            // Limitation:
            // components that cannot handle byte array need to deserialize/serialize in
            // component sepcific way in components-contrib repo.
            Value:    req.Value,
            Metadata: req.Metadata,
            ETag:     req.Etag,
         }

         if req.Options != nil {
            setReq.Options = state.SetStateOption{
               Concurrency: stateConcurrencyToString(req.Options.Concurrency),
               Consistency: stateConsistencyToString(req.Options.Consistency),
            }
         }

         operation = state.TransactionalStateOperation{
            Operation: state.Upsert,
            Request:   setReq,
         }

      case state.Delete:
         delReq := state.DeleteRequest{
            Key:      a.getModifiedStateKey(req.Key),
            Metadata: req.Metadata,
            ETag:     req.Etag,
         }

         if req.Options != nil {
            delReq.Options = state.DeleteStateOption{
               Concurrency: stateConcurrencyToString(req.Options.Concurrency),
               Consistency: stateConsistencyToString(req.Options.Consistency),
            }
         }

         operation = state.TransactionalStateOperation{
            Operation: state.Delete,
            Request:   delReq,
         }

      default:
         err := fmt.Errorf("ERR_OPERATION_NOT_SUPPORTED: operation type %s not supported", inputReq.OperationType)
         apiServerLogger.Debug(err)
         return &empty.Empty{}, err
      }

      operations = append(operations, operation)
   }
 
   // 调用 state store 的 Multi 方法执行有事务性的多个操作
   err := transactionalStore.Multi(&state.TransactionalStateRequest{
      Operations: operations,
      Metadata:   in.Metadata,
   })

   if err != nil {
      err = fmt.Errorf("ERR_STATE_TRANSACTION: %s", err)
      apiServerLogger.Debug(err)
      return &empty.Empty{}, err
   }
   return &empty.Empty{}, nil
}

9.4 - 状态管理中Redis实现的处理源码分析

Dapr状态管理中Redis实现的处理源码分析

状态管理的redis实现

Redis的实现在 dapr/components-contrib 下,/state/redis/redis.go 中:

// StateStore is a Redis state store
type StateStore struct {
	client   *redis.Client
	json     jsoniter.API
	metadata metadata
	replicas int

	logger logger.Logger
}

// NewRedisStateStore returns a new redis state store
func NewRedisStateStore(logger logger.Logger) *StateStore {
	return &StateStore{
		json:   jsoniter.ConfigFastest,
		logger: logger,
	}
}

初始化

在 dapr runtime 初始化时,关联 redis 的 state 实现:

state_loader.New("redis", func() state.Store {
    return state_redis.NewRedisStateStore(logContrib)
}),

然后 Init 方法会在 state 初始化时被 dapr runtime 调用,Redis的实现内容为:

// Init does metadata and connection parsing
func (r *StateStore) Init(metadata state.Metadata) error {
	m, err := parseRedisMetadata(metadata)
	if err != nil {
		return err
	}
	r.metadata = m

	if r.metadata.failover {
		r.client = r.newFailoverClient(m)
	} else {
		r.client = r.newClient(m)
	}

	if _, err = r.client.Ping().Result(); err != nil {
		return fmt.Errorf("redis store: error connecting to redis at %s: %s", m.host, err)
	}

	r.replicas, err = r.getConnectedSlaves()

	return err
}

get state

get的实现方式:

// Get retrieves state from redis with a key
func (r *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
   res, err := r.client.DoContext(context.Background(), "HGETALL", req.Key).Result() // Prefer values with ETags
   if err != nil {
      return r.directGet(req) //Falls back to original get
   }
   if res == nil {
      // 结果为空的处理1
      return &state.GetResponse{}, nil
   }
   vals := res.([]interface{})
   if len(vals) == 0 {
      // 结果为空的处理2
      // 所以如果没有找到对应key的值,是给空应答,而不是报错
      return &state.GetResponse{}, nil
   }

   data, version, err := r.getKeyVersion(vals)
   if err != nil {
      return nil, err
   }
   return &state.GetResponse{
      Data: []byte(data),
      ETag: version,
   }, nil
}

支持ETag的实现方式

要支持ETag,就不能简单用 redis 的 key / value 方式直接在value中存放state的数据(data字段,byte[]格式),这个“value”需要包含出data之外的其他Etag字段,比如 version。

redis state实现的设计方式方式是:对于每个存储在 redis 中的 state item中,其value是一个hashmap,在这个value hashmap中通过不同的key存放多个信息:

  • data:state的数据
  • version:ETag需要的version

所以前面要用 HGETALL 命令把这个hashamap的所有key/value都取出来,然后现在要通过getKeyVersion方法来从这些key/value中读取data和version:

func (r *StateStore) getKeyVersion(vals []interface{}) (data string, version string, err error) {
   seenData := false
   seenVersion := false
   for i := 0; i < len(vals); i += 2 {
      field, _ := strconv.Unquote(fmt.Sprintf("%q", vals[i]))
      switch field {
      case "data":
         data, _ = strconv.Unquote(fmt.Sprintf("%q", vals[i+1]))
         seenData = true
      case "version":
         version, _ = strconv.Unquote(fmt.Sprintf("%q", vals[i+1]))
         seenVersion = true
      }
   }
   if !seenData || !seenVersion {
      return "", "", errors.New("required hash field 'data' or 'version' was not found")
   }
   return data, version, nil
}

返回的时候,带上ETag:

return &state.GetResponse{
      Data: []byte(data),
      ETag: version,
   }, nil

不支持ETag的实现方式

如果 HGETALL 命令执行失败,则fall back到普通场景:redis中只简单保存数据,没有etag。此时保存方式就是简单的key/value,用简单的 GET 命令直接读取:

func (r *StateStore) directGet(req *state.GetRequest) (*state.GetResponse, error) {
   res, err := r.client.DoContext(context.Background(), "GET", req.Key).Result()
   if err != nil {
      return nil, err
   }

   if res == nil {
      return &state.GetResponse{}, nil
   }

   s, _ := strconv.Unquote(fmt.Sprintf("%q", res))
   return &state.GetResponse{
      Data: []byte(s),
   }, nil
}

备注:这个设计有个性能问题,如果redis中的数据是用简单key/value存储,没有etag,则每次读取都要进行两个:第一次 HGETALL 命令失败,然后 fall back 用 GET 命令再读第二次。

save state

redis的实现,有 set 方法和 BulkSet

// Set saves state into redis
func (r *StateStore) Set(req *state.SetRequest) error {
   return state.SetWithOptions(r.setValue, req)
}

// BulkSet performs a bulks save operation
func (r *StateStore) BulkSet(req []state.SetRequest) error {
   for i := range req {
      err := r.Set(&req[i])
      if err != nil {
         // 这个地方有异议
         // 按照代码逻辑,只要有一个save操作失败,就直接return而放弃后续的操作
         return err
      }
   }

   return nil
}

实际实现在 r.setValue 方法中:

func (r *StateStore) setValue(req *state.SetRequest) error {
   err := state.CheckRequestOptions(req.Options)
   if err != nil {
      return err
   }
   
   // 解析etag,要求etag必须是可以转为整型
   ver, err := r.parseETag(req.ETag)
   if err != nil {
      return err
   }

   // LastWrite win意味着无视ETag的异同,强制写入
   // 所以这里重置 ver 为 0
   if req.Options.Concurrency == state.LastWrite {
      ver = 0
   }

   bt, _ := utils.Marshal(req.Value, r.json.Marshal)

	 // 用 EVAL 命令执行一段 LUA 脚本,脚本内容为 setQuery
   _, err = r.client.DoContext(context.Background(), "EVAL", setQuery, 1, req.Key, ver, bt).Result()
   if err != nil {
      return fmt.Errorf("failed to set key %s: %s", req.Key, err)
   }

	 // 如果要求强一致性,而且副本数量大于0
   if req.Options.Consistency == state.Strong && r.replicas > 0 {
     // 则需要等待所有副本数都写入成功
      _, err = r.client.DoContext(context.Background(), "WAIT", r.replicas, 1000).Result()
      if err != nil {
         return fmt.Errorf("timed out while waiting for %v replicas to acknowledge write", r.replicas)
      }
   }

   return nil
}

更多redis细节:

  • setQuery 脚本
setQuery                 = "local var1 = redis.pcall(\"HGET\", KEYS[1], \"version\"); if type(var1) == \"table\" then redis.call(\"DEL\", KEYS[1]); end; if not var1 or type(var1)==\"table\" or var1 == \"\" or var1 == ARGV[1] or ARGV[1] == \"0\" then redis.call(\"HSET\", KEYS[1], \"data\", ARGV[2]) return redis.call(\"HINCRBY\", KEYS[1], \"version\", 1) else return error(\"failed to set key \" .. KEYS[1]) end"
  • WAIT numreplicas timeout 命令:https://redis.io/commands/wait

delete state

// Delete performs a delete operation
func (r *StateStore) Delete(req *state.DeleteRequest) error {
   err := state.CheckRequestOptions(req.Options)
   if err != nil {
      return err
   }
   return state.DeleteWithOptions(r.deleteValue, req)
}

// 内部循环调用 Delete
// BulkDelete 方法没有暴露给 dapr runtime
// BulkDelete performs a bulk delete operation
func (r *StateStore) BulkDelete(req []state.DeleteRequest) error {
   for i := range req {
      err := r.Delete(&req[i])
      if err != nil {
         return err
      }
   }

   return nil
}

实际实现在 r.deleteValue 方法中:

func (r *StateStore) deleteValue(req *state.DeleteRequest) error {
   if req.ETag == "" {
      // ETag的空值则改为 “0” / 零值
      req.ETag = "0"
   }
   _, err := r.client.DoContext(context.Background(), "EVAL", delQuery, 1, req.Key, req.ETag).Result()

   if err != nil {
      return fmt.Errorf("failed to delete key '%s' due to ETag mismatch", req.Key)
   }

   return nil
}

更多redis细节:

  • delQuery 脚本
delQuery                 = "local var1 = redis.pcall(\"HGET\", KEYS[1], \"version\"); if not var1 or type(var1)==\"table\" or var1 == ARGV[1] or var1 == \"\" or ARGV[1] == \"0\" then return redis.call(\"DEL\", KEYS[1]) else return error(\"failed to delete \" .. KEYS[1]) end"

State Transaction

redis state store 实现了 TransactionalStore,它的 Multi方式:

// Multi performs a transactional operation. succeeds only if all operations succeed, and fails if one or more operations fail
func (r *StateStore) Multi(request *state.TransactionalStateRequest) error {
   // 用的是 redis-go 封装的 TxPipeline
   pipe := r.client.TxPipeline()
   for _, o := range request.Operations {
      if o.Operation == state.Upsert {
         req := o.Request.(state.SetRequest)

         bt, _ := utils.Marshal(req.Value, r.json.Marshal)

         pipe.Set(req.Key, bt, defaultExpirationTime)
      } else if o.Operation == state.Delete {
         req := o.Request.(state.DeleteRequest)
         pipe.Del(req.Key)
      }
   }

   _, err := pipe.Exec()
   return err
}

10 - 资源绑定的源码

Dapr的资源绑定的源码

10.1 - 资源绑定的源码概述

Dapr的资源绑定的源码概述

10.2 - 资源绑定的初始化源码分析

Dapr资源绑定的初始化源码分析

Binding Registry

Binding Registry的初始化准备

Binding Registry 的初始化在 runtime 初始化时进行:

func NewDaprRuntime(runtimeConfig *Config, globalConfig *config.Configuration) *DaprRuntime {
  ......
  bindingsRegistry:       bindings_loader.NewRegistry(),
}

func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {	
  ......
  a.bindingsRegistry.RegisterInputBindings(opts.inputBindings...)
	a.bindingsRegistry.RegisterOutputBindings(opts.outputBindings...)
  ......
}

这些 opts 来自 runtime 启动时的配置,如 cmd/daprd/main.go 下:

func main() {
	rt, err := runtime.FromFlags()
	if err != nil {
		log.Fatal(err)
	}

	err = rt.Run(
    ......
    runtime.WithInputBindings(
			bindings_loader.NewInput("aws.sqs", func() bindings.InputBinding {
				return sqs.NewAWSSQS(logContrib)
			}),
			bindings_loader.NewInput("aws.kinesis", func() bindings.InputBinding {
				return kinesis.NewAWSKinesis(logContrib)
			}),
			bindings_loader.NewInput("azure.eventhubs", func() bindings.InputBinding {
				return eventhubs.NewAzureEventHubs(logContrib)
			}),
			bindings_loader.NewInput("kafka", func() bindings.InputBinding {
				return kafka.NewKafka(logContrib)
			}),
			bindings_loader.NewInput("mqtt", func() bindings.InputBinding {
				return mqtt.NewMQTT(logContrib)
			}),
			bindings_loader.NewInput("rabbitmq", func() bindings.InputBinding {
				return bindings_rabbitmq.NewRabbitMQ(logContrib)
			}),
			bindings_loader.NewInput("azure.servicebusqueues", func() bindings.InputBinding {
				return servicebusqueues.NewAzureServiceBusQueues(logContrib)
			}),
			bindings_loader.NewInput("azure.storagequeues", func() bindings.InputBinding {
				return storagequeues.NewAzureStorageQueues(logContrib)
			}),
			bindings_loader.NewInput("gcp.pubsub", func() bindings.InputBinding {
				return pubsub.NewGCPPubSub(logContrib)
			}),
			bindings_loader.NewInput("kubernetes", func() bindings.InputBinding {
				return kubernetes.NewKubernetes(logContrib)
			}),
			bindings_loader.NewInput("azure.eventgrid", func() bindings.InputBinding {
				return eventgrid.NewAzureEventGrid(logContrib)
			}),
			bindings_loader.NewInput("twitter", func() bindings.InputBinding {
				return twitter.NewTwitter(logContrib)
			}),
			bindings_loader.NewInput("cron", func() bindings.InputBinding {
				return cron.NewCron(logContrib)
			}),
		),
    runtime.WithOutputBindings(
			bindings_loader.NewOutput("aws.sqs", func() bindings.OutputBinding {
				return sqs.NewAWSSQS(logContrib)
			}),
			bindings_loader.NewOutput("aws.sns", func() bindings.OutputBinding {
				return sns.NewAWSSNS(logContrib)
			}),
			bindings_loader.NewOutput("aws.kinesis", func() bindings.OutputBinding {
				return kinesis.NewAWSKinesis(logContrib)
			}),
			bindings_loader.NewOutput("azure.eventhubs", func() bindings.OutputBinding {
				return eventhubs.NewAzureEventHubs(logContrib)
			}),
			bindings_loader.NewOutput("aws.dynamodb", func() bindings.OutputBinding {
				return dynamodb.NewDynamoDB(logContrib)
			}),
			bindings_loader.NewOutput("azure.cosmosdb", func() bindings.OutputBinding {
				return bindings_cosmosdb.NewCosmosDB(logContrib)
			}),
			bindings_loader.NewOutput("gcp.bucket", func() bindings.OutputBinding {
				return bucket.NewGCPStorage(logContrib)
			}),
			bindings_loader.NewOutput("http", func() bindings.OutputBinding {
				return http.NewHTTP(logContrib)
			}),
			bindings_loader.NewOutput("kafka", func() bindings.OutputBinding {
				return kafka.NewKafka(logContrib)
			}),
			bindings_loader.NewOutput("mqtt", func() bindings.OutputBinding {
				return mqtt.NewMQTT(logContrib)
			}),
			bindings_loader.NewOutput("rabbitmq", func() bindings.OutputBinding {
				return bindings_rabbitmq.NewRabbitMQ(logContrib)
			}),
			bindings_loader.NewOutput("redis", func() bindings.OutputBinding {
				return redis.NewRedis(logContrib)
			}),
			bindings_loader.NewOutput("aws.s3", func() bindings.OutputBinding {
				return s3.NewAWSS3(logContrib)
			}),
			bindings_loader.NewOutput("azure.blobstorage", func() bindings.OutputBinding {
				return blobstorage.NewAzureBlobStorage(logContrib)
			}),
			bindings_loader.NewOutput("azure.servicebusqueues", func() bindings.OutputBinding {
				return servicebusqueues.NewAzureServiceBusQueues(logContrib)
			}),
			bindings_loader.NewOutput("azure.storagequeues", func() bindings.OutputBinding {
				return storagequeues.NewAzureStorageQueues(logContrib)
			}),
			bindings_loader.NewOutput("gcp.pubsub", func() bindings.OutputBinding {
				return pubsub.NewGCPPubSub(logContrib)
			}),
			bindings_loader.NewOutput("azure.signalr", func() bindings.OutputBinding {
				return signalr.NewSignalR(logContrib)
			}),
			bindings_loader.NewOutput("twilio.sms", func() bindings.OutputBinding {
				return sms.NewSMS(logContrib)
			}),
			bindings_loader.NewOutput("twilio.sendgrid", func() bindings.OutputBinding {
				return sendgrid.NewSendGrid(logContrib)
			}),
			bindings_loader.NewOutput("azure.eventgrid", func() bindings.OutputBinding {
				return eventgrid.NewAzureEventGrid(logContrib)
			}),
			bindings_loader.NewOutput("cron", func() bindings.OutputBinding {
				return cron.NewCron(logContrib)
			}),
			bindings_loader.NewOutput("twitter", func() bindings.OutputBinding {
				return twitter.NewTwitter(logContrib)
			}),
			bindings_loader.NewOutput("influx", func() bindings.OutputBinding {
				return influx.NewInflux(logContrib)
			}),
		),
    ......
}

在这里配置各种 inputbinding 和 output binding的实现。

Binding Registry的实现方式

pkg/components/bindings/registry.go,定义了多个数据结构:

type (
	// InputBinding is an input binding component definition.
	InputBinding struct {
		Name          string
		FactoryMethod func() bindings.InputBinding
	}

	// OutputBinding is an output binding component definition.
	OutputBinding struct {
		Name          string
		FactoryMethod func() bindings.OutputBinding
	}

	// Registry is the interface of a components that allows callers to get registered instances of input and output bindings
	Registry interface {
		RegisterInputBindings(components ...InputBinding)
		RegisterOutputBindings(components ...OutputBinding)
		CreateInputBinding(name string) (bindings.InputBinding, error)
		CreateOutputBinding(name string) (bindings.OutputBinding, error)
	}

	bindingsRegistry struct {
		inputBindings  map[string]func() bindings.InputBinding
		outputBindings map[string]func() bindings.OutputBinding
	}
)

前面 runtime 初始化时,每个实现都通过 NewInput 方法和 NewOutput方法,将 name 和对应的InputBinding/OutputBinding关联起来:

// NewInput creates a InputBinding.
func NewInput(name string, factoryMethod func() bindings.InputBinding) InputBinding {
	return InputBinding{
		Name:          name,
		FactoryMethod: factoryMethod,
	}
}

// NewOutput creates a OutputBinding.
func NewOutput(name string, factoryMethod func() bindings.OutputBinding) OutputBinding {
	return OutputBinding{
		Name:          name,
		FactoryMethod: factoryMethod,
	}
}

RegisterInputBindings 和 RegisterOutputBindings 方法用来注册 input binding 和 output binding

的实现,在runtime 初始化时被调用:

// RegisterInputBindings registers one or more new input bindings.
func (b *bindingsRegistry) RegisterInputBindings(components ...InputBinding) {
	for _, component := range components {
		b.inputBindings[createFullName(component.Name)] = component.FactoryMethod
	}
}

// RegisterOutputBindings registers one or more new output bindings.
func (b *bindingsRegistry) RegisterOutputBindings(components ...OutputBinding) {
	for _, component := range components {
		b.outputBindings[createFullName(component.Name)] = component.FactoryMethod
	}
}

func createFullName(name string) string {
  // createFullName统一增加前缀 bindings.
	return fmt.Sprintf("bindings.%s", name)
}

binding的初始化流程

pkg/runtime/runtime.go :

Binding 的初始化在 runtime 初始化时进行:

func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
	......
	go a.processComponents()
	......
}
func (a *DaprRuntime) processComponents() {
   for {
      comp, more := <-a.pendingComponents
      if !more {
         a.pendingComponentsDone <- true
         return
      }
      if err := a.processOneComponent(comp); err != nil {
         log.Errorf("process component %s error, %s", comp.Name, err)
      }
   }
}

processOneComponent:

func (a *DaprRuntime) processOneComponent(comp components_v1alpha1.Component) error {
	res := a.preprocessOneComponent(&comp)
  
	compCategory := a.figureOutComponentCategory(comp)

	......
	return nil
}

doProcessOneComponent:

func (a *DaprRuntime) doProcessOneComponent(category ComponentCategory, comp components_v1alpha1.Component) error {
	switch category {
	case bindingsComponent:
		return a.initBinding(comp)
		......
	}
	return nil
}

initBinding:

func (a *DaprRuntime) initBinding(c components_v1alpha1.Component) error {
	if err := a.initOutputBinding(c); err != nil {
		log.Errorf("failed to init output bindings: %s", err)
		return err
	}

	if err := a.initInputBinding(c); err != nil {
		log.Errorf("failed to init input bindings: %s", err)
		return err
	}
	return nil
}

在这里进行 input binding 和 output binding 的初始化。

Output Binding的初始化

pkg/runtime/runtime.go:

func (a *DaprRuntime) initOutputBinding(c components_v1alpha1.Component) error {
  // 成功
	binding, err := a.bindingsRegistry.CreateOutputBinding(c.Spec.Type)
	if err != nil {
		log.Warnf("failed to create output binding %s (%s): %s", c.ObjectMeta.Name, c.Spec.Type, err)
		diag.DefaultMonitoring.ComponentInitFailed(c.Spec.Type, "creation")
		return err
	}

	if binding != nil {
		err := binding.Init(bindings.Metadata{
			Properties: a.convertMetadataItemsToProperties(c.Spec.Metadata),
			Name:       c.ObjectMeta.Name,
		})
		if err != nil {
			log.Errorf("failed to init output binding %s (%s): %s", c.ObjectMeta.Name, c.Spec.Type, err)
			diag.DefaultMonitoring.ComponentInitFailed(c.Spec.Type, "init")
			return err
		}
		log.Infof("successful init for output binding %s (%s)", c.ObjectMeta.Name, c.Spec.Type)
		a.outputBindings[c.ObjectMeta.Name] = binding
		diag.DefaultMonitoring.ComponentInitialized(c.Spec.Type)
	}
	return nil
}

其中 CreateOutputBinding 方法的实现在 pkg/components/bindings/registry.go 中:

// Create instantiates an output binding based on `name`.
func (b *bindingsRegistry) CreateOutputBinding(name string) (bindings.OutputBinding, error) {
	if method, ok := b.outputBindings[name]; ok {
    // 调用 factory 方法生成具体实现的 outputBinding
		return method(), nil
	}
	return nil, errors.Errorf("couldn't find output binding %s", name)
}

Input Binding的初始化

TODO

10.3 - 资源绑定的Redis output实现源码分析

Dapr资源绑定的Redis output实现源码分析

备注:根据 https://github.com/dapr/docs/blob/master/concepts/bindings/README.md 的描述,redis 只实现了 output binding。

output binding 的实现

Redis的实现在 dapr/components-contrib 下,/bindings/redis/redis.go 中:

func (r *Redis) Operations() []bindings.OperationKind {
  // 只支持create
	return []bindings.OperationKind{bindings.CreateOperation}
}

func (r *Redis) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
  // 通过 metadata 传递 key
	if val, ok := req.Metadata["key"]; ok && val != "" {
		key := val
    // 调用标准 redis 客户端,执行 SET 命令
		_, err := r.client.DoContext(context.Background(), "SET", key, req.Data).Result()
		if err != nil {
			return nil, err
		}
		return nil, nil
	}
	return nil, errors.New("redis binding: missing key on write request metadata")
}

完整分析

初始化:

在 dapr runtime 初始化时,关联 redis 的 output binding实现:

bindings_loader.NewOutput("redis", func() bindings.OutputBinding {
   return redis.NewRedis(logContrib)
}),

然后 Init 方法会在 output binding初始化时被 dapr runtime 调用,Redis的实现内容为:

// Init performs metadata parsing and connection creation
func (r *Redis) Init(meta bindings.Metadata) error {
  // 解析metadata
	m, err := r.parseMetadata(meta)
	if err != nil {
		return err
	}

  // redis 连接属性
	opts := &redis.Options{
		Addr:            m.host,
		Password:        m.password,
		DB:              defaultDB,
		MaxRetries:      m.maxRetries,
		MaxRetryBackoff: m.maxRetryBackoff,
	}

	/* #nosec */
	if m.enableTLS {
		opts.TLSConfig = &tls.Config{
			InsecureSkipVerify: m.enableTLS,
		}
	}

  // 建立redis连接
	r.client = redis.NewClient(opts)
	_, err = r.client.Ping().Result()
	if err != nil {
		return fmt.Errorf("redis binding: error connecting to redis at %s: %s", m.host, err)
	}

	return err
}

10.4 - 资源绑定的output处理源码分析

Dapr资源绑定的output处理源码分析

pkc/grpc/api.go 中的 InvokeBinding 方法:

func (a *api) InvokeBinding(ctx context.Context, in *runtimev1pb.InvokeBindingRequest) (*runtimev1pb.InvokeBindingResponse, error) {
	req := &bindings.InvokeRequest{
		Metadata:  in.Metadata,
		Operation: bindings.OperationKind(in.Operation),
	}
	if in.Data != nil {
		req.Data = in.Data
	}

	r := &runtimev1pb.InvokeBindingResponse{}
  // 关键实现在这里
	resp, err := a.sendToOutputBindingFn(in.Name, req)
	if err != nil {
		err = fmt.Errorf("ERR_INVOKE_OUTPUT_BINDING: %s", err)
		apiServerLogger.Debug(err)
		return r, err
	}

	if resp != nil {
		r.Data = resp.Data
		r.Metadata = resp.Metadata
	}
	return r, nil
}

sendToOutputBindingFn 方法的初始化在这里:

func (a *DaprRuntime) getGRPCAPI() grpc.API {
	return grpc.NewAPI(a.runtimeConfig.ID, a.appChannel, a.stateStores, a.secretStores, a.getPublishAdapter(), a.directMessaging, a.actor, a.sendToOutputBinding, a.globalConfig.Spec.TracingSpec)
}

sendToOutputBinding 方法的实现在 pkg/runtime/runtime.go:

func (a *DaprRuntime) sendToOutputBinding(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
   if req.Operation == "" {
      return nil, errors.New("operation field is missing from request")
   }

   // 根据 name 找已经注册好的 binding
   if binding, ok := a.outputBindings[name]; ok {
      ops := binding.Operations()
      for _, o := range ops {
      	 // 找到改 binding 下支持的 operation
         if o == req.Operation {
         		// 关键代码,需要转到具体的实现了
            return binding.Invoke(req)
         }
      }
      supported := make([]string, len(ops))
      for _, o := range ops {
         supported = append(supported, string(o))
      }
      return nil, errors.Errorf("binding %s does not support operation %s. supported operations:%s", name, req.Operation, strings.Join(supported, " "))
   }
   return nil, errors.Errorf("couldn't find output binding %s", name)
}

10.5 - 资源绑定的Metadata总结

Dapr资源绑定的Metadata总结

总结一下各种binding实现中 metadata 的设计和使用:

实现 配置级别的metadata 请求级别的metadata
alicloud oss key
HTTP url / method
cron schedule
MQTT url / topic
RabbitMQ host / queueName / durable
deleteWhenUnused / prefetchCount
ttlInSeconds
Redis host / password / enableTLS /
maxRetries / maxRetryBackoff
key
Influx url / token / org / bucket
Kafka brokers / topics / publishTopic
consumerGroup / authRequried
saslUsername / saslPassword
key
Kubernetes namespace / resyncPeriodInSec /
twilio-sendgrid apiKey / emailFrom / emailTo
subject / emailCc / emailBcc
emailFrom / emailTo / subject
emailCc / emailBcc
twilio-sms toNumber / fromNumber / accountSid
authToken / timeout
toNumber
twitter consumerKey / consumerSecret / accessToken
accessSecret / query
query / lang / result / since_id
gcp-bucket bucket / type / project_id / private_key_id
private_key / client_email / client_id
auth_uri / token_uri
auth_provider_x509_cert_url / client_x509_cert_url
name
gcp-pubsub topic / subscription / type /
project_id / private_key_id / private_key
client_email / client_id / auth_uri / token_uri
auth_provider_x509_cert_url / client_x509_cert_url
topic
Azure-blobstorage storageAccount / storageAccessKey / container blobName / ContentType / ContentMD5
ContentEncoding / ContentLanguage
ContentDisposition / CacheControl
Azure-cosmosDB url / masterKey / database /
collection / partitionKey
Azure-EventGrid tenantId / subscriptionId / clientId
clientSecret / subscriberEndpoint
handshakePort / scope
eventSubscriptionName / accessKey
topicEndpoint
Azure-EventHubs connection / consumerGroup / storageAccountName /
storageAccountKey / storageContainerName
partitionID / partitionKey
partitionKey
Azure-ServiceBusQueues connectionString / queueName / ttl id / correlationID / ttlInSeconds
Azure-SignalR connectionString / hub hub / group / user
Azure-storagequeue ttlInSeconds
Aws-dynamodb region / endpoint / accessKey
secretKey / table
Aws-kinesis streamName / consumerName / region
endpoint / accessKey
secretKey / mode
partitionKey
Aws-s3 region / endpoint / accessKey
secretKey / bucket
key
Aws-sns topicArn / region / endpoint
accessKey / secretKey
Aws-sqs queueName / region / endpoint
accessKey / secretKey

11 - Injector的源码分析

Dapr Injector的源码分析

11.1 - Injector的代码实现

Dapr Injector的代码实现

Inject的流程

以e2e中的 stateapp 为例。

应用的原始Deployment

tests/apps/stateapp/service.yaml 中是 stateapp 的 Service 定义和 Deployment定义。

Service的定义没有什么特殊:

kind: Service
apiVersion: v1
metadata:
  name: stateapp
  labels:
    testapp: stateapp
spec:
  selector:
    testapp: stateapp
  ports:
  - protocol: TCP
    port: 80
    targetPort: 3000
  type: LoadBalancer

deployment的定义:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: stateapp
  labels:
    testapp: stateapp
spec:
  replicas: 1
  selector:
    matchLabels:
      testapp: stateapp
  template: # stateapp的pod定义
    metadata:
      labels:
        testapp: stateapp
      annotations:
        dapr.io/enabled: "true"
        dapr.io/app-id: "stateapp"
        dapr.io/app-port: "3000"
    spec:   #stateapp的container定义,暂时pod中只定义了这个一个container
      containers:
      - name: stateapp
        image: docker.io/YOUR_DOCKER_ALIAS/e2e-stateapp:dev
        ports:
        - containerPort: 3000
        imagePullPolicy: Always

单独看 stateapp 的 pod 定义的 annotations ,

      annotations:
        dapr.io/enabled: "true"
        dapr.io/app-id: "stateapp"
        dapr.io/app-port: "3000"

源码

getPodPatchOperations:

func (i *injector) getPodPatchOperations(ar *v1beta1.AdmissionReview,
	namespace, image string, kubeClient *kubernetes.Clientset, daprClient scheme.Interface) ([]PatchOperation, error) {
	req := ar.Request
	var pod corev1.Pod
	if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
		errors.Wrap(err, "could not unmarshal raw object")
		return nil, err
	}

	log.Infof(
		"AdmissionReview for Kind=%v, Namespace=%v Name=%v (%v) UID=%v "+
			"patchOperation=%v UserInfo=%v",
		req.Kind,
		req.Namespace,
		req.Name,
		pod.Name,
		req.UID,
		req.Operation,
		req.UserInfo,
	)

	if !isResourceDaprEnabled(pod.Annotations) || podContainsSidecarContainer(&pod) {
		return nil, nil
	}
  ...

这个info日志打印的例子如下:

{"instance":"dapr-sidecar-injector-5f6f4bb6df-n5dsk","level":"info","msg":"AdmissionReview for Kind=/v1, Kind=Pod, Namespace=dapr-tests Name= () UID=d0126a13-9efd-432e-894a-5ddbee55898c patchOperation=CREATE UserInfo={system:serviceaccount:kube-system:replicaset-controller 3e5de149-07a3-434e-a8de-209abee69760 [system:serviceaccounts system:serviceaccounts:kube-system system:authenticated] map[]}","scope":"dapr.injector","time":"2020-09-25T07:07:07.6482457Z","type":"log","ver":"edge"}

可以看到在 namespace dapr-tests 下 pod 有 CREATE operation时Injector有开始工作。

isResourceDaprEnabled(pod.Annotations) 检查是否是 dapr,判断的方式是看 pod 是否有名为dapr.io/enabled 的 annotation并且设置为true,缺省为false:

const (
	daprEnabledKey                    = "dapr.io/enabled"
)
func isResourceDaprEnabled(annotations map[string]string) bool {
	return getBoolAnnotationOrDefault(annotations, daprEnabledKey, false)
}

podContainsSidecarContainer 检查 pod 是不是已经包含 dapr的sidecar,判断的方式是看 container 的名字是不是 daprd

const (
	sidecarContainerName              = "daprd"
)
func podContainsSidecarContainer(pod *corev1.Pod) bool {
	for _, c := range pod.Spec.Containers {
		if c.Name == sidecarContainerName {
			return true
		}
	}
	return false
}

继续getPodPatchOperations():

	id := getAppID(pod)
	// Keep DNS resolution outside of getSidecarContainer for unit testing.
	placementAddress := fmt.Sprintf("%s:80", getKubernetesDNS(placementService, namespace))
	sentryAddress := fmt.Sprintf("%s:80", getKubernetesDNS(sentryService, namespace))
	apiSrvAddress := fmt.Sprintf("%s:80", getKubernetesDNS(apiAddress, namespace))

getAppID(pod) 通过读取 annotation 来获取应用id,注意 “dapr.io/id” 已经废弃,1.0 之后将被删除,替换为dapr.io/app-id":

const (
	appIDKey                          = "dapr.io/app-id"
  	// Deprecated, remove in v1.0
	idKey                 = "dapr.io/id"
)
func getAppID(pod corev1.Pod) string {
	id := getStringAnnotationOrDefault(pod.Annotations, appIDKey, "")
	if id != "" {
		return id
	}

	return getStringAnnotationOrDefault(pod.Annotations, idKey, pod.GetName())
}

mtlsEnabled的判断

	var trustAnchors string
	var certChain string
	var certKey string
	var identity string

	mtlsEnabled := mTLSEnabled(daprClient)
	if mtlsEnabled {
		trustAnchors, certChain, certKey = getTrustAnchorsAndCertChain(kubeClient, namespace)
		identity = fmt.Sprintf("%s:%s", req.Namespace, pod.Spec.ServiceAccountName)
	}

mTLSEnabled判断的方式,居然是读取所有的namespace下的dapr configuration:

const (
	// NamespaceAll is the default argument to specify on a context when you want to list or filter resources across all namespaces
	NamespaceAll string = ""
)
func mTLSEnabled(daprClient scheme.Interface) bool {
	resp, err := daprClient.ConfigurationV1alpha1().Configurations(meta_v1.NamespaceAll).List(meta_v1.ListOptions{})
	if err != nil {
		return defaultMtlsEnabled
	}

	for _, c := range resp.Items {
		if c.GetName() == defaultConfig {  // "daprsystem"
			return c.Spec.MTLSSpec.Enabled
		}
	}
	return defaultMtlsEnabled
}

通过读取k8s的资源来判断是否要开启 mtls,tests/config/dapr_mtls_off_config.yaml 有example内容:

apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
  name: daprsystem # 名字一定要是 daprsystem
spec:
  mtls:
    enabled: "false"  # 在这里配置要不要开启 mtls
    workloadCertTTL: "1h"
    allowedClockSkew: "20m"

但这个坑货

E0925 09:37:53.480772       1 reflector.go:153] sigs.k8s.io/controller-runtime/pkg/cache/internal/informers_map.go:224: Failed to list *v1alpha1.Configuration: v1alpha1.ConfigurationList.Items: []v1alpha1.Configuration: v1alpha1.Configuration.Spec: v1alpha1.ConfigurationSpec.MTLSSpec: v1alpha1.MTLSSpec.Enabled: ReadBool: expect t or f, but found ", error found in #10 byte of ...|enabled":"false","wo|..., bigger context ...|pec":{"mtls":{"allowedClockSkew":"20m","enabled":"false","workloadCertTTL":"1h"}}},{"apiVersion":"da|...

生效的应用pod定义

apiVersion: v1
kind: Pod
metadata:
  annotations:
    dapr.io/app-id: stateapp
    dapr.io/app-port: "3000"
    dapr.io/enabled: "true"
    dapr.io/sidecar-cpu-limit: "4.0"
    dapr.io/sidecar-cpu-request: "0.5"
    dapr.io/sidecar-memory-limit: 512Mi
    dapr.io/sidecar-memory-request: 250Mi
  creationTimestamp: "2020-09-25T07:07:07Z"
  generateName: stateapp-567b6b9c6f-
  labels:
    pod-template-hash: 567b6b9c6f
    testapp: stateapp
  name: stateapp-567b6b9c6f-84kzb
  namespace: dapr-tests
  ownerReferences:
  - apiVersion: apps/v1
    blockOwnerDeletion: true
    controller: true
    kind: ReplicaSet
    name: stateapp-567b6b9c6f
    uid: 25a34367-79ed-4e19-868a-5b063a45b1f4
  resourceVersion: "146616"
  selfLink: /api/v1/namespaces/dapr-tests/pods/stateapp-567b6b9c6f-84kzb
  uid: 0f4060df-0312-4d73-91c1-6f085462b33d
  spec:
  affinity:
    nodeAffinity:
      requiredDuringSchedulingIgnoredDuringExecution:
        nodeSelectorTerms:
        - matchExpressions:
          - key: kubernetes.io/os
            operator: In
            values:
            - linux
          - key: kubernetes.io/arch
            operator: In
            values:
            - amd64
  containers:
  - env:
    - name: DAPR_HTTP_PORT
      value: "3500"
    - name: DAPR_GRPC_PORT
      value: "50001"
    image: docker.io/skyao/e2e-stateapp:dev-linux-amd64
    imagePullPolicy: Always
    name: stateapp
    ports:
    - containerPort: 3000
      name: http
      protocol: TCP
    resources: {}
    terminationMessagePath: /dev/termination-log
    terminationMessagePolicy: File
    volumeMounts:
    - mountPath: /var/run/secrets/kubernetes.io/serviceaccount
      name: default-token-qncjc
      readOnly: true
  - args:
    - --mode
    - kubernetes
    - --dapr-http-port
    - "3500"
    - --dapr-grpc-port
    - "50001"
    - --dapr-internal-grpc-port
    - "50002"
    - --app-port
    - "3000"
    - --app-id
    - stateapp
    - --control-plane-address
    - dapr-api.dapr-system.svc.cluster.local:80
    - --app-protocol
    - http
    - --placement-host-address
    - dapr-placement.dapr-system.svc.cluster.local:80
    - --config
    - ""
    - --log-level
    - info
    - --app-max-concurrency
    - "-1"
    - --sentry-address
    - dapr-sentry.dapr-system.svc.cluster.local:80
    - --metrics-port
    - "9090"
    - --enable-mtls
    command:
    - /daprd
    env:
    - name: DAPR_HOST_IP
      valueFrom:
        fieldRef:
          apiVersion: v1
          fieldPath: status.podIP
    - name: NAMESPACE
      value: dapr-tests
    - name: DAPR_TRUST_ANCHORS
      value: |
        -----BEGIN CERTIFICATE-----
        MIIB3TCCAYKgAwIBAgIRAMra+wjgMY6ABDtu3/vJ0NcwCgYIKoZIzj0EAwIwMTEX
        MBUGA1UEChMOZGFwci5pby9zZW50cnkxFjAUBgNVBAMTDWNsdXN0ZXIubG9jYWww
        HhcNMjAwOTI1MDU1ODAzWhcNMjEwOTI1MDU1ODAzWjAxMRcwFQYDVQQKEw5kYXBy
        LmlvL3NlbnRyeTEWMBQGA1UEAxMNY2x1c3Rlci5sb2NhbDBZMBMGByqGSM49AgEG
        CCqGSM49AwEHA0IABE/w/8YBtRJPYNJkcDM05e9PhrbGjBU/RQd09J909OJebDe8
        rthysygWrcGYHYKziKK2Pyc1j4ua2xklLC5DFEWjezB5MA4GA1UdDwEB/wQEAwIC
        BDAdBgNVHSUEFjAUBggrBgEFBQcDAQYIKwYBBQUHAwIwDwYDVR0TAQH/BAUwAwEB
        /zAdBgNVHQ4EFgQUQ2v6OiayM9V4DPAU6UZHGe/nc1swGAYDVR0RBBEwD4INY2x1
        c3Rlci5sb2NhbDAKBggqhkjOPQQDAgNJADBGAiEAtVBx9vDXiRE3fXJTU2yK11W5
        eo+Ce4+U6/vXDtzw4PUCIQDlLOB45ihOAhhLVLG9akhgwJOrgZLEW3FZjRabpSsb
        og==
        -----END CERTIFICATE-----        
    - name: DAPR_CERT_CHAIN
      value: |
        -----BEGIN CERTIFICATE-----
        MIIBxDCCAWqgAwIBAgIQQ1sfEH4aYacFZwBau+aOozAKBggqhkjOPQQDAjAxMRcw
        FQYDVQQKEw5kYXByLmlvL3NlbnRyeTEWMBQGA1UEAxMNY2x1c3Rlci5sb2NhbDAe
        Fw0yMDA5MjUwNTU4MDNaFw0yMTA5MjUwNTU4MDNaMBgxFjAUBgNVBAMTDWNsdXN0
        ZXIubG9jYWwwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAARhj7MQ1uiOkZvJ0AYV
        uiFca/Iu9D5O98E5JN1mjCohRawk+QT1PjW05YtmyVji4Tt6ckIMvOXwG3aoTsGO
        UbRio30wezAOBgNVHQ8BAf8EBAMCAQYwDwYDVR0TAQH/BAUwAwEB/zAdBgNVHQ4E
        FgQUTPUh0WWBB5baKs3aJjMzInVLX/EwHwYDVR0jBBgwFoAUQ2v6OiayM9V4DPAU
        6UZHGe/nc1swGAYDVR0RBBEwD4INY2x1c3Rlci5sb2NhbDAKBggqhkjOPQQDAgNI
        ADBFAiBO0oCadeYyLM+RkSAYPSTtjMyEZ0wv1/BsWuUMg+KZ6AIhALHnT0pxiqlj
        miYT4WZWvaBc17AbUh1efgV2DAaNKm54
        -----END CERTIFICATE-----
                
    - name: DAPR_CERT_KEY
      value: |
        -----BEGIN EC PRIVATE KEY-----
        MHcCAQEEIDj6niLJ5ep+fDdY71bKyWl9RZHrXyRjND6pWySL2Q4UoAoGCCqGSM49
        AwEHoUQDQgAEYY+zENbojpGbydAGFbohXGvyLvQ+TvfBOSTdZowqIUWsJPkE9T41
        tOWLZslY4uE7enJCDLzl8Bt2qE7BjlG0Yg==
        -----END EC PRIVATE KEY-----        
    - name: SENTRY_LOCAL_IDENTITY
      value: default:dapr-tests
    image: docker.io/skyao/daprd:dev-linux-amd64
    imagePullPolicy: Always
    livenessProbe:
      failureThreshold: 3
      httpGet:
        path: /v1.0/healthz
        port: 3500
        scheme: HTTP
      initialDelaySeconds: 3
      periodSeconds: 6
      successThreshold: 1
      timeoutSeconds: 3
    name: daprd
    ports:
    - containerPort: 3500
      name: dapr-http
      protocol: TCP
    - containerPort: 50001
      name: dapr-grpc
      protocol: TCP
    - containerPort: 50002
      name: dapr-internal
      protocol: TCP
    - containerPort: 9090
      name: dapr-metrics
      protocol: TCP
    readinessProbe:
      failureThreshold: 3
      httpGet:
        path: /v1.0/healthz
        port: 3500
        scheme: HTTP
      initialDelaySeconds: 3
      periodSeconds: 6
      successThreshold: 1
      timeoutSeconds: 3
    resources:
      limits:
        cpu: "4"
        memory: 512Mi
      requests:
        cpu: 500m
        memory: 250Mi
    terminationMessagePath: /dev/termination-log
    terminationMessagePolicy: File
    volumeMounts:
    - mountPath: /var/run/secrets/kubernetes.io/serviceaccount
      name: default-token-qncjc
      readOnly: true
  dnsPolicy: ClusterFirst
  enableServiceLinks: true
  nodeName: docker-desktop
  priority: 0
  restartPolicy: Always
  schedulerName: default-scheduler
  securityContext: {}
  serviceAccount: default
  serviceAccountName: default
  terminationGracePeriodSeconds: 30
  tolerations:
  - effect: NoExecute
    key: node.kubernetes.io/not-ready
    operator: Exists
    tolerationSeconds: 300
  - effect: NoExecute
    key: node.kubernetes.io/unreachable
    operator: Exists
    tolerationSeconds: 300
  volumes:
  - name: default-token-qncjc
    secret:
      defaultMode: 420
      secretName: default-token-qncjc
status:
  conditions:
  - lastProbeTime: null
    lastTransitionTime: "2020-09-25T07:07:07Z"
    status: "True"
    type: Initialized
  - lastProbeTime: null
    lastTransitionTime: "2020-09-25T07:07:07Z"
    message: 'containers with unready status: [daprd]'
    reason: ContainersNotReady
    status: "False"
    type: Ready
  - lastProbeTime: null
    lastTransitionTime: "2020-09-25T07:07:07Z"
    message: 'containers with unready status: [daprd]'
    reason: ContainersNotReady
    status: "False"
    type: ContainersReady
  - lastProbeTime: null
    lastTransitionTime: "2020-09-25T07:07:07Z"
    status: "True"
    type: PodScheduled
  containerStatuses:
  - containerID: docker://26a1d85ac6e2accd833832681b8dc2aa809e3c0fcfa293398bd5e7c2e8bf3e2b
    image: skyao/daprd:dev-linux-amd64
    imageID: docker-pullable://skyao/daprd@sha256:387f3bf4e7397c43dca9ac2d248a9ce790b1c1888aa0d6de3b07107ce124752f
    lastState:
      terminated:
        containerID: docker://26a1d85ac6e2accd833832681b8dc2aa809e3c0fcfa293398bd5e7c2e8bf3e2b
        exitCode: 1
        finishedAt: "2020-09-25T08:03:14Z"
        reason: Error
        startedAt: "2020-09-25T08:03:04Z"
    name: daprd
    ready: false
    restartCount: 21
    started: false
    state:
      waiting:
        message: back-off 5m0s restarting failed container=daprd pod=stateapp-567b6b9c6f-84kzb_dapr-tests(0f4060df-0312-4d73-91c1-6f085462b33d)
        reason: CrashLoopBackOff
  - containerID: docker://737745ace04213c9519ad1f91e248015c89a80e2b3d61081c3c530d1c89bdbae
    image: skyao/e2e-stateapp:dev-linux-amd64
    imageID: docker-pullable://skyao/e2e-stateapp@sha256:16351b331f1338a61348c9a87fce43728369f1bf18ee69d9d45fb13db0283644
    lastState: {}
    name: stateapp
    ready: true
    restartCount: 0
    started: true
    state:
      running:
        startedAt: "2020-09-25T07:07:24Z"
  hostIP: 192.168.65.3
  phase: Running
  podIP: 10.1.0.194
  podIPs:
  - ip: 10.1.0.194
  qosClass: Burstable
  startTime: "2020-09-25T07:07:07Z"

其他

injector自身的pod定义

dapr-sidecar-injector

apiVersion: v1
kind: Pod
metadata:
  annotations:
    prometheus.io/path: /
    prometheus.io/port: "9090"
    prometheus.io/scrape: "true"
  creationTimestamp: "2020-09-25T05:57:37Z"
  generateName: dapr-sidecar-injector-5f6f4bb6df-
  labels:
    app: dapr-sidecar-injector
    app.kubernetes.io/component: sidecar-injector
    app.kubernetes.io/managed-by: helm
    app.kubernetes.io/name: dapr
    app.kubernetes.io/part-of: dapr
    app.kubernetes.io/version: dev-linux-amd64
    pod-template-hash: 5f6f4bb6df
  name: dapr-sidecar-injector-5f6f4bb6df-n5dsk
  namespace: dapr-system
  ownerReferences:
  - apiVersion: apps/v1
    blockOwnerDeletion: true
    controller: true
    kind: ReplicaSet
    name: dapr-sidecar-injector-5f6f4bb6df
    uid: ff47b1df-6da7-4a19-b99d-15622ca3a485
  resourceVersion: "133143"
  selfLink: /api/v1/namespaces/dapr-system/pods/dapr-sidecar-injector-5f6f4bb6df-n5dsk
  uid: 40df3834-4df2-495a-aa26-5b2a22de7639
  spec:
  affinity:
    nodeAffinity:
      preferredDuringSchedulingIgnoredDuringExecution:
      - preference:
          matchExpressions:
          - key: kubernetes.io/os
            operator: In
            values:
            - linux
        weight: 1
  containers:
  - args:
    - --log-level
    - info
    - --log-as-json
    - --metrics-port
    - "9090"
    command:
    - /injector
    env:
    - name: TLS_CERT_FILE
      value: /dapr/cert/tls.crt
    - name: TLS_KEY_FILE
      value: /dapr/cert/tls.key
    - name: SIDECAR_IMAGE
      value: docker.io/skyao/daprd:dev-linux-amd64
    - name: NAMESPACE
      valueFrom:
        fieldRef:
          apiVersion: v1
          fieldPath: metadata.namespace
    image: docker.io/skyao/dapr:dev-linux-amd64
    imagePullPolicy: Always
        livenessProbe:
      failureThreshold: 5
      httpGet:
        path: /healthz
        port: 8080
        scheme: HTTP
      initialDelaySeconds: 3
      periodSeconds: 3
      successThreshold: 1
      timeoutSeconds: 1
    name: dapr-sidecar-injector
    ports:
    - containerPort: 4000
      name: https
      protocol: TCP
    - containerPort: 9090
      name: metrics
      protocol: TCP
    readinessProbe:
      failureThreshold: 5
      httpGet:
        path: /healthz
        port: 8080
        scheme: HTTP
      initialDelaySeconds: 3
      periodSeconds: 3
      successThreshold: 1
      timeoutSeconds: 1
    resources: {}
        securityContext:
      runAsUser: 1000
    terminationMessagePath: /dev/termination-log
    terminationMessagePolicy: File
    volumeMounts:
    - mountPath: /dapr/cert
      name: cert
      readOnly: true
    - mountPath: /var/run/secrets/kubernetes.io/serviceaccount
      name: dapr-operator-token-lgpvc
      readOnly: true
  dnsPolicy: ClusterFirst
  enableServiceLinks: true
  nodeName: docker-desktop
  priority: 0
  restartPolicy: Always
  schedulerName: default-scheduler
  securityContext: {}
  serviceAccount: dapr-operator
  serviceAccountName: dapr-operator
  terminationGracePeriodSeconds: 30
  tolerations:
    - effect: NoExecute
    key: node.kubernetes.io/not-ready
    operator: Exists
    tolerationSeconds: 300
  - effect: NoExecute
    key: node.kubernetes.io/unreachable
    operator: Exists
    tolerationSeconds: 300
  volumes:
  - name: cert
    secret:
      defaultMode: 420
      secretName: dapr-sidecar-injector-cert
  - name: dapr-operator-token-lgpvc
    secret:
      defaultMode: 420
      secretName: dapr-operator-token-lgpvc
status:
  conditions:
  - lastProbeTime: null
    lastTransitionTime: "2020-09-25T05:57:37Z"
    status: "True"
    type: Initialized
  - lastProbeTime: null
    lastTransitionTime: "2020-09-25T05:58:10Z"
    status: "True"
    type: Ready
  - lastProbeTime: null
    lastTransitionTime: "2020-09-25T05:58:10Z"
    status: "True"
    type: ContainersReady
  - lastProbeTime: null
    lastTransitionTime: "2020-09-25T05:57:37Z"
    status: "True"
    type: PodScheduled
  containerStatuses:
  - containerID: docker://a820646b468a07eabdd89ca133f062a93e85256afc6c19c1bdf13b56980ec5e9
    image: skyao/dapr:dev-linux-amd64
    imageID: docker-pullable://skyao/dapr@sha256:77003eee9fd02d9fc24c2e9f385a6c86223bc35915cede98a8897c0dfc51ee61
    lastState: {}
    name: dapr-sidecar-injector
    ready: true
    restartCount: 0
    started: true
    state:
      running:
        startedAt: "2020-09-25T05:58:06Z"
  hostIP: 192.168.65.3
  phase: Running
  podIP: 10.1.0.188
  podIPs:
  - ip: 10.1.0.188
  qosClass: BestEffort
  startTime: "2020-09-25T05:57:37Z"

11.2 - main.go的源码学习

Dapr Injector 的 main 代码

Dapr injector 中的 main.go 文件的源码分析。

init() 方法

init() 进行初始化,包括 flag (logger, metric),

flag 设定和读取

func init() {
	loggerOptions := logger.DefaultOptions()
	// 这里设定了 `log-level` 和 `log-as-json`
	loggerOptions.AttachCmdFlags(flag.StringVar, flag.BoolVar)

	metricsExporter := metrics.NewExporter(metrics.DefaultMetricNamespace)
	
	// 这里设定了 `metrics-port` 和 `enable-metrics`
metricsExporter.Options().AttachCmdFlags(flag.StringVar, flag.BoolVar)

	flag.Parse()

参考 injector pod yaml文件中 Command 段:

    Command:
      /injector
    Args:
      --log-level
      info
      --log-as-json
      --enable-metrics
      --metrics-port
      9090

初始化 logger

	// Apply options to all loggers
	if err := logger.ApplyOptionsToLoggers(&loggerOptions); err != nil {
		log.Fatal(err)
	} else {
		log.Infof("log level set to: %s", loggerOptions.OutputLevel)
	}

初始化 metrics

	// Initialize dapr metrics exporter
	if err := metricsExporter.Init(); err != nil {
		log.Fatal(err)
	}

	// Initialize injector service metrics
	if err := monitoring.InitMetrics(); err != nil {
		log.Fatal(err)
	}

main() 方法

获取配置

从环境变量中读取配置:

func main() {
	logger.DaprVersion = version.Version()
	log.Infof("starting Dapr Sidecar Injector -- version %s -- commit %s", version.Version(), version.Commit())

	ctx := signals.Context()
	cfg, err := injector.GetConfigFromEnvironment()
	if err != nil {
		log.Fatalf("error getting config: %s", err)
	}
	......
}

获取daprClient

	kubeClient := utils.GetKubeClient()
	conf := utils.GetConfig()
	daprClient, _ := scheme.NewForConfig(conf)

启动 healthz server

	go func() {
		healthzServer := health.NewServer(log)
		healthzServer.Ready()

		healthzErr := healthzServer.Run(ctx, healthzPort)
		if healthzErr != nil {
			log.Fatalf("failed to start healthz server: %s", healthzErr)
		}
	}()

service account

	uids, err := injector.AllowedControllersServiceAccountUID(ctx, kubeClient)
	if err != nil {
		log.Fatalf("failed to get authentication uids from services accounts: %s", err)
	}

创建 injector

	injector.NewInjector(uids, cfg, daprClient, kubeClient).Run(ctx)

graceful shutdown

简单的sleep 5秒作为 graceful shutdown :

	shutdownDuration := 5 * time.Second
	log.Infof("allowing %s for graceful shutdown to complete", shutdownDuration)
	<-time.After(shutdownDuration)

11.3 - config.go的源码学习

Dapr Injector 的 config 代码

Dapr injector package中的 config.go 文件的源码分析。

代码实现

Config 结构体定义

Injector 相关的配置项定义:

// Config represents configuration options for the Dapr Sidecar Injector webhook server
type Config struct {
	TLSCertFile            string `envconfig:"TLS_CERT_FILE" required:"true"`
	TLSKeyFile             string `envconfig:"TLS_KEY_FILE" required:"true"`
	SidecarImage           string `envconfig:"SIDECAR_IMAGE" required:"true"`
	SidecarImagePullPolicy string `envconfig:"SIDECAR_IMAGE_PULL_POLICY"`
	Namespace              string `envconfig:"NAMESPACE" required:"true"`
}

NewConfigWithDefaults() 方法

只设置了一个 SidecarImagePullPolicy 的默认值:

func NewConfigWithDefaults() Config {
	return Config{
		SidecarImagePullPolicy: "Always",
	}
}

这个方法只被下面的 GetConfigFromEnvironment() 方法调用。

GetConfigFromEnvironment() 方法

从环境中获取配置

func GetConfigFromEnvironment() (Config, error) {
	c := NewConfigWithDefaults()
	err := envconfig.Process("", &c)
	return c, err
}

envconfig.Process() 的代码实现会通过反射读取到 Config 结构体的信息,然后根据设定的环境变量名来读取。

这个方法的调用只有一个地方,在injector main 函数的开始位置:

func main() {
   log.Infof("starting Dapr Sidecar Injector -- version %s -- commit %s", version.Version(), version.Commit())

   ctx := signals.Context()
   cfg, err := injector.GetConfigFromEnvironment()
   if err != nil {
      log.Fatalf("error getting config: %s", err)
   }
   ......  
}

通过命令如 k describe pod dapr-sidecar-injector-6f656b7dd-sg87p -n dapr-system 拿到 injector pod 的yaml 文件,可以看到 Environment 的这一段:

    Environment:
      TLS_CERT_FILE:              /dapr/cert/tls.crt
      TLS_KEY_FILE:               /dapr/cert/tls.key
      SIDECAR_IMAGE:              docker.io/skyao/daprd:dev-linux-amd64
      SIDECAR_IMAGE_PULL_POLICY:  IfNotPresent
      NAMESPACE:                  dapr-system (v1:metadata.namespace)

injector yaml 备用

以下是完整的 injector pod yaml,留着备用:

Name:         dapr-sidecar-injector-6f656b7dd-sg87p
Namespace:    dapr-system
Priority:     0
Node:         docker-desktop/192.168.65.3
Start Time:   Mon, 19 Apr 2021 15:04:07 +0800
Labels:       app=dapr-sidecar-injector
              app.kubernetes.io/component=sidecar-injector
              app.kubernetes.io/managed-by=helm
              app.kubernetes.io/name=dapr
              app.kubernetes.io/part-of=dapr
              app.kubernetes.io/version=dev-linux-amd64
              pod-template-hash=6f656b7dd
Annotations:  prometheus.io/path: /
              prometheus.io/port: 9090
              prometheus.io/scrape: true
Status:       Running
IP:           10.1.2.162
IPs:
  IP:           10.1.2.162
Controlled By:  ReplicaSet/dapr-sidecar-injector-6f656b7dd
Containers:
  dapr-sidecar-injector:
    Container ID:  docker://544dabf00bdaba9cf8f320218dd0b7e6d2ebce7fbf5184ce162d58bc693162d9
    Image:         docker.io/skyao/dapr:dev-linux-amd64
    Image ID:      docker-pullable://skyao/dapr@sha256:b4843ee78eabf014e15749bc4daa5c249ce3d33f796a89aaba9d117dd3dc76c9
    Ports:         4000/TCP, 9090/TCP
    Host Ports:    0/TCP, 0/TCP
    Command:
      /injector
    Args:
      --log-level
      info
      --log-as-json
      --enable-metrics
      --metrics-port
      9090
    State:          Running
      Started:      Mon, 19 Apr 2021 15:04:08 +0800
    Ready:          True
    Restart Count:  0
    Liveness:       http-get http://:8080/healthz delay=3s timeout=1s period=3s #success=1 #failure=5
    Readiness:      http-get http://:8080/healthz delay=3s timeout=1s period=3s #success=1 #failure=5
    Environment:
      TLS_CERT_FILE:              /dapr/cert/tls.crt
      TLS_KEY_FILE:               /dapr/cert/tls.key
      SIDECAR_IMAGE:              docker.io/skyao/daprd:dev-linux-amd64
      SIDECAR_IMAGE_PULL_POLICY:  IfNotPresent
      NAMESPACE:                  dapr-system (v1:metadata.namespace)
    Mounts:
      /dapr/cert from cert (ro)
      /var/run/secrets/kubernetes.io/serviceaccount from dapr-operator-token-cjpnd (ro)
Conditions:
  Type              Status
  Initialized       True 
  Ready             True 
  ContainersReady   True 
  PodScheduled      True 
Volumes:
  cert:
    Type:        Secret (a volume populated by a Secret)
    SecretName:  dapr-sidecar-injector-cert
    Optional:    false
  dapr-operator-token-cjpnd:
    Type:        Secret (a volume populated by a Secret)
    SecretName:  dapr-operator-token-cjpnd
    Optional:    false
QoS Class:       BestEffort
Node-Selectors:  <none>
Tolerations:     node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
                 node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
Events:
  Type    Reason     Age   From               Message
  ----    ------     ----  ----               -------
  Normal  Scheduled  17m   default-scheduler  Successfully assigned dapr-system/dapr-sidecar-injector-6f656b7dd-sg87p to docker-desktop
  Normal  Pulled     17m   kubelet            Container image "docker.io/skyao/dapr:dev-linux-amd64" already present on machine
  Normal  Created    17m   kubelet            Created container dapr-sidecar-injector
  Normal  Started    17m   kubelet            Started container dapr-sidecar-injector

11.4 - injector.go的源码学习

Dapr Injector 中的 injector.go 的 代码

主流程代码

接口和结构体定义和创建

Injector 是Dapr运行时 sidecar 注入组件的接口。

// Injector is the interface for the Dapr runtime sidecar injection component
type Injector interface {
   Run(ctx context.Context)
}

injector 结构体定义:

type injector struct {
   config       Config
   deserializer runtime.Decoder
   server       *http.Server
   kubeClient   *kubernetes.Clientset
   daprClient   scheme.Interface
   authUIDs     []string
}

创建新的 injector 结构体(这个方法在injecot的main方法中被调用):

// NewInjector returns a new Injector instance with the given config
func NewInjector(authUIDs []string, config Config, daprClient scheme.Interface, kubeClient *kubernetes.Clientset) Injector {
   mux := http.NewServeMux()

   i := &injector{
      config: config,
      deserializer: serializer.NewCodecFactory(
         runtime.NewScheme(),
      ).UniversalDeserializer(),
      // 启动http server
      server: &http.Server{
         Addr:    fmt.Sprintf(":%d", port),
         Handler: mux,
      },
      kubeClient: kubeClient,
      daprClient: daprClient,
      authUIDs:   authUIDs,
   }

   // 给 k8s 调用的 mutate 端点
   mux.HandleFunc("/mutate", i.handleRequest)
   return i
}

Run()方法

最核心的run方法,

func (i *injector) Run(ctx context.Context) {
   doneCh := make(chan struct{})

   // 启动go routing,监听 ctx 和 doneCh 的信号
   go func() {
      select {
      case <-ctx.Done():
         log.Info("Sidecar injector is shutting down")
         shutdownCtx, cancel := context.WithTimeout(
            context.Background(),
            time.Second*5,
         )
         defer cancel()
         i.server.Shutdown(shutdownCtx) // nolint: errcheck
      case <-doneCh:
      }
   }()

   // 打印启动时的日志,这行日志可以通过 
   log.Infof("Sidecar injector is listening on %s, patching Dapr-enabled pods", i.server.Addr)
   // TODO:这里有时会报错,证书有问题,导致injector无法正常工作,后面再来检查
   err := i.server.ListenAndServeTLS(i.config.TLSCertFile, i.config.TLSKeyFile)
   if err != http.ErrServerClosed {
      log.Errorf("Sidecar injector error: %s", err)
   }
   close(doneCh)
}

可以对比通过 k logs dapr-sidecar-injector-86b8dc4dcd-bkbgw -n dapr-system 命令查看到的injecot 日志内容:

{"instance":"dapr-sidecar-injector-86b8dc4dcd-bkbgw","level":"info","msg":"log level set to: info","scope":"dapr.injector","time":"2021-05-11T01:13:20.1904136Z","type":"log","ver":"unknown"}
{"instance":"dapr-sidecar-injector-86b8dc4dcd-bkbgw","level":"info","msg":"metrics server started on :9090/","scope":"dapr.metrics","time":"2021-05-11T01:13:20.1907347Z","type":"log","ver":"unknown"}
{"instance":"dapr-sidecar-injector-86b8dc4dcd-bkbgw","level":"info","msg":"starting Dapr Sidecar Injector -- version edge -- commit v1.0.0-rc.4-163-g9a4210a-dirty","scope":"dapr.injector","time":"2021-05-11T01:13:20.191669Z","type":"log","ver":"unknown"}
{"instance":"dapr-sidecar-injector-86b8dc4dcd-bkbgw","level":"info","msg":"Healthz server is listening on :8080","scope":"dapr.injector","time":"2021-05-11T01:13:20.1928941Z","type":"log","ver":"unknown"}

{"instance":"dapr-sidecar-injector-86b8dc4dcd-bkbgw","level":"info","msg":"Sidecar injector is listening on :4000, patching Dapr-enabled pods","scope":"dapr.injector","time":"2021-05-11T01:13:20.208587Z","type":"log","ver":"unknown"}

handleRequest方法

handleRequest方法用来处理来自 k8s api server的 mutate 调用:

mux.HandleFunc("/mutate", i.handleRequest)

func (i *injector) handleRequest(w http.ResponseWriter, r *http.Request) {
  ......
}

代码比较长,忽略部分细节代码。

读取请求的body,验证长度和content-type:

defer r.Body.Close()

var body []byte
if r.Body != nil {
   if data, err := ioutil.ReadAll(r.Body); err == nil {
      body = data
   }
}
if len(body) == 0 {
   log.Error("empty body")
   http.Error(w, "empty body", http.StatusBadRequest)
   return
}

contentType := r.Header.Get("Content-Type")
if contentType != "application/json" {
  log.Errorf("Content-Type=%s, expect application/json", contentType)
  http.Error(
    w,
    "invalid Content-Type, expect `application/json`",
    http.StatusUnsupportedMediaType,
  )

  return
}

反序列化body,并做一些基本的验证:

ar := v1.AdmissionReview{}
_, gvk, err := i.deserializer.Decode(body, nil, &ar)
if err != nil {
   log.Errorf("Can't decode body: %v", err)
} else {
   if !utils.StringSliceContains(ar.Request.UserInfo.UID, i.authUIDs) {
      err = errors.Wrapf(err, "unauthorized request")
      log.Error(err)
   } else if ar.Request.Kind.Kind != "Pod" {
      err = errors.Wrapf(err, "invalid kind for review: %s", ar.Kind)
      log.Error(err)
   } else {
      patchOps, err = i.getPodPatchOperations(&ar, i.config.Namespace, i.config.SidecarImage, i.config.SidecarImagePullPolicy, i.kubeClient, i.daprClient)
   }
}

getPodPatchOperations 是核心代码,后面细看。

统一处理前面可能产生的错误,以及 getPodPatchOperations() 的处理结果:

diagAppID := getAppIDFromRequest(ar.Request)

if err != nil {
   admissionResponse = toAdmissionResponse(err)
   log.Errorf("Sidecar injector failed to inject for app '%s'. Error: %s", diagAppID, err)
   monitoring.RecordFailedSidecarInjectionCount(diagAppID, "patch")
} else if len(patchOps) == 0 {
   // len(patchOps) == 0 表示什么都没改,返回  Allowed: true
   admissionResponse = &v1.AdmissionResponse{
      Allowed: true,
   }
} else {
   var patchBytes []byte
   // 将 patchOps 序列化为json
   patchBytes, err = json.Marshal(patchOps)
   if err != nil {
      admissionResponse = toAdmissionResponse(err)
   } else {
      // 返回AdmissionResponse
      admissionResponse = &v1.AdmissionResponse{
         Allowed: true,
         Patch:   patchBytes,
         PatchType: func() *v1.PatchType {
            pt := v1.PatchTypeJSONPatch
            return &pt
         }(),
      }
   }
}

组装 AdmissionReview:

admissionReview := v1.AdmissionReview{}
if admissionResponse != nil {
   admissionReview.Response = admissionResponse
   if ar.Request != nil {
      admissionReview.Response.UID = ar.Request.UID
      admissionReview.SetGroupVersionKind(*gvk)
   }
}

将应答序列化并返回:

log.Infof("ready to write response ...")
respBytes, err := json.Marshal(admissionReview)
if err != nil {
   http.Error(
      w,
      err.Error(),
      http.StatusInternalServerError,
   )

   log.Errorf("Sidecar injector failed to inject for app '%s'. Can't deserialize response: %s", diagAppID, err)
   monitoring.RecordFailedSidecarInjectionCount(diagAppID, "response")
}
w.Header().Set("Content-Type", "application/json")
if _, err := w.Write(respBytes); err != nil {
   log.Error(err)
} else {
   log.Infof("Sidecar injector succeeded injection for app '%s'", diagAppID)
   monitoring.RecordSuccessfulSidecarInjectionCount(diagAppID)
}

帮助类代码

toAdmissionResponse方法

toAdmissionResponse 方法用于从一个 error 创建 k8s 的 AdmissionResponse :

// toAdmissionResponse is a helper function to create an AdmissionResponse
// with an embedded error
func toAdmissionResponse(err error) *v1.AdmissionResponse {
   return &v1.AdmissionResponse{
      Result: &metav1.Status{
         Message: err.Error(),
      },
   }
}

获取AppID

getAppIDFromRequest() 方法从 AdmissionRequest 中获取AppID:

func getAppIDFromRequest(req *v1.AdmissionRequest) string {
   // default App ID
   appID := ""

   // if req is not given
   if req == nil {
      return appID
   }

   var pod corev1.Pod
   // 解析pod的raw数据为json
   if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
      log.Warnf("could not unmarshal raw object: %v", err)
   } else {
      // 然后从pod信息中获取appID
      appID = getAppID(pod)
   }

   return appID
}

getAppID()方法的实现如下,首先读取 “dapr.io/app-id” 的 Annotation,如果没有,则取 pod 的 name 作为默认AppID:

const	appIDKey                          = "dapr.io/app-id"
func getAppID(pod corev1.Pod) string {
	return getStringAnnotationOrDefault(pod.Annotations, appIDKey, pod.GetName())
}

分支代码

ServiceAccount 相关代码

AllowedControllersServiceAccountUID()方法返回UID数组,这些是 webhook handler 上容许的 service account 列表:

var allowedControllersServiceAccounts = []string{
	"replicaset-controller",
	"deployment-controller",
	"cronjob-controller",
	"job-controller",
	"statefulset-controller",
}

// AllowedControllersServiceAccountUID returns an array of UID, list of allowed service account on the webhook handler
func AllowedControllersServiceAccountUID(ctx context.Context, kubeClient *kubernetes.Clientset) ([]string, error) {
   allowedUids := []string{}
   for i, allowedControllersServiceAccount := range allowedControllersServiceAccounts {
      saUUID, err := getServiceAccount(ctx, kubeClient, allowedControllersServiceAccount)
      // i == 0 => "replicaset-controller" is the only one mandatory
      if err != nil && i == 0 {
         return nil, err
      } else if err != nil {
         log.Warnf("Unable to get SA %s UID (%s)", allowedControllersServiceAccount, err)
         continue
      }
      allowedUids = append(allowedUids, saUUID)
   }

   return allowedUids, nil
}

func getServiceAccount(ctx context.Context, kubeClient *kubernetes.Clientset, allowedControllersServiceAccount string) (string, error) {
	ctxWithTimeout, cancel := context.WithTimeout(ctx, getKubernetesServiceAccountTimeoutSeconds*time.Second)
	defer cancel()

	sa, err := kubeClient.CoreV1().ServiceAccounts(metav1.NamespaceSystem).Get(ctxWithTimeout, allowedControllersServiceAccount, metav1.GetOptions{})
	if err != nil {
		return "", err
	}

	return string(sa.ObjectMeta.UID), nil
}

11.5 - patch_operation.go的源码学习

Dapr Injector 中的 patch_operation.go 的 代码

代码非常简单,只定义了一个结构体 PatchOperation,用来表示要应用于Kubernetes资源的一个单独的变化。

// PatchOperation represents a discreet change to be applied to a Kubernetes resource
type PatchOperation struct {
	Op    string      `json:"op"`
	Path  string      `json:"path"`
	Value interface{} `json:"value,omitempty"`
}

11.6 - pod_patch.go的源码学习

Dapr Injector 中的 pod_patch.go 的 代码

主流程

getPodPatchOperations() 是最重要的方法,injector 对 pod 的修改就在这里进行:


func (i *injector) getPodPatchOperations(ar *v1.AdmissionReview,
	namespace, image, imagePullPolicy string, kubeClient *kubernetes.Clientset, daprClient scheme.Interface) ([]PatchOperation, error) {
    ......
  	return patchOps, nil
}

解析request,得到 pod 对象 (这里和前面重复了?):

req := ar.Request
var pod corev1.Pod
if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
   errors.Wrap(err, "could not unmarshal raw object")
   return nil, err
}

判断是否需要 injector 做处理:

if !isResourceDaprEnabled(pod.Annotations) || podContainsSidecarContainer(&pod) {
   return nil, nil
}

// 判断是否启动了dapr,依据是是否设置 annotation "dapr.io/enabled" 为 true,默认为false
const daprEnabledKey                    = "dapr.io/enabled"
func isResourceDaprEnabled(annotations map[string]string) bool {
	return getBoolAnnotationOrDefault(annotations, daprEnabledKey, false)
}

// 判断是否包含了 dapr 的 sidecar container
const 	sidecarContainerName              = "daprd"
func podContainsSidecarContainer(pod *corev1.Pod) bool {
	for _, c := range pod.Spec.Containers {
    // 检测方式是循环pod中的所有container,检查是否有container的名字为 "daprd"
		if c.Name == sidecarContainerName {
			return true
		}
	}
	return false
}

创建 daprd sidecar container:

sidecarContainer, err := getSidecarContainer(pod.Annotations, id, image, imagePullPolicy, req.Namespace, apiSrvAddress, placementAddress, tokenMount, trustAnchors, certChain, certKey, sentryAddress, mtlsEnabled, identity)

getSidecarContainer()的细节后面看,先走完主流程。

patchOps := []PatchOperation{}
envPatchOps := []PatchOperation{}
var path string
var value interface{}
if len(pod.Spec.Containers) == 0 {
   // 如果pod的container数量为0(什么情况下会有这种没有container的pod?)
   path = containersPath
   value = []corev1.Container{*sidecarContainer}
} else {
   // 将 daprd 的sidecar 加入
   envPatchOps = addDaprEnvVarsToContainers(pod.Spec.Containers)
   // TODO:path 的设值有什么规范或者要求?
   path = "/spec/containers/-"
   value = sidecarContainer
}

	patchOps = append(
		patchOps,
		PatchOperation{
			Op:    "add",
			Path:  path,
			Value: value,
		},
	)
	patchOps = append(patchOps, envPatchOps...)

addDaprEnvVarsToContainers

// This function add Dapr environment variables to all the containers in any Dapr enabled pod.
// The containers can be injected or user defined.
func addDaprEnvVarsToContainers(containers []corev1.Container) []PatchOperation {
   portEnv := []corev1.EnvVar{
      {
         Name:  userContainerDaprHTTPPortName,
         Value: strconv.Itoa(sidecarHTTPPort),
      },
      {
         Name:  userContainerDaprGRPCPortName,
         Value: strconv.Itoa(sidecarAPIGRPCPort),
      },
   }
   envPatchOps := make([]PatchOperation, 0, len(containers))
   for i, container := range containers {
      path := fmt.Sprintf("%s/%d/env", containersPath, i)
      patchOps := getEnvPatchOperations(container.Env, portEnv, path)
      envPatchOps = append(envPatchOps, patchOps...)
   }
   return envPatchOps
}

分支流程:mTLS的处理

mtlsEnabled := mTLSEnabled(daprClient)
if mtlsEnabled {
   trustAnchors, certChain, certKey = getTrustAnchorsAndCertChain(kubeClient, namespace)
   identity = fmt.Sprintf("%s:%s", req.Namespace, pod.Spec.ServiceAccountName)
}
func mTLSEnabled(daprClient scheme.Interface) bool {
   resp, err := daprClient.ConfigurationV1alpha1().Configurations(meta_v1.NamespaceAll).List(meta_v1.ListOptions{})
   if err != nil {
      log.Errorf("Failed to load dapr configuration from k8s, use default value %t for mTLSEnabled: %s", defaultMtlsEnabled, err)
      return defaultMtlsEnabled
   }

   for _, c := range resp.Items {
      if c.GetName() == defaultConfig {
         return c.Spec.MTLSSpec.Enabled
      }
   }
   log.Infof("Dapr system configuration (%s) is not found, use default value %t for mTLSEnabled", defaultConfig, defaultMtlsEnabled)
   return defaultMtlsEnabled
}

分支处理:serviceaccount