components-contrib仓库中的代码:
components-contrib仓库的源码学习
Dapr源码学习之components-contrib仓库
- 1: workflow组件的源码学习
- 1.1: workflow定义和操作方法
- 1.2: temporal集成
1 - workflow组件的源码学习
components-contrib仓库中的workflow组件代码实现
1.1 - workflow定义和操作方法
workflow的定义和操作方法的具体内容
代码量比较少,就放在一起看吧。
接口定义
workflow 接口
workflow 接口定义了 workflow 上要履行的操作:
var ErrNotImplemented = errors.New("this component doesn't implement the current API operation")
type Workflow interface {
Init(metadata Metadata) error
Start(ctx context.Context, req *StartRequest) (*StartResponse, error)
Terminate(ctx context.Context, req *TerminateRequest) error
Get(ctx context.Context, req *GetRequest) (*StateResponse, error)
RaiseEvent(ctx context.Context, req *RaiseEventRequest) error
Purge(ctx context.Context, req *PurgeRequest) error
Pause(ctx context.Context, req *PauseRequest) error
Resume(ctx context.Context, req *ResumeRequest) error
}
其中 Init 是初始化 workflow 实现。
Start / Terminate / Pause / Resume 是 workflow 的生命周期管理。
如果没有实现上述操作,则需要返回错误,而错误信息在 ErrNotImplemented 中有统一给出。
操作
init 操作
通过 metadata 进行初始化,和其他组件类似:
type Workflow interface {
Init(metadata Metadata) error
......
}
type Metadata struct {
metadata.Base `json:",inline"`
}
Start 操作
start 操作用来开始一个工作流:
type Workflow interface {
Start(ctx context.Context, req *StartRequest) (*StartResponse, error)
......
}
// StartRequest is the struct describing a start workflow request.
type StartRequest struct {
InstanceID string `json:"instanceID"`
Options map[string]string `json:"options"`
WorkflowName string `json:"workflowName"`
WorkflowInput []byte `json:"workflowInput"`
}
type StartResponse struct {
InstanceID string `json:"instanceID"`
}
start 操作的请求参数是:
- InstanceID:
- Options:map[string]string
- WorkflowName:
- WorkflowInput: []byte
start 操作的响应参数是:
- InstanceID:
Terminate 操作
Terminate 操作用来终止一个 workflow:
type Workflow interface {
Terminate(ctx context.Context, req *TerminateRequest) error
}
type TerminateRequest struct {
InstanceID string `json:"instanceID"`
}
start 操作的请求只需要传递一个 InstanceID 参数。
Get 操作
Get 操作用来或者一个工作流实例的状态:
type Workflow interface {
Get(ctx context.Context, req *GetRequest) (*StateResponse, error)
......
}
type GetRequest struct {
InstanceID string `json:"instanceID"`
}
type StateResponse struct {
Workflow *WorkflowState `json:"workflow"`
}
type WorkflowState struct {
InstanceID string `json:"instanceID"`
WorkflowName string `json:"workflowName"`
CreatedAt time.Time `json:"startedAt"`
LastUpdatedAt time.Time `json:"lastUpdatedAt"`
RuntimeStatus string `json:"runtimeStatus"`
Properties map[string]string `json:"properties"`
}
Get 操作的请求只需要传递一个 InstanceID 参数。
Get 操作的响应参数是 WorkflowState,字段有:
- InstanceID:
- WorkflowName:
- CreatedAt
- LastUpdatedAt
- RuntimeStatus
- Properties
Purge 操作
Purge 操作用来终止一个 workflow:
type Workflow interface {
Purge(ctx context.Context, req *PurgeRequest) error
}
type PurgeRequest struct {
InstanceID string `json:"instanceID"`
}
Purge 操作的请求只需要传递一个 InstanceID 参数。
Pause 操作
Pause 操作用来暂停一个 workflow:
type Workflow interface {
Pause(ctx context.Context, req *PauseRequest) error
}
type PauseRequest struct {
InstanceID string `json:"instanceID"`
}
Pause 操作的请求只需要传递一个 InstanceID 参数。
Resume 操作
Resume 操作用来继续一个 workflow:
type Workflow interface {
Resume(ctx context.Context, req *ResumeRequest) error
}
type ResumeRequest struct {
InstanceID string `json:"instanceID"`
}
Resume 操作的请求只需要传递一个 InstanceID 参数。
1.2 - temporal集成
temporal集成的实现
workflow 定义
TemporalWF 结构体包含 temporal 的 client:
type TemporalWF struct {
client client.Client
logger logger.Logger
}
temporalMetadata 结构体定义 metadata:
type temporalMetadata struct {
Identity string `json:"identity" mapstructure:"identity"`
HostPort string `json:"hostport" mapstructure:"hostport"`
Namespace string `json:"namespace" mapstructure:"namespace"`
}
创建workflow
NewTemporalWorkflow()方法
// NewTemporalWorkflow returns a new workflow.
func NewTemporalWorkflow(logger logger.Logger) workflows.Workflow {
s := &TemporalWF{
logger: logger,
}
return s
}
Init()方法
func (c *TemporalWF) Init(metadata workflows.Metadata) error {
c.logger.Debugf("Temporal init start")
m, err := c.parseMetadata(metadata)
if err != nil {
return err
}
cOpt := client.Options{}
if m.HostPort != "" {
cOpt.HostPort = m.HostPort
}
if m.Identity != "" {
cOpt.Identity = m.Identity
}
if m.Namespace != "" {
cOpt.Namespace = m.Namespace
}
// Create the workflow client
newClient, err := client.Dial(cOpt)
if err != nil {
return err
}
c.client = newClient
return nil
}
func (c *TemporalWF) parseMetadata(meta workflows.Metadata) (*temporalMetadata, error) {
var m temporalMetadata
err := metadata.DecodeMetadata(meta.Properties, &m)
return &m, err
}
workflow操作
Start
func (c *TemporalWF) Start(ctx context.Context, req *workflows.StartRequest) (*workflows.StartResponse, error) {
c.logger.Debugf("starting workflow")
if len(req.Options) == 0 {
c.logger.Debugf("no options provided")
return nil, errors.New("no options provided. At the very least, a task queue is needed")
}
if _, ok := req.Options["task_queue"]; !ok {
c.logger.Debugf("no task queue provided")
return nil, errors.New("no task queue provided")
}
taskQ := req.Options["task_queue"]
opt := client.StartWorkflowOptions{ID: req.InstanceID, TaskQueue: taskQ}
var inputArgs interface{}
if err := decodeInputData(req.WorkflowInput, &inputArgs); err != nil {
return nil, fmt.Errorf("error decoding workflow input data: %w", err)
}
run, err := c.client.ExecuteWorkflow(ctx, opt, req.WorkflowName, inputArgs)
if err != nil {
return nil, fmt.Errorf("error executing workflow: %w", err)
}
wfStruct := workflows.StartResponse{InstanceID: run.GetID()}
return &wfStruct, nil
}
代码和 temporal 的牵连还是很重的,WorkflowInput 相当于透传给了 temporal ,dapr 对此没有做任何的抽象和封装,只是简单透传。
Terminate
func (c *TemporalWF) Terminate(ctx context.Context, req *workflows.TerminateRequest) error {
c.logger.Debugf("terminating workflow")
err := c.client.TerminateWorkflow(ctx, req.InstanceID, "", "")
if err != nil {
return fmt.Errorf("error terminating workflow: %w", err)
}
return nil
}