kit仓库的源码学习
- 1: kit仓库简介
- 2: logger的源码学习
- 2.1: logger.go的源码学习
- 2.2: dapr_logger.go的源码学习
- 2.3: options.go的源码学习
- 3: config的源码学习
- 3.1: decode.go的源码学习
- 3.2: normalize.go的源码学习
- 3.3: prefix.go的源码学习
- 4: retry的源码学习
- 4.1: retry.go的源码学习
1 - kit仓库简介
kit仓库的介绍
Shared utility code for Dapr runtime.
目前内容很少,只有 logger/config/retry 三个package。
kit仓库的背景
kit 仓库是后来提取出来的仓库,原来的代码存放在 dapr 仓库中,被 dapr 仓库中的其他代码使用。后来 components-contrib 仓库的代码也使用了这些基础代码,这导致了一个循环依赖:
- dapr 仓库依赖 components-contrib 仓库: 使用 components-contrib 仓库 仓库中的各种 components 实现
- components-contrib 仓库依赖dapr 仓库: 使用dapr 仓库中的基础代码。
participant dapr
participant "components-contrib" as components
dapr -> components : for component impl
components -> dapr : for common code
为了让依赖关系更加的清晰,避免循环依赖,因此将这些基础代码从 dapr 仓库中移出来存放在单独的 kit仓库中,之后的依赖关系就是这样:
- dapr 仓库依赖 components-contrib 仓库: 使用 components-contrib 仓库 仓库中的各种 components 实现
- dapr 仓库依赖 kit 仓库: 使用 kit 仓库中的基础代码。
- components-contrib 仓库依赖 kit 仓库: 使用 kit 仓库中的基础代码。
participant dapr
participant "components-contrib" as components
participant kit
dapr -> kit : for common code
components -> kit : for common code
dapr -> components : for component impl
2 - logger的源码学习
2.1 - logger.go的源码学习
Dapr Logger package中的logger.go文件的源码学习,定义logger相关`的日志类型、schema、日志级别、接口以及保存全局logger列表。
logger的相关定义
log type
log类型分为 普通 log 和 request 两种:
const (
// LogTypeLog is normal log type
LogTypeLog = "log"
// LogTypeRequest is Request log type
LogTypeRequest = "request"
......
}
log schema
const (
......
// Field names that defines Dapr log schema
logFieldTimeStamp = "time"
logFieldLevel = "level"
logFieldType = "type"
logFieldScope = "scope"
logFieldMessage = "msg"
logFieldInstance = "instance"
logFieldDaprVer = "ver"
logFieldAppID = "app_id"
)
log level
log level 没啥特别,很传统的定义:
const (
// DebugLevel has verbose message
DebugLevel LogLevel = "debug"
// InfoLevel is default log level
InfoLevel LogLevel = "info"
// WarnLevel is for logging messages about possible issues
WarnLevel LogLevel = "warn"
// ErrorLevel is for logging errors
ErrorLevel LogLevel = "error"
// FatalLevel is for logging fatal messages. The system shuts down after logging the message.
FatalLevel LogLevel = "fatal"
// UndefinedLevel is for undefined log level
UndefinedLevel LogLevel = "undefined"
)
注意: FatalLevel 有特别的意义,”The system shuts down after logging the message“. 所以这个不能随便用。
toLogLevel() 方法将字符串转为 LogLevel,大小写不敏感:
// toLogLevel converts to LogLevel
func toLogLevel(level string) LogLevel {
switch strings.ToLower(level) {
case "debug":
return DebugLevel
case "info":
return InfoLevel
case "warn":
return WarnLevel
case "error":
return ErrorLevel
case "fatal":
return FatalLevel
}
// unsupported log level by Dapr
return UndefinedLevel
}
Logger 接口定义
// Logger includes the logging api sets
type Logger interface {
// EnableJSONOutput enables JSON formatted output log
EnableJSONOutput(enabled bool)
// SetAppID sets dapr_id field in log. Default value is empty string
SetAppID(id string)
// SetOutputLevel sets log output level
SetOutputLevel(outputLevel LogLevel)
// WithLogType specify the log_type field in log. Default value is LogTypeLog
WithLogType(logType string) Logger
// Info logs a message at level Info.
Info(args ...interface{})
// Infof logs a message at level Info.
Infof(format string, args ...interface{})
// Debug logs a message at level Debug.
Debug(args ...interface{})
// Debugf logs a message at level Debug.
Debugf(format string, args ...interface{})
// Warn logs a message at level Warn.
Warn(args ...interface{})
// Warnf logs a message at level Warn.
Warnf(format string, args ...interface{})
// Error logs a message at level Error.
Error(args ...interface{})
// Errorf logs a message at level Error.
Errorf(format string, args ...interface{})
// Fatal logs a message at level Fatal then the process will exit with status set to 1.
Fatal(args ...interface{})
// Fatalf logs a message at level Fatal then the process will exit with status set to 1.
Fatalf(format string, args ...interface{})
}
logger的创建和获取
全局 logger 列表
// globalLoggers is the collection of Dapr Logger that is shared globally.
// TODO: User will disable or enable logger on demand.
var globalLoggers = map[string]Logger{} // map保存所有的logger实例
var globalLoggersLock = sync.RWMutex{} // 用读写锁对map进行保护
创建新logger或获取已经保存的logger
logger创建之后会保存在 global loggers 中,这意味着每个 name 的logger只会创建一个实例。
// NewLogger creates new Logger instance.
func NewLogger(name string) Logger {
globalLoggersLock.Lock() // 加写锁
defer globalLoggersLock.Unlock()
logger, ok := globalLoggers[name]
if !ok {
logger = newDaprLogger(name)
globalLoggers[name] = logger
}
return logger
}
newDaprLogger() 方法的细节见 dapr_logger.go。
获取所有已经创建的logger列表
func getLoggers() map[string]Logger {
globalLoggersLock.RLock() // 加读锁
defer globalLoggersLock.RUnlock()
l := map[string]Logger{}
for k, v := range globalLoggers {
l[k] = v
}
return l
}
2.2 - dapr_logger.go的源码学习
Dapr logger package中的dapr_logger.go文件的源码分析,daprLogger 是实际的日志实现。
daprLogger 结构体定义
daprLogger 结构体,底层实现是 logrus :
// daprLogger is the implemention for logrus
type daprLogger struct {
// name is the name of logger that is published to log as a scope
name string
// loger is the instance of logrus logger
logger *logrus.Entry
}
创建Dapr logger
创建Dapr logger的逻辑:
func newDaprLogger(name string) *daprLogger {
// 底层是 logrus
newLogger := logrus.New()
// 输出到 stdout
newLogger.SetOutput(os.Stdout)
dl := &daprLogger{
name: name,
logger: newLogger.WithFields(logrus.Fields{
logFieldScope: name,
// 默认是普通log类型
logFieldType: LogTypeLog,
}),
}
// 设置是否启用json输出,defaultJSONOutput默认是false
dl.EnableJSONOutput(defaultJSONOutput)
return dl
}
启用json输出
函数名有点小问题,实际是初始化logger,是否enables JSON只是部分逻辑:
// EnableJSONOutput enables JSON formatted output log
func (l *daprLogger) EnableJSONOutput(enabled bool) {
var formatter logrus.Formatter
fieldMap := logrus.FieldMap{
// If time field name is conflicted, logrus adds "fields." prefix.
// So rename to unused field @time to avoid the confliction.
logrus.FieldKeyTime: logFieldTimeStamp,
logrus.FieldKeyLevel: logFieldLevel,
logrus.FieldKeyMsg: logFieldMessage,
}
hostname, _ := os.Hostname()
l.logger.Data = logrus.Fields{
logFieldScope: l.logger.Data[logFieldScope],
logFieldType: LogTypeLog,
logFieldInstance: hostname,
logFieldDaprVer: DaprVersion,
}
if enabled {
formatter = &logrus.JSONFormatter{
TimestampFormat: time.RFC3339Nano,
FieldMap: fieldMap,
}
} else {
formatter = &logrus.TextFormatter{
TimestampFormat: time.RFC3339Nano,
FieldMap: fieldMap,
}
}
l.logger.Logger.SetFormatter(formatter)
}
logger的设置
设置DaprVersion
var DaprVersion string = "unknown"
func (l *daprLogger) EnableJSONOutput(enabled bool) {
l.logger.Data = logrus.Fields{
......
logFieldDaprVer: DaprVersion,
}
}
DaprVersion的值来自于 makefile (dapr/Makefile
):
LOGGER_PACKAGE_NAME := github.com/dapr/kit/logger
DEFAULT_LDFLAGS:=-X $(BASE_PACKAGE_NAME)/pkg/version.gitcommit=$(GIT_COMMIT) \
-X $(BASE_PACKAGE_NAME)/pkg/version.gitversion=$(GIT_VERSION) \
-X $(BASE_PACKAGE_NAME)/pkg/version.version=$(DAPR_VERSION) \
-X $(LOGGER_PACKAGE_NAME).DaprVersion=$(DAPR_VERSION)
设置appid
设置日志的 app_id 字段,默认为空。
// SetAppID sets app_id field in log. Default value is empty string
func (l *daprLogger) SetAppID(id string) {
l.logger = l.logger.WithField(logFieldAppID, id)
}
这个方法在logger被初始化时调用进行设置,见 options.go 方法:
func ApplyOptionsToLoggers(options *Options) error {
......
if options.appID != undefinedAppID {
v.SetAppID(options.appID)
}
}
设置日志级别
// SetOutputLevel sets log output level
func (l *daprLogger) SetOutputLevel(outputLevel LogLevel) {
l.logger.Logger.SetLevel(toLogrusLevel(outputLevel))
}
func toLogrusLevel(lvl LogLevel) logrus.Level {
// ignore error because it will never happens
l, _ := logrus.ParseLevel(string(lvl))
return l
}
这个是在原有的 daprLogger 实例上进行设置,没啥特殊。
设置日志类型
默认是普通 log 类型,如果要设置log类型:
// WithLogType specify the log_type field in log. Default value is LogTypeLog
func (l *daprLogger) WithLogType(logType string) Logger {
// 这里重新构造了一个新的 daprLogger 结构体,然后返回
return &daprLogger{
name: l.name,
logger: l.logger.WithField(logFieldType, logType),
}
}
疑问和TODO:
- 为什么不是直接设置 l.logger,而是构造一个新的结构体,然后返回还是 Logger ?
- 会不会有隐患?前面logger创建之后是存放在global logger map中的,key是简单的 name 而不是 name + logtype,这岂不是无法保存一个 name 两个类型的两个 logger 对象?
logger的实现
所有的写log的方法都简单代理给了 l.logger (*logrus.Entry):
// Info logs a message at level Info.
func (l *daprLogger) Info(args ...interface{}) {
l.logger.Log(logrus.InfoLevel, args...)
}
// Infof logs a message at level Info.
func (l *daprLogger) Infof(format string, args ...interface{}) {
l.logger.Logf(logrus.InfoLevel, format, args...)
}
// Debug logs a message at level Debug.
func (l *daprLogger) Debug(args ...interface{}) {
l.logger.Log(logrus.DebugLevel, args...)
}
// Debugf logs a message at level Debug.
func (l *daprLogger) Debugf(format string, args ...interface{}) {
l.logger.Logf(logrus.DebugLevel, format, args...)
}
// Warn logs a message at level Warn.
func (l *daprLogger) Warn(args ...interface{}) {
l.logger.Log(logrus.WarnLevel, args...)
}
// Warnf logs a message at level Warn.
func (l *daprLogger) Warnf(format string, args ...interface{}) {
l.logger.Logf(logrus.WarnLevel, format, args...)
}
// Error logs a message at level Error.
func (l *daprLogger) Error(args ...interface{}) {
l.logger.Log(logrus.ErrorLevel, args...)
}
// Errorf logs a message at level Error.
func (l *daprLogger) Errorf(format string, args ...interface{}) {
l.logger.Logf(logrus.ErrorLevel, format, args...)
}
// Fatal logs a message at level Fatal then the process will exit with status set to 1.
func (l *daprLogger) Fatal(args ...interface{}) {
l.logger.Fatal(args...)
}
// Fatalf logs a message at level Fatal then the process will exit with status set to 1.
func (l *daprLogger) Fatalf(format string, args ...interface{}) {
l.logger.Fatalf(format, args...)
}
注意 logrus 的 Fatalf() 方法的实现,在输出日志之后会调用ExitFunc(如果没设置则默认是 os.Exit
)
func (entry *Entry) Fatalf(format string, args ...interface{}) {
entry.Logf(FatalLevel, format, args...)
entry.Logger.Exit(1)
}
func (logger *Logger) Exit(code int) {
runHandlers()
if logger.ExitFunc == nil {
logger.ExitFunc = os.Exit
}
logger.ExitFunc(code)
}
这会导致进程退出。因此要慎用。
2.3 - options.go的源码学习
Dapr logger package中的 options.go 文件的源码学习,设置logger相关的属性,包括从命令行参数中解析标记。
默认属性
const (
defaultJSONOutput = false
defaultOutputLevel = "info"
undefinedAppID = ""
)
Options 结构体定义
Options 结构体,就三个字段:
// Options defines the sets of options for Dapr logging
type Options struct {
// appID is the unique id of Dapr Application
// 默认为空
appID string
// JSONFormatEnabled is the flag to enable JSON formatted log
// 默认为fasle
JSONFormatEnabled bool
// OutputLevel is the level of logging
// 默认为 info
OutputLevel string
}
设值方法
// SetOutputLevel sets the log output level
func (o *Options) SetOutputLevel(outputLevel string) error {
// 疑问:这里检查和赋值存在不一致:如果 outputLevel 中有大写字母
// TODO:改进一下
if toLogLevel(outputLevel) == UndefinedLevel {
return errors.Errorf("undefined Log Output Level: %s", outputLevel)
}
o.OutputLevel = outputLevel
return nil
}
// SetAppID sets Dapr ID
func (o *Options) SetAppID(id string) {
o.appID = id
}
疑问 :为什么字段和设置方法不统一?
- JSONFormatEnabled 是 public 字段,没有Set方法
- OutputLevel 是 public 字段,有 Set 方法,Set 方法做了输入值的检测。
- 问题来了:既然是 public 字段,那么绕开 Set 方法直接赋值岂不是就绕开了输入值检测的逻辑?
- appID 是 private 字段,有 Set 方法,而 Set 方法什么都没有做,只是简单赋值,那么为什么不直接用 public 字段呢?
检查发现:
- SetOutputLevel 在dapr/dapr 项目中没有任何人调用
默认构造
返回每个字段的默认值,没啥特殊:
// DefaultOptions returns default values of Options
func DefaultOptions() Options {
return Options{
JSONFormatEnabled: defaultJSONOutput,
appID: undefinedAppID,
OutputLevel: defaultOutputLevel,
}
}
备注:go 不像 java 可以在字段定义时直接赋值一个默认值,有时还真不方便。
从命令行标记中读取日志属性
在命令行参数中读取 log-level
和 log-as-json
两个标记并设置 OutputLevel 和 JSONFormatEnabled:
// AttachCmdFlags attaches log options to command flags
func (o *Options) AttachCmdFlags(
stringVar func(p *string, name string, value string, usage string),
boolVar func(p *bool, name string, value bool, usage string)) {
if stringVar != nil {
stringVar(
&o.OutputLevel,
"log-level",
defaultOutputLevel,
"Options are debug, info, warn, error, or fatal (default info)")
}
if boolVar != nil {
boolVar(
&o.JSONFormatEnabled,
"log-as-json",
defaultJSONOutput,
"print log as JSON (default false)")
}
}
备注:这大概就是 OutputLevel 和 JSONFormatEnabled 两个字段是 public 的原因?
这个方法会在每个二进制文件(runtime(也就是daprd) / injector / operator / placement / sentry) 的初始化代码中调用:
loggerOptions := logger.DefaultOptions()
loggerOptions.AttachCmdFlags(flag.StringVar, flag.BoolVar)
注意:这个时候 OutputLevel 的值是没有经过检查而直接设值的,绕开了 SetOutputLevel 方法的检查。
将属性应用到所有的logger
// ApplyOptionsToLoggers applys options to all registered loggers
func ApplyOptionsToLoggers(options *Options) error {
// 所有的 logger 指的是保存在全局 logger map 中所有 logger
internalLoggers := getLoggers()
// Apply formatting options first
for _, v := range internalLoggers {
v.EnableJSONOutput(options.JSONFormatEnabled)
if options.appID != undefinedAppID {
v.SetAppID(options.appID)
}
}
daprLogLevel := toLogLevel(options.OutputLevel)
if daprLogLevel == UndefinedLevel {
// 在这里做了 OutputLevel 值的有效性检查
return errors.Errorf("invalid value for --log-level: %s", options.OutputLevel)
}
for _, v := range internalLoggers {
v.SetOutputLevel(daprLogLevel)
}
return nil
}
TODO:OutputLevel 赋值有效性检查的地方现在发现有两个,其中一个还没有被使用。准备PR修订。
查了一下这个方法的确是在每个二进制文件(runtime(也就是daprd) / injector / operator / placement / sentry) 的初始化代码中调用:
loggerOptions := logger.DefaultOptions()
loggerOptions.AttachCmdFlags(flag.StringVar, flag.BoolVar)
......
// Apply options to all loggers
loggerOptions.SetAppID(*appID)
if err := logger.ApplyOptionsToLoggers(&loggerOptions); err != nil {
return nil, err
}
TODO: ApplyOptionsToLoggers这个方法名最好修改增加“来自命令行的options”语义,否则报错 “invalid value for –log-level“ 就会很奇怪。
3 - config的源码学习
3.1 - decode.go的源码学习
Dapr config package中的 decode.go 文件的源码学习。
Decoder的相关定义
StringDecoder
// StringDecoder被用作自定义类型(或别名类型)来覆盖 `decodeString` DecodeHook中的基本解码功能的一种方式。
// `encoding.TextMashaller`没有被使用,是因为它与许多Go类型相匹配,并且会有潜在的意外结果。
// 指定一个自定义的解码func应该是非常有意的。
type StringDecoder interface {
DecodeString(value string) error
}
Decode()方法
// Decode()将通用map值从 `input` 解码到 `output`,同时提供有用的错误信息。
// `output`必须是一个指向Go结构体的指针,该结构体包含应被解码的字段的 `mapstructure` 结构体标签。
// 这个函数在解码被解析为 `map[string]interface{}` 的配置文件或被解析为`map[string]string` 的组件元数据的值时很有用。
//
// 大部分繁重的工作都由 mapstructure 库处理。自定义的解码器被用来处理将字符串值解码为支持的原生类型。
func Decode(input interface{}, output interface{}) error {
// 构建mapstructure的decoder
decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ // nolint:exhaustivestruct
Result: output,
DecodeHook: decodeString, // 这里植入我们的hook
})
if err != nil {
return err
}
// 委托给mapstructure的decoder进行解码
return decoder.Decode(input)
}
DecodeHookFunc 的定义:
type DecodeHookFunc interface{}
DecodeHookFunc() 要求必须是下面的三个方法之一:
// DecodeHookFuncType is a DecodeHookFunc which has complete information about
// the source and target types.
type DecodeHookFuncType func(reflect.Type, reflect.Type, interface{}) (interface{}, error)
// DecodeHookFuncKind is a DecodeHookFunc which knows only the Kinds of the
// source and target types.
type DecodeHookFuncKind func(reflect.Kind, reflect.Kind, interface{}) (interface{}, error)
// DecodeHookFuncRaw is a DecodeHookFunc which has complete access to both the source and target
// values.
type DecodeHookFuncValue func(from reflect.Value, to reflect.Value) (interface{}, error)
config实现中采用的是第一种: 有 source 和 target 类型的完整信息。
decodeString()方法
decodeString()方法的实现:
func decodeString(
f reflect.Type,
t reflect.Type,
data interface{}) (interface{}, error) {
if t.Kind() == reflect.String && f.Kind() != reflect.String {
return fmt.Sprintf("%v", data), nil
}
if f.Kind() == reflect.Ptr {
f = f.Elem()
data = reflect.ValueOf(data).Elem().Interface()
}
if f.Kind() != reflect.String {
return data, nil
}
dataString, ok := data.(string)
if !ok {
return nil, errors.Errorf("expected string: got %s", reflect.TypeOf(data))
}
var result interface{}
var decoder StringDecoder
if t.Implements(typeStringDecoder) {
result = reflect.New(t.Elem()).Interface()
decoder = result.(StringDecoder)
} else if reflect.PtrTo(t).Implements(typeStringDecoder) {
result = reflect.New(t).Interface()
decoder = result.(StringDecoder)
}
if decoder != nil {
if err := decoder.DecodeString(dataString); err != nil {
if t.Kind() == reflect.Ptr {
t = t.Elem()
}
return nil, errors.Errorf("invalid %s %q: %v", t.Name(), dataString, err)
}
return result, nil
}
switch t {
case typeDuration:
// Check for simple integer values and treat them
// as milliseconds
if val, err := strconv.Atoi(dataString); err == nil {
return time.Duration(val) * time.Millisecond, nil
}
// Convert it by parsing
d, err := time.ParseDuration(dataString)
return d, invalidError(err, "duration", dataString)
case typeTime:
// Convert it by parsing
t, err := time.Parse(time.RFC3339Nano, dataString)
if err == nil {
return t, nil
}
t, err = time.Parse(time.RFC3339, dataString)
return t, invalidError(err, "time", dataString)
}
switch t.Kind() { // nolint: exhaustive
case reflect.Uint:
val, err := strconv.ParseUint(dataString, 10, 64)
return uint(val), invalidError(err, "uint", dataString)
case reflect.Uint64:
val, err := strconv.ParseUint(dataString, 10, 64)
return val, invalidError(err, "uint64", dataString)
case reflect.Uint32:
val, err := strconv.ParseUint(dataString, 10, 32)
return uint32(val), invalidError(err, "uint32", dataString)
case reflect.Uint16:
val, err := strconv.ParseUint(dataString, 10, 16)
return uint16(val), invalidError(err, "uint16", dataString)
case reflect.Uint8:
val, err := strconv.ParseUint(dataString, 10, 8)
return uint8(val), invalidError(err, "uint8", dataString)
case reflect.Int:
val, err := strconv.ParseInt(dataString, 10, 64)
return int(val), invalidError(err, "int", dataString)
case reflect.Int64:
val, err := strconv.ParseInt(dataString, 10, 64)
return val, invalidError(err, "int64", dataString)
case reflect.Int32:
val, err := strconv.ParseInt(dataString, 10, 32)
return int32(val), invalidError(err, "int32", dataString)
case reflect.Int16:
val, err := strconv.ParseInt(dataString, 10, 16)
return int16(val), invalidError(err, "int16", dataString)
case reflect.Int8:
val, err := strconv.ParseInt(dataString, 10, 8)
return int8(val), invalidError(err, "int8", dataString)
case reflect.Float32:
val, err := strconv.ParseFloat(dataString, 32)
return float32(val), invalidError(err, "float32", dataString)
case reflect.Float64:
val, err := strconv.ParseFloat(dataString, 64)
return val, invalidError(err, "float64", dataString)
case reflect.Bool:
val, err := strconv.ParseBool(dataString)
return val, invalidError(err, "bool", dataString)
default:
return data, nil
}
}
3.2 - normalize.go的源码学习
Dapr config package中的 normalize.go 文件的源码学习。
将 map[interface{}]interface{}
转换为 map[string]interface{}
,以便对JSON进行标准化处理,并在组件初始化时使用。
代码实现:
func Normalize(i interface{}) (interface{}, error) {
var err error
switch x := i.(type) { // 只标准化三种类型,其他类型直接返回
case map[interface{}]interface{}: // 1. 对于map[interface{}]interface{},key和value都要做正常化
m2 := map[string]interface{}{}
for k, v := range x {
if strKey, ok := k.(string); ok {
// 将key的类型改成string,value继续做正常化
if m2[strKey], err = Normalize(v); err != nil {
return nil, err
}
} else {
// 要求key一定是string,否则报错
return nil, fmt.Errorf("error parsing config field: %v", k)
}
}
return m2, nil
case map[string]interface{}: // 2. 对于map[string{}]interface{},只需要对value做正常化
m2 := map[string]interface{}{}
for k, v := range x {
if m2[k], err = Normalize(v); err != nil {
return nil, err
}
}
return m2, nil
case []interface{}: // 3. 对于[]interface{}这样的数组,每个数组元素都做正常化
for i, v := range x {
if x[i], err = Normalize(v); err != nil {
return nil, err
}
}
}
return i, nil
}
3.3 - prefix.go的源码学习
Dapr config package中的 prefix.go 文件的源码学习。
代码实现
func PrefixedBy(input interface{}, prefix string) (interface{}, error) {
normalized, err := Normalize(input)
if err != nil {
// 唯一可能来自normalize的错误是: 输入是map[interface{}]interface{},而某个key不是字符串
return input, err
}
input = normalized
if inputMap, ok := input.(map[string]interface{}); ok {
converted := make(map[string]interface{}, len(inputMap))
for k, v := range inputMap {
if strings.HasPrefix(k, prefix) {
key := uncapitalize(strings.TrimPrefix(k, prefix)) // 去掉key的前缀
converted[key] = v
}
}
return converted, nil
} else if inputMap, ok := input.(map[string]string); ok {
converted := make(map[string]string, len(inputMap))
for k, v := range inputMap {
if strings.HasPrefix(k, prefix) {
key := uncapitalize(strings.TrimPrefix(k, prefix)) // 去掉key的前缀
converted[key] = v
}
}
return converted, nil
}
return input, nil
}
uncapitalize()方法将字符串转为小写:
func uncapitalize(str string) string {
if len(str) == 0 {
return str
}
vv := []rune(str) // Introduced later
vv[0] = unicode.ToLower(vv[0])
return string(vv)
}
使用场景
被 retry.go 的 DecodeConfigWithPrefix() 方法调用
func DecodeConfigWithPrefix(c *Config, input interface{}, prefix string) error {
input, err := config.PrefixedBy(input, prefix)
if err != nil {
return err
}
return DecodeConfig(c, input)
}
4 - retry的源码学习
4.1 - retry.go的源码学习
Dapr retry package中的 retry.go 文件的源码学习。
重试策略
多次重试之间的间隔策略,有两种:PolicyConstant 是固定值,PolicyExponential是指数增长。
// PolicyType 表示后退延迟(back off delay)应该是固定值还是指数增长。
// PolicyType denotes if the back off delay should be constant or exponential.
type PolicyType int
const (
// PolicyConstant is a backoff policy that always returns the same backoff delay.
// PolicyConstant是一个总是返回相同退避延迟的退避策略。
PolicyConstant PolicyType = iota
// PolicyExponential is a backoff implementation that increases the backoff period
// for each retry attempt using a randomization function that grows exponentially.
// PolicyExponential是一个退避实现,它使用一个以指数增长的随机化函数来增加每次重试的退避周期。
PolicyExponential
)
重试配置
// Config 封装了退避策略的配置。
type Config struct {
Policy PolicyType `mapstructure:"policy"`
// Constant back off
Duration time.Duration `mapstructure:"duration"`
// Exponential back off
InitialInterval time.Duration `mapstructure:"initialInterval"`
RandomizationFactor float32 `mapstructure:"randomizationFactor"`
Multiplier float32 `mapstructure:"multiplier"`
MaxInterval time.Duration `mapstructure:"maxInterval"`
MaxElapsedTime time.Duration `mapstructure:"maxElapsedTime"`
// Additional options
MaxRetries int64 `mapstructure:"maxRetries"`
}
注意: 每个字段都标记了
mapstructure
,这是为了使用 mapstructure 进行解码。
默认配置为:
func DefaultConfig() Config {
return Config{
Policy: PolicyConstant, // 默认为固定间隔
Duration: 5 * time.Second, // 间隔时间默认是5秒钟
InitialInterval: backoff.DefaultInitialInterval,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: backoff.DefaultMaxInterval,
MaxElapsedTime: backoff.DefaultMaxElapsedTime,
MaxRetries: -1, // 默认一直进行重试
}
}
不带重试的默认配置:
// 这对那些可以自行处理重试的broker来说可能很有用。
func DefaultConfigWithNoRetry() Config {
c := DefaultConfig()
c.MaxRetries = 0 // MaxRetries 设置为0
return c
}
解码配置
DecodeConfig() 方法将 go 结构体解析为 Config
:
func DecodeConfig(c *Config, input interface{}) error {
// Use the default config if `c` is empty/zero value.
var emptyConfig Config
if *c == emptyConfig { // 如果c是一个初始化之后没有进行赋值的Config结构体,则改用默认配置的Config
*c = DefaultConfig()
}
return config.Decode(input, c)
}
DecodeConfigWithPrefix() 方法在将 go 结构体解析为 Config
之前,先去除前缀,并进行key和value的正常化:
func DecodeConfigWithPrefix(c *Config, input interface{}, prefix string) error {
input, err := config.PrefixedBy(input, prefix) // 去除前缀,并进行key和value的正常化
if err != nil {
return err
}
return DecodeConfig(c, input)
}
DecodeString()方法解析重试策略:
func (p *PolicyType) DecodeString(value string) error {
switch strings.ToLower(value) {
case "constant":
*p = PolicyConstant
case "exponential":
*p = PolicyExponential
default:
return errors.Errorf("unexpected back off policy type: %s", value)
}
return nil
}
重试退避时间的生成
NewBackOff() 方法 返回一个 BackOff
实例,可直接与NotifyRecover
或backoff.RetryNotify
一起使用。该实例不会因为上下文取消而停止。要支持取消(推荐),请使用NewBackOffWithContext
。 由于底层的回退实现并不总是线程安全的,所以每次使用RetryNotifyRecover
或backoff.RetryNotify
时都应该调用NewBackOff
或NewBackOffWithContext
。
func (c *Config) NewBackOff() backoff.BackOff {
var b backoff.BackOff
switch c.Policy {
case PolicyConstant: // 1. 对于固定周期只需要返回配置项中设定的时间间隔,默认5秒钟
b = backoff.NewConstantBackOff(c.Duration)
case PolicyExponential: // 2. 对于指数周期,通过 backoff 类库来实现,简单透传配置参数
eb := backoff.NewExponentialBackOff()
eb.InitialInterval = c.InitialInterval
eb.RandomizationFactor = float64(c.RandomizationFactor)
eb.Multiplier = float64(c.Multiplier)
eb.MaxInterval = c.MaxInterval
eb.MaxElapsedTime = c.MaxElapsedTime
b = eb
}
if c.MaxRetries >= 0 {
b = backoff.WithMaxRetries(b, uint64(c.MaxRetries))
}
return b
}
NewBackOffWithContext() 方法返回一个BackOff实例,以便与RetryNotifyRecover
或backoff.RetryNotify
直接使用。如果提供的上下文被取消,则用于取消重试。
由于底层的回退实现并不总是线程安全的,NewBackOff
或NewBackOffWithContext
应该在每次使用RetryNotifyRecover
或backoff.RetryNotify
时被调用。
func (c *Config) NewBackOffWithContext(ctx context.Context) backoff.BackOff {
b := c.NewBackOff()
return backoff.WithContext(b, ctx)
}
恢复通知
标准 backoff.RetryNotify
的用法:
func RetryNotify(operation Operation, b BackOff, notify Notify) error {
return RetryNotifyWithTimer(operation, b, notify, nil)
}
// Operation 是由Retry()或RetryNotify()执行的。
// 如果该操作返回错误,将使用退避策略重试。
type Operation func() error
// Notify是一个出错通知的函数。
// 如果操作失败(有错误),它会收到一个操作错误和回退延迟。
// 注意,如果退避政策要求停止重试。通知函数不会被调用。
type Notify func(error, time.Duration)
如果出现问题,需要多次重试才恢复,会存在几个问题:
- Notify()方法会被调用多次
- 不好判断是否恢复:理论上"恢复"的概念是先有出错(一次或者连续多次出错),然后成功(出错之后的第一次不出错)
NotifyRecover() 方法是 backoff.RetryNotify
的封装器,它为之前操作失败但后来恢复的情况增加了另一个回调。这个包装器的主要目的是只在操作第一次失败时调用 “notify”,在最后成功时调用 “recovered”。这有助于将日志信息限制在操作者需要被提醒的事件上。
这里的NotifyRecover() 方法包装了 Operation()
和 Notify()
函数:
func NotifyRecover(operation backoff.Operation, b backoff.BackOff, notify backoff.Notify, recovered func()) error {
var notified bool
return backoff.RetryNotify(func() error {
err := operation()
// notified为true说明之前执行过notify,即出现了一次或者多次连续错误。
// err为空说明operation不再出错
// 这才可以成为"恢复"
if err == nil && notified {
notified = false // 重置 notified ,下一次 operation() 再成功也不会再出发recovered()
recovered() // 满足逻辑,可以触发一次 recovered() 方法
}
return err
}, b, func(err error, d time.Duration) {
if !notified { // 只在第一次时调用真正的notify()函数,其他情况下忽略
notify(err, d)
notified = true
}
})
}
备注:感觉 notified 这个变量的取名不够清晰,它的语义不应该是"是否触发了通知",而是"是否发生了错误而一直没有恢复"。应该改为类似 errorNotRecoverd 之类的,语义更清晰一些。