状态管理的runtime处理源码分析
Dapr状态管理的runtime处理源码分析
runtime 处理 state 请求的代码在 pkg/grpc/api.go
中。
get state
func (a *api) GetState(ctx context.Context, in *runtimev1pb.GetStateRequest) (*runtimev1pb.GetStateResponse, error) {
// 找 store name 对应的 state store
// 所以请求里面的 store name,必须对应 yaml 文件里面的 name
store, err := a.getStateStore(in.StoreName)
if err != nil {
apiServerLogger.Debug(err)
return &runtimev1pb.GetStateResponse{}, err
}
req := state.GetRequest{
Key: a.getModifiedStateKey(in.Key),
Metadata: in.Metadata,
Options: state.GetStateOption{
Consistency: stateConsistencyToString(in.Consistency),
},
}
// 执行查询
// 里面实际上会先执行 HGETALL 命令,失败后再执行 GET 命令
getResponse, err := store.Get(&req)
if err != nil {
err = fmt.Errorf("ERR_STATE_GET: %s", err)
apiServerLogger.Debug(err)
return &runtimev1pb.GetStateResponse{}, err
}
response := &runtimev1pb.GetStateResponse{}
if getResponse != nil {
response.Etag = getResponse.ETag
response.Data = getResponse.Data
}
return response, nil
}
get bulk state
get bulk 方法的实现是有 runtime 封装 get 方法而成,底层 state store 只需要实现单个查询的 get 即可。
func (a *api) GetBulkState(ctx context.Context, in *runtimev1pb.GetBulkStateRequest) (*runtimev1pb.GetBulkStateResponse, error) {
store, err := a.getStateStore(in.StoreName)
if err != nil {
apiServerLogger.Debug(err)
return &runtimev1pb.GetBulkStateResponse{}, err
}
resp := &runtimev1pb.GetBulkStateResponse{}
// 如果 Parallelism <= 0,则取默认值100
limiter := concurrency.NewLimiter(int(in.Parallelism))
for _, k := range in.Keys {
fn := func(param interface{}) {
req := state.GetRequest{
Key: a.getModifiedStateKey(param.(string)),
Metadata: in.Metadata,
}
r, err := store.Get(&req)
item := &runtimev1pb.BulkStateItem{
Key: param.(string),
}
if err != nil {
item.Error = err.Error()
} else if r != nil {
item.Data = r.Data
item.Etag = r.ETag
}
resp.Items = append(resp.Items, item)
}
limiter.Execute(fn, k)
}
limiter.Wait()
return resp, nil
}
save state
func (a *api) SaveState(ctx context.Context, in *runtimev1pb.SaveStateRequest) (*empty.Empty, error) {
store, err := a.getStateStore(in.StoreName)
if err != nil {
apiServerLogger.Debug(err)
return &empty.Empty{}, err
}
reqs := []state.SetRequest{}
for _, s := range in.States {
req := state.SetRequest{
Key: a.getModifiedStateKey(s.Key),
Metadata: s.Metadata,
Value: s.Value,
ETag: s.Etag,
}
if s.Options != nil {
req.Options = state.SetStateOption{
Consistency: stateConsistencyToString(s.Options.Consistency),
Concurrency: stateConcurrencyToString(s.Options.Concurrency),
}
}
reqs = append(reqs, req)
}
// 调用 store 的 BulkSet 方法
// 事实上store的Set方法根本没有被 runtime 调用???
err = store.BulkSet(reqs)
if err != nil {
err = fmt.Errorf("ERR_STATE_SAVE: %s", err)
apiServerLogger.Debug(err)
return &empty.Empty{}, err
}
return &empty.Empty{}, nil
}
delete state
func (a *api) DeleteState(ctx context.Context, in *runtimev1pb.DeleteStateRequest) (*empty.Empty, error) {
store, err := a.getStateStore(in.StoreName)
if err != nil {
apiServerLogger.Debug(err)
return &empty.Empty{}, err
}
req := state.DeleteRequest{
Key: a.getModifiedStateKey(in.Key),
Metadata: in.Metadata,
ETag: in.Etag,
}
if in.Options != nil {
req.Options = state.DeleteStateOption{
Concurrency: stateConcurrencyToString(in.Options.Concurrency),
Consistency: stateConsistencyToString(in.Options.Consistency),
}
}
// 调用 store 的delete方法
// store 的 BulkDelete 方法没有调用
// runtime 也没有对外暴露 BulkDelete 方法
err = store.Delete(&req)
if err != nil {
err = fmt.Errorf("ERR_STATE_DELETE: failed deleting state with key %s: %s", in.Key, err)
apiServerLogger.Debug(err)
return &empty.Empty{}, err
}
return &empty.Empty{}, nil
}
Execute State Transaction
如果要支持事务,则要求实现 TransactionalStore 接口:
type TransactionalStore interface {
// Init方法是和普通store接口一致的
Init(metadata Metadata) error
// 增加的是 Multi 方法
Multi(request *TransactionalStateRequest) error
}
runtime 的 ExecuteStateTransaction 方法的实现:
func (a *api) ExecuteStateTransaction(ctx context.Context, in *runtimev1pb.ExecuteStateTransactionRequest) (*empty.Empty, error) {
if a.stateStores == nil || len(a.stateStores) == 0 {
err := errors.New("ERR_STATE_STORE_NOT_CONFIGURED")
apiServerLogger.Debug(err)
return &empty.Empty{}, err
}
storeName := in.StoreName
if a.stateStores[storeName] == nil {
err := errors.New("ERR_STATE_STORE_NOT_FOUND")
apiServerLogger.Debug(err)
return &empty.Empty{}, err
}
// 检测是否是 TransactionalStore
transactionalStore, ok := a.stateStores[storeName].(state.TransactionalStore)
if !ok {
err := errors.New("ERR_STATE_STORE_NOT_SUPPORTED")
apiServerLogger.Debug(err)
return &empty.Empty{}, err
}
// 构造请求
operations := []state.TransactionalStateOperation{}
for _, inputReq := range in.Operations {
var operation state.TransactionalStateOperation
var req = inputReq.Request
switch state.OperationType(inputReq.OperationType) {
case state.Upsert:
setReq := state.SetRequest{
Key: a.getModifiedStateKey(req.Key),
// Limitation:
// components that cannot handle byte array need to deserialize/serialize in
// component sepcific way in components-contrib repo.
Value: req.Value,
Metadata: req.Metadata,
ETag: req.Etag,
}
if req.Options != nil {
setReq.Options = state.SetStateOption{
Concurrency: stateConcurrencyToString(req.Options.Concurrency),
Consistency: stateConsistencyToString(req.Options.Consistency),
}
}
operation = state.TransactionalStateOperation{
Operation: state.Upsert,
Request: setReq,
}
case state.Delete:
delReq := state.DeleteRequest{
Key: a.getModifiedStateKey(req.Key),
Metadata: req.Metadata,
ETag: req.Etag,
}
if req.Options != nil {
delReq.Options = state.DeleteStateOption{
Concurrency: stateConcurrencyToString(req.Options.Concurrency),
Consistency: stateConsistencyToString(req.Options.Consistency),
}
}
operation = state.TransactionalStateOperation{
Operation: state.Delete,
Request: delReq,
}
default:
err := fmt.Errorf("ERR_OPERATION_NOT_SUPPORTED: operation type %s not supported", inputReq.OperationType)
apiServerLogger.Debug(err)
return &empty.Empty{}, err
}
operations = append(operations, operation)
}
// 调用 state store 的 Multi 方法执行有事务性的多个操作
err := transactionalStore.Multi(&state.TransactionalStateRequest{
Operations: operations,
Metadata: in.Metadata,
})
if err != nil {
err = fmt.Errorf("ERR_STATE_TRANSACTION: %s", err)
apiServerLogger.Debug(err)
return &empty.Empty{}, err
}
return &empty.Empty{}, nil
}