1 - kit仓库简介

存放共享的工具代码

kit仓库的介绍

Shared utility code for Dapr runtime.

https://github.com/dapr/kit

目前内容很少,只有 logger/config/retry 三个package。

kit仓库的背景

kit 仓库是后来提取出来的仓库,原来的代码存放在 dapr 仓库中,被 dapr 仓库中的其他代码使用。后来 components-contrib 仓库的代码也使用了这些基础代码,这导致了一个循环依赖:

  • dapr 仓库依赖 components-contrib 仓库: 使用 components-contrib 仓库 仓库中的各种 components 实现
  • components-contrib 仓库依赖dapr 仓库: 使用dapr 仓库中的基础代码。
participant dapr
participant       "components-contrib"       as components
dapr -> components : for component impl 
components -> dapr : for common code

为了让依赖关系更加的清晰,避免循环依赖,因此将这些基础代码从 dapr 仓库中移出来存放在单独的 kit仓库中,之后的依赖关系就是这样:

  • dapr 仓库依赖 components-contrib 仓库: 使用 components-contrib 仓库 仓库中的各种 components 实现
  • dapr 仓库依赖 kit 仓库: 使用 kit 仓库中的基础代码。
  • components-contrib 仓库依赖 kit 仓库: 使用 kit 仓库中的基础代码。
participant dapr
participant       "components-contrib"       as components
participant kit

dapr -> kit : for common code
components -> kit : for common code
dapr -> components : for component impl 

2 - logger的源码学习

Dapr Logger package的源码学习

2.1 - logger.go的源码学习

定义logger相关的日志类型、schema、日志级别、接口以及保存全局logger列表

Dapr Logger package中的logger.go文件的源码学习,定义logger相关`的日志类型、schema、日志级别、接口以及保存全局logger列表。

logger的相关定义

log type

log类型分为 普通 log 和 request 两种:

const (
	// LogTypeLog is normal log type
	LogTypeLog = "log"
	// LogTypeRequest is Request log type
	LogTypeRequest = "request"
    ......
}

log schema

const (
    ......
	// Field names that defines Dapr log schema
	logFieldTimeStamp = "time"
	logFieldLevel     = "level"
	logFieldType      = "type"
	logFieldScope     = "scope"
	logFieldMessage   = "msg"
	logFieldInstance  = "instance"
	logFieldDaprVer   = "ver"
	logFieldAppID     = "app_id"
)

log level

log level 没啥特别,很传统的定义:

const (
	// DebugLevel has verbose message
	DebugLevel LogLevel = "debug"
	// InfoLevel is default log level
	InfoLevel LogLevel = "info"
	// WarnLevel is for logging messages about possible issues
	WarnLevel LogLevel = "warn"
	// ErrorLevel is for logging errors
	ErrorLevel LogLevel = "error"
	// FatalLevel is for logging fatal messages. The system shuts down after logging the message.
	FatalLevel LogLevel = "fatal"

	// UndefinedLevel is for undefined log level
	UndefinedLevel LogLevel = "undefined"
)

注意: FatalLevel 有特别的意义,”The system shuts down after logging the message“. 所以这个不能随便用。

toLogLevel() 方法将字符串转为 LogLevel,大小写不敏感:

// toLogLevel converts to LogLevel
func toLogLevel(level string) LogLevel {
	switch strings.ToLower(level) {
	case "debug":
		return DebugLevel
	case "info":
		return InfoLevel
	case "warn":
		return WarnLevel
	case "error":
		return ErrorLevel
	case "fatal":
		return FatalLevel
	}

	// unsupported log level by Dapr
	return UndefinedLevel
}

Logger 接口定义

// Logger includes the logging api sets
type Logger interface {
	// EnableJSONOutput enables JSON formatted output log
	EnableJSONOutput(enabled bool)

	// SetAppID sets dapr_id field in log. Default value is empty string
	SetAppID(id string)
	// SetOutputLevel sets log output level
	SetOutputLevel(outputLevel LogLevel)

	// WithLogType specify the log_type field in log. Default value is LogTypeLog
	WithLogType(logType string) Logger

	// Info logs a message at level Info.
	Info(args ...interface{})
	// Infof logs a message at level Info.
	Infof(format string, args ...interface{})
	// Debug logs a message at level Debug.
	Debug(args ...interface{})
	// Debugf logs a message at level Debug.
	Debugf(format string, args ...interface{})
	// Warn logs a message at level Warn.
	Warn(args ...interface{})
	// Warnf logs a message at level Warn.
	Warnf(format string, args ...interface{})
	// Error logs a message at level Error.
	Error(args ...interface{})
	// Errorf logs a message at level Error.
	Errorf(format string, args ...interface{})
	// Fatal logs a message at level Fatal then the process will exit with status set to 1.
	Fatal(args ...interface{})
	// Fatalf logs a message at level Fatal then the process will exit with status set to 1.
	Fatalf(format string, args ...interface{})
}

logger的创建和获取

全局 logger 列表

// globalLoggers is the collection of Dapr Logger that is shared globally.
// TODO: User will disable or enable logger on demand.
var globalLoggers = map[string]Logger{}  // map保存所有的logger实例
var globalLoggersLock = sync.RWMutex{}   // 用读写锁对map进行保护

创建新logger或获取已经保存的logger

logger创建之后会保存在 global loggers 中,这意味着每个 name 的logger只会创建一个实例。

// NewLogger creates new Logger instance.
func NewLogger(name string) Logger {
	globalLoggersLock.Lock()	// 加写锁
	defer globalLoggersLock.Unlock()

	logger, ok := globalLoggers[name]
	if !ok {
		logger = newDaprLogger(name)
		globalLoggers[name] = logger
	}

	return logger
}

newDaprLogger() 方法的细节见 dapr_logger.go。

获取所有已经创建的logger列表

func getLoggers() map[string]Logger {
	globalLoggersLock.RLock()		// 加读锁
	defer globalLoggersLock.RUnlock()

	l := map[string]Logger{}
	for k, v := range globalLoggers {
		l[k] = v
	}

	return l
}

2.2 - dapr_logger.go的源码学习

daprLogger 是实际的日志实现

Dapr logger package中的dapr_logger.go文件的源码分析,daprLogger 是实际的日志实现。

daprLogger 结构体定义

daprLogger 结构体,底层实现是 logrus :

// daprLogger is the implemention for logrus
type daprLogger struct {
	// name is the name of logger that is published to log as a scope
	name string
	// loger is the instance of logrus logger
	logger *logrus.Entry
}

创建Dapr logger

创建Dapr logger的逻辑:

func newDaprLogger(name string) *daprLogger {
   // 底层是 logrus
	newLogger := logrus.New()
   // 输出到 stdout
	newLogger.SetOutput(os.Stdout)

	dl := &daprLogger{
		name: name,
		logger: newLogger.WithFields(logrus.Fields{
			logFieldScope: name,
         // 默认是普通log类型
			logFieldType:  LogTypeLog,
		}),
	}

   // 设置是否启用json输出,defaultJSONOutput默认是false
	dl.EnableJSONOutput(defaultJSONOutput)

	return dl
}

启用json输出

函数名有点小问题,实际是初始化logger,是否enables JSON只是部分逻辑:

// EnableJSONOutput enables JSON formatted output log
func (l *daprLogger) EnableJSONOutput(enabled bool) {
	var formatter logrus.Formatter

	fieldMap := logrus.FieldMap{
		// If time field name is conflicted, logrus adds "fields." prefix.
		// So rename to unused field @time to avoid the confliction.
		logrus.FieldKeyTime:  logFieldTimeStamp,
		logrus.FieldKeyLevel: logFieldLevel,
		logrus.FieldKeyMsg:   logFieldMessage,
	}

	hostname, _ := os.Hostname()
	l.logger.Data = logrus.Fields{
		logFieldScope:    l.logger.Data[logFieldScope],
		logFieldType:     LogTypeLog,
		logFieldInstance: hostname,
		logFieldDaprVer:  DaprVersion,
	}

	if enabled {
		formatter = &logrus.JSONFormatter{
			TimestampFormat: time.RFC3339Nano,
			FieldMap:        fieldMap,
		}
	} else {
		formatter = &logrus.TextFormatter{
			TimestampFormat: time.RFC3339Nano,
			FieldMap:        fieldMap,
		}
	}

	l.logger.Logger.SetFormatter(formatter)
}

logger的设置

设置DaprVersion

var DaprVersion string = "unknown"

func (l *daprLogger) EnableJSONOutput(enabled bool) {
	l.logger.Data = logrus.Fields{
        ......
		logFieldDaprVer:  DaprVersion,
	}
}

DaprVersion的值来自于 makefile (dapr/Makefile):

LOGGER_PACKAGE_NAME := github.com/dapr/kit/logger

DEFAULT_LDFLAGS:=-X $(BASE_PACKAGE_NAME)/pkg/version.gitcommit=$(GIT_COMMIT) \
  -X $(BASE_PACKAGE_NAME)/pkg/version.gitversion=$(GIT_VERSION) \
  -X $(BASE_PACKAGE_NAME)/pkg/version.version=$(DAPR_VERSION) \
  -X $(LOGGER_PACKAGE_NAME).DaprVersion=$(DAPR_VERSION)

设置appid

设置日志的 app_id 字段,默认为空。

// SetAppID sets app_id field in log. Default value is empty string
func (l *daprLogger) SetAppID(id string) {
	l.logger = l.logger.WithField(logFieldAppID, id)
}

这个方法在logger被初始化时调用进行设置,见 options.go 方法:

func ApplyOptionsToLoggers(options *Options) error {
    	......
 		if options.appID != undefinedAppID {
			v.SetAppID(options.appID)
		}   
}

设置日志级别

// SetOutputLevel sets log output level
func (l *daprLogger) SetOutputLevel(outputLevel LogLevel) {
   l.logger.Logger.SetLevel(toLogrusLevel(outputLevel))
}

func toLogrusLevel(lvl LogLevel) logrus.Level {
	// ignore error because it will never happens
	l, _ := logrus.ParseLevel(string(lvl))
	return l
}

这个是在原有的 daprLogger 实例上进行设置,没啥特殊。

设置日志类型

默认是普通 log 类型,如果要设置log类型:

// WithLogType specify the log_type field in log. Default value is LogTypeLog
func (l *daprLogger) WithLogType(logType string) Logger {
   // 这里重新构造了一个新的 daprLogger 结构体,然后返回
   return &daprLogger{
      name:   l.name,
      logger: l.logger.WithField(logFieldType, logType),
   }
}

疑问和TODO:

  1. 为什么不是直接设置 l.logger,而是构造一个新的结构体,然后返回还是 Logger ?
  2. 会不会有隐患?前面logger创建之后是存放在global logger map中的,key是简单的 name 而不是 name + logtype,这岂不是无法保存一个 name 两个类型的两个 logger 对象?

logger的实现

所有的写log的方法都简单代理给了 l.logger (*logrus.Entry):

// Info logs a message at level Info.
func (l *daprLogger) Info(args ...interface{}) {
	l.logger.Log(logrus.InfoLevel, args...)
}

// Infof logs a message at level Info.
func (l *daprLogger) Infof(format string, args ...interface{}) {
	l.logger.Logf(logrus.InfoLevel, format, args...)
}

// Debug logs a message at level Debug.
func (l *daprLogger) Debug(args ...interface{}) {
	l.logger.Log(logrus.DebugLevel, args...)
}

// Debugf logs a message at level Debug.
func (l *daprLogger) Debugf(format string, args ...interface{}) {
	l.logger.Logf(logrus.DebugLevel, format, args...)
}

// Warn logs a message at level Warn.
func (l *daprLogger) Warn(args ...interface{}) {
	l.logger.Log(logrus.WarnLevel, args...)
}

// Warnf logs a message at level Warn.
func (l *daprLogger) Warnf(format string, args ...interface{}) {
	l.logger.Logf(logrus.WarnLevel, format, args...)
}

// Error logs a message at level Error.
func (l *daprLogger) Error(args ...interface{}) {
	l.logger.Log(logrus.ErrorLevel, args...)
}

// Errorf logs a message at level Error.
func (l *daprLogger) Errorf(format string, args ...interface{}) {
	l.logger.Logf(logrus.ErrorLevel, format, args...)
}

// Fatal logs a message at level Fatal then the process will exit with status set to 1.
func (l *daprLogger) Fatal(args ...interface{}) {
	l.logger.Fatal(args...)
}

// Fatalf logs a message at level Fatal then the process will exit with status set to 1.
func (l *daprLogger) Fatalf(format string, args ...interface{}) {
	l.logger.Fatalf(format, args...)
}

注意 logrus 的 Fatalf() 方法的实现,在输出日志之后会调用ExitFunc(如果没设置则默认是 os.Exit

func (entry *Entry) Fatalf(format string, args ...interface{}) {
	entry.Logf(FatalLevel, format, args...)
	entry.Logger.Exit(1)
}
func (logger *Logger) Exit(code int) {
	runHandlers()
	if logger.ExitFunc == nil {
		logger.ExitFunc = os.Exit
	}
	logger.ExitFunc(code)
}

这会导致进程退出。因此要慎用。

2.3 - options.go的源码学习

设置 logger 相关的属性,包括从命令行参数中解析标记

Dapr logger package中的 options.go 文件的源码学习,设置logger相关的属性,包括从命令行参数中解析标记。

默认属性

const (
	defaultJSONOutput  = false
	defaultOutputLevel = "info"
	undefinedAppID     = ""
)

Options 结构体定义

Options 结构体,就三个字段:

// Options defines the sets of options for Dapr logging
type Options struct {
   // appID is the unique id of Dapr Application
   // 默认为空
   appID string

   // JSONFormatEnabled is the flag to enable JSON formatted log
   // 默认为fasle
   JSONFormatEnabled bool

   // OutputLevel is the level of logging
   // 默认为 info
   OutputLevel string
}

设值方法

// SetOutputLevel sets the log output level
func (o *Options) SetOutputLevel(outputLevel string) error {
   // 疑问:这里检查和赋值存在不一致:如果 outputLevel 中有大写字母
   // TODO:改进一下
   if toLogLevel(outputLevel) == UndefinedLevel {
      return errors.Errorf("undefined Log Output Level: %s", outputLevel)
   }
   o.OutputLevel = outputLevel
   return nil
}

// SetAppID sets Dapr ID
func (o *Options) SetAppID(id string) {
   o.appID = id
}

疑问 :为什么字段和设置方法不统一?

  1. JSONFormatEnabled 是 public 字段,没有Set方法
  2. OutputLevel 是 public 字段,有 Set 方法,Set 方法做了输入值的检测。
    • 问题来了:既然是 public 字段,那么绕开 Set 方法直接赋值岂不是就绕开了输入值检测的逻辑?
  3. appID 是 private 字段,有 Set 方法,而 Set 方法什么都没有做,只是简单赋值,那么为什么不直接用 public 字段呢?

检查发现:

  • SetOutputLevel 在dapr/dapr 项目中没有任何人调用

默认构造

返回每个字段的默认值,没啥特殊:

// DefaultOptions returns default values of Options
func DefaultOptions() Options {
   return Options{
      JSONFormatEnabled: defaultJSONOutput,
      appID:             undefinedAppID,
      OutputLevel:       defaultOutputLevel,
   }
}

备注:go 不像 java 可以在字段定义时直接赋值一个默认值,有时还真不方便。

从命令行标记中读取日志属性

在命令行参数中读取 log-levellog-as-json 两个标记并设置 OutputLevel 和 JSONFormatEnabled:

// AttachCmdFlags attaches log options to command flags
func (o *Options) AttachCmdFlags(
   stringVar func(p *string, name string, value string, usage string),
   boolVar func(p *bool, name string, value bool, usage string)) {
	if stringVar != nil {
		stringVar(
			&o.OutputLevel,
			"log-level",
			defaultOutputLevel,
			"Options are debug, info, warn, error, or fatal (default info)")
	}
	if boolVar != nil {
		boolVar(
			&o.JSONFormatEnabled,
			"log-as-json",
			defaultJSONOutput,
			"print log as JSON (default false)")
	}
}

备注:这大概就是 OutputLevel 和 JSONFormatEnabled 两个字段是 public 的原因?

这个方法会在每个二进制文件(runtime(也就是daprd) / injector / operator / placement / sentry) 的初始化代码中调用:

loggerOptions := logger.DefaultOptions()
loggerOptions.AttachCmdFlags(flag.StringVar, flag.BoolVar)

注意:这个时候 OutputLevel 的值是没有经过检查而直接设值的,绕开了 SetOutputLevel 方法的检查。

将属性应用到所有的logger

// ApplyOptionsToLoggers applys options to all registered loggers
func ApplyOptionsToLoggers(options *Options) error {
   // 所有的 logger 指的是保存在全局 logger map 中所有 logger
   internalLoggers := getLoggers()

   // Apply formatting options first
   for _, v := range internalLoggers {
      v.EnableJSONOutput(options.JSONFormatEnabled)

      if options.appID != undefinedAppID {
         v.SetAppID(options.appID)
      }
   }

   daprLogLevel := toLogLevel(options.OutputLevel)
   if daprLogLevel == UndefinedLevel {
      // 在这里做了 OutputLevel 值的有效性检查
      return errors.Errorf("invalid value for --log-level: %s", options.OutputLevel)
   }

   for _, v := range internalLoggers {
      v.SetOutputLevel(daprLogLevel)
   }
   return nil
}

TODO:OutputLevel 赋值有效性检查的地方现在发现有两个,其中一个还没有被使用。准备PR修订。

查了一下这个方法的确是在每个二进制文件(runtime(也就是daprd) / injector / operator / placement / sentry) 的初始化代码中调用:

loggerOptions := logger.DefaultOptions()
loggerOptions.AttachCmdFlags(flag.StringVar, flag.BoolVar)
......
// Apply options to all loggers
loggerOptions.SetAppID(*appID)
if err := logger.ApplyOptionsToLoggers(&loggerOptions); err != nil {
   return nil, err
}

TODO: ApplyOptionsToLoggers这个方法名最好修改增加“来自命令行的options”语义,否则报错 “invalid value for –log-level“ 就会很奇怪。

3 - config的源码学习

Dapr config package的源码学习

3.1 - decode.go的源码学习

从config中解析出配置信息。

Dapr config package中的 decode.go 文件的源码学习。

Decoder的相关定义

StringDecoder

// StringDecoder被用作自定义类型(或别名类型)来覆盖 `decodeString` DecodeHook中的基本解码功能的一种方式。 
// `encoding.TextMashaller`没有被使用,是因为它与许多Go类型相匹配,并且会有潜在的意外结果。
// 指定一个自定义的解码func应该是非常有意的。
type StringDecoder interface {
	DecodeString(value string) error
}

Decode()方法

// Decode()将通用map值从 `input` 解码到 `output`,同时提供有用的错误信息。
// `output`必须是一个指向Go结构体的指针,该结构体包含应被解码的字段的 `mapstructure` 结构体标签。
// 这个函数在解码被解析为 `map[string]interface{}` 的配置文件或被解析为`map[string]string` 的组件元数据的值时很有用。
// 
// 大部分繁重的工作都由 mapstructure 库处理。自定义的解码器被用来处理将字符串值解码为支持的原生类型。
func Decode(input interface{}, output interface{}) error {
	// 构建mapstructure的decoder
	decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ // nolint:exhaustivestruct
		Result:     output,
		DecodeHook: decodeString,	// 这里植入我们的hook
	})
	if err != nil {
		return err
	}
	// 委托给mapstructure的decoder进行解码
	return decoder.Decode(input)
}

DecodeHookFunc 的定义:

type DecodeHookFunc interface{}

DecodeHookFunc() 要求必须是下面的三个方法之一:

// DecodeHookFuncType is a DecodeHookFunc which has complete information about
// the source and target types.
type DecodeHookFuncType func(reflect.Type, reflect.Type, interface{}) (interface{}, error)

// DecodeHookFuncKind is a DecodeHookFunc which knows only the Kinds of the
// source and target types.
type DecodeHookFuncKind func(reflect.Kind, reflect.Kind, interface{}) (interface{}, error)

// DecodeHookFuncRaw is a DecodeHookFunc which has complete access to both the source and target
// values.
type DecodeHookFuncValue func(from reflect.Value, to reflect.Value) (interface{}, error)

config实现中采用的是第一种: 有 source 和 target 类型的完整信息。

decodeString()方法

decodeString()方法的实现:

func decodeString(
	f reflect.Type,
	t reflect.Type,
	data interface{}) (interface{}, error) {
	if t.Kind() == reflect.String && f.Kind() != reflect.String {
		return fmt.Sprintf("%v", data), nil
	}
	if f.Kind() == reflect.Ptr {
		f = f.Elem()
		data = reflect.ValueOf(data).Elem().Interface()
	}
	if f.Kind() != reflect.String {
		return data, nil
	}

	dataString, ok := data.(string)
	if !ok {
		return nil, errors.Errorf("expected string: got %s", reflect.TypeOf(data))
	}

	var result interface{}
	var decoder StringDecoder

	if t.Implements(typeStringDecoder) {
		result = reflect.New(t.Elem()).Interface()
		decoder = result.(StringDecoder)
	} else if reflect.PtrTo(t).Implements(typeStringDecoder) {
		result = reflect.New(t).Interface()
		decoder = result.(StringDecoder)
	}

	if decoder != nil {
		if err := decoder.DecodeString(dataString); err != nil {
			if t.Kind() == reflect.Ptr {
				t = t.Elem()
			}

			return nil, errors.Errorf("invalid %s %q: %v", t.Name(), dataString, err)
		}

		return result, nil
	}

	switch t {
	case typeDuration:
		// Check for simple integer values and treat them
		// as milliseconds
		if val, err := strconv.Atoi(dataString); err == nil {
			return time.Duration(val) * time.Millisecond, nil
		}

		// Convert it by parsing
		d, err := time.ParseDuration(dataString)

		return d, invalidError(err, "duration", dataString)
	case typeTime:
		// Convert it by parsing
		t, err := time.Parse(time.RFC3339Nano, dataString)
		if err == nil {
			return t, nil
		}
		t, err = time.Parse(time.RFC3339, dataString)

		return t, invalidError(err, "time", dataString)
	}

	switch t.Kind() { // nolint: exhaustive
	case reflect.Uint:
		val, err := strconv.ParseUint(dataString, 10, 64)

		return uint(val), invalidError(err, "uint", dataString)
	case reflect.Uint64:
		val, err := strconv.ParseUint(dataString, 10, 64)

		return val, invalidError(err, "uint64", dataString)
	case reflect.Uint32:
		val, err := strconv.ParseUint(dataString, 10, 32)

		return uint32(val), invalidError(err, "uint32", dataString)
	case reflect.Uint16:
		val, err := strconv.ParseUint(dataString, 10, 16)

		return uint16(val), invalidError(err, "uint16", dataString)
	case reflect.Uint8:
		val, err := strconv.ParseUint(dataString, 10, 8)

		return uint8(val), invalidError(err, "uint8", dataString)

	case reflect.Int:
		val, err := strconv.ParseInt(dataString, 10, 64)

		return int(val), invalidError(err, "int", dataString)
	case reflect.Int64:
		val, err := strconv.ParseInt(dataString, 10, 64)

		return val, invalidError(err, "int64", dataString)
	case reflect.Int32:
		val, err := strconv.ParseInt(dataString, 10, 32)

		return int32(val), invalidError(err, "int32", dataString)
	case reflect.Int16:
		val, err := strconv.ParseInt(dataString, 10, 16)

		return int16(val), invalidError(err, "int16", dataString)
	case reflect.Int8:
		val, err := strconv.ParseInt(dataString, 10, 8)

		return int8(val), invalidError(err, "int8", dataString)

	case reflect.Float32:
		val, err := strconv.ParseFloat(dataString, 32)

		return float32(val), invalidError(err, "float32", dataString)
	case reflect.Float64:
		val, err := strconv.ParseFloat(dataString, 64)

		return val, invalidError(err, "float64", dataString)

	case reflect.Bool:
		val, err := strconv.ParseBool(dataString)

		return val, invalidError(err, "bool", dataString)

	default:
		return data, nil
	}
}

3.2 - normalize.go的源码学习

对JSON进行标准化处理

Dapr config package中的 normalize.go 文件的源码学习。

map[interface{}]interface{} 转换为 map[string]interface{},以便对JSON进行标准化处理,并在组件初始化时使用。

代码实现:

func Normalize(i interface{}) (interface{}, error) {
	var err error
	switch x := i.(type) {				// 只标准化三种类型,其他类型直接返回
	case map[interface{}]interface{}:	// 1. 对于map[interface{}]interface{},key和value都要做正常化
		m2 := map[string]interface{}{}
		for k, v := range x {
			if strKey, ok := k.(string); ok {
                // 将key的类型改成string,value继续做正常化
				if m2[strKey], err = Normalize(v); err != nil {
					return nil, err
				}
			} else {
                // 要求key一定是string,否则报错
				return nil, fmt.Errorf("error parsing config field: %v", k)
			}
		}

		return m2, nil
	case map[string]interface{}:		// 2. 对于map[string{}]interface{},只需要对value做正常化
		m2 := map[string]interface{}{}
		for k, v := range x {
			if m2[k], err = Normalize(v); err != nil {
				return nil, err
			}
		}

		return m2, nil
	case []interface{}:					// 3. 对于[]interface{}这样的数组,每个数组元素都做正常化
		for i, v := range x {
			if x[i], err = Normalize(v); err != nil {
				return nil, err
			}
		}
	}

	return i, nil
}

3.3 - prefix.go的源码学习

去除key的前缀

Dapr config package中的 prefix.go 文件的源码学习。

代码实现

func PrefixedBy(input interface{}, prefix string) (interface{}, error) {
	normalized, err := Normalize(input)
	if err != nil {
        // 唯一可能来自normalize的错误是: 输入是map[interface{}]interface{},而某个key不是字符串
		return input, err
	}
	input = normalized

	if inputMap, ok := input.(map[string]interface{}); ok {
		converted := make(map[string]interface{}, len(inputMap))
		for k, v := range inputMap {
			if strings.HasPrefix(k, prefix) {
				key := uncapitalize(strings.TrimPrefix(k, prefix)) // 去掉key的前缀
				converted[key] = v
			}
		}

		return converted, nil
	} else if inputMap, ok := input.(map[string]string); ok {
		converted := make(map[string]string, len(inputMap))
		for k, v := range inputMap {
			if strings.HasPrefix(k, prefix) {
				key := uncapitalize(strings.TrimPrefix(k, prefix)) // 去掉key的前缀
				converted[key] = v
			}
		}

		return converted, nil
	}

	return input, nil
}

uncapitalize()方法将字符串转为小写:

func uncapitalize(str string) string {
	if len(str) == 0 {
		return str
	}

	vv := []rune(str) // Introduced later
	vv[0] = unicode.ToLower(vv[0])

	return string(vv)
}

使用场景

被 retry.go 的 DecodeConfigWithPrefix() 方法调用

func DecodeConfigWithPrefix(c *Config, input interface{}, prefix string) error {
	input, err := config.PrefixedBy(input, prefix)
	if err != nil {
		return err
	}

	return DecodeConfig(c, input)
}

4 - retry的源码学习

Dapr retry package的源码学习

4.1 - retry.go的源码学习

对JSON进行标准化处理

Dapr retry package中的 retry.go 文件的源码学习。

重试策略

多次重试之间的间隔策略,有两种:PolicyConstant 是固定值,PolicyExponential是指数增长。

// PolicyType 表示后退延迟(back off delay)应该是固定值还是指数增长。
// PolicyType denotes if the back off delay should be constant or exponential.
type PolicyType int

const (
	// PolicyConstant is a backoff policy that always returns the same backoff delay.
    // PolicyConstant是一个总是返回相同退避延迟的退避策略。
	PolicyConstant PolicyType = iota
	// PolicyExponential is a backoff implementation that increases the backoff period
	// for each retry attempt using a randomization function that grows exponentially.
    // PolicyExponential是一个退避实现,它使用一个以指数增长的随机化函数来增加每次重试的退避周期。
	PolicyExponential
)

重试配置

// Config 封装了退避策略的配置。
type Config struct {
	Policy PolicyType `mapstructure:"policy"`

	// Constant back off
	Duration time.Duration `mapstructure:"duration"`

	// Exponential back off
	InitialInterval     time.Duration `mapstructure:"initialInterval"`
	RandomizationFactor float32       `mapstructure:"randomizationFactor"`
	Multiplier          float32       `mapstructure:"multiplier"`
	MaxInterval         time.Duration `mapstructure:"maxInterval"`
	MaxElapsedTime      time.Duration `mapstructure:"maxElapsedTime"`

	// Additional options
	MaxRetries int64 `mapstructure:"maxRetries"`
}

注意: 每个字段都标记了 mapstructure ,这是为了使用 mapstructure 进行解码。

默认配置为:

func DefaultConfig() Config {
	return Config{
		Policy:              PolicyConstant,		// 默认为固定间隔
		Duration:            5 * time.Second,		// 间隔时间默认是5秒钟
		InitialInterval:     backoff.DefaultInitialInterval,
		RandomizationFactor: backoff.DefaultRandomizationFactor,
		Multiplier:          backoff.DefaultMultiplier,
		MaxInterval:         backoff.DefaultMaxInterval,
		MaxElapsedTime:      backoff.DefaultMaxElapsedTime,
		MaxRetries:          -1,					// 默认一直进行重试
	}
}

不带重试的默认配置:

// 这对那些可以自行处理重试的broker来说可能很有用。
func DefaultConfigWithNoRetry() Config {
	c := DefaultConfig()
	c.MaxRetries = 0		// MaxRetries 设置为0

	return c
}

解码配置

DecodeConfig() 方法将 go 结构体解析为 Config :

func DecodeConfig(c *Config, input interface{}) error {
	// Use the default config if `c` is empty/zero value.
	var emptyConfig Config
	if *c == emptyConfig {		// 如果c是一个初始化之后没有进行赋值的Config结构体,则改用默认配置的Config
		*c = DefaultConfig()
	}

	return config.Decode(input, c)
}

DecodeConfigWithPrefix() 方法在将 go 结构体解析为 Config 之前,先去除前缀,并进行key和value的正常化:

func DecodeConfigWithPrefix(c *Config, input interface{}, prefix string) error {
	input, err := config.PrefixedBy(input, prefix)		// 去除前缀,并进行key和value的正常化
	if err != nil {
		return err
	}

	return DecodeConfig(c, input)
}

DecodeString()方法解析重试策略:

func (p *PolicyType) DecodeString(value string) error {
	switch strings.ToLower(value) {
	case "constant":
		*p = PolicyConstant
	case "exponential":
		*p = PolicyExponential
	default:
		return errors.Errorf("unexpected back off policy type: %s", value)
	}

	return nil
}

重试退避时间的生成

NewBackOff() 方法 返回一个 BackOff 实例,可直接与NotifyRecoverbackoff.RetryNotify一起使用。该实例不会因为上下文取消而停止。要支持取消(推荐),请使用NewBackOffWithContext。 由于底层的回退实现并不总是线程安全的,所以每次使用RetryNotifyRecoverbackoff.RetryNotify时都应该调用NewBackOffNewBackOffWithContext

func (c *Config) NewBackOff() backoff.BackOff {
	var b backoff.BackOff
	switch c.Policy {
	case PolicyConstant:							// 1. 对于固定周期只需要返回配置项中设定的时间间隔,默认5秒钟
		b = backoff.NewConstantBackOff(c.Duration) 
	case PolicyExponential:							// 2. 对于指数周期,通过 backoff 类库来实现,简单透传配置参数
		eb := backoff.NewExponentialBackOff()
		eb.InitialInterval = c.InitialInterval
		eb.RandomizationFactor = float64(c.RandomizationFactor)
		eb.Multiplier = float64(c.Multiplier)
		eb.MaxInterval = c.MaxInterval
		eb.MaxElapsedTime = c.MaxElapsedTime
		b = eb
	}

	if c.MaxRetries >= 0 {
		b = backoff.WithMaxRetries(b, uint64(c.MaxRetries))
	}

	return b
}

NewBackOffWithContext() 方法返回一个BackOff实例,以便与RetryNotifyRecoverbackoff.RetryNotify直接使用。如果提供的上下文被取消,则用于取消重试。

由于底层的回退实现并不总是线程安全的,NewBackOffNewBackOffWithContext应该在每次使用RetryNotifyRecoverbackoff.RetryNotify时被调用。

func (c *Config) NewBackOffWithContext(ctx context.Context) backoff.BackOff {
	b := c.NewBackOff()

	return backoff.WithContext(b, ctx)
}

恢复通知

标准 backoff.RetryNotify的用法:

func RetryNotify(operation Operation, b BackOff, notify Notify) error {
   return RetryNotifyWithTimer(operation, b, notify, nil)
}

// Operation 是由Retry()或RetryNotify()执行的。
// 如果该操作返回错误,将使用退避策略重试。
type Operation func() error
// Notify是一个出错通知的函数。
// 如果操作失败(有错误),它会收到一个操作错误和回退延迟。
// 注意,如果退避政策要求停止重试。通知函数不会被调用。
type Notify func(error, time.Duration)

如果出现问题,需要多次重试才恢复,会存在几个问题:

  1. Notify()方法会被调用多次
  2. 不好判断是否恢复:理论上"恢复"的概念是先有出错(一次或者连续多次出错),然后成功(出错之后的第一次不出错)

NotifyRecover() 方法是 backoff.RetryNotify 的封装器,它为之前操作失败但后来恢复的情况增加了另一个回调。这个包装器的主要目的是只在操作第一次失败时调用 “notify”,在最后成功时调用 “recovered”。这有助于将日志信息限制在操作者需要被提醒的事件上。

这里的NotifyRecover() 方法包装了 Operation() Notify() 函数:

func NotifyRecover(operation backoff.Operation, b backoff.BackOff, notify backoff.Notify, recovered func()) error {
	var notified bool

	return backoff.RetryNotify(func() error {
		err := operation()

        // notified为true说明之前执行过notify,即出现了一次或者多次连续错误。
        // err为空说明operation不再出错
        // 这才可以成为"恢复"
		if err == nil && notified {	
            notified = false	// 重置 notified ,下一次 operation() 再成功也不会再出发recovered()
            recovered()			// 满足逻辑,可以触发一次 recovered() 方法
		}

		return err
	}, b, func(err error, d time.Duration) {
        if !notified {		// 只在第一次时调用真正的notify()函数,其他情况下忽略
			notify(err, d)
			notified = true
		}
	})
}

备注:感觉 notified 这个变量的取名不够清晰,它的语义不应该是"是否触发了通知",而是"是否发生了错误而一直没有恢复"。应该改为类似 errorNotRecoverd 之类的,语义更清晰一些。