状态管理
- 1: 状态管理概述
- 2: 状态管理的API
- 2.1: 状态管理API的概述
- 2.2: 状态管理API的Proto定义
- 2.3: 状态管理API的golang生成代码
- 2.4: 状态管理API的go client定义
- 2.5: 状态管理API的go sdk封装
- 3: 状态管理的高级特性
- 3.1: 状态管理高级特性的概述
- 3.2: 状态管理高级特性之并发控制
- 3.3: 状态管理高级特性之一致性
- 3.4: 状态管理高级特性之事务性
1 - 状态管理概述
状态管理 / State Management
2 - 状态管理的API
2.1 - 状态管理API的概述
2.2 - 状态管理API的Proto定义
State API的定义
State Management API 定义在 proto文件 dapr/proto/runtime/v1/dapr.proto 中:
service Dapr {
// Gets the state for a specific key.
rpc GetState(GetStateRequest) returns (GetStateResponse) {}
// Gets a bulk of state items for a list of keys
rpc GetBulkState(GetBulkStateRequest) returns (GetBulkStateResponse) {}
// Saves the state for a specific key.
rpc SaveState(SaveStateRequest) returns (google.protobuf.Empty) {}
// Deletes the state for a specific key.
rpc DeleteState(DeleteStateRequest) returns (google.protobuf.Empty) {}
// Executes transactions for a specified store
rpc ExecuteStateTransaction(ExecuteStateTransactionRequest) returns (google.protobuf.Empty) {}
...
}
另外的common.proto中定义了和state相关的消息和枚举:
// StateOptions configures concurrency and consistency for state operations
message StateOptions {
// Enum describing the supported concurrency for state.
enum StateConcurrency {
CONCURRENCY_UNSPECIFIED = 0;
CONCURRENCY_FIRST_WRITE = 1;
CONCURRENCY_LAST_WRITE = 2;
}
// Enum describing the supported consistency for state.
enum StateConsistency {
CONSISTENCY_UNSPECIFIED = 0;
CONSISTENCY_EVENTUAL = 1;
CONSISTENCY_STRONG = 2;
}
StateConcurrency concurrency = 1;
StateConsistency consistency = 2;
}
get state
GetStateRequest 包含store_name/key,还有并发要求和请求级别的metadata:
// GetStateRequest is the message to get key-value states from specific state store.
message GetStateRequest {
// The name of state store.
string store_name = 1;
// The key of the desired state
string key = 2;
// The read consistency of the state store.
common.v1.StateOptions.StateConsistency consistency = 3;
// The metadata which will be sent to state store components.
map<string,string> metadata = 4;
}
GetStateResponse 包含byte[] 形式的 state 数据 data,和特殊表示数据特定版本的etag:
// GetStateResponse is the response conveying the state value and etag.
message GetStateResponse {
// The byte array data
bytes data = 1;
// The entity tag which represents the specific version of data.
// ETag format is defined by the corresponding data store.
string etag = 2;
}
Get Bulk State
GetBulkStateRequest 是批量接口,一次性获取多个key的数据:
// GetBulkStateRequest is the message to get a list of key-value states from specific state store.
message GetBulkStateRequest {
// The name of state store.
string store_name = 1;
// The keys to get.
repeated string keys = 2;
// The number of parallel operations executed on the state store for a get operation.
// 在状态存储上用于get操作的并行操作执行的数量:也就是并发数,同时执行的请求数量
int32 parallelism = 3;
// The metadata which will be sent to state store components.
// 请求级别,意味着所有的key都是使用同样的metadata
map<string,string> metadata = 4;
}
GetBulkStateResponse:
// GetBulkStateResponse is the response conveying the list of state values.
message GetBulkStateResponse {
// The list of items containing the keys to get values for.
// 为啥不用map?
repeated BulkStateItem items = 1;
}
// BulkStateItem is the response item for a bulk get operation.
// Return values include the item key, data and etag.
message BulkStateItem {
// state item key
string key = 1;
// The byte array data
bytes data = 2;
// The entity tag which represents the specific version of data.
// ETag format is defined by the corresponding data store.
string etag = 3;
// The error that was returned from the state store in case of a failed get operation.
// 这里考虑了出错的可能,有机会给出错误信息
// 但是,单个get state 操作怎么没有定义错误信息?
// 只能在http/grpc协议层上报错?TODO:看看代码实现
string error = 4;
}
Save State
SaveStateRequest 支持多个状态的保存:
// SaveStateRequest is the message to save multiple states into state store.
message SaveStateRequest {
// The name of state store.
string store_name = 1;
// The array of the state key values.
repeated common.v1.StateItem states = 2;
}
// StateItem represents state key, value, and additional options to save state.
message StateItem {
// Required. The state key
string key = 1;
// Required. The state data for key
bytes value = 2;
// The entity tag which represents the specific version of data.
// The exact ETag format is defined by the corresponding data store.
string etag = 3;
// The metadata which will be passed to state store component.
map<string,string> metadata = 4;
// Options for concurrency and consistency to save the state.
StateOptions options = 5;
}
response为 google.protobuf.Empty。
Delete State
DeleteStateRequest:
// DeleteStateRequest is the message to delete key-value states in the specific state store.
message DeleteStateRequest {
// The name of state store.
string store_name = 1;
// The key of the desired state
string key = 2;
// The entity tag which represents the specific version of data.
// The exact ETag format is defined by the corresponding data store.
string etag = 3;
// State operation options which includes concurrency/
// consistency/retry_policy.
common.v1.StateOptions options = 4;
// The metadata which will be sent to state store components.
map<string,string> metadata = 5;
}
response为 google.protobuf.Empty。
Execute State Transaction
ExecuteStateTransactionRequest
// ExecuteStateTransactionRequest is the message to execute multiple operations on a specified store.
message ExecuteStateTransactionRequest {
// Required. name of state store.
string storeName = 1;
// Required. transactional operation list.
repeated TransactionalStateOperation operations = 2;
// The metadata used for transactional operations.
map<string,string> metadata = 3;
}
// TransactionalStateOperation is the message to execute a specified operation with a key-value pair.
message TransactionalStateOperation {
// The type of operation to be executed
// 具体有哪些操作?
string operationType = 1;
// State values to be operated on
common.v1.StateItem request = 2;
}
response为 google.protobuf.Empty。
2.3 - 状态管理API的golang生成代码
从proto api定义文件生成的golang代码,被存放在dapr项目的 pkg/proto/
目录下。
grpc服务定义
DaprServer 是 dapr 服务的服务器端API定义,包含多个 state 相关的方法:
// DaprServer is the server API for Dapr service.
type DaprServer interface {
// Gets the state for a specific key.
GetState(context.Context, *GetStateRequest) (*GetStateResponse, error)
// Gets a bulk of state items for a list of keys
GetBulkState(context.Context, *GetBulkStateRequest) (*GetBulkStateResponse, error)
// Saves the state for a specific key.
SaveState(context.Context, *SaveStateRequest) (*empty.Empty, error)
// Deletes the state for a specific key.
DeleteState(context.Context, *DeleteStateRequest) (*empty.Empty, error)
// Executes transactions for a specified store
ExecuteStateTransaction(context.Context, *ExecuteStateTransactionRequest) (*empty.Empty, error)
......
}
2.4 - 状态管理API的go client定义
DaprClient
这是根据proto生成的go代码
type DaprClient interface {
// Gets the state for a specific key.
GetState(ctx context.Context, in *GetStateRequest, opts ...grpc.CallOption) (*GetStateResponse, error)
// Gets a bulk of state items for a list of keys
GetBulkState(ctx context.Context, in *GetBulkStateRequest, opts ...grpc.CallOption) (*GetBulkStateResponse, error)
// Saves the state for a specific key.
SaveState(ctx context.Context, in *SaveStateRequest, opts ...grpc.CallOption) (*empty.Empty, error)
// Deletes the state for a specific key.
DeleteState(ctx context.Context, in *DeleteStateRequest, opts ...grpc.CallOption) (*empty.Empty, error)
// Executes transactions for a specified store
ExecuteStateTransaction(ctx context.Context, in *ExecuteStateTransactionRequest, opts ...grpc.CallOption) (*empty.Empty, error)
......
}
Get State
以 Get State 为例看 DaprClient 的实现:
func (c *daprClient) GetState(ctx context.Context, in *GetStateRequest, opts ...grpc.CallOption) (*GetStateResponse, error) {
out := new(GetStateResponse)
err := c.cc.Invoke(ctx, "/dapr.proto.runtime.v1.Dapr/GetState", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
只是简单调用远程方法。
2.5 - 状态管理API的go sdk封装
go sdk使用案例
https://github.com/dapr/go-sdk
对于简单场景,只要给出 store name / key / data 就好了:
ctx := context.Background()
data := []byte("hello")
store := "my-store" // defined in the component YAML
// save state with the key key1
if err := client.SaveState(ctx, store, "key1", data); err != nil {
panic(err)
}
// get state for key key1
item, err := client.GetState(ctx, store, "key1")
if err != nil {
panic(err)
}
fmt.Printf("data [key:%s etag:%s]: %s", item.Key, item.Etag, string(item.Value))
// delete state for key key1
if err := client.DeleteState(ctx, store, "key1"); err != nil {
panic(err)
}
get state
简单get方法,使用默认的并发选项:
// GetState retreaves state from specific store using default consistency option.
func (c *GRPCClient) GetState(ctx context.Context, store, key string) (item *StateItem, err error) {
return c.GetStateWithConsistency(ctx, store, key, StateConsistencyStrong)
}
但,默认并发选项是 StateConsistencyStrong,强一致性。
完整的get 方法:
// GetStateWithConsistency retreaves state from specific store using provided state consistency.
func (c *GRPCClient) GetStateWithConsistency(ctx context.Context, store, key string, sc StateConsistency) (item *StateItem, err error) {
if store == "" {
return nil, errors.New("nil store")
}
if key == "" {
return nil, errors.New("nil key")
}
req := &pb.GetStateRequest{
StoreName: store,
Key: key,
Consistency: (v1.StateOptions_StateConsistency(sc)),
}
result, err := c.protoClient.GetState(authContext(ctx), req)
if err != nil {
return nil, errors.Wrap(err, "error getting state")
}
return &StateItem{
Etag: result.Etag,
Key: key,
Value: result.Data,
}, nil
}
基本上也没做什么。
3 - 状态管理的高级特性
3.1 - 状态管理高级特性的概述
Dapr 状态管理的高级特性有:
- 并发
- 一致性
- 事务性
3.2 - 状态管理高级特性之并发控制
设计分析
dapr state 目前要求操作的并发控制有两个: FirstWrite 和 LastWrite。
const (
FirstWrite = "first-write"
LastWrite = "last-write"
)
LastWrite (Last Write Win模式)就简单了,每个写操作都只需要简单的执行即可,无需考虑是否并发。事实上就是不做并发控制。
FirstWrite (First Write Win模式)复杂一些,当有多个操作进行并发写时,只有第一个能成功。因此,必须有机制能够在执行写操作时判断从上次读到这次写,期间 state 数据没有被修改。也就是需要实现 CAS操作:CAS = Compare And Set。
Dapr state 的设计是引入一个名为 ETag 的机制:
- ETag 是一个整型,每个状态都会关联一个 ETag
- 每次创建或修改 state 时,ETag都会递增
- 进行写操作时:先读取现有state,拿到当前的ETag;在提交写操作时,传入之前的ETag。底层 state store的实现应该在执行写操作之前检查ETag是否匹配。
具体到各个操作:
- Save state
- grpc API:在请求的SaveStateRequest中通过 etag 字段提供
- HTTP API:在请求的json内容中通过etag字段提供
- Get state
- grpc API:在应答的 GetStateResponse 中通过 etag 字段提供
- HTTP API:在应答的 ETag header中提供
- Get Bulk
- grpc API:在应答的 GetBulkStateResponse 中通过 etag 字段提供
- HTTP API:在应答的json中通过 etag 字段提供
- Delete State
- grpc API:在请求的 DeleteStateRequest 中通过 etag 字段提供
- HTTP API:通过请求的 If-Match header提供
实现分析
Redis实现
redis 为了实现 state 要求的 etag,就必须在常规的key/value存储模型上增加 key/etag 的存储,实现方式就是 key / map as value,将一个 map 作为value(刚好redis本身也支持map结构)。然后在map中存储 data / version 等多个信息:
- key=version,存储ETag需要的version
- key=data,存储state的实际数据
读取state的时候将整个map as value读取,然后分别取data和version即可。
但写操作会比较麻烦, redis 本身不直接提供对多个字段的原子操作方式,因此在save和delete操作时需要通过LUA脚本来完成。
- concurrency 设置为 first-write :需要通过 etag 实现 CAS (Compare And Set)
- concurrency 设置为 last-write :忽略 etag,即使请求设置了也要重置
3.3 - 状态管理高级特性之一致性
设计分析
dapr state 目前对操作的一致性要求有两个: strong 和 eventual。
const (
Strong = "strong"
Eventual = "eventual"
)
eventual 就简单了,每个写操作都只需要简单的执行即可,后续的同步等操作由底层实现自行保证。
FirstWrite (First Write Win模式)复杂一些,当有多个操作进行并发写时,只有第一个能成功。因此,必须有机制能够在执行写操作时判断从上次读到这次写,期间 state 数据没有被修改。也就是需要实现 CAS操作:CAS = Compare And Set。
Dapr state 的设计是引入一个名为 ETag 的机制:
- ETag 是一个整型,每个状态都会关联一个 ETag
- 每次创建或修改 state 时,ETag都会递增
- 进行写操作时:先读取现有state,拿到当前的ETag;在提交写操作时,传入之前的ETag。底层 state store的实现应该在执行写操作之前检查ETag是否匹配。
具体到各个操作:
- Save state
- grpc API:在请求的SaveStateRequest中通过 etag 字段提供
- HTTP API:在请求的json内容中通过etag字段提供
- Get state
- grpc API:在应答的 GetStateResponse 中通过 etag 字段提供
- HTTP API:在应答的 ETag header中提供
- Get Bulk
- grpc API:在应答的 GetBulkStateResponse 中通过 etag 字段提供
- HTTP API:在应答的json中通过 etag 字段提供
- Delete State
- grpc API:在请求的 DeleteStateRequest 中通过 etag 字段提供
- HTTP API:通过请求的 If-Match header提供
实现分析
Redis实现
redis 为了实现 state 要求的 etag,就必须在常规的key/value存储模型上增加 key/etag 的存储,实现方式就是 key / map as value,将一个 map 作为value(刚好redis本身也支持map结构)。然后在map中存储 data / version 等多个信息:
- key=version,存储ETag需要的version
- key=data,存储state的实际数据
读取state的时候将整个map as value读取,然后分别取data和version即可。
但写操作会比较麻烦, redis 本身不直接提供对多个字段的原子操作方式,因此在save和delete操作时需要通过LUA脚本来完成。
- concurrency 设置为 first-write :需要通过 etag 实现 CAS (Compare And Set)
- concurrency 设置为 last-write :忽略 etag,即使请求设置了也要重置
3.4 - 状态管理高级特性之事务性
设计分析
如果 state store 要支持事务,则要求实现 TransactionalStore 接口:
type TransactionalStore interface {
// Init方法是和普通store接口一致的
Init(metadata Metadata) error
// 增加的是 Multi 方法
Multi(request *TransactionalStateRequest) error
}
Runtime ExecuteStateTransaction 方法会调用 state store 的 multi 方法。
实现分析
Redis实现
dapr redis state store的事务实现,是通过 redis-go 封装的 TxPipeline 实现的。
TODO:
- redis-go 如何实现的
- redis如何实现事务?multi?