这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

Temporal学习笔记

Temporal学习笔记

1 - 介绍

Temporal的介绍,以及Temporal的资料收集

1.1 - 资料收集

收集Temporal的各种资料

官方网站

社区

文档

文章&演讲

1.2 - Temporal介绍

Temporal介绍

首页介绍

Temporal是什么?

Temporal 网站首页的介绍是:

Code for more reliable systems

编写更可靠的系统代码

Temporal is an open source programming model that can simplify your code, make your applications more reliable, and help you deliver more features faster.

Temporal 是一种开源编程模型,可以简化代码,提高应用程序的可靠性,并帮助您更快地提供更多功能。

口号是:

Fail less - Fail better

更少失败 - 更好失败

提出要对以下内容 Just say no to( 拒绝) :

  • Unresponsive, overloaded services / 反应迟钝、超负荷的服务

  • Process crashes / 进程崩溃

  • Network outages / 网络中断

  • Race conditions / 竞赛条件

  • Duplicate or incomplete transactions / 重复或不完整的事务

  • Time lags or timeouts / 时间滞后或超时

智能代码

Temporal 简化了应用程序逻辑,使代码更易于维护,提高开发人员的工作效率。您可以通过这个汇款应用程序示例亲自体验一下,它将 241 行代码减少到了 98 行。

这是241 行的代码:

package main

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"log"
	"math"
	"time"
)

type Status uint32

const (
	Started Status = iota
	Failed
	Succeeded
	Withdrawing
	Depositing
	Refunding
)

// ErrStorageConflict is returned by the storage API for the CompareAndSwap operation
var ErrStorageConflict = errors.New("storage conflict")

// ErrInsufficientFunds is returned by the bank API, considered a non-retryable business level error
var ErrInsufficientFunds = errors.New("insufficient funds")

// ErrAccountNotFound is returned by the bank API, considered a non-retryable business level error
var ErrAccountNotFound = errors.New("account not found")

type BankingService interface {
	Withdraw(accountNumber string, amount int, referenceID string) (string, error)
	Deposit(accountNumber string, amount int, referenceID string) (string, error)
}

type Persistence interface {
	Load(ctx context.Context, key string) (state interface{}, err error)
	CompareAndSwap(ctx context.Context, key string, state interface{}, expected interface{}) error
}

// Task pulled off a persistent task queue
type Task struct {
	QueueName string
	Payload   []byte
	Attempt   uint
}

type Consumer func(ctx context.Context, task Task)

// Queue represents a persistent task queue
type Queue interface {
	Enqueue(queue string, payload []byte) error
	Consume(queue string, consumer Consumer)
	Ack(task Task)
	RetryLater(task Task, duration time.Duration)
}

type TransactionInput struct {
	ReferenceID     string
	SourceAccountID string
	TargetAccountID string
	Amount          int
	// Used to report errors from activities
	ErrorMessage string
	// Used to verify process consistency
	LastStatus Status
}

type ActivityInput struct {
	Type      string
	AccountID string
	// Used to extract information about the transaction and forward it back to the transaction handler
	Transaction TransactionInput
}

type Worker struct {
	queue       Queue
	persistence Persistence
	bank        BankingService
}

func (w *Worker) ProcessMoneyTransferEvent(ctx context.Context, task Task) error {
	var input TransactionInput
	if err := json.Unmarshal(task.Payload, &input); err != nil {
		log.Printf("Failed to unmarshal payload: %v", err)
		// Enqueue in dead letter queue for human inspection
		return moveToDeadLetterQueue(w.queue, task)
	}
	anyStatus, err := w.persistence.Load(ctx, input.ReferenceID)
	if err != nil {
		return err
	}
	status, ok := anyStatus.(Status)
	if !ok {
		log.Printf("Failed to load status from DB, got: %v", anyStatus)
		return nil // discard this task, we don't know what to do with it
	}
	if status < input.LastStatus {
		// CompareAndSwap for the previous ProcessMoneyTransferEvent iteration has not completed before we got an activity completion.
		// Return an error and backoff retrying this task until our process is in consistent sate.
		return errors.New("got activity completion for uncommited workflow state")
	}
	if status > input.LastStatus {
		log.Printf("Invalid status in task, got: %v, expected: %v", input.LastStatus, status)
		return nil // discard this task, we probably generated duplicate activities due to a crash before committing previous status
	}

	prevStatus := status

	switch status {
	case Started:
		status = Withdrawing
		activityInput := ActivityInput{Type: "withdraw", AccountID: input.SourceAccountID, Transaction: input}
		if err := scheduleActivity(w.queue, activityInput); err != nil {
			return err
		}
	case Withdrawing:
		if input.ErrorMessage != "" {
			// Could not withdraw, abort
			status = Failed
		} else {
			status = Depositing
			activityInput := ActivityInput{Type: "deposit", AccountID: input.TargetAccountID, Transaction: input}
			if err := scheduleActivity(w.queue, activityInput); err != nil {
				return err
			}
		}
	case Depositing:
		if input.ErrorMessage != "" {
			status = Refunding
			// Reset the error message
			input.ErrorMessage = ""
			activityInput := ActivityInput{Type: "refund", AccountID: input.SourceAccountID, Transaction: input}
			if err := scheduleActivity(w.queue, activityInput); err != nil {
				return err
			}
		} else {
			status = Succeeded
		}
	case Refunding:
		if input.ErrorMessage != "" {
			// Critical transaction failure, cannot refund.
			// In a real world example a human operator would probably need to examine this transaction.
			status = Failed
		} else {
			status = Succeeded
		}
	default:
		return nil // discard this task, transaction has already completed. This shouldn't happen
	}

	// Since we already enqueued tasks before storing the state we might generate duplicate tasks in case storage returns an error.
	// This is okay for our case because we use a unique transaction ID as an idempotency token.
	// Temporal prevents this situation by committing workflow state and the request to schedule an activity in a single transaction.
	// This API will fail in case the status was incremented concurrently.
	err = w.persistence.CompareAndSwap(ctx, input.ReferenceID, status, prevStatus)
	if err != nil && !errors.Is(err, ErrStorageConflict) {
		return err
	}
	return nil
}

func (w *Worker) ProcessActivity(ctx context.Context, task Task) error {
	var input ActivityInput
	if err := json.Unmarshal(task.Payload, &input); err != nil {
		log.Printf("Failed to unmarshal payload: %v", err)
		return moveToDeadLetterQueue(w.queue, task)
	}
	tx := input.Transaction

	var operationErr error
	switch input.Type {
	case "deposit":
		_, operationErr = w.bank.Deposit(input.AccountID, tx.Amount, fmt.Sprintf("%s-deposit", tx.ReferenceID))
	case "withdraw":
		_, operationErr = w.bank.Withdraw(input.AccountID, tx.Amount, fmt.Sprintf("%s-withdraw", tx.ReferenceID))
	case "refund":
		_, operationErr = w.bank.Deposit(input.AccountID, tx.Amount, fmt.Sprintf("%s-refund", tx.ReferenceID))
	default:
		operationErr = fmt.Errorf("not implemented")
	}

	if operationErr != nil {
		if errors.Is(operationErr, ErrAccountNotFound) || errors.Is(operationErr, ErrInsufficientFunds) {
			// Business error, report back to transaction task
			tx.ErrorMessage = operationErr.Error()
		} else {
			// Transient error, retry later
			return operationErr
		}
	}
	payload, err := json.Marshal(tx)
	if err != nil {
		log.Printf("Failed to marshal payload: %v", err)
		// Enqueue in dead letter queue for human inspection
		return moveToDeadLetterQueue(w.queue, task)
	}
	if err := w.queue.Enqueue("transactions", payload); err != nil {
		return err
	}
	return nil
}

// Run is the entry point for our program
func Run(queue Queue, persistence Persistence, bank BankingService) {
	worker := Worker{queue, persistence, bank}
	handleErrors := func(consumer func(ctx context.Context, task Task) error) Consumer {
		return func(ctx context.Context, task Task) {
			err := consumer(ctx, task)
			if err != nil {
				log.Printf("Failed to process task: %v", err)
				queue.RetryLater(task, calcBackoff(task))
			} else {
				queue.Ack(task)
			}
		}
	}

	queue.Consume("money-transfer-events", handleErrors(worker.ProcessMoneyTransferEvent))
	queue.Consume("money-transfer-activities", handleErrors(worker.ProcessActivity))
}

func scheduleActivity(queue Queue, input ActivityInput) error {
	payload, err := json.Marshal(input)
	if err != nil {
		return err
	}
	return queue.Enqueue("money-transfer-activities", payload)
}

// calcBackoff calculates exponential backoff without jitter
func calcBackoff(task Task) time.Duration {
	initialInterval := float64(time.Millisecond) * 500
	return time.Duration(initialInterval * math.Pow(2, float64(task.Attempt)))
}

func moveToDeadLetterQueue(queue Queue, task Task) error {
	return queue.Enqueue(fmt.Sprintf("%s-DLQ", task.QueueName), task.Payload)
}

temporal的98 行代码:


package app

import (
	"context"
	"fmt"
	"time"

	"go.temporal.io/sdk/temporal"
	"go.temporal.io/sdk/workflow"
)

type BankingService interface {
	Withdraw(accountNumber string, amount int, referenceID string) (string, error)
	Deposit(accountNumber string, amount int, referenceID string) (string, error)
}

type Activities struct {
	bank BankingService
}

type PaymentDetails struct {
	ReferenceID   string
	SourceAccount string
	TargetAccount string
	Amount        int
}

func MoneyTransfer(ctx workflow.Context, input PaymentDetails) (string, error) {
	// RetryPolicy specifies how to automatically handle retries if an Activity fails.
	retrypolicy := &temporal.RetryPolicy{
		InitialInterval:        time.Second,
		BackoffCoefficient:     2.0,
		MaximumInterval:        100 * time.Second,
		MaximumAttempts:        0, // unlimited retries
		NonRetryableErrorTypes: []string{"ErrInvalidAccount", "ErrInsufficientFunds"},
	}

	options := workflow.ActivityOptions{
		// Timeout options specify when to automatically timeout Activity functions.
		StartToCloseTimeout: time.Minute,
		// Optionally provide a customized RetryPolicy.
		// Temporal retries failed Activities by default.
		RetryPolicy: retrypolicy,
	}

	// Apply the options.
	ctx = workflow.WithActivityOptions(ctx, options)

	// Withdraw money.
	var withdrawOutput string

	withdrawErr := workflow.ExecuteActivity(ctx, "Withdraw", input).Get(ctx, &withdrawOutput)

	if withdrawErr != nil {
		return "", withdrawErr
	}

	// Deposit money.
	var depositOutput string

	depositErr := workflow.ExecuteActivity(ctx, "Deposit", input).Get(ctx, &depositOutput)

	if depositErr != nil {
		// The deposit failed; put money back in original account.
		var result string
		refundErr := workflow.ExecuteActivity(ctx, "Refund", input).Get(ctx, &result)

		if refundErr != nil {
			return "",
				fmt.Errorf("Deposit: failed to deposit money into %v: %v. Money could not be returned to %v: %w",
					input.TargetAccount, depositErr, input.SourceAccount, refundErr)
		}

		return "", fmt.Errorf("Deposit: failed to deposit money into %v: Money returned to %v: %w",
			input.TargetAccount, input.SourceAccount, depositErr)
	}

	result := fmt.Sprintf("Transfer complete (transaction IDs: %s, %s)", withdrawOutput, depositOutput)
	return result, nil
}

func (a *Activities) Withdraw(ctx context.Context, data PaymentDetails) (string, error) {
	referenceID := fmt.Sprintf("%s-withdrawal", data.ReferenceID)
	confirmation, err := a.bank.Withdraw(data.SourceAccount, data.Amount, referenceID)
	return confirmation, err
}

func (a *Activities) Deposit(ctx context.Context, data PaymentDetails) (string, error) {
	referenceID := fmt.Sprintf("%s-deposit", data.ReferenceID)
	confirmation, err := a.bank.Deposit(data.TargetAccount, data.Amount, referenceID)
	return confirmation, err
}

func (a *Activities) Refund(ctx context.Context, data PaymentDetails) (string, error) {
	referenceID := fmt.Sprintf("%s-refund", data.ReferenceID)
	confirmation, err := a.bank.Deposit(data.SourceAccount, data.Amount, referenceID)
}

这个代码对比不太明朗啊,主要是代码太长了,不容易看出来。其实我个人的理解是对故障的处理。

官方文档介绍

参考: https://docs.temporal.io/temporal

探索 Temporal 平台,它是一种用于持久执行的运行时,由 Temporal Cluster 和 Worker Processes 以及多种语言的 SDK 组成。

Temporal 是一种可扩展、可靠的运行时,适用于称为 “时态工作流执行” 的可重入进程。

Temporal 系统:

Temporal 平台

Temporal 平台 由 Temporal 集群和工作进程组成。这些组件共同创建了工作流执行的运行时。

Temporal Cluster 是开放源码的,可以由您来操作。Temporal Cloud 是由我们运营的一组集群。

工作进程由你托管并执行你的代码。它们通过 gRPC 与 Temporal Cluster 通信。

Temporal 应用

Temporal 应用程序是一组 Temporal 工作流执行程序。每个 Temporal 工作流执行体都能独占访问其本地状态,与所有其他工作流执行体同时执行,并通过消息传递与其他工作流执行体和环境通信。

Temporal 应用程序可以由数百万到数十亿个工作流执行程序组成。工作流执行是轻量级组件。工作流执行消耗的计算资源很少;事实上,如果工作流执行处于暂停状态,例如处于等待状态,工作流执行根本不会消耗任何计算资源。

可重入流程

Temporal 工作流执行是一个可重入流程。可重入流程具有可恢复性、可复原性和反应性。

  • 可继续: 进程在等待暂停执行后继续执行的能力。
  • 可恢复: 进程在因故障而暂停执行后继续执行的能力。
  • 反应性:进程对外部事件做出反应的能力。

因此,Temporal 工作流执行会准确地执行一次 Temporal 工作流定义(也称作 Temporal 工作流函数),即您的应用程序代码,并直到执行完毕–无论您的代码执行了几秒钟还是几年,也无论是否存在任意负载和任意故障。

Temporal SDK

Temporal SDK 是一种特定语言的库,它提供的应用程序接口(API)可实现以下功能:

  1. 构建和使用 Temporal 客户端
  2. 开发工作流定义
  3. 开发工作程序

Temporal SDK 使你能够使用编程语言的全部功能来编写应用程序代码,而 Temporal Platform 则负责处理应用程序的耐用性、可靠性和可扩展性。

Temporal 客户端

每个 SDK 中都有一个 Temporal 客户端(Temporal Client),它提供了一套与 Temporal cluster 通信的 API。

通过 Temporal 客户端可以执行的最常见操作如下:

  • 获取工作流执行结果。
  • 列出工作流执行。
  • 查询工作流执行情况。
  • 向工作流执行发出信号。
  • 启动工作流执行

什么是故障

Temporal 故障(Temporal Failures)是系统中发生的各种类型错误的表示(在 SDK 和事件历史中)。

KB 文章: Temporal故障

故障处理是开发的重要组成部分。有关更多信息,包括应用程序级故障和平台级故障之间的区别,请参阅 “从第一原则处理故障”。有关这些概念在 Temporal 中的实际应用,请参阅实践中的故障处理。

对于抛出(或引发)错误(或异常)的语言,从工作流中抛出一个非时态故障的错误会导致工作流任务失败(任务会重试直到成功),而抛出一个时态故障(或让时态故障从时态调用中传播,就像从活动调用中产生活动故障一样)会导致工作流执行失败。更多信息,请参阅应用程序失败。

1.3 - Temporal概念

Temporal 概念

1.3.1 - workflow的概念

Temporal 中 workflow 的概念

https://docs.temporal.io/workflows

在日常对话中,工作流一词通常指工作流类型、工作流定义或工作流执行。Temporal文档旨在明确区分它们。

工作流定义

工作流定义是定义工作流执行约束的代码。

工作流定义(Workflow Definition)通常也被称为工作流函数(Workflow Function)。在 Temporal 的文档中,工作流定义(Workflow Definition)指的是工作流执行实例的源码,而工作流函数(Workflow Function)指的是工作流函数执行实例的源码。

工作流执行一次有效执行完成,而工作流函数执行则在工作流执行的生命周期内多次执行。

我们强烈建议您使用具有相应 Temporal SDK 的语言编写工作流定义。

确定性约束

开发工作流定义的一个重要方面是确保它们表现出一定的确定性特征,即在重新执行相应的工作流函数执行(函数定义的实例)时,确保以相同的顺序发出相同的命令。

工作流执行的执行语义包括工作流函数的重新执行,这被称为 “重播”。在函数中使用工作流 API 会生成命令。命令告诉群集要创建哪些事件并将其添加到工作流执行的事件历史记录中。工作流函数执行时,发出的命令会与现有的事件历史记录进行比较。如果在事件历史记录中已经存在对应的事件,该事件与以相同顺序生成的命令相对应,并且该命令的某些特定元数据与事件的某些特定元数据相匹配,那么函数执行就会继续进行。

例如,使用 SDK 的 “执行活动/Execute Activity” API 会生成 ScheduleActivityTask 命令。在重新执行时调用该 API 时,会将该命令与序列中同一位置的事件进行比较。序列中的事件必须是 ActivityTaskScheduled 事件,其中的活动名称与命令中的名称相同。

如果生成的命令与现有事件历史记录中的命令不匹配,工作流执行将返回一个非确定性错误。

以下两个原因可能会导致命令生成顺序错误或生成错误的命令:

  1. 对正在运行的工作流执行程序所使用的工作流定义进行了代码更改。
  2. 存在固有的非确定逻辑(如内联随机分支)。

代码更改会导致非确定性行为

一旦有工作流执行依赖于工作流定义,工作流定义的更改就会非常有限。为了减轻代码更改带来的非确定性问题,我们建议使用工作流版本控制。

例如,假设我们有一个工作流定义,它定义了以下序列:

  1. 启动并等待计时器/睡眠。
  2. 产生并等待活动执行。
  3. 完成。

我们启动一个 Worker 并生成一个使用该工作流定义的工作流执行。Worker 将发出 StartTimer 命令,工作流执行将暂停。

在计时器结束前,我们将工作流定义更改为以下序列:

  1. 生成并等待活动执行。
  2. 启动并等待计时器/休眠。
  3. 完成。

当计时器启动时,下一个工作流任务将导致工作流函数重新执行。Worker 看到的第一个命令将是 ScheduleActivityTask 命令,这与预期的 TimerStarted 事件不符。

工作流执行将失败并返回非确定性错误。

下面举例说明在重新执行已包含事件的历史记录时,不会导致非确定性错误的微小更改:

  • 更改计时器的持续时间,但以下情况除外:

    • 在 Java、Python 和 Go 中,将计时器的持续时间从或变为 0 是一种非确定性行为。
    • 在 .NET 中,将定时器的持续时间从或改为-1(表示 “无限”)是一种非确定行为。
  • 更改参数:

    • 调用催生活动执行(本地或非本地)时的活动选项。
    • 调用催生子工作流执行时的子工作流选项。
    • 调用外部工作流执行的信号。
    • 为尚未发送给本工作流执行的信号类型添加信号处理器。

内在非确定性逻辑

内在非确定性是指工作流函数执行在重新执行时可能会发出不同的命令序列,而不管所有输入参数是否相同。

例如,工作流定义不能有根据本地时间设置或随机数分支(发出不同的命令序列)的内联逻辑。在下面具有代表性的伪代码中,local_clock() 函数返回的是本地时间,而不是 Temporal 定义的时间:

fn your_workflow() {
  if local_clock().is_before("12pm") {
    await workflow.sleep(duration_until("12pm"))
  } else {
    await your_afternoon_activity()
  }
}

每个 Temporal SDK 都提供应用程序接口(API),使工作流定义的逻辑能够从不可靠的资源中获取和使用时间、随机数和数据。使用这些应用程序接口时,结果将作为事件历史的一部分存储,这意味着重新执行的工作流函数将发出相同的命令序列,即使涉及分支。

换句话说,所有不纯粹改变工作流执行状态的操作都应通过 Temporal SDK API 进行。

工作流版本控制

工作流版本功能可根据开发人员指定的版本标识符,在工作流定义内部创建逻辑分支。该功能适用于需要更新工作流程定义逻辑,但当前正在运行的工作流程执行依赖于该逻辑的情况。需要注意的是,在不使用版本控制应用程序接口的情况下,处理不同版本工作流定义的实用方法是在不同的任务队列上运行不同的版本。

处理不可靠的工作进程

在工作流定义中不处理工作进程故障或重新启动。

工作流函数执行完全无视工作进程的故障或停机。如果工作进程或时态集群本身发生故障,Temporal 平台会确保恢复工作流执行的状态,并恢复进度。工作流执行失败的唯一原因是代码出现错误或异常,而不是底层基础设施中断。

工作流类型

工作流类型是一个映射到工作流定义的名称。

  • 单个工作流类型可实例化为多个工作流执行。
  • 工作流类型的作用域是任务队列。如果使用完全不同的 Worker,可以将相同的工作流类型名称映射到不同的工作流定义。

这里没有太看懂。我自己的理解是:

“单个工作流类型可实例化为多个工作流执行” 这个容易理解,一个类型的工作流可以被反复执行多次,每次都是一个 workflow execution。

“工作流类型的作用域是任务队列。如果使用完全不同的 Worker,可以将相同的工作流类型名称映射到不同的工作流定义。“ —— 这是说如果在不通的工作背景下(也就是不同的worker),同一个名字的 工作流类型 是可以映射到不同的工作流定义。比如说 ”取款“ 这个 工作流类型在 柜台/取款机/VIP室 就可以映射为不同的工作流定义(有不同的实现。)

工作流执行

Temporal 工作流执行是一种持久、可靠和可扩展的函数执行。它是 Temporal 应用程序的主要执行单元。

每个 Temporal 工作流执行都能独占访问其本地状态。它与所有其他工作流执行同时执行,并通过信号与其他工作流执行通信,通过活动与环境通信。单个工作流执行的规模和吞吐量有限,而一个 Temporal 应用程序可以由数百万到数十亿个工作流执行组成。

持久性

持久性是指没有强加的时间限制。

工作流执行之所以是持久的,是因为它能一次有效地执行 Temporal 工作流定义(也称为 Temporal 工作流函数),也就是您的应用程序代码–无论您的代码执行了几秒还是几年。

可靠性

可靠性是指出现故障时的响应能力。

工作流执行之所以可靠,是因为它在发生故障后可以完全恢复。Temporal 平台可确保工作流执行的状态在发生故障和中断时仍然存在,并从最新状态恢复执行。

可扩展性

可扩展性是指在负载情况下的响应能力。

单个工作流执行的规模和吞吐量是有限的,但却具有可扩展性,因为它可以根据负载情况不断更新。Temporal 应用程序之所以具有可扩展性,是因为 Temporal 平台能够支持数百万到数十亿个工作流同时执行,而这是通过 Temporal 集群和工作进程的设计和性质实现的。

重放

重放是工作流执行重新开始的方法。在重放过程中,会根据现有的事件历史记录检查生成的命令。为了使工作流执行具有可恢复性、可靠性和持久性,重放是必要的,也是经常发生的。

更多信息,请参阅确定性约束。

如果发生故障,工作流执行将从事件历史记录中最后一次记录事件的位置重新开始。

命令和可等待项

工作流执行有两个功能:

  1. 发布命令
  2. 等待可等待对象(通常称为 Future)。

命令生成和等待:

通过使用工作流定义中的工作流应用程序接口(Workflow API),可以发出命令并提供可等待项。

每当执行工作流函数时,就会生成命令。工作进程监督命令的生成,并确保其与当前事件历史记录相对应。(更多信息,请参阅 “确定性约束”。)工作进程对命令进行批处理,然后每当工作流函数到达没有 “等待” 结果就无法继续执行的位置时,工作进程就会暂停执行,并将命令发送到群集。

工作流执行只能阻止通过 Temporal SDK API 提供的 “Awaitable/可等待” 的进程。在使用以下 API 时,会提供可等待项:

  • 等待:可以使用显式 “Await” API 阻止进程。
  • 请求取消另一个工作流执行: 在确认其他工作流执行已取消时,进程可以阻塞。
  • 发送信号: 进程可以在确认信号已发送时阻塞。
  • 生成子工作流执行: 当确认子工作流执行已开始以及子工作流执行的结果时,进程就会阻塞。
  • 生成活动执行: 进度会因活动执行的结果而受阻。
  • 启动定时器:在定时器启动之前,进度可能会受阻。

状态

工作流程执行可以是 “打开/Open” 或 “关闭/Closed"。

工作流程执行状态:

打开

打开状态表示工作流执行能够取得进展。

  • 运行中: 工作流执行的唯一打开状态。当工作流执行处于运行状态时,它要么正在积极推进,要么正在等待。

关闭

已关闭 状态表示由于以下原因之一,工作流执行无法取得进一步进展:

  1. Cancelled / 已取消: 工作流执行成功处理了取消请求。
  2. Completed / 已完成: 工作流执行已成功完成。
  3. Continued-As-New: 工作流执行以新的方式继续。
  4. Failed / 失败: 工作流执行返回错误并失败。
  5. Terminated / 已终止: 工作流执行已终止。
  6. Timed Out / 超时: 工作流执行达到超时限制。

工作流执行链

工作流执行链是共享相同工作流标识的工作流执行序列。链中的每个环节通常称为一个工作流运行。序列中的每个工作流运行由以下方式之一连接:

  • Continue-As-New
  • 重试
  • Temporal Cron 作业

工作流执行由其命名空间、工作流 Id 和运行 Id 唯一标识。

工作流执行超时适用于工作流执行链。工作流运行超时适用于单个工作流执行(工作流运行)。

事件循环

工作流执行由一系列称为 “事件历史” 的事件组成。事件由 Temporal cluster 创建,以响应 Temporal 客户端请求的命令或操作(如启动工作流执行的请求)。

工作流执行:

时间限制

工作流的运行时间有限制吗?

没有,工作流执行的运行时间没有限制。

但是,如果工作流执行打算无限期运行,那么在编写时就应该小心谨慎。Temporal 集群会存储工作流执行整个生命周期的完整事件历史记录。Temporal Cluster 会在 10Ki (10,240)个事件后记录警告,并在添加新事件时定期记录更多警告。如果事件历史记录超过 50Ki(51,200)个事件,工作流执行将被终止。

为防止工作流执行失控,可以使用工作流执行超时、工作流运行超时或两者兼用。工作流执行超时可用于限制工作流执行链的持续时间,工作流运行超时可用于限制单个工作流执行(运行)的持续时间。

您可以使用 Continue-As-New 功能关闭当前的工作流执行,并在一次原子操作中创建一个新的工作流执行。由 Continue-As-New 生成的工作流执行具有相同的工作流 ID、新的运行 ID 和新的事件历史记录,并传递所有适当的参数。例如,对于产生大量事件历史记录的长期运行工作流执行,每天使用一次 Continue-As-New 是合理的。

限制

并发工作流执行次数没有限制。

不过,工作流执行的事件历史记录的长度和大小有限制(默认情况下为 51,200 个事件和 50 MB)。

某些类型的未完成操作的数量也有限制。

每个进行中的活动都会在工作流执行的可变状态中生成一个元数据条目。单个工作流执行的可变状态中条目过多会导致持久性不稳定。为了保护系统,Temporal 为每个工作流执行设定了未完成活动、子工作流、信号或取消请求的最大数量限制(默认情况下,每种操作类型的限制数量为 2000 个)。一旦达到了某一操作类型的限制,如果工作流执行试图启动该类型的其他操作(通过生成 ScheduleActivityTask、StartChildWorkflowExecution、SignalExternalWorkflowExecution 或 RequestCancelExternalWorkflowExecution 命令),将无法启动(工作流任务执行将失败并重试)。

这些限制通过以下动态配置键设置:

  • NumPendingActivitiesLimit
  • NumPendingChildExecutionsLimit
  • NumPendingSignalsLimit
  • NumPendingCancelRequestsLimit

命令

命令是工作流任务执行完成后,由 Worker 向 Temporal cluster 发出的请求操作。j

集群所采取的行动将作为事件记录在工作流执行的事件历史中。工作流执行可以等待由某些命令产生的某些事件。

命令是通过在代码中使用工作流 API 生成的。在工作流任务执行期间,可能会生成多个命令。在工作流任务执行到工作流函数所能达到的程度后,这些命令会被分批发送到集群,作为工作流任务执行完成请求的一部分。当有工作流任务执行完成请求时,事件历史中总会出现 WorkflowTaskStarted 和 WorkflowTaskCompleted 事件。

通过在代码中使用工作流 API 生成命令:

命令在命令参考中进行了描述,并在 temporal gRPC API 中进行了定义。

事件

事件由 temporal 集群创建,以响应外部事件和工作流执行所产生的命令。每个事件都与服务器 API 中定义的枚举相对应。

所有事件都记录在事件历史记录中。

所有可能出现在工作流执行事件历史记录中的事件列表都在事件参考中提供。

活动事件

在活动执行的不同阶段,会有七个与活动相关的事件被添加到事件历史记录中:

  • 当工作流任务执行到启动/执行活动的代码行时,工作程序会将活动类型和参数发送到 temporal 集群,然后集群会将 ActivityTaskScheduled 事件添加到事件历史记录中。
  • 当 ActivityTaskScheduled 被添加到历史记录中时,temporal 集群会将相应的活动任务添加到任务队列中。
  • 轮询该任务队列的 Worker 会拾取活动任务并运行活动函数或方法。
  • 如果活动函数返回,则 Worker 会向集群报告完成情况,集群会将 ActivityTaskStarted 和 ActivityTaskCompleted 添加到事件历史记录中。
  • 如果活动函数抛出不可重试的失败,集群会将 ActivityTaskStarted 和 ActivityTaskFailed 添加到事件历史记录中。
  • 如果活动函数抛出错误或可重试失败,集群会调度活动任务重试以添加到任务队列(除非您已达到重试策略的最大尝试值,在这种情况下,群集会将 ActivityTaskStarted 和 ActivityTaskFailed 添加到事件历史记录)。
  • 如果在活动函数返回或抛出之前,活动的 开始到关闭(start-to-close) 超时已过,集群就会安排重试。
  • 如果在活动执行完成前活动的 “调度到关闭” 已经超时,或如果在 Worker 获得活动任务前 “调度到开始” 已经超时,集群将写入 ActivityTaskTimedOut。
  • 如果活动被取消,集群会将 ActivityTaskCancelRequested 写入事件历史记录;如果活动接受取消,集群会将 ActivityTaskCanceled 写入事件历史记录。

注意:

当活动正在运行和重试时,ActivityTaskScheduled 是历史记录中唯一与活动相关的事件: ActivityTaskStarted 会与 ActivityTaskCompleted 或 ActivityTaskFailed 等终端事件一起被写入。

事件历史记录

应用程序的仅附加的事件日志。

  • 事件历史记录由 Temporal 服务持久保存,可从崩溃或故障中无缝恢复应用程序状态。

  • 它还可作为审计日志用于调试。

事件历史限制

Temporal 群集会存储工作流执行整个生命周期的完整事件历史记录。

Temporal Cluster 会在 10Ki (10,240)个事件后记录警告,并在添加新事件时定期记录其他警告。如果事件历史记录超过 50Ki(51,200)个事件,工作流执行将被终止。

Continue-As-New

Continue-As-New 是一种机制,通过这种机制,最新的相关状态会传递给新的工作流执行,并带有新的事件历史记录。

作为一项预防措施,temporal 平台将事件历史记录总数限制为 51200 个事件或 50 MB,并在超过 10240 个事件或 10 MB 后发出警告。为防止工作流执行的事件历史记录超过此限制而失败,可使用 Continue-As-New 以新的事件历史记录启动新的工作流执行。

通过参数传递给工作流执行或通过结果值返回的所有值都会记录到事件历史记录中。在名称空间的保留期内,temporal 集群会存储工作流执行的完整事件历史记录。周期性执行许多活动的工作流执行有可能达到大小限制。

过大的事件历史记录可能会对工作流执行的性能产生不利影响。例如,在工作流工作者发生故障的情况下,必须从 temporal 集群中提取完整的事件历史记录,并通过工作流任务交给另一个工作者。如果事件历史记录非常大,加载可能需要一些时间。

通过 Continue-As-New 功能,开发人员可以完成当前的工作流执行,并以原子方式启动一个新的工作流。

新的工作流执行具有相同的工作流标识(Workflow Id),但运行标识(Run Id)不同,并且有自己的事件历史记录。

就 temporal Cron 作业而言,Continue-As-New 其实在内部也有同样的作用。

重置

重置(reset)会终止工作流执行,删除事件历史记录中截至重置点的进度,然后创建一个具有相同工作流类型和 Id 的新工作流执行来继续执行。

运行标识(run ID)

Run Id 是工作流执行的全局唯一平台级标识符。

当前运行标识(Run Id)是可变的,在工作流重试期间可能会发生变化。由于工作流重试会更改运行标识(Run Id),因此不应依赖于存储当前运行标识(Run Id)或将其用于任何逻辑选择,否则会导致非确定性问题。

Temporal 保证在任何给定时间内,只有一个具有给定工作流 Id 的工作流执行处于打开状态。但当一个工作流执行达到关闭状态时,有可能会有另一个具有相同工作流 Id 的工作流执行处于打开状态。例如,Temporal Cron Job 是一连串工作流执行,它们都具有相同的工作流 ID。链中的每个工作流执行都被视为一个运行(Run)。

运行标识(run ID)唯一标识工作流执行,即使它与其他工作流执行共享一个工作流标识(workflow ID)。

哪些操作会导致非确定性问题?

ContinueAsNew、Retry、Cron 和 Reset 等操作会创建一个由 first_execution_run_id 标识的工作流执行链。

每个操作都会在链运行中创建一个新的工作流执行,并将其信息保存为 first_execution_run_id。因此,每次对工作流执行进行操作时都会更新运行标识(run ID)。

  • first_execution_run_id 是链运行中第一个工作流执行的运行标识。
  • original_execution_run_id 是 WorkflowExecutionStarted 事件发生时的运行标识。

工作流重置(Workflow Reset)会更改第一个执行的运行标识(Run Id),但会保留原始执行的运行标识(Run Id)。例如,当链中的一个新工作流执行启动时,它会将其 Run Id 保存在 original_execution_run_id 中。重置不会更改该字段,但会更新当前的 Run Id。

工作流 ID

工作流标识(Workflow Id)是工作流执行的一个可定制的应用程序级标识符,对于命名空间内的开放的工作流执行来说是唯一的。

  • 如何设置工作流标识

工作流标识是指业务流程标识,如客户标识或订单标识。

工作流标识重用策略可用于管理是否可以重用工作流标识。基于工作流标识重用策略,temporal 平台可保证命名空间内工作流标识的唯一性。

工作流执行可通过其名称空间、工作流标识和运行标识在所有名称空间中唯一标识。

工作流标识重用策略

工作流标识重用策略(Workflow Id Reuse Policy)是指,如果某个工作流标识已被前一个工作流执行(现已关闭)使用,则该工作流执行是否允许以该工作流标识启动。

无论工作流标识重用策略如何,新的工作流执行都不可能使用与另一个开放式工作流执行相同的工作流标识。在某些情况下,如果尝试生成的工作流 Id 与当前打开的工作流 Id 相同,则会导致 “工作流执行已启动” 错误。

工作流 ID 重用策略可以有以下其中一个值:

  • 允许重复: 无论之前具有相同工作流标识的工作流执行是否处于关闭状态,都允许工作流执行存在。如果没有指定,这是默认策略。当工作流执行与之前已关闭的工作流执行具有相同的工作流 ID 时,可以使用此策略。
  • 只允许重复失败: 只有在具有相同工作流标识的前一个工作流执行没有已完成状态时,才允许工作流执行存在。当需要重新执行失败、超时、终止或取消的工作流执行并保证已完成的工作流执行不会被重新执行时,请使用此策略。
  • 拒绝重复: 如果之前的工作流执行具有相同的工作流 Id,则无论关闭状态如何,工作流执行都不能存在。在给定的保留期内,一个命名空间内每个工作流 Id 只能有一个工作流执行时,请使用此功能。

运行时终止: 指定如果具有相同工作流标识的工作流执行已在运行,则应终止该工作流执行,并启动具有相同工作流标识的新工作流执行。该策略允许在任何给定时间内只有一个具有特定工作流 ID 的工作流执行在运行。 工作流标识重复使用策略只有在具有相同工作流标识的已关闭工作流执行存在于相关 N

1.3.2 - activity的概念

Temporal 中 activity 的概念

2 - Saga 模式

Temporal 对 SAGA 模式的支持

2.1 - Saga 模式相关的博客

Temporal 对 SAGA 模式支持的相关博客文章

2.1.1 - 轻松实现Saga模式

轻松实现Saga模式

https://temporal.io/blog/saga-pattern-made-easy

使用 saga 进行旅行规划,但不带行李

上次你家人去当地公园玩时,每个人都在谈论 saga 设计模式,现在你想知道它是什么,你是否应该将它作为自己分布式系统设计的一部分,以及如何实现它。众所周知,软件设计是一种时尚趋势 。

Saga的理由

如果你想知道 saga 模式是否适合你的应用场景,请扪心自问:你的逻辑是否涉及多个步骤,其中一些步骤跨越机器、服务、分片或数据库,而部分执行是不可取的?事实证明,这正是 saga 的用武之地。也许您正在检查库存,向用户的信用卡收费,然后完成订单。也许你正在管理供应链。Saga 模式之所以有用,是因为它基本上就像一台状态机,可以存储程序进度、防止信用卡多次刷卡、在必要时进行还原,并清楚地知道在断电的情况下如何安全地恢复到一致的状态。

用来解释 saga 模式如何弥补失败的常见生活实例是旅行计划。假设你很想去西雅图的杜瓦米什地区淋雨。你需要购买机票、预订酒店,并获得一张雷尼尔山背包旅行的门票。这三件事都是一环扣一环的:如果您买不到机票,就没有理由去买其他的票。如果你买到了机票却没地方住,你就会想取消机票预订(或重新预订酒店或找其他地方住)。最后,如果你订不到背包旅行的门票,那就真的没有其他理由来西雅图了,所以还不如取消整个旅行。(开玩笑)

上图:一个面对旅行计划失败进行补偿的简单模型

现实世界中有很多 “要么全做,要么不做” 的软件应用:如果你成功地向用户收取了商品费用,但你的履行服务却报告商品缺货,如果你不退还费用,用户就会不高兴。如果你遇到了相反的问题,不小心 “免费” 交付了商品,你就会被淘汰出局。如果协调机器学习数据处理流水线的机器崩溃了,但后续机器仍在继续处理数据,而它们的数据却无处上报,那么你可能会面临一笔非常昂贵的计算资源账单。 在所有这些情况下,Saga 模式都提供了某种 “进度跟踪” 和补偿代码,以处理这些 “要么全做,要么全不做” 的任务。在 Saga 术语中,这类 “要么全做,要么全不做” 的任务被称为长期事务。这并不一定意味着这类事务要运行 “很长时间”,只是与本地运行的、与单个数据库交互的事务相比,它们在逻辑时间上需要更多的步骤。

如何创作 saga?

Saga 由两部分组成:

  1. 为 “倒退” 而定义的行为,如果你需要 “撤销” 某些事情(即补偿)
  2. 努力向前迈进的行为(即保存状态,以便在失败时知道从哪里恢复)

本博客的忠实读者一定还记得我最近写过一篇关于补偿行为的文章。从上文可以看出,补偿只是 saga 设计模式的一半。上文提到的另一半本质上是整个系统的状态管理。补偿动作模式可以帮助你了解在单个步骤(或者用 temporal 术语来说,一个活动/activity)失败时如何恢复。但如果整个系统瘫痪了呢?从哪里开始恢复?因为并不是每个步骤都附带有补偿,所以你只能根据存储的补偿进行最佳猜测。Saga 模式会记录你当前所处的位置,这样你就可以继续向前迈进。

那么,如何在自己的代码中实现saga呢?

很高兴你这么问。

向前倾

在耳边低语

这是个有点小技巧的问题,因为使用 Temporal 运行代码时,你会自动保存状态,并在任何级别失败时重试。这就意味着,使用 Temporal 的 saga 模式非常简单,只需在某个步骤(活动)失败时编写你希望采取的补偿措施即可。搞定。

这种魔力背后的_原因_是 Temporal 在设计上自动跟踪程序的进度,并能在发生灾难性故障时从中断的地方重新开始。此外,Temporal 还会在活动失败时重试,除了指定重试策略外,你不需要添加任何代码,例如:

RetryOptions retryoptions = RetryOptions.newBuilder()
       .setInitialInterval(Duration.ofSeconds(1))
       .setMaximumInterval(Duration.ofSeconds(100))
       .setBackoffCoefficient(2)
       .setMaximumAttempts(500).build();

要进一步了解这种自动机制是如何工作的,请继续关注我即将发表的关于编排和协调(实现 saga 的两种常用方法)的文章。

因此,要表达我的程序的高层逻辑,包括预订假期的步骤以及我希望在失败时采取的补偿措施,它看起来就像下面的伪代码:

try:
   registerCompensationInCaseOfFailure(cancelHotel)
   bookHotel
   registerCompensationInCaseOfFailure(cancelFlight)
   bookFlight
   registerCompensationInCaseOfFailure(cancelExcursion)
   bookExcursion
catch:
   run all compensation activities

在 Java 中,Saga 类会为您记录补偿信息:

@Override
public void bookVacation(BookingInfo info) {
   Saga saga = new Saga(new Saga.Options.Builder().build());
   try {
       saga.addCompensation(activities::cancelHotel, info.getClientId());
       activities.bookHotel(info);

       saga.addCompensation(activities::cancelFlight, info.getClientId());
       activities.bookFlight(info);

       saga.addCompensation(activities::cancelExcursion, 
                            info.getClientId());
       activities.bookExcursion(info);
   } catch (TemporalFailure e) {
       saga.compensate();
       throw e;
   }
}

在其他语言的 SDK 中,您可以自己轻松编写 addCompensation 和 compensate 函数。下面是 Go 语言的一个版本:

func (s *Compensations) AddCompensation(activity any, parameters ...any) {
	s.compensations = append(s.compensations, activity)
	s.arguments = append(s.arguments, parameters)
}

func (s Compensations) Compensate(ctx workflow.Context, inParallel bool) {
	if !inParallel {
		// Compensate in Last-In-First-Out order, to undo in the reverse order that activies were applied.
		for i := len(s.compensations) - 1; i >= 0; i-- {
			errCompensation := workflow.ExecuteActivity(ctx, s.compensations[i], s.arguments[i]...).Get(ctx, nil)
			if errCompensation != nil {
				workflow.GetLogger(ctx).Error("Executing compensation failed", "Error", errCompensation)
			}
		}
	} else {
		selector := workflow.NewSelector(ctx)
		for i := 0; i < len(s.compensations); i++ {
			execution := workflow.ExecuteActivity(ctx, s.compensations[i], s.arguments[i]...)
			selector.AddFuture(execution, func(f workflow.Future) {
				if errCompensation := f.Get(ctx, nil); errCompensation != nil {
					workflow.GetLogger(ctx).Error("Executing compensation failed", "Error", errCompensation)
				}
			})
		}
		for range s.compensations {
			selector.Select(ctx)
		}
	}
}

步骤和补偿的高级 Go 代码与 Java 版本非常相似:

func TripPlanningWorkflow(ctx workflow.Context, info BookingInfo) (err error) {
   options := workflow.ActivityOptions{
       StartToCloseTimeout: time.Second * 5,
       RetryPolicy:         &temporal.RetryPolicy{MaximumAttempts: 2},
   }

   ctx = workflow.WithActivityOptions(ctx, options)

   var compensations Compensations

   defer func() {
       if err != nil {
           // activity failed, and workflow context is canceled
           disconnectedCtx, _ := workflow.NewDisconnectedContext(ctx)
           compensations.Compensate(disconnectedCtx, true)
       }
   }()

   compensations.AddCompensation(CancelHotel)
   err = workflow.ExecuteActivity(ctx, BookHotel, info).Get(ctx, nil)
   if err != nil {
       return err
   }

   compensations.AddCompensation(CancelFlight)
   err = workflow.ExecuteActivity(ctx, BookFlight, info).Get(ctx, nil)
   if err != nil {
       return err
   }

   compensations.AddCompensation(CancelExcursion)
   err = workflow.ExecuteActivity(ctx, BookExcursion, info).Get(ctx, nil)
   if err != nil {
       return err
   }

   return err
}

上述高级代码序列被称为 temporal 工作流。而且,如前所述,通过使用 Temporal 运行,我们不必担心通过事件源或添加重试和重启逻辑来实现任何跟踪进度的 bookkeeping 功能,因为这些都是免费提供的。因此,在编写使用 Temporal 运行的代码时,你只需担心如何编写补偿,其余的都是免费提供的。

幂等

好吧,还有第二件事需要 “担心”。你可能还记得,saga 由两部分组成,第一部分是我们之前编码的补偿。第二部分是 “努力向前推进”,包括面对失败时可能重新尝试某项活动。让我们深入了解其中一个步骤,好吗?Temporal 承担了重试和跟踪整体进度的所有重任,但由于代码可以重试,程序员需要确保每个 Temporal 活动都是幂等的。这意味着无论调用一次还是多次,观察到的 bookFlight 结果都是一样的。说得具体一点,设置某个字段 foo=3 的函数是幂等的,因为无论调用多少次,之后 foo 都是 3。函数 foo += 3 不是幂等的,因为 foo 的值取决于函数被调用的次数。非幂等性有时看起来更微妙:如果数据库允许重复记录,那么调用 INSERT INTO foo (bar) VALUES (3) 的函数会轻率地在表中创建与调用次数相同的记录,因此不是幂等的。发送电子邮件或转账的函数的天然实现也不是默认幂等。

如果你正在慢慢退缩,因为你的真实世界应用程序要做的事情比 set foo=3 复杂得多,那么请振作起来。有一个解决方案。您可以使用一个独特的标识符(称为idempotency key,有时也称为 referenceId 或类似标识符)来唯一标识一个特定的事务,并确保酒店预订事务只有效发生一次。idempotency key 的定义方式可根据应用程序的需要而定。在旅行计划应用程序中,BookingInfo 中的一个字段 clientId 用于唯一标识事务。

type BookingInfo struct {
   Name     string
   ClientId string
   Address  string
   CcInfo   CreditCardInfo
   Start    date.Date
   End      date.Date
}

在上述 Java 工作流代码中,您可能还看到了用于注册补偿的 clientId:

saga.addCompensation(activities::cancelHotel, info.getClientId());

不过,使用 clientId 作为键可以限制某个人同时预订多个假期。这可能正是我们想要的。但是,一些业务应用程序可能会选择通过结合客户 ID 和工作流 ID 来建立一个唯一密钥,以允许每个客户同时预订多个假期。如果您想要一个真正唯一的幂等 key,您可以向工作流传递一个 UUID。您可以根据自己应用程序的需要做出选择。

许多处理资金的第三方应用程序接口已经接受了用于此目的的幂等键。如果需要自己实现这样的功能,可以使用原子写入来记录迄今为止已查看过的幂等键,如果某个操作的幂等键在 “已查看过” 集合中,就不要执行该操作。

优势与复杂性

saga 模式确实会增加代码的复杂性,因此不要因为有了微服务就在代码中实现它,这一点很重要。但是,如果您需要完成一项涉及多个服务的任务(如预订机票和酒店),而部分执行实际上并不成功,那么 saga 将是您的朋友。此外,如果你发现自己的 saga 变得特别笨重,可能就需要重新考虑如何划分微服务,并卷起袖子进行重构了。总的来说,Temporal 可以让你在代码中实现 saga 模式变得相对简单,因为你只需编写每一步所需的补偿即可。敬请期待我的下一篇文章,在这篇文章中,我将深入探讨 saga 和订阅场景,Temporal 在降低 saga 工作的复杂性方面尤为突出。

使用本文所述代码的完整版本库可在 GitHub 上找到:

如果您想了解其他使用 Temporal 的 saga 教程,请查看以下资源:

此外,我的一位同事 Dominik Tornow 还在 YouTube 上介绍了 saga。

在我们的课程、教程、文档和视频中了解更多有关 Temporal 的信息。

注释

  1. 显然,不要因为它是新的热点就重新设计你的系统。除非它是一个新的 JavaScript 框架。那就赶紧用 npm 安装那个新软件包吧。😜

  2. 别担心,saga 并不是一种趋势,自上世纪 80 年代以来,saga 就一直存在于数据库中。您可以放心,因为您的项目设计经典优雅。

  3. 这并不是说作者对这种情况完全没有经验。

  4. 逻辑时间是分布式计算中的一个概念,用于描述分布式计算中不同机器上发生事件的时间,因为机器可能没有物理同步的全局时钟。逻辑时间只是这些机器上发生的事件的因果排序。就长期运行的事务而言,它基本上可以归结为在不同机器上发生的许多 “步骤”。

2.2 - Saga 模式相关的教程

Temporal 对 SAGA 模式支持的相关教程

2.2.1 - PHP booking saga教程

Temporal 的 PHP booking saga

https://learn.temporal.io/tutorials/php/booking_saga/

用 Saga 模式和 Temporal 创建 PHP 旅行预订系统

简介

想象一下,我们提供的服务可以让人们预订旅行。预订定期旅行通常包括几个步骤:

  • 预订汽车。
  • 预订酒店。
  • 预订航班。

客户要么什么都想订,要么什么都不想订。不预订飞机就预订酒店是毫无意义的。另外,想象一下,在这个交易中,每个预订步骤都是通过专门的服务或微服务来实现的。

所有这些步骤共同构成了一个跨越多个服务和数据库的分布式事务。为确保预订成功,所有三个微服务都必须完成各个本地事务。如果其中任何一个步骤失败,前面完成的所有事务都应相应撤销。我们不能简单地 “删除” 之前的事务或 “回到过去”–尤其是在涉及到资金和预订的情况下,拥有不可更改的尝试和失败记录非常重要。因此,我们应该积累一份失败后的补偿行动清单。

回顾 Saga 架构模式

要做好分布式事务管理是非常困难的。对于长期运行的工作来说,Saga 是最久经考验的设计模式之一:

  • Saga 使用一系列本地事务来提供事务管理。
  • 本地事务是 saga 参与者(微服务)执行工作的单位。
  • 属于 Saga 的每个操作都可以通过补偿事务回滚。
  • Saga 模式保证所有操作都能成功完成,或者运行相应的补偿事务来撤销先前完成的工作。

实现 Saga 模式可能很复杂,但幸运的是,Temporal 提供了对 Saga 模式的原生支持。这意味着处理所有回滚和运行补偿事务都由 Temporal 内部完成。

上图显示了如何在前面讨论过的在线旅行预订场景中将 Saga 模式可视化。

工作流的实现

我们需要做的第一件事就是编写一个业务流程–旅行预订的高级流程。我们称之为 TripBookingWorkflow:

class TripBookingWorkflow implements TripBookingWorkflowInterface
{
    /** @var \Temporal\Internal\Workflow\ActivityProxy|TripBookingActivitiesInterface */
    private $activities;

    public function __construct()
    {
        $this->activities = Workflow::newActivityStub(
            TripBookingActivitiesInterface::class,
            ActivityOptions::new()->withStartToCloseTimeout(CarbonInterval::hour(1))
        );
    }

    public function bookTrip(string $name)
    {

    }
}

为简单起见,我们假设所有预订服务(汽车、酒店和航班)都由一个活动 TripBookingActivitiesInterface 管理。但这不是必要条件。好了,现在我们需要告诉 Temporal 我们要使用 Saga。