状态管理的初始化源码分析

Dapr状态管理的初始化源码分析

State Store Registry

stateStoreRegistry的初始化准备

stateStoreRegistry Registry 的初始化在 runtime 初始化时进行:

func NewDaprRuntime(runtimeConfig *Config, globalConfig *config.Configuration) *DaprRuntime {
  ......
  stateStoreRegistry:     state_loader.NewRegistry(),
}

func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {	
  ......
  a.stateStoreRegistry.Register(opts.states...)
  ......
}

这些 opts 来自 runtime 启动时的配置,如 cmd/daprd/main.go 下:

func main() {
	rt, err := runtime.FromFlags()
	if err != nil {
		log.Fatal(err)
	}

	err = rt.Run(
    ......
    runtime.WithStates(
			state_loader.New("redis", func() state.Store {
				return state_redis.NewRedisStateStore(logContrib)
			}),
			state_loader.New("consul", func() state.Store {
				return consul.NewConsulStateStore(logContrib)
			}),
			state_loader.New("azure.blobstorage", func() state.Store {
				return state_azure_blobstorage.NewAzureBlobStorageStore(logContrib)
			}),
			state_loader.New("azure.cosmosdb", func() state.Store {
				return state_cosmosdb.NewCosmosDBStateStore(logContrib)
			}),
			state_loader.New("azure.tablestorage", func() state.Store {
				return state_azure_tablestorage.NewAzureTablesStateStore(logContrib)
			}),
			//state_loader.New("etcd", func() state.Store {
			//	return etcd.NewETCD(logContrib)
			//}),
			state_loader.New("cassandra", func() state.Store {
				return cassandra.NewCassandraStateStore(logContrib)
			}),
			state_loader.New("memcached", func() state.Store {
				return memcached.NewMemCacheStateStore(logContrib)
			}),
			state_loader.New("mongodb", func() state.Store {
				return mongodb.NewMongoDB(logContrib)
			}),
			state_loader.New("zookeeper", func() state.Store {
				return zookeeper.NewZookeeperStateStore(logContrib)
			}),
			state_loader.New("gcp.firestore", func() state.Store {
				return firestore.NewFirestoreStateStore(logContrib)
			}),
			state_loader.New("postgresql", func() state.Store {
				return postgresql.NewPostgreSQLStateStore(logContrib)
			}),
			state_loader.New("sqlserver", func() state.Store {
				return sqlserver.NewSQLServerStateStore(logContrib)
			}),
			state_loader.New("hazelcast", func() state.Store {
				return hazelcast.NewHazelcastStore(logContrib)
			}),
			state_loader.New("cloudstate.crdt", func() state.Store {
				return cloudstate.NewCRDT(logContrib)
			}),
			state_loader.New("couchbase", func() state.Store {
				return couchbase.NewCouchbaseStateStore(logContrib)
			}),
			state_loader.New("aerospike", func() state.Store {
				return aerospike.NewAerospikeStateStore(logContrib)
			}),
		),
    ......
}

在这里配置各种 state store 的实现。

State Store Registry的实现方式

pkg/components/state/registry.go,定义了registry的接口和数据结构:

// Registry is an interface for a component that returns registered state store implementations
type Registry interface {
	Register(components ...State)
	CreateStateStore(name string) (state.Store, error)
}

type stateStoreRegistry struct {
	stateStores map[string]func() state.Store
}

state.Store 是 dapr 定义的标准 state store的接口,所有的实现都要遵循这个接口。定义在 github.com/dapr/components-contrib/state/store.go 文件中:

// Store is an interface to perform operations on store
type Store interface {
	Init(metadata Metadata) error
	Delete(req *DeleteRequest) error
	BulkDelete(req []DeleteRequest) error
	Get(req *GetRequest) (*GetResponse, error)
	Set(req *SetRequest) error
	BulkSet(req []SetRequest) error
}

前面 runtime 初始化时,每个实现都通过 New 方法将 name 和对应的 state store 关联起来:

type State struct {
	Name          string
	FactoryMethod func() state.Store
}

func New(name string, factoryMethod func() state.Store) State {
	return State{
		Name:          name,
		FactoryMethod: factoryMethod,
	}
}

State Store的初始化流程

pkg/runtime/runtime.go :

State 的初始化在 runtime 初始化时进行:

func (a *DaprRuntime) initRuntime(opts *runtimeOpts) error {
	......
	go a.processComponents()
	......
}
func (a *DaprRuntime) processComponents() {
   for {
      comp, more := <-a.pendingComponents
      if !more {
         a.pendingComponentsDone <- true
         return
      }
      if err := a.processOneComponent(comp); err != nil {
         log.Errorf("process component %s error, %s", comp.Name, err)
      }
   }
}

processOneComponent:

func (a *DaprRuntime) processOneComponent(comp components_v1alpha1.Component) error {
	res := a.preprocessOneComponent(&comp)
  
	compCategory := a.figureOutComponentCategory(comp)

	......
	return nil
}

doProcessOneComponent:

func (a *DaprRuntime) doProcessOneComponent(category ComponentCategory, comp components_v1alpha1.Component) error {
	switch category {
	case stateComponent:
		return a.initState(comp)
	}
		......
	return nil
}

initState方法的实现:

// Refer for state store api decision  https://github.com/dapr/dapr/blob/master/docs/decision_records/api/API-008-multi-state-store-api-design.md
func (a *DaprRuntime) initState(s components_v1alpha1.Component) error {
	// 构建 state store(这里才开始集成components的代码)
	store, err := a.stateStoreRegistry.CreateStateStore(s.Spec.Type)
	if err != nil {
		log.Warnf("error creating state store %s: %s", s.Spec.Type, err)
		diag.DefaultMonitoring.ComponentInitFailed(s.Spec.Type, "creation")
		return err
	}
	if store != nil {
		props := a.convertMetadataItemsToProperties(s.Spec.Metadata)
		// components的store实现在这里做初始化,如建连
		err := store.Init(state.Metadata{
			Properties: props,
		})
		if err != nil {
			diag.DefaultMonitoring.ComponentInitFailed(s.Spec.Type, "init")
			log.Warnf("error initializing state store %s: %s", s.Spec.Type, err)
			return err
		}

		// 将初始化完成的store实现存放在runtime中
		a.stateStores[s.ObjectMeta.Name] = store

		// set specified actor store if "actorStateStore" is true in the spec.
		actorStoreSpecified := props[actorStateStore]
		if actorStoreSpecified == "true" {
			if a.actorStateStoreCount++; a.actorStateStoreCount == 1 {
				a.actorStateStoreName = s.ObjectMeta.Name
			}
		}
		diag.DefaultMonitoring.ComponentInitialized(s.Spec.Type)
	}

	if a.actorStateStoreName == "" || a.actorStateStoreCount != 1 {
		log.Warnf("either no actor state store or multiple actor state stores are specified in the configuration, actor stores specified: %d", a.actorStateStoreCount)
	}

	return nil
}

其中 CreateStateStore 方法的实现在 pkg/components/state/registry.go 中:

func (s *stateStoreRegistry) CreateStateStore(name string) (state.Store, error) {
	if method, ok := s.stateStores[name]; ok {
		return method(), nil
	}
	return nil, errors.Errorf("couldn't find state store %s", name)
}