temporal集成

temporal集成的实现

workflow 定义

TemporalWF 结构体包含 temporal 的 client:

type TemporalWF struct {
	client client.Client
	logger logger.Logger
}

temporalMetadata 结构体定义 metadata:

type temporalMetadata struct {
	Identity  string `json:"identity" mapstructure:"identity"`
	HostPort  string `json:"hostport" mapstructure:"hostport"`
	Namespace string `json:"namespace" mapstructure:"namespace"`
}

创建workflow

NewTemporalWorkflow()方法

// NewTemporalWorkflow returns a new workflow.
func NewTemporalWorkflow(logger logger.Logger) workflows.Workflow {
	s := &TemporalWF{
		logger: logger,
	}
	return s
}

Init()方法

func (c *TemporalWF) Init(metadata workflows.Metadata) error {
	c.logger.Debugf("Temporal init start")
	m, err := c.parseMetadata(metadata)
	if err != nil {
		return err
	}
	cOpt := client.Options{}
	if m.HostPort != "" {
		cOpt.HostPort = m.HostPort
	}
	if m.Identity != "" {
		cOpt.Identity = m.Identity
	}
	if m.Namespace != "" {
		cOpt.Namespace = m.Namespace
	}
	// Create the workflow client
	newClient, err := client.Dial(cOpt)
	if err != nil {
		return err
	}
	c.client = newClient

	return nil
}

func (c *TemporalWF) parseMetadata(meta workflows.Metadata) (*temporalMetadata, error) {
	var m temporalMetadata
	err := metadata.DecodeMetadata(meta.Properties, &m)
	return &m, err
}

workflow操作

Start

func (c *TemporalWF) Start(ctx context.Context, req *workflows.StartRequest) (*workflows.StartResponse, error) {
	c.logger.Debugf("starting workflow")

	if len(req.Options) == 0 {
		c.logger.Debugf("no options provided")
		return nil, errors.New("no options provided. At the very least, a task queue is needed")
	}

	if _, ok := req.Options["task_queue"]; !ok {
		c.logger.Debugf("no task queue provided")
		return nil, errors.New("no task queue provided")
	}
	taskQ := req.Options["task_queue"]

	opt := client.StartWorkflowOptions{ID: req.InstanceID, TaskQueue: taskQ}

	var inputArgs interface{}
	if err := decodeInputData(req.WorkflowInput, &inputArgs); err != nil {
		return nil, fmt.Errorf("error decoding workflow input data: %w", err)
	}

	run, err := c.client.ExecuteWorkflow(ctx, opt, req.WorkflowName, inputArgs)
	if err != nil {
		return nil, fmt.Errorf("error executing workflow: %w", err)
	}
	wfStruct := workflows.StartResponse{InstanceID: run.GetID()}
	return &wfStruct, nil
}

代码和 temporal 的牵连还是很重的,WorkflowInput 相当于透传给了 temporal ,dapr 对此没有做任何的抽象和封装,只是简单透传。

Terminate

func (c *TemporalWF) Terminate(ctx context.Context, req *workflows.TerminateRequest) error {
	c.logger.Debugf("terminating workflow")

	err := c.client.TerminateWorkflow(ctx, req.InstanceID, "", "")
	if err != nil {
		return fmt.Errorf("error terminating workflow: %w", err)
	}
	return nil
}