1 - options.go的源码学习

用于定制 runtime 中包含的组件

Dapr runtime package中的 options.go 文件的源码学习,用于定制 runtime 中包含的组件。

runtimeOpts 结构体定义

runtimeOpts封装了需要包含在 runtime 中的 component:

type (
	// runtimeOpts encapsulates the components to include in the runtime.
	runtimeOpts struct {
		secretStores    []secretstores.SecretStore
		states          []state.State
		pubsubs         []pubsub.PubSub
		nameResolutions []nameresolution.NameResolution
		inputBindings   []bindings.InputBinding
		outputBindings  []bindings.OutputBinding
		httpMiddleware  []http.Middleware
	}
)

Option 方法

Option 方法用于定制 runtime:

type (
	// Option is a function that customizes the runtime.
	Option func(o *runtimeOpts)
)

定制runtime的With系列方法

提供多个 WithXxx() 方法,用于定制 runtime 的组件:


// WithSecretStores adds secret store components to the runtime.
func WithSecretStores(secretStores ...secretstores.SecretStore) Option {
	return func(o *runtimeOpts) {
		o.secretStores = append(o.secretStores, secretStores...)
	}
}

// WithStates adds state store components to the runtime.
func WithStates(states ...state.State) Option {
	return func(o *runtimeOpts) {
		o.states = append(o.states, states...)
	}
}

// WithPubSubs adds pubsub store components to the runtime.
func WithPubSubs(pubsubs ...pubsub.PubSub) Option {
	return func(o *runtimeOpts) {
		o.pubsubs = append(o.pubsubs, pubsubs...)
	}
}

// WithNameResolutions adds name resolution components to the runtime.
func WithNameResolutions(nameResolutions ...nameresolution.NameResolution) Option {
	return func(o *runtimeOpts) {
		o.nameResolutions = append(o.nameResolutions, nameResolutions...)
	}
}

// WithInputBindings adds input binding components to the runtime.
func WithInputBindings(inputBindings ...bindings.InputBinding) Option {
	return func(o *runtimeOpts) {
		o.inputBindings = append(o.inputBindings, inputBindings...)
	}
}

// WithOutputBindings adds output binding components to the runtime.
func WithOutputBindings(outputBindings ...bindings.OutputBinding) Option {
	return func(o *runtimeOpts) {
		o.outputBindings = append(o.outputBindings, outputBindings...)
	}
}

// WithHTTPMiddleware adds HTTP middleware components to the runtime.
func WithHTTPMiddleware(httpMiddleware ...http.Middleware) Option {
	return func(o *runtimeOpts) {
		o.httpMiddleware = append(o.httpMiddleware, httpMiddleware...)
	}
}

这些方法都只是简单的往 runtimeOpts 结构体的各个组件字段里面保存信息,用于后续 runtime 的初始化。

2 - config.go的源码学习

解析命令行标记并返回 DaprRuntime 实例

Dapr runtime package中的 cli.go 文件的源码学习,解析命令行标记并返回 DaprRuntime 实例。

cli.go 基本上就一个 FromFlags() 方法。

常量定义

protocol,目前只支持 http 和 grpc :

// Protocol is a communications protocol
type Protocol string

const (
	// GRPCProtocol is a gRPC communication protocol
	GRPCProtocol Protocol = "grpc"
	// HTTPProtocol is a HTTP communication protocol
	HTTPProtocol Protocol = "http"
)

各种端口的默认值:

const (
	// DefaultDaprHTTPPort is the default http port for Dapr
	DefaultDaprHTTPPort = 3500
	// DefaultDaprAPIGRPCPort is the default API gRPC port for Dapr
	DefaultDaprAPIGRPCPort = 50001
	// DefaultProfilePort is the default port for profiling endpoints
	DefaultProfilePort = 7777
	// DefaultMetricsPort is the default port for metrics endpoints
	DefaultMetricsPort = 9090
)

http默认配置,目前只有一个 MaxRequestBodySize :

const (
	// DefaultMaxRequestBodySize is the default option for the maximum body size in MB for Dapr HTTP servers
	DefaultMaxRequestBodySize = 4
)

Config 结构体

// Config holds the Dapr Runtime configuration
type Config struct {
	ID                   string
	HTTPPort             int
	ProfilePort          int
	EnableProfiling      bool
	APIGRPCPort          int
	InternalGRPCPort     int
	ApplicationPort      int
	ApplicationProtocol  Protocol
	Mode                 modes.DaprMode
	PlacementAddresses   []string
	GlobalConfig         string
	AllowedOrigins       string
	Standalone           config.StandaloneConfig
	Kubernetes           config.KubernetesConfig
	MaxConcurrency       int
	mtlsEnabled          bool
	SentryServiceAddress string
	CertChain            *credentials.CertChain
	AppSSL               bool
	MaxRequestBodySize   int
}

有点乱,所有的字段都是扁平的,以后越加越多。。。

构建Config

简单赋值构建 config 结构体,这个参数是在太多了一点:

// NewRuntimeConfig returns a new runtime config
func NewRuntimeConfig(
   id string, placementAddresses []string,
   controlPlaneAddress, allowedOrigins, globalConfig, componentsPath, appProtocol, mode string,
   httpPort, internalGRPCPort, apiGRPCPort, appPort, profilePort int,
   enableProfiling bool, maxConcurrency int, mtlsEnabled bool, sentryAddress string, appSSL bool, maxRequestBodySize int) *Config {
   return &Config{
      ID:                  id,
      HTTPPort:            httpPort,
      InternalGRPCPort:    internalGRPCPort,
      APIGRPCPort:         apiGRPCPort,
      ApplicationPort:     appPort,
      ProfilePort:         profilePort,
      ApplicationProtocol: Protocol(appProtocol),
      Mode:                modes.DaprMode(mode),
      PlacementAddresses:  placementAddresses,
      GlobalConfig:        globalConfig,
      AllowedOrigins:      allowedOrigins,
      Standalone: config.StandaloneConfig{
         ComponentsPath: componentsPath,
      },
      Kubernetes: config.KubernetesConfig{
         ControlPlaneAddress: controlPlaneAddress,
      },
      EnableProfiling:      enableProfiling,
      MaxConcurrency:       maxConcurrency,
      mtlsEnabled:          mtlsEnabled,
      SentryServiceAddress: sentryAddress,
      AppSSL:               appSSL,
      MaxRequestBodySize:   maxRequestBodySize,
   }
}

3 - cli.go的源码学习

解析命令行标记并返回 DaprRuntime 实例

Dapr runtime package中的 cli.go 文件的源码学习,解析命令行标记并返回 DaprRuntime 实例。

cli.go 基本上就一个 FromFlags() 方法。

FromFlags()概述

FromFlags() 方法解析命令行标记并返回 DaprRuntime 实例:

// FromFlags parses command flags and returns DaprRuntime instance
func FromFlags() (*DaprRuntime, error) {
   ......
   return NewDaprRuntime(runtimeConfig, globalConfig, accessControlList), nil
}

解析命令行标记

通用标记

代码如下:

mode := flag.String("mode", string(modes.StandaloneMode), "Runtime mode for Dapr")
daprHTTPPort := flag.String("dapr-http-port", fmt.Sprintf("%v", DefaultDaprHTTPPort), "HTTP port for Dapr API to listen on")
daprAPIGRPCPort := flag.String("dapr-grpc-port", fmt.Sprintf("%v", DefaultDaprAPIGRPCPort), "gRPC port for the Dapr API to listen on")
daprInternalGRPCPort := flag.String("dapr-internal-grpc-port", "", "gRPC port for the Dapr Internal API to listen on")
appPort := flag.String("app-port", "", "The port the application is listening on")
profilePort := flag.String("profile-port", fmt.Sprintf("%v", DefaultProfilePort), "The port for the profile server")
appProtocol := flag.String("app-protocol", string(HTTPProtocol), "Protocol for the application: grpc or http")
componentsPath := flag.String("components-path", "", "Path for components directory. If empty, components will not be loaded. Self-hosted mode only")
config := flag.String("config", "", "Path to config file, or name of a configuration object")
appID := flag.String("app-id", "", "A unique ID for Dapr. Used for Service Discovery and state")
controlPlaneAddress := flag.String("control-plane-address", "", "Address for a Dapr control plane")
sentryAddress := flag.String("sentry-address", "", "Address for the Sentry CA service")
placementServiceHostAddr := flag.String("placement-host-address", "", "Addresses for Dapr Actor Placement servers")
allowedOrigins := flag.String("allowed-origins", cors.DefaultAllowedOrigins, "Allowed HTTP origins")
enableProfiling := flag.Bool("enable-profiling", false, "Enable profiling")
runtimeVersion := flag.Bool("version", false, "Prints the runtime version")
appMaxConcurrency := flag.Int("app-max-concurrency", -1, "Controls the concurrency level when forwarding requests to user code")
enableMTLS := flag.Bool("enable-mtls", false, "Enables automatic mTLS for daprd to daprd communication channels")
appSSL := flag.Bool("app-ssl", false, "Sets the URI scheme of the app to https and attempts an SSL connection")
daprHTTPMaxRequestSize := flag.Int("dapr-http-max-request-size", -1, "Increasing max size of request body in MB to handle uploading of big files. By default 4 MB.")

TODO:应该有命令行参数的文档,对照文档学习一遍。

解析日志相关的标记

loggerOptions := logger.DefaultOptions()
loggerOptions.AttachCmdFlags(flag.StringVar, flag.BoolVar)

解析metrics相关的标记

metricsExporter := metrics.NewExporter(metrics.DefaultMetricNamespace)

// attaching only metrics-port option
metricsExporter.Options().AttachCmdFlag(flag.StringVar)

然后执行解析:

flag.Parse()

执行version命令

如果只是version命令,则打印版本信息之后就可以退出进程了:

runtimeVersion := flag.Bool("version", false, "Prints the runtime version")

if *runtimeVersion {
   fmt.Println(version.Version())
   os.Exit(0)
}

初始化日志和metrics

日志初始化

根据日志属性初始化logger:

loggerOptions := logger.DefaultOptions()
loggerOptions.AttachCmdFlags(flag.StringVar, flag.BoolVar)

if *appID == "" {
   return nil, errors.New("app-id parameter cannot be empty")
}

// Apply options to all loggers
loggerOptions.SetAppID(*appID)
if err := logger.ApplyOptionsToLoggers(&loggerOptions); err != nil {
   return nil, err
}

完成日志初始化之后就可以愉快的打印日志了:

log.Infof("starting Dapr Runtime -- version %s -- commit %s", version.Version(), version.Commit())
log.Infof("log level set to: %s", loggerOptions.OutputLevel)

metrics初始化

初始化dapr metrics exporter:

// Initialize dapr metrics exporter
if err := metricsExporter.Init(); err != nil {
   log.Fatal(err)
}

解析配置

解析dapr各种端口设置

dapr-http-port / dapr-grpc-port / profile-port / dapr-internal-grpc-port / app-port :

daprHTTP, err := strconv.Atoi(*daprHTTPPort)
if err != nil {
   return nil, errors.Wrap(err, "error parsing dapr-http-port flag")
}

daprAPIGRPC, err := strconv.Atoi(*daprAPIGRPCPort)
if err != nil {
   return nil, errors.Wrap(err, "error parsing dapr-grpc-port flag")
}

profPort, err := strconv.Atoi(*profilePort)
if err != nil {
   return nil, errors.Wrap(err, "error parsing profile-port flag")
}

var daprInternalGRPC int
if *daprInternalGRPCPort != "" {
   daprInternalGRPC, err = strconv.Atoi(*daprInternalGRPCPort)
   if err != nil {
      return nil, errors.Wrap(err, "error parsing dapr-internal-grpc-port")
   }
} else {
   daprInternalGRPC, err = grpc.GetFreePort()
   if err != nil {
      return nil, errors.Wrap(err, "failed to get free port for internal grpc server")
   }
}

var applicationPort int
if *appPort != "" {
   applicationPort, err = strconv.Atoi(*appPort)
   if err != nil {
      return nil, errors.Wrap(err, "error parsing app-port")
   }
}

解析其他配置

继续解析 maxRequestBodySize / placementAddresses / concurrency / appProtocol 等 配置:

var maxRequestBodySize int
if *daprHTTPMaxRequestSize != -1 {
   maxRequestBodySize = *daprHTTPMaxRequestSize
} else {
   maxRequestBodySize = DefaultMaxRequestBodySize
}

placementAddresses := []string{}
if *placementServiceHostAddr != "" {
   placementAddresses = parsePlacementAddr(*placementServiceHostAddr)
}

var concurrency int
if *appMaxConcurrency != -1 {
   concurrency = *appMaxConcurrency
}

appPrtcl := string(HTTPProtocol)
if *appProtocol != string(HTTPProtocol) {
   appPrtcl = *appProtocol
}

构建Runtime的三大配置

构建runtimeConfig

runtimeConfig := NewRuntimeConfig(*appID, placementAddresses, *controlPlaneAddress, *allowedOrigins, *config, *componentsPath,
   appPrtcl, *mode, daprHTTP, daprInternalGRPC, daprAPIGRPC, applicationPort, profPort, *enableProfiling, concurrency, *enableMTLS, *sentryAddress, *appSSL, maxRequestBodySize)

MTLS相关的配置:

if *enableMTLS {
   runtimeConfig.CertChain, err = security.GetCertChain()
   if err != nil {
      return nil, err
   }
}

构建globalConfig

var globalConfig *global_config.Configuration

根据 config 配置文件的配置,还有 dapr 模式的配置,读取相应的配置文件:

config := flag.String("config", "", "Path to config file, or name of a configuration object")

if *config != "" {
   switch modes.DaprMode(*mode) {
      case modes.KubernetesMode:
      client, conn, clientErr := client.GetOperatorClient(*controlPlaneAddress, security.TLSServerName, runtimeConfig.CertChain)
      if clientErr != nil {
         return nil, clientErr
      }
      defer conn.Close()
      namespace = os.Getenv("NAMESPACE")
      globalConfig, configErr = global_config.LoadKubernetesConfiguration(*config, namespace, client)
      case modes.StandaloneMode:
      globalConfig, _, configErr = global_config.LoadStandaloneConfiguration(*config)
   }

   if configErr != nil {
      log.Debugf("Config error: %v", configErr)
   }
}

if configErr != nil {
   log.Fatalf("error loading configuration: %s", configErr)
}

简单说:kubernetes 模式下读取CRD,standalone 模式下读取本地配置文件。

如果 config 没有配置,则使用默认的 global 配置:

if globalConfig == nil {
   log.Info("loading default configuration")
   globalConfig = global_config.LoadDefaultConfiguration()
}

构建accessControlList

var accessControlList *global_config.AccessControlList

accessControlList, err = global_config.ParseAccessControlSpec(globalConfig.Spec.AccessControlSpec, string(runtimeConfig.ApplicationProtocol))
if err != nil {
   log.Fatalf(err.Error())
}

构造 DaprRuntime 实例

最后构造 DaprRuntime 实例:

return NewDaprRuntime(runtimeConfig, globalConfig, accessControlList), nil

4 - Runtime App Channel的源码学习

Dapr runtime 中 App Channel的源码学习

4.1 - channel.go的源码学习

定义 AppChannel 接口和方法

Dapr channel package中的 channel.go 文件的源码学习,定义 AppChannel 接口和方法。

AppChannel 是和用户代码进行通讯的抽象。

常量定义 DefaultChannelAddress,考虑到 dapr 通常是以 sidecar 模式部署的,因此默认channel 地址是 127.0.0.1

const (
   // DefaultChannelAddress is the address that user application listen to
   DefaultChannelAddress = "127.0.0.1"
)

方法定义:

// AppChannel is an abstraction over communications with user code
type AppChannel interface {
   GetBaseAddress() string
   InvokeMethod(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error)
}

4.2 - grpc_channel.go的源码学习

AppChannel 的 gRPC 实现。

Dapr channel package中的 grpc_channel.go 文件的源码学习,AppChannel 的 gRPC 实现。

Channel 结构体定义

Channel是一个具体的AppChannel实现,用于与基于gRPC的用户代码进行交互。

// Channel is a concrete AppChannel implementation for interacting with gRPC based user code
type Channel struct {
  // grpc 客户端连接
	client           *grpc.ClientConn
  // user code(应用)的地址
	baseAddress      string
  // 限流用的 go chan
	ch               chan int
	tracingSpec      config.TracingSpec
	appMetadataToken string
}

创建 Channel 结构体

// CreateLocalChannel creates a gRPC connection with user code
func CreateLocalChannel(port, maxConcurrency int, conn *grpc.ClientConn, spec config.TracingSpec) *Channel {
	c := &Channel{
		client:           conn,
    // baseAddress 就是 "ip:port"
		baseAddress:      fmt.Sprintf("%s:%d", channel.DefaultChannelAddress, port),
		tracingSpec:      spec,
		appMetadataToken: auth.GetAppToken(),
	}
	if maxConcurrency > 0 {
    // 如果有并发控制要求,则创建用于并发控制的go channel
		c.ch = make(chan int, maxConcurrency)
	}
	return c
}

GetBaseAddress 方法

// GetBaseAddress returns the application base address
func (g *Channel) GetBaseAddress() string {
   return g.baseAddress
}

这个方法用来获取app的基础路径,可以拼接其他的字路径,如:

func (a *actorsRuntime) startAppHealthCheck(opts ...health.Option) {
	healthAddress := fmt.Sprintf("%s/healthz", a.appChannel.GetBaseAddress())
	ch := health.StartEndpointHealthCheck(healthAddress, opts...)
	......
}

备注:只有 actor 这一个地方用到了这个方法

InvokeMethod 方法

InvokeMethod 方法通过 gRPC 调用 user code:

// InvokeMethod invokes user code via gRPC
func (g *Channel) InvokeMethod(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
   var rsp *invokev1.InvokeMethodResponse
   var err error

   switch req.APIVersion() {
   case internalv1pb.APIVersion_V1:
      // 目前只支持 v1 版本
      rsp, err = g.invokeMethodV1(ctx, req)

   default:
      // Reject unsupported version
      // 其他版本会被拒绝
      rsp = nil
      err = status.Error(codes.Unimplemented, fmt.Sprintf("Unsupported spec version: %d", req.APIVersion()))
   }

   return rsp, err
}

invokeMethodV1() 的实现

// invokeMethodV1 calls user applications using daprclient v1
func (g *Channel) invokeMethodV1(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
   if g.ch != nil {
      // 往 ch 里面发一个int,等价于当前并发数量 + 1
      g.ch <- 1
   }

   // 创建一个 app callback 的 client
   clientV1 := runtimev1pb.NewAppCallbackClient(g.client)
   // 将内部 metadata 转为 grpc 的 metadata
   grpcMetadata := invokev1.InternalMetadataToGrpcMetadata(ctx, req.Metadata(), true)

   if g.appMetadataToken != "" {
      grpcMetadata.Set(auth.APITokenHeader, g.appMetadataToken)
   }

   // Prepare gRPC Metadata
   ctx = metadata.NewOutgoingContext(context.Background(), grpcMetadata)

   var header, trailer metadata.MD
   // 调用user code
   resp, err := clientV1.OnInvoke(ctx, req.Message(), grpc.Header(&header), grpc.Trailer(&trailer))

   if g.ch != nil {
      // 从 ch 中读取一个int,等价于当前并发数量 - 1
      // 但这个操作并没有额外保护,如果上面的代码发生 panic,岂不是这个计数器就出错了?
      // 考虑把这个操作放在 deffer 中进行会比较安全
      <-g.ch
   }

   var rsp *invokev1.InvokeMethodResponse
   if err != nil {
      // Convert status code
      respStatus := status.Convert(err)
      // Prepare response
      rsp = invokev1.NewInvokeMethodResponse(int32(respStatus.Code()), respStatus.Message(), respStatus.Proto().Details)
   } else {
      rsp = invokev1.NewInvokeMethodResponse(int32(codes.OK), "", nil)
   }

   rsp.WithHeaders(header).WithTrailers(trailer)

   return rsp.WithMessage(resp), nil
}

使用这个方法的地方有:

  • actor 的 callLocalActor() 和 deactivateActor()
  • Grpc api 中的 CallLocal()
  • messaging 中 direct_message 的 invokeLocal()
  • runtime中
    • getConfigurationHTTP()
    • isAppSubscribedToBinding()
    • publishMessageHTTP()
    • sendBindingEventToApp()