1 - 状态管理源码的概述

Dapr状态管理源码的概述

状态管理的源码

2 - 状态管理的初始化源码分析

Dapr状态管理的初始化源码分析

State Store Registry

stateStoreRegistry的初始化准备

stateStoreRegistry Registry 的初始化在 runtime 初始化时进行:

func NewDaprRuntime(runtimeConfig *Config, globalConfig *config.Configuration) *DaprRuntime {
  ......
  stateStoreRegistry:     state_loader.NewRegistry(),
}

func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {	
  ......
  a.stateStoreRegistry.Register(opts.states...)
  ......
}

这些 opts 来自 runtime 启动时的配置,如 cmd/daprd/main.go 下:

func main() {
	rt, err := runtime.FromFlags()
	if err != nil {
		log.Fatal(err)
	}

	err = rt.Run(
    ......
    runtime.WithStates(
			state_loader.New("redis", func() state.Store {
				return state_redis.NewRedisStateStore(logContrib)
			}),
			state_loader.New("consul", func() state.Store {
				return consul.NewConsulStateStore(logContrib)
			}),
			state_loader.New("azure.blobstorage", func() state.Store {
				return state_azure_blobstorage.NewAzureBlobStorageStore(logContrib)
			}),
			state_loader.New("azure.cosmosdb", func() state.Store {
				return state_cosmosdb.NewCosmosDBStateStore(logContrib)
			}),
			state_loader.New("azure.tablestorage", func() state.Store {
				return state_azure_tablestorage.NewAzureTablesStateStore(logContrib)
			}),
			//state_loader.New("etcd", func() state.Store {
			//	return etcd.NewETCD(logContrib)
			//}),
			state_loader.New("cassandra", func() state.Store {
				return cassandra.NewCassandraStateStore(logContrib)
			}),
			state_loader.New("memcached", func() state.Store {
				return memcached.NewMemCacheStateStore(logContrib)
			}),
			state_loader.New("mongodb", func() state.Store {
				return mongodb.NewMongoDB(logContrib)
			}),
			state_loader.New("zookeeper", func() state.Store {
				return zookeeper.NewZookeeperStateStore(logContrib)
			}),
			state_loader.New("gcp.firestore", func() state.Store {
				return firestore.NewFirestoreStateStore(logContrib)
			}),
			state_loader.New("postgresql", func() state.Store {
				return postgresql.NewPostgreSQLStateStore(logContrib)
			}),
			state_loader.New("sqlserver", func() state.Store {
				return sqlserver.NewSQLServerStateStore(logContrib)
			}),
			state_loader.New("hazelcast", func() state.Store {
				return hazelcast.NewHazelcastStore(logContrib)
			}),
			state_loader.New("cloudstate.crdt", func() state.Store {
				return cloudstate.NewCRDT(logContrib)
			}),
			state_loader.New("couchbase", func() state.Store {
				return couchbase.NewCouchbaseStateStore(logContrib)
			}),
			state_loader.New("aerospike", func() state.Store {
				return aerospike.NewAerospikeStateStore(logContrib)
			}),
		),
    ......
}

在这里配置各种 state store 的实现。

State Store Registry的实现方式

pkg/components/state/registry.go,定义了registry的接口和数据结构:

// Registry is an interface for a component that returns registered state store implementations
type Registry interface {
	Register(components ...State)
	CreateStateStore(name string) (state.Store, error)
}

type stateStoreRegistry struct {
	stateStores map[string]func() state.Store
}

state.Store 是 dapr 定义的标准 state store的接口,所有的实现都要遵循这个接口。定义在 github.com/dapr/components-contrib/state/store.go 文件中:

// Store is an interface to perform operations on store
type Store interface {
	Init(metadata Metadata) error
	Delete(req *DeleteRequest) error
	BulkDelete(req []DeleteRequest) error
	Get(req *GetRequest) (*GetResponse, error)
	Set(req *SetRequest) error
	BulkSet(req []SetRequest) error
}

前面 runtime 初始化时,每个实现都通过 New 方法将 name 和对应的 state store 关联起来:

type State struct {
	Name          string
	FactoryMethod func() state.Store
}

func New(name string, factoryMethod func() state.Store) State {
	return State{
		Name:          name,
		FactoryMethod: factoryMethod,
	}
}

State Store的初始化流程

pkg/runtime/runtime.go :

State 的初始化在 runtime 初始化时进行:

func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
	......
	go a.processComponents()
	......
}
func (a *DaprRuntime) processComponents() {
   for {
      comp, more := <-a.pendingComponents
      if !more {
         a.pendingComponentsDone <- true
         return
      }
      if err := a.processOneComponent(comp); err != nil {
         log.Errorf("process component %s error, %s", comp.Name, err)
      }
   }
}

processOneComponent:

func (a *DaprRuntime) processOneComponent(comp components_v1alpha1.Component) error {
	res := a.preprocessOneComponent(&comp)
  
	compCategory := a.figureOutComponentCategory(comp)

	......
	return nil
}

doProcessOneComponent:

func (a *DaprRuntime) doProcessOneComponent(category ComponentCategory, comp components_v1alpha1.Component) error {
	switch category {
	case stateComponent:
		return a.initState(comp)
	}
		......
	return nil
}

initState方法的实现:

// Refer for state store api decision  https://github.com/dapr/dapr/blob/master/docs/decision_records/api/API-008-multi-state-store-api-design.md
func (a *DaprRuntime) initState(s components_v1alpha1.Component) error {
	// 构建 state store(这里才开始集成components的代码)
	store, err := a.stateStoreRegistry.CreateStateStore(s.Spec.Type)
	if err != nil {
		log.Warnf("error creating state store %s: %s", s.Spec.Type, err)
		diag.DefaultMonitoring.ComponentInitFailed(s.Spec.Type, "creation")
		return err
	}
	if store != nil {
		props := a.convertMetadataItemsToProperties(s.Spec.Metadata)
		// components的store实现在这里做初始化,如建连
		err := store.Init(state.Metadata{
			Properties: props,
		})
		if err != nil {
			diag.DefaultMonitoring.ComponentInitFailed(s.Spec.Type, "init")
			log.Warnf("error initializing state store %s: %s", s.Spec.Type, err)
			return err
		}

		// 将初始化完成的store实现存放在runtime中
		a.stateStores[s.ObjectMeta.Name] = store

		// set specified actor store if "actorStateStore" is true in the spec.
		actorStoreSpecified := props[actorStateStore]
		if actorStoreSpecified == "true" {
			if a.actorStateStoreCount++; a.actorStateStoreCount == 1 {
				a.actorStateStoreName = s.ObjectMeta.Name
			}
		}
		diag.DefaultMonitoring.ComponentInitialized(s.Spec.Type)
	}

	if a.actorStateStoreName == "" || a.actorStateStoreCount != 1 {
		log.Warnf("either no actor state store or multiple actor state stores are specified in the configuration, actor stores specified: %d", a.actorStateStoreCount)
	}

	return nil
}

其中 CreateStateStore 方法的实现在 pkg/components/state/registry.go 中:

func (s *stateStoreRegistry) CreateStateStore(name string) (state.Store, error) {
	if method, ok := s.stateStores[name]; ok {
		return method(), nil
	}
	return nil, errors.Errorf("couldn't find state store %s", name)
}

3 - 状态管理的runtime处理源码分析

Dapr状态管理的runtime处理源码分析

runtime 处理 state 请求的代码在 pkg/grpc/api.go 中。

get state

func (a *api) GetState(ctx context.Context, in *runtimev1pb.GetStateRequest) (*runtimev1pb.GetStateResponse, error) {
  // 找 store name 对应的 state store
  // 所以请求里面的 store name,必须对应 yaml 文件里面的 name
	store, err := a.getStateStore(in.StoreName)
	if err != nil {
		apiServerLogger.Debug(err)
		return &runtimev1pb.GetStateResponse{}, err
	}

	req := state.GetRequest{
		Key:      a.getModifiedStateKey(in.Key),
		Metadata: in.Metadata,
		Options: state.GetStateOption{
			Consistency: stateConsistencyToString(in.Consistency),
		},
	}

  // 执行查询
  // 里面实际上会先执行 HGETALL 命令,失败后再执行 GET 命令
	getResponse, err := store.Get(&req)
	if err != nil {
		err = fmt.Errorf("ERR_STATE_GET: %s", err)
		apiServerLogger.Debug(err)
		return &runtimev1pb.GetStateResponse{}, err
	}

	response := &runtimev1pb.GetStateResponse{}
	if getResponse != nil {
		response.Etag = getResponse.ETag
		response.Data = getResponse.Data
	}
	return response, nil
}

get bulk state

get bulk 方法的实现是有 runtime 封装 get 方法而成,底层 state store 只需要实现单个查询的 get 即可。

func (a *api) GetBulkState(ctx context.Context, in *runtimev1pb.GetBulkStateRequest) (*runtimev1pb.GetBulkStateResponse, error) {
   store, err := a.getStateStore(in.StoreName)
   if err != nil {
      apiServerLogger.Debug(err)
      return &runtimev1pb.GetBulkStateResponse{}, err
   }

   resp := &runtimev1pb.GetBulkStateResponse{}
   // 如果 Parallelism <= 0,则取默认值100
   limiter := concurrency.NewLimiter(int(in.Parallelism))

   for _, k := range in.Keys {
      fn := func(param interface{}) {
         req := state.GetRequest{
            Key:      a.getModifiedStateKey(param.(string)),
            Metadata: in.Metadata,
         }

         r, err := store.Get(&req)
         item := &runtimev1pb.BulkStateItem{
            Key: param.(string),
         }
         if err != nil {
            item.Error = err.Error()
         } else if r != nil {
            item.Data = r.Data
            item.Etag = r.ETag
         }
         resp.Items = append(resp.Items, item)
      }

      limiter.Execute(fn, k)
   }
   limiter.Wait()

   return resp, nil
}

save state

func (a *api) SaveState(ctx context.Context, in *runtimev1pb.SaveStateRequest) (*empty.Empty, error) {
   store, err := a.getStateStore(in.StoreName)
   if err != nil {
      apiServerLogger.Debug(err)
      return &empty.Empty{}, err
   }

   reqs := []state.SetRequest{}
   for _, s := range in.States {
      req := state.SetRequest{
         Key:      a.getModifiedStateKey(s.Key),
         Metadata: s.Metadata,
         Value:    s.Value,
         ETag:     s.Etag,
      }
      if s.Options != nil {
         req.Options = state.SetStateOption{
            Consistency: stateConsistencyToString(s.Options.Consistency),
            Concurrency: stateConcurrencyToString(s.Options.Concurrency),
         }
      }
      reqs = append(reqs, req)
   }

   // 调用 store 的 BulkSet 方法
   // 事实上store的Set方法根本没有被 runtime 调用???
   err = store.BulkSet(reqs)
   if err != nil {
      err = fmt.Errorf("ERR_STATE_SAVE: %s", err)
      apiServerLogger.Debug(err)
      return &empty.Empty{}, err
   }
   return &empty.Empty{}, nil
}

delete state

func (a *api) DeleteState(ctx context.Context, in *runtimev1pb.DeleteStateRequest) (*empty.Empty, error) {
   store, err := a.getStateStore(in.StoreName)
   if err != nil {
      apiServerLogger.Debug(err)
      return &empty.Empty{}, err
   }

   req := state.DeleteRequest{
      Key:      a.getModifiedStateKey(in.Key),
      Metadata: in.Metadata,
      ETag:     in.Etag,
   }
   if in.Options != nil {
      req.Options = state.DeleteStateOption{
         Concurrency: stateConcurrencyToString(in.Options.Concurrency),
         Consistency: stateConsistencyToString(in.Options.Consistency),
      }
   }

   // 调用 store 的delete方法
   // store 的 BulkDelete 方法没有调用
   // runtime 也没有对外暴露 BulkDelete 方法
   err = store.Delete(&req)
   if err != nil {
      err = fmt.Errorf("ERR_STATE_DELETE: failed deleting state with key %s: %s", in.Key, err)
      apiServerLogger.Debug(err)
      return &empty.Empty{}, err
   }
   return &empty.Empty{}, nil
}

Execute State Transaction

如果要支持事务,则要求实现 TransactionalStore 接口:

type TransactionalStore interface {
   // Init方法是和普通store接口一致的
   Init(metadata Metadata) error
   // 增加的是 Multi 方法
   Multi(request *TransactionalStateRequest) error
}

runtime 的 ExecuteStateTransaction 方法的实现:

func (a *api) ExecuteStateTransaction(ctx context.Context, in *runtimev1pb.ExecuteStateTransactionRequest) (*empty.Empty, error) {
   if a.stateStores == nil || len(a.stateStores) == 0 {
      err := errors.New("ERR_STATE_STORE_NOT_CONFIGURED")
      apiServerLogger.Debug(err)
      return &empty.Empty{}, err
   }

   storeName := in.StoreName

   if a.stateStores[storeName] == nil {
      err := errors.New("ERR_STATE_STORE_NOT_FOUND")
      apiServerLogger.Debug(err)
      return &empty.Empty{}, err
   }

   // 检测是否是 TransactionalStore
   transactionalStore, ok := a.stateStores[storeName].(state.TransactionalStore)
   if !ok {
      err := errors.New("ERR_STATE_STORE_NOT_SUPPORTED")
      apiServerLogger.Debug(err)
      return &empty.Empty{}, err
   }

   // 构造请求
   operations := []state.TransactionalStateOperation{}
   for _, inputReq := range in.Operations {
      var operation state.TransactionalStateOperation
      var req = inputReq.Request
      switch state.OperationType(inputReq.OperationType) {
      case state.Upsert:
         setReq := state.SetRequest{
            Key: a.getModifiedStateKey(req.Key),
            // Limitation:
            // components that cannot handle byte array need to deserialize/serialize in
            // component sepcific way in components-contrib repo.
            Value:    req.Value,
            Metadata: req.Metadata,
            ETag:     req.Etag,
         }

         if req.Options != nil {
            setReq.Options = state.SetStateOption{
               Concurrency: stateConcurrencyToString(req.Options.Concurrency),
               Consistency: stateConsistencyToString(req.Options.Consistency),
            }
         }

         operation = state.TransactionalStateOperation{
            Operation: state.Upsert,
            Request:   setReq,
         }

      case state.Delete:
         delReq := state.DeleteRequest{
            Key:      a.getModifiedStateKey(req.Key),
            Metadata: req.Metadata,
            ETag:     req.Etag,
         }

         if req.Options != nil {
            delReq.Options = state.DeleteStateOption{
               Concurrency: stateConcurrencyToString(req.Options.Concurrency),
               Consistency: stateConsistencyToString(req.Options.Consistency),
            }
         }

         operation = state.TransactionalStateOperation{
            Operation: state.Delete,
            Request:   delReq,
         }

      default:
         err := fmt.Errorf("ERR_OPERATION_NOT_SUPPORTED: operation type %s not supported", inputReq.OperationType)
         apiServerLogger.Debug(err)
         return &empty.Empty{}, err
      }

      operations = append(operations, operation)
   }
 
   // 调用 state store 的 Multi 方法执行有事务性的多个操作
   err := transactionalStore.Multi(&state.TransactionalStateRequest{
      Operations: operations,
      Metadata:   in.Metadata,
   })

   if err != nil {
      err = fmt.Errorf("ERR_STATE_TRANSACTION: %s", err)
      apiServerLogger.Debug(err)
      return &empty.Empty{}, err
   }
   return &empty.Empty{}, nil
}

4 - 状态管理中Redis实现的处理源码分析

Dapr状态管理中Redis实现的处理源码分析

状态管理的redis实现

Redis的实现在 dapr/components-contrib 下,/state/redis/redis.go 中:

// StateStore is a Redis state store
type StateStore struct {
	client   *redis.Client
	json     jsoniter.API
	metadata metadata
	replicas int

	logger logger.Logger
}

// NewRedisStateStore returns a new redis state store
func NewRedisStateStore(logger logger.Logger) *StateStore {
	return &StateStore{
		json:   jsoniter.ConfigFastest,
		logger: logger,
	}
}

初始化

在 dapr runtime 初始化时,关联 redis 的 state 实现:

state_loader.New("redis", func() state.Store {
    return state_redis.NewRedisStateStore(logContrib)
}),

然后 Init 方法会在 state 初始化时被 dapr runtime 调用,Redis的实现内容为:

// Init does metadata and connection parsing
func (r *StateStore) Init(metadata state.Metadata) error {
	m, err := parseRedisMetadata(metadata)
	if err != nil {
		return err
	}
	r.metadata = m

	if r.metadata.failover {
		r.client = r.newFailoverClient(m)
	} else {
		r.client = r.newClient(m)
	}

	if _, err = r.client.Ping().Result(); err != nil {
		return fmt.Errorf("redis store: error connecting to redis at %s: %s", m.host, err)
	}

	r.replicas, err = r.getConnectedSlaves()

	return err
}

get state

get的实现方式:

// Get retrieves state from redis with a key
func (r *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
   res, err := r.client.DoContext(context.Background(), "HGETALL", req.Key).Result() // Prefer values with ETags
   if err != nil {
      return r.directGet(req) //Falls back to original get
   }
   if res == nil {
      // 结果为空的处理1
      return &state.GetResponse{}, nil
   }
   vals := res.([]interface{})
   if len(vals) == 0 {
      // 结果为空的处理2
      // 所以如果没有找到对应key的值,是给空应答,而不是报错
      return &state.GetResponse{}, nil
   }

   data, version, err := r.getKeyVersion(vals)
   if err != nil {
      return nil, err
   }
   return &state.GetResponse{
      Data: []byte(data),
      ETag: version,
   }, nil
}

支持ETag的实现方式

要支持ETag,就不能简单用 redis 的 key / value 方式直接在value中存放state的数据(data字段,byte[]格式),这个“value”需要包含出data之外的其他Etag字段,比如 version。

redis state实现的设计方式方式是:对于每个存储在 redis 中的 state item中,其value是一个hashmap,在这个value hashmap中通过不同的key存放多个信息:

  • data:state的数据
  • version:ETag需要的version

所以前面要用 HGETALL 命令把这个hashamap的所有key/value都取出来,然后现在要通过getKeyVersion方法来从这些key/value中读取data和version:

func (r *StateStore) getKeyVersion(vals []interface{}) (data string, version string, err error) {
   seenData := false
   seenVersion := false
   for i := 0; i < len(vals); i += 2 {
      field, _ := strconv.Unquote(fmt.Sprintf("%q", vals[i]))
      switch field {
      case "data":
         data, _ = strconv.Unquote(fmt.Sprintf("%q", vals[i+1]))
         seenData = true
      case "version":
         version, _ = strconv.Unquote(fmt.Sprintf("%q", vals[i+1]))
         seenVersion = true
      }
   }
   if !seenData || !seenVersion {
      return "", "", errors.New("required hash field 'data' or 'version' was not found")
   }
   return data, version, nil
}

返回的时候,带上ETag:

return &state.GetResponse{
      Data: []byte(data),
      ETag: version,
   }, nil

不支持ETag的实现方式

如果 HGETALL 命令执行失败,则fall back到普通场景:redis中只简单保存数据,没有etag。此时保存方式就是简单的key/value,用简单的 GET 命令直接读取:

func (r *StateStore) directGet(req *state.GetRequest) (*state.GetResponse, error) {
   res, err := r.client.DoContext(context.Background(), "GET", req.Key).Result()
   if err != nil {
      return nil, err
   }

   if res == nil {
      return &state.GetResponse{}, nil
   }

   s, _ := strconv.Unquote(fmt.Sprintf("%q", res))
   return &state.GetResponse{
      Data: []byte(s),
   }, nil
}

备注:这个设计有个性能问题,如果redis中的数据是用简单key/value存储,没有etag,则每次读取都要进行两个:第一次 HGETALL 命令失败,然后 fall back 用 GET 命令再读第二次。

save state

redis的实现,有 set 方法和 BulkSet

// Set saves state into redis
func (r *StateStore) Set(req *state.SetRequest) error {
   return state.SetWithOptions(r.setValue, req)
}

// BulkSet performs a bulks save operation
func (r *StateStore) BulkSet(req []state.SetRequest) error {
   for i := range req {
      err := r.Set(&req[i])
      if err != nil {
         // 这个地方有异议
         // 按照代码逻辑,只要有一个save操作失败,就直接return而放弃后续的操作
         return err
      }
   }

   return nil
}

实际实现在 r.setValue 方法中:

func (r *StateStore) setValue(req *state.SetRequest) error {
   err := state.CheckRequestOptions(req.Options)
   if err != nil {
      return err
   }
   
   // 解析etag,要求etag必须是可以转为整型
   ver, err := r.parseETag(req.ETag)
   if err != nil {
      return err
   }

   // LastWrite win意味着无视ETag的异同,强制写入
   // 所以这里重置 ver 为 0
   if req.Options.Concurrency == state.LastWrite {
      ver = 0
   }

   bt, _ := utils.Marshal(req.Value, r.json.Marshal)

	 // 用 EVAL 命令执行一段 LUA 脚本,脚本内容为 setQuery
   _, err = r.client.DoContext(context.Background(), "EVAL", setQuery, 1, req.Key, ver, bt).Result()
   if err != nil {
      return fmt.Errorf("failed to set key %s: %s", req.Key, err)
   }

	 // 如果要求强一致性,而且副本数量大于0
   if req.Options.Consistency == state.Strong && r.replicas > 0 {
     // 则需要等待所有副本数都写入成功
      _, err = r.client.DoContext(context.Background(), "WAIT", r.replicas, 1000).Result()
      if err != nil {
         return fmt.Errorf("timed out while waiting for %v replicas to acknowledge write", r.replicas)
      }
   }

   return nil
}

更多redis细节:

  • setQuery 脚本
setQuery                 = "local var1 = redis.pcall(\"HGET\", KEYS[1], \"version\"); if type(var1) == \"table\" then redis.call(\"DEL\", KEYS[1]); end; if not var1 or type(var1)==\"table\" or var1 == \"\" or var1 == ARGV[1] or ARGV[1] == \"0\" then redis.call(\"HSET\", KEYS[1], \"data\", ARGV[2]) return redis.call(\"HINCRBY\", KEYS[1], \"version\", 1) else return error(\"failed to set key \" .. KEYS[1]) end"
  • WAIT numreplicas timeout 命令:https://redis.io/commands/wait

delete state

// Delete performs a delete operation
func (r *StateStore) Delete(req *state.DeleteRequest) error {
   err := state.CheckRequestOptions(req.Options)
   if err != nil {
      return err
   }
   return state.DeleteWithOptions(r.deleteValue, req)
}

// 内部循环调用 Delete
// BulkDelete 方法没有暴露给 dapr runtime
// BulkDelete performs a bulk delete operation
func (r *StateStore) BulkDelete(req []state.DeleteRequest) error {
   for i := range req {
      err := r.Delete(&req[i])
      if err != nil {
         return err
      }
   }

   return nil
}

实际实现在 r.deleteValue 方法中:

func (r *StateStore) deleteValue(req *state.DeleteRequest) error {
   if req.ETag == "" {
      // ETag的空值则改为 “0” / 零值
      req.ETag = "0"
   }
   _, err := r.client.DoContext(context.Background(), "EVAL", delQuery, 1, req.Key, req.ETag).Result()

   if err != nil {
      return fmt.Errorf("failed to delete key '%s' due to ETag mismatch", req.Key)
   }

   return nil
}

更多redis细节:

  • delQuery 脚本
delQuery                 = "local var1 = redis.pcall(\"HGET\", KEYS[1], \"version\"); if not var1 or type(var1)==\"table\" or var1 == ARGV[1] or var1 == \"\" or ARGV[1] == \"0\" then return redis.call(\"DEL\", KEYS[1]) else return error(\"failed to delete \" .. KEYS[1]) end"

State Transaction

redis state store 实现了 TransactionalStore,它的 Multi方式:

// Multi performs a transactional operation. succeeds only if all operations succeed, and fails if one or more operations fail
func (r *StateStore) Multi(request *state.TransactionalStateRequest) error {
   // 用的是 redis-go 封装的 TxPipeline
   pipe := r.client.TxPipeline()
   for _, o := range request.Operations {
      if o.Operation == state.Upsert {
         req := o.Request.(state.SetRequest)

         bt, _ := utils.Marshal(req.Value, r.json.Marshal)

         pipe.Set(req.Key, bt, defaultExpirationTime)
      } else if o.Operation == state.Delete {
         req := o.Request.(state.DeleteRequest)
         pipe.Del(req.Key)
      }
   }

   _, err := pipe.Exec()
   return err
}