这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

components-contrib仓库的源码学习

Dapr源码学习之components-contrib仓库

components-contrib仓库中的代码:

https://github.com/dapr/components-contrib

1 - workflow组件的源码学习

components-contrib仓库中的workflow组件代码实现

1.1 - workflow定义和操作方法

workflow的定义和操作方法的具体内容

代码量比较少,就放在一起看吧。

接口定义

workflow 接口

workflow 接口定义了 workflow 上要履行的操作:

var ErrNotImplemented = errors.New("this component doesn't implement the current API operation")

type Workflow interface {
	Init(metadata Metadata) error
	Start(ctx context.Context, req *StartRequest) (*StartResponse, error)
	Terminate(ctx context.Context, req *TerminateRequest) error
	Get(ctx context.Context, req *GetRequest) (*StateResponse, error)
	RaiseEvent(ctx context.Context, req *RaiseEventRequest) error
	Purge(ctx context.Context, req *PurgeRequest) error
	Pause(ctx context.Context, req *PauseRequest) error
	Resume(ctx context.Context, req *ResumeRequest) error
}

其中 Init 是初始化 workflow 实现。

Start / Terminate / Pause / Resume 是 workflow 的生命周期管理。

如果没有实现上述操作,则需要返回错误,而错误信息在 ErrNotImplemented 中有统一给出。

操作

init 操作

通过 metadata 进行初始化,和其他组件类似:

type Workflow interface {
	Init(metadata Metadata) error
	......
}

type Metadata struct {
	metadata.Base `json:",inline"`
}

Start 操作

start 操作用来开始一个工作流:

type Workflow interface {
	Start(ctx context.Context, req *StartRequest) (*StartResponse, error)
	......
}

// StartRequest is the struct describing a start workflow request.
type StartRequest struct {
	InstanceID    string            `json:"instanceID"`
	Options       map[string]string `json:"options"`
	WorkflowName  string            `json:"workflowName"`
	WorkflowInput []byte            `json:"workflowInput"`
}

type StartResponse struct {
	InstanceID string `json:"instanceID"`
}

start 操作的请求参数是:

  • InstanceID:
  • Options:map[string]string
  • WorkflowName:
  • WorkflowInput: []byte

start 操作的响应参数是:

  • InstanceID:

Terminate 操作

Terminate 操作用来终止一个 workflow:

type Workflow interface {
	Terminate(ctx context.Context, req *TerminateRequest) error
}

type TerminateRequest struct {
	InstanceID string `json:"instanceID"`
}

start 操作的请求只需要传递一个 InstanceID 参数。

Get 操作

Get 操作用来或者一个工作流实例的状态:

type Workflow interface {
	Get(ctx context.Context, req *GetRequest) (*StateResponse, error)
	......
}

type GetRequest struct {
	InstanceID string `json:"instanceID"`
}

type StateResponse struct {
	Workflow *WorkflowState `json:"workflow"`
}

type WorkflowState struct {
	InstanceID    string            `json:"instanceID"`
	WorkflowName  string            `json:"workflowName"`
	CreatedAt     time.Time         `json:"startedAt"`
	LastUpdatedAt time.Time         `json:"lastUpdatedAt"`
	RuntimeStatus string            `json:"runtimeStatus"`
	Properties    map[string]string `json:"properties"`
}

Get 操作的请求只需要传递一个 InstanceID 参数。

Get 操作的响应参数是 WorkflowState,字段有:

  • InstanceID:
  • WorkflowName:
  • CreatedAt
  • LastUpdatedAt
  • RuntimeStatus
  • Properties

Purge 操作

Purge 操作用来终止一个 workflow:

type Workflow interface {
	Purge(ctx context.Context, req *PurgeRequest) error
}

type PurgeRequest struct {
	InstanceID string `json:"instanceID"`
}

Purge 操作的请求只需要传递一个 InstanceID 参数。

Pause 操作

Pause 操作用来暂停一个 workflow:

type Workflow interface {
	Pause(ctx context.Context, req *PauseRequest) error
}

type PauseRequest struct {
	InstanceID string `json:"instanceID"`
}

Pause 操作的请求只需要传递一个 InstanceID 参数。

Resume 操作

Resume 操作用来继续一个 workflow:

type Workflow interface {
	Resume(ctx context.Context, req *ResumeRequest) error
}

type ResumeRequest struct {
	InstanceID string `json:"instanceID"`
}

Resume 操作的请求只需要传递一个 InstanceID 参数。

1.2 - temporal集成

temporal集成的实现

workflow 定义

TemporalWF 结构体包含 temporal 的 client:

type TemporalWF struct {
	client client.Client
	logger logger.Logger
}

temporalMetadata 结构体定义 metadata:

type temporalMetadata struct {
	Identity  string `json:"identity" mapstructure:"identity"`
	HostPort  string `json:"hostport" mapstructure:"hostport"`
	Namespace string `json:"namespace" mapstructure:"namespace"`
}

创建workflow

NewTemporalWorkflow()方法

// NewTemporalWorkflow returns a new workflow.
func NewTemporalWorkflow(logger logger.Logger) workflows.Workflow {
	s := &TemporalWF{
		logger: logger,
	}
	return s
}

Init()方法

func (c *TemporalWF) Init(metadata workflows.Metadata) error {
	c.logger.Debugf("Temporal init start")
	m, err := c.parseMetadata(metadata)
	if err != nil {
		return err
	}
	cOpt := client.Options{}
	if m.HostPort != "" {
		cOpt.HostPort = m.HostPort
	}
	if m.Identity != "" {
		cOpt.Identity = m.Identity
	}
	if m.Namespace != "" {
		cOpt.Namespace = m.Namespace
	}
	// Create the workflow client
	newClient, err := client.Dial(cOpt)
	if err != nil {
		return err
	}
	c.client = newClient

	return nil
}

func (c *TemporalWF) parseMetadata(meta workflows.Metadata) (*temporalMetadata, error) {
	var m temporalMetadata
	err := metadata.DecodeMetadata(meta.Properties, &m)
	return &m, err
}

workflow操作

Start

func (c *TemporalWF) Start(ctx context.Context, req *workflows.StartRequest) (*workflows.StartResponse, error) {
	c.logger.Debugf("starting workflow")

	if len(req.Options) == 0 {
		c.logger.Debugf("no options provided")
		return nil, errors.New("no options provided. At the very least, a task queue is needed")
	}

	if _, ok := req.Options["task_queue"]; !ok {
		c.logger.Debugf("no task queue provided")
		return nil, errors.New("no task queue provided")
	}
	taskQ := req.Options["task_queue"]

	opt := client.StartWorkflowOptions{ID: req.InstanceID, TaskQueue: taskQ}

	var inputArgs interface{}
	if err := decodeInputData(req.WorkflowInput, &inputArgs); err != nil {
		return nil, fmt.Errorf("error decoding workflow input data: %w", err)
	}

	run, err := c.client.ExecuteWorkflow(ctx, opt, req.WorkflowName, inputArgs)
	if err != nil {
		return nil, fmt.Errorf("error executing workflow: %w", err)
	}
	wfStruct := workflows.StartResponse{InstanceID: run.GetID()}
	return &wfStruct, nil
}

代码和 temporal 的牵连还是很重的,WorkflowInput 相当于透传给了 temporal ,dapr 对此没有做任何的抽象和封装,只是简单透传。

Terminate

func (c *TemporalWF) Terminate(ctx context.Context, req *workflows.TerminateRequest) error {
	c.logger.Debugf("terminating workflow")

	err := c.client.TerminateWorkflow(ctx, req.InstanceID, "", "")
	if err != nil {
		return fmt.Errorf("error terminating workflow: %w", err)
	}
	return nil
}