1 - 状态管理概述

Dapr状态管理概述

状态管理 / State Management

2 - 状态管理的API

Dapr状态管理的API

2.1 - 状态管理API的概述

Dapr状态管理API的概述

2.2 - 状态管理API的Proto定义

Dapr状态管理API的Proto定义

State API的定义

State Management API 定义在 proto文件 dapr/proto/runtime/v1/dapr.proto 中:

service Dapr {
  // Gets the state for a specific key.
  rpc GetState(GetStateRequest) returns (GetStateResponse) {}

  // Gets a bulk of state items for a list of keys
  rpc GetBulkState(GetBulkStateRequest) returns (GetBulkStateResponse) {}

  // Saves the state for a specific key.
  rpc SaveState(SaveStateRequest) returns (google.protobuf.Empty) {}

  // Deletes the state for a specific key.
  rpc DeleteState(DeleteStateRequest) returns (google.protobuf.Empty) {}
  
  // Executes transactions for a specified store
  rpc ExecuteStateTransaction(ExecuteStateTransactionRequest) returns (google.protobuf.Empty) {}
  ...
}

另外的common.proto中定义了和state相关的消息和枚举:

// StateOptions configures concurrency and consistency for state operations
message StateOptions {
  // Enum describing the supported concurrency for state.
  enum StateConcurrency {
    CONCURRENCY_UNSPECIFIED = 0;
    CONCURRENCY_FIRST_WRITE = 1;
    CONCURRENCY_LAST_WRITE = 2;
  }

  // Enum describing the supported consistency for state.
  enum StateConsistency {
    CONSISTENCY_UNSPECIFIED = 0;
    CONSISTENCY_EVENTUAL = 1;
    CONSISTENCY_STRONG = 2;
  }

  StateConcurrency concurrency = 1;
  StateConsistency consistency = 2;
}

get state

GetStateRequest 包含store_name/key,还有并发要求和请求级别的metadata:

// GetStateRequest is the message to get key-value states from specific state store.
message GetStateRequest {
  // The name of state store.
  string store_name = 1;

  // The key of the desired state
  string key = 2;

  // The read consistency of the state store.
  common.v1.StateOptions.StateConsistency consistency = 3;

  // The metadata which will be sent to state store components.
  map<string,string> metadata = 4;
}

GetStateResponse 包含byte[] 形式的 state 数据 data,和特殊表示数据特定版本的etag:

// GetStateResponse is the response conveying the state value and etag.
message GetStateResponse {
  // The byte array data
  bytes data = 1;

  // The entity tag which represents the specific version of data.
  // ETag format is defined by the corresponding data store.
  string etag = 2;
}

Get Bulk State

GetBulkStateRequest 是批量接口,一次性获取多个key的数据:

// GetBulkStateRequest is the message to get a list of key-value states from specific state store.
message GetBulkStateRequest {
  // The name of state store.
  string store_name = 1;

  // The keys to get.
  repeated string keys = 2;

  // The number of parallel operations executed on the state store for a get operation.
  // 在状态存储上用于get操作的并行操作执行的数量:也就是并发数,同时执行的请求数量
  int32 parallelism = 3;

  // The metadata which will be sent to state store components.
  // 请求级别,意味着所有的key都是使用同样的metadata
  map<string,string> metadata = 4;
}

GetBulkStateResponse:

// GetBulkStateResponse is the response conveying the list of state values.
message GetBulkStateResponse {
  // The list of items containing the keys to get values for.
  // 为啥不用map?
  repeated BulkStateItem items = 1;
}

// BulkStateItem is the response item for a bulk get operation.
// Return values include the item key, data and etag.
message BulkStateItem {
  // state item key
  string key = 1;

  // The byte array data
  bytes data = 2;

  // The entity tag which represents the specific version of data.
  // ETag format is defined by the corresponding data store.
  string etag = 3;

  // The error that was returned from the state store in case of a failed get operation.
  // 这里考虑了出错的可能,有机会给出错误信息
  // 但是,单个get state 操作怎么没有定义错误信息?
  // 只能在http/grpc协议层上报错?TODO:看看代码实现
  string error = 4;
}

Save State

SaveStateRequest 支持多个状态的保存:

// SaveStateRequest is the message to save multiple states into state store.
message SaveStateRequest {
  // The name of state store.
  string store_name = 1;

  // The array of the state key values.
  repeated common.v1.StateItem states = 2;
}

// StateItem represents state key, value, and additional options to save state.
message StateItem {
  // Required. The state key
  string key = 1;

  // Required. The state data for key
  bytes value = 2;

  // The entity tag which represents the specific version of data.
  // The exact ETag format is defined by the corresponding data store.
  string etag = 3;

  // The metadata which will be passed to state store component.
  map<string,string> metadata = 4;

  // Options for concurrency and consistency to save the state.
  StateOptions options = 5;
}

response为 google.protobuf.Empty。

Delete State

DeleteStateRequest:

// DeleteStateRequest is the message to delete key-value states in the specific state store.
message DeleteStateRequest {
  // The name of state store.
  string store_name = 1;

  // The key of the desired state
  string key = 2;

  // The entity tag which represents the specific version of data.
  // The exact ETag format is defined by the corresponding data store.
  string etag = 3;

  // State operation options which includes concurrency/
  // consistency/retry_policy.
  common.v1.StateOptions options = 4;

  // The metadata which will be sent to state store components.
  map<string,string> metadata = 5;
}

response为 google.protobuf.Empty。

Execute State Transaction

ExecuteStateTransactionRequest

// ExecuteStateTransactionRequest is the message to execute multiple operations on a specified store.
message ExecuteStateTransactionRequest {
  // Required. name of state store.
  string storeName = 1;

  // Required. transactional operation list.
  repeated TransactionalStateOperation operations = 2;

  // The metadata used for transactional operations.
  map<string,string> metadata = 3;
}

// TransactionalStateOperation is the message to execute a specified operation with a key-value pair.
message TransactionalStateOperation {
  // The type of operation to be executed
  // 具体有哪些操作?
  string operationType = 1;

  // State values to be operated on 
  common.v1.StateItem request = 2;
}

response为 google.protobuf.Empty。

2.3 - 状态管理API的golang生成代码

Dapr状态管理API的golang生成代码

从proto api定义文件生成的golang代码,被存放在dapr项目的 pkg/proto/ 目录下。

grpc服务定义

DaprServer 是 dapr 服务的服务器端API定义,包含多个 state 相关的方法:

// DaprServer is the server API for Dapr service.
type DaprServer interface {
	// Gets the state for a specific key.
	GetState(context.Context, *GetStateRequest) (*GetStateResponse, error)
	// Gets a bulk of state items for a list of keys
	GetBulkState(context.Context, *GetBulkStateRequest) (*GetBulkStateResponse, error)
	// Saves the state for a specific key.
	SaveState(context.Context, *SaveStateRequest) (*empty.Empty, error)
	// Deletes the state for a specific key.
	DeleteState(context.Context, *DeleteStateRequest) (*empty.Empty, error)
	// Executes transactions for a specified store
	ExecuteStateTransaction(context.Context, *ExecuteStateTransactionRequest) (*empty.Empty, error)
   ......
}

2.4 - 状态管理API的go client定义

Dapr状态管理API的golang生成代码

DaprClient

https://github.com/dapr/dapr/blob/11741c6cd697e08b2e776943e61bb2e3388c85a8/pkg/proto/runtime/v1/dapr.pb.go

这是根据proto生成的go代码

type DaprClient interface {
	// Gets the state for a specific key.
	GetState(ctx context.Context, in *GetStateRequest, opts ...grpc.CallOption) (*GetStateResponse, error)
	// Gets a bulk of state items for a list of keys
	GetBulkState(ctx context.Context, in *GetBulkStateRequest, opts ...grpc.CallOption) (*GetBulkStateResponse, error)
	// Saves the state for a specific key.
	SaveState(ctx context.Context, in *SaveStateRequest, opts ...grpc.CallOption) (*empty.Empty, error)
	// Deletes the state for a specific key.
	DeleteState(ctx context.Context, in *DeleteStateRequest, opts ...grpc.CallOption) (*empty.Empty, error)
	// Executes transactions for a specified store
	ExecuteStateTransaction(ctx context.Context, in *ExecuteStateTransactionRequest, opts ...grpc.CallOption) (*empty.Empty, error)
	......
}

Get State

以 Get State 为例看 DaprClient 的实现:

func (c *daprClient) GetState(ctx context.Context, in *GetStateRequest, opts ...grpc.CallOption) (*GetStateResponse, error) {
	out := new(GetStateResponse)
	err := c.cc.Invoke(ctx, "/dapr.proto.runtime.v1.Dapr/GetState", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

只是简单调用远程方法。

2.5 - 状态管理API的go sdk封装

Dapr状态管理API的go sdk封装

go sdk使用案例

https://github.com/dapr/go-sdk

对于简单场景,只要给出 store name / key / data 就好了:

ctx := context.Background()
data := []byte("hello")
store := "my-store" // defined in the component YAML 

// save state with the key key1
if err := client.SaveState(ctx, store, "key1", data); err != nil {
    panic(err)
}

// get state for key key1
item, err := client.GetState(ctx, store, "key1")
if err != nil {
    panic(err)
}
fmt.Printf("data [key:%s etag:%s]: %s", item.Key, item.Etag, string(item.Value))

// delete state for key key1
if err := client.DeleteState(ctx, store, "key1"); err != nil {
    panic(err)
}

get state

简单get方法,使用默认的并发选项:

// GetState retreaves state from specific store using default consistency option.
func (c *GRPCClient) GetState(ctx context.Context, store, key string) (item *StateItem, err error) {
	return c.GetStateWithConsistency(ctx, store, key, StateConsistencyStrong)
}

但,默认并发选项是 StateConsistencyStrong,强一致性。

完整的get 方法:

// GetStateWithConsistency retreaves state from specific store using provided state consistency.
func (c *GRPCClient) GetStateWithConsistency(ctx context.Context, store, key string, sc StateConsistency) (item *StateItem, err error) {
	if store == "" {
		return nil, errors.New("nil store")
	}
	if key == "" {
		return nil, errors.New("nil key")
	}

	req := &pb.GetStateRequest{
		StoreName:   store,
		Key:         key,
		Consistency: (v1.StateOptions_StateConsistency(sc)),
	}

	result, err := c.protoClient.GetState(authContext(ctx), req)
	if err != nil {
		return nil, errors.Wrap(err, "error getting state")
	}

	return &StateItem{
		Etag:  result.Etag,
		Key:   key,
		Value: result.Data,
	}, nil
}

基本上也没做什么。

3 - 状态管理的高级特性

Dapr状态管理的高级特性

3.1 - 状态管理高级特性的概述

Dapr状态管理高级特性的概述

Dapr 状态管理的高级特性有:

  • 并发
  • 一致性
  • 事务性

3.2 - 状态管理高级特性之并发控制

Dapr状态管理高级特性之并发控制

设计分析

dapr state 目前要求操作的并发控制有两个: FirstWrite 和 LastWrite。

const (
	FirstWrite = "first-write"
	LastWrite  = "last-write"
)

LastWrite (Last Write Win模式)就简单了,每个写操作都只需要简单的执行即可,无需考虑是否并发。事实上就是不做并发控制。

FirstWrite (First Write Win模式)复杂一些,当有多个操作进行并发写时,只有第一个能成功。因此,必须有机制能够在执行写操作时判断从上次读到这次写,期间 state 数据没有被修改。也就是需要实现 CAS操作:CAS = Compare And Set。

Dapr state 的设计是引入一个名为 ETag 的机制:

  • ETag 是一个整型,每个状态都会关联一个 ETag
  • 每次创建或修改 state 时,ETag都会递增
  • 进行写操作时:先读取现有state,拿到当前的ETag;在提交写操作时,传入之前的ETag。底层 state store的实现应该在执行写操作之前检查ETag是否匹配。

具体到各个操作:

  1. Save state
    • grpc API:在请求的SaveStateRequest中通过 etag 字段提供
    • HTTP API:在请求的json内容中通过etag字段提供
  2. Get state
    • grpc API:在应答的 GetStateResponse 中通过 etag 字段提供
    • HTTP API:在应答的 ETag header中提供
  3. Get Bulk
    • grpc API:在应答的 GetBulkStateResponse 中通过 etag 字段提供
    • HTTP API:在应答的json中通过 etag 字段提供
  4. Delete State
    • grpc API:在请求的 DeleteStateRequest 中通过 etag 字段提供
    • HTTP API:通过请求的 If-Match header提供

实现分析

Redis实现

redis 为了实现 state 要求的 etag,就必须在常规的key/value存储模型上增加 key/etag 的存储,实现方式就是 key / map as value,将一个 map 作为value(刚好redis本身也支持map结构)。然后在map中存储 data / version 等多个信息:

  • key=version,存储ETag需要的version
  • key=data,存储state的实际数据

读取state的时候将整个map as value读取,然后分别取data和version即可。

但写操作会比较麻烦, redis 本身不直接提供对多个字段的原子操作方式,因此在save和delete操作时需要通过LUA脚本来完成。

  • concurrency 设置为 first-write :需要通过 etag 实现 CAS (Compare And Set)
  • concurrency 设置为 last-write :忽略 etag,即使请求设置了也要重置

3.3 - 状态管理高级特性之一致性

Dapr状态管理高级特性之一致性

设计分析

dapr state 目前对操作的一致性要求有两个: strong 和 eventual。

const (
	Strong     = "strong"
	Eventual   = "eventual"
)

eventual 就简单了,每个写操作都只需要简单的执行即可,后续的同步等操作由底层实现自行保证。

FirstWrite (First Write Win模式)复杂一些,当有多个操作进行并发写时,只有第一个能成功。因此,必须有机制能够在执行写操作时判断从上次读到这次写,期间 state 数据没有被修改。也就是需要实现 CAS操作:CAS = Compare And Set。

Dapr state 的设计是引入一个名为 ETag 的机制:

  • ETag 是一个整型,每个状态都会关联一个 ETag
  • 每次创建或修改 state 时,ETag都会递增
  • 进行写操作时:先读取现有state,拿到当前的ETag;在提交写操作时,传入之前的ETag。底层 state store的实现应该在执行写操作之前检查ETag是否匹配。

具体到各个操作:

  1. Save state
    • grpc API:在请求的SaveStateRequest中通过 etag 字段提供
    • HTTP API:在请求的json内容中通过etag字段提供
  2. Get state
    • grpc API:在应答的 GetStateResponse 中通过 etag 字段提供
    • HTTP API:在应答的 ETag header中提供
  3. Get Bulk
    • grpc API:在应答的 GetBulkStateResponse 中通过 etag 字段提供
    • HTTP API:在应答的json中通过 etag 字段提供
  4. Delete State
    • grpc API:在请求的 DeleteStateRequest 中通过 etag 字段提供
    • HTTP API:通过请求的 If-Match header提供

实现分析

Redis实现

redis 为了实现 state 要求的 etag,就必须在常规的key/value存储模型上增加 key/etag 的存储,实现方式就是 key / map as value,将一个 map 作为value(刚好redis本身也支持map结构)。然后在map中存储 data / version 等多个信息:

  • key=version,存储ETag需要的version
  • key=data,存储state的实际数据

读取state的时候将整个map as value读取,然后分别取data和version即可。

但写操作会比较麻烦, redis 本身不直接提供对多个字段的原子操作方式,因此在save和delete操作时需要通过LUA脚本来完成。

  • concurrency 设置为 first-write :需要通过 etag 实现 CAS (Compare And Set)
  • concurrency 设置为 last-write :忽略 etag,即使请求设置了也要重置

3.4 - 状态管理高级特性之事务性

Dapr状态管理高级特性之事务性

设计分析

如果 state store 要支持事务,则要求实现 TransactionalStore 接口:

type TransactionalStore interface {
   // Init方法是和普通store接口一致的
   Init(metadata Metadata) error
   // 增加的是 Multi 方法
   Multi(request *TransactionalStateRequest) error
}

Runtime ExecuteStateTransaction 方法会调用 state store 的 multi 方法。

实现分析

Redis实现

dapr redis state store的事务实现,是通过 redis-go 封装的 TxPipeline 实现的。

TODO:

  • redis-go 如何实现的
  • redis如何实现事务?multi?