这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

DurableTask .Net

DurableTask .Net

1 - Durable Task Framework介绍

Durable Task Framework介绍和资料收集

1.1 - DTFx概述

DTFx概述

DTFx github首页的介绍

https://github.com/Azure/durabletask

持久任务框架(DTFx)是一个类库,允许用户使用简单的 async/await 编码结构在 C# 中编写长期运行的持久工作流(称为 orchestrations)。微软的各个团队都大量使用它来可靠地协调长期运行的配置、监控和管理操作。只需添加更多的工作机器,就可以线性地扩展协调。该框架还用于支持Azure Functions的无服务器Durable Functions扩展。

通过开源这个项目,我们希望为社区提供一个极具成本效益的工作流系统替代方案。我们还希望围绕这个简单但功能强大的框架,建立一个由供应商和活动组成的生态系统。

2 - 搭建开发环境

搭建用于 Durable Task Framework的 开发环境

2.1 - 搭建 windows 开发环境

搭建用于 Durable Task Framework的 windows 开发环境

2.1.1 - 安装git

在 windows 上安装 git

安装 git

下载 git 的 windows 安装包,一路 next 安装即可。

为了避免空格,我将安装路径从默认的 C:\Program Files\Git 修改为 C:\soft\git

安装完成之后,除了 git 外,还顺便得到了一个 bash 终端,它至少比 windows 自带的终端要好用。

后面会继续安装 zsh 来替代 bash。

配置自动登录

将本地的 id_rsa.pub 公钥传到 windows 机器

scp id_rsa.pub sky@192.168.0.103:~/.ssh/

然后加入到 authorized_keys ,注意由于登录的账号是 administrator 账号,所以 authorized_keys 的文件路径是 “C:\ProgramData\ssh\administrators_authorized_keys” 而不是 “~/.ssh\administrators_authorized_keys”

用管理员身份运行 bash,然后执行命令:

 touch "C:\ProgramData\ssh\administrators_authorized_keys"
 
 cat ~/.ssh/id_rsa.pub >> "C:\ProgramData\ssh\administrators_authorized_keys"

用管理员身份运行 powershell,修改文件属性:

icacls.exe "C:\ProgramData\ssh\administrators_authorized_keys" /inheritance:r /grant "Administrators:F" /grant "SYSTEM:F"

之后就可以在 openssh client 直接 ssh ,不用输入密码。

ssh sky@192.168.0.103

参考:

2.1.2 - 安装 OpenSSH server

在 windows 上安装 OpenSSH server

参考:

使用 powershell 安装

注意:必须以管理员身份运行 powershell。

检查是否安装

检查有没有安装 OpenSSH :

Get-WindowsCapability -Online | Where-Object Name -like 'OpenSSH*'

如果没有安装则输出为:

Name  : OpenSSH.Client~~~~0.0.1.0
State : NotPresent

Name  : OpenSSH.Server~~~~0.0.1.0
State : NotPresent

如果已经安装则输出为:

Name  : OpenSSH.Client~~~~0.0.1.0
State : Installed

Name  : OpenSSH.Server~~~~0.0.1.0
State : Installed

安装 OpenSSH Client 和 Server

执行安装命令:

# Install the OpenSSH Client
Add-WindowsCapability -Online -Name OpenSSH.Client~~~~0.0.1.0

# Install the OpenSSH Server
Add-WindowsCapability -Online -Name OpenSSH.Server~~~~0.0.1.0

启动 OpenSSH Server

启动 OpenSSH Server 并验证防火墙设置:

# Start the sshd service
Start-Service sshd

# Confirm the Firewall rule is configured. It should be created automatically by setup. Run the following to verify
if (!(Get-NetFirewallRule -Name "OpenSSH-Server-In-TCP" -ErrorAction SilentlyContinue | Select-Object Name, Enabled)) {
    Write-Output "Firewall Rule 'OpenSSH-Server-In-TCP' does not exist, creating it..."
    New-NetFirewallRule -Name 'OpenSSH-Server-In-TCP' -DisplayName 'OpenSSH Server (sshd)' -Enabled True -Direction Inbound -Protocol TCP -Action Allow -LocalPort 22
} else {
    Write-Output "Firewall rule 'OpenSSH-Server-In-TCP' has been created and exists."
}

设置开机自动启动

# OPTIONAL but recommended:
Set-Service -Name sshd -StartupType 'Automatic'

配置 openssh server

配置默认 shell

请在安装好 git (自带bash) 之后再进行这个配置,假定 bash 的安装路径是:C:\soft\git\usr\bin\bash.exe

用 powershell 执行命令:

New-ItemProperty -Path "HKLM:\SOFTWARE\OpenSSH" -Name DefaultShell -Value "C:\soft\git\usr\bin\bash.exe" -PropertyType String -Force

如果想把默认shell修改为 zsh,最好是先安装上面的方法将默认 shell 设置为 git 带的 bash,然后再让bash启动zsh,这样就能够利用到 git 带的 bash 里面的一些基本的 linux 命令。如果直接将默认shell改为 zsh,则 ssh 登录之后会报错如下:

/c/Users/sky/.oh-my-zsh/oh-my-zsh.sh:65: command not found: mkdir
/c/Users/sky/.oh-my-zsh/oh-my-zsh.sh:124: command not found: rm

警告:

这个修改完成后,在终端 ssh 上去可以正常工作。但是用 vs code remote ssh 连接上去,就会报错:

[19:30:31.224] stderr> /usr/bin/bash: line 3: $'\202': command not found
[19:30:31.230] stderr> /usr/bin/bash: line 4: name: command not found
[19:30:31.236] stderr> /usr/bin/bash: line 6: $'\202': command not found
[19:30:31.242] stderr> /usr/bin/bash: line 8: $'\202': command not found
[19:30:31.248] stderr> /usr/bin/bash: line 10: $'\202': command not found
[19:30:31.253] stderr> /usr/bin/bash: line 12: $'\202': command not found
......
[19:31:05.042] stderr> /usr/bin/bash: line 5432: $'\202': command not found
[19:31:05.048] stderr> /usr/bin/bash: line 5434: $'\202': command not found
[19:31:05.054] stderr> /usr/bin/bash: line 5436: $'\202': command not found
[19:31:05.057] Terminating local server
[19:31:05.060] Exec server for ssh-remote+192.168.0.103 failed: Error: Connecting with SSH timed out

暂时先回滚这个配置:

Remove-ItemProperty -Path "HKLM:\SOFTWARE\OpenSSH" -Name DefaultShell -Force

2.1.3 - 安装zsh和ohmyzsh

在 windows 上安装 zsh和ohmyzsh

参考:

安装 zsh

假定前面已经安装好了 git 并自带 bash。

下载zsh

Zsh下载地址:

https://packages.msys2.org/package/zsh?repo=msys&variant=x86_64

下载 .tar.zst 文件:

https://mirror.msys2.org/msys/x86_64/zsh-5.9-2-x86_64.pkg.tar.zst

这个文件可以用 winrar 解压缩,得到 zsh-5.9-2-x86_64.pkg 目录,里面有两个子目录:etc 和 usr 。

安装zsh

复制 etc 和 usr 目录,粘贴到 git 的安装目录如 C:\Program Files\Git\,git 安装目录下同样有 etc 和 usr 目录,文件会自动合并进去。

运行zsh

运行时,要先启动 git 自带的 bash 终端,然后执行 zsh 命令,也可以查看 zsh 版本:

zsh --version

为了方便使用,尤其是用 zsh 替代 bash,可以修改 bash 的配置文件 ~/.bashrc (如果没有就创建它) ,加入内容:

/c/Windows/System32/chcp.com 65001 > /dev/null 2>&1

if [ -t 1 ]; then
  exec zsh
fi

这样就可以自动 bash 时自动启动 zsh。

第一次执行时会询问文件创建的问题,选择

Quit and do nothing.  The function will be run again next time.

安装 Oh my zsh!

安装

在 zsh 终端执行:

sh -c "$(curl -fsSL https://raw.githubusercontent.com/ohmyzsh/ohmyzsh/master/tools/install.sh)"

准备字体

在这里下载并安装几个字体

https://github.com/romkatv/powerlevel10k#meslo-nerd-font-patched-for-powerlevel10k

安装 Powerlevel10k 主题

下载:

git clone --depth=1 https://github.com/romkatv/powerlevel10k.git ${ZSH_CUSTOM:-$HOME/.oh-my-zsh/custom}/themes/powerlevel10k

修改

vi ~/.zshrc

增加内容:

ZSH_THEME="powerlevel10k/powerlevel10k"
POWERLEVEL9K_RIGHT_PROMPT_ELEMENTS=(history)
POWERLEVEL9K_SHORTEN_DIR_LENGTH=1

# User configuration

export LS_COLORS="rs=0:no=00:mi=00:mh=00:ln=01;36:or=01;31:di=01;34:ow=04;01;34:st=34:tw=04;34:pi=01;33:so=01;33:do=01;33:bd=01;33:cd=01;33:su=01;35:sg=01;35:ca=01;35:ex=01;32:"

配置插件

下载以下插件:

git clone https://github.com/zsh-users/zsh-autosuggestions.git ${ZSH_CUSTOM:-~/.oh-my-zsh/custom}/plugins/zsh-autosuggestions
git clone https://github.com/zsh-users/zsh-syntax-highlighting.git ${ZSH_CUSTOM:-~/.oh-my-zsh/custom}/plugins/zsh-syntax-highlighting
git clone https://github.com/Pilaton/OhMyZsh-full-autoupdate.git ${ZSH_CUSTOM:-~/.oh-my-zsh/custom}/plugins/ohmyzsh-full-autoupdate

修改 zsh 配置

vi ~/.zshrc

修改 plugins 为

plugins=(
    adb
    command-not-found
    extract
    deno
    docker
    git
    github
    gitignore
    history-substring-search
    node
    npm
    nvm
    yarn
    volta
    vscode
    sudo
    web-search
    z
    zsh-autosuggestions
    zsh-syntax-highlighting
    ohmyzsh-full-autoupdate
)

# User configuration

ZSH_HIGHLIGHT_HIGHLIGHTERS=(main brackets pattern cursor root line)
ZSH_HIGHLIGHT_PATTERNS=('rm -rf *' 'fg=white,bold,bg=red')

重启 zsh。

Updating plugins and themes Oh My ZSH
--------------------------------------

Updating Plugin — ohmyzsh-full-autoupdate -> https://github.com/Pilaton/OhMyZsh-full-autoupdate
Already up to date.

Updating Plugin — zsh-autosuggestions -> https://github.com/zsh-users/zsh-autosuggestions
Already up to date.

Updating Plugin — zsh-syntax-highlighting -> https://github.com/zsh-users/zsh-syntax-highlighting
Already up to date.

Updating Theme — powerlevel10k -> https://github.com/romkatv/powerlevel10k
Already up to date.

2.1.4 - 配置 VS Code 的 remote ssh

在 windows 上配置 VS Code 的 remote ssh

前面的准备工作中,已经在 windows 上安装了 openssh-server,因此现在可以利用 vs code 的 remote ssh 功能来实现在 windows 上开发代码,但是 vs code 界面可以运行在其他地方,如我的 m1 笔记本。

连接 windows 主机

在 vs code 中,通过 “connect to host” 连接到前面准备好的 windows。

第一次会自动下载并准备 vs code 后端。

打开项目

打开项目时,报错:

C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.SqlServer.Tests\DurableTask.SqlServer.Tests.csproj : error NU1903: Warning As Error: Package 'System.Data.SqlClient' 4.8.5 has a known high severity vulnerability, https://github.com/advisories/GHSA-98g6-xh36-x2p7 [C:\Users\sky\work\code\durabletask\durabletask\DurableTask.sln]
  Failed to restore C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.SqlServer.Tests\DurableTask.SqlServer.Tests.csproj (in 1.2 sec).
  23 of 24 projects are up-to-date for restore.

2.1.5 - 安装.NET SDK

在 windows 上安装.NET SDK

.NET 6.0 SDK

https://dotnet.microsoft.com/en-us/download/dotnet/6.0

下载到 dotnet-sdk-6.0.420-win-x64.exe ,执行安装

下载安装 .NET 6.0 SDK

下列项安装于: ""
    • .NET SDK 6.0.420
    • .NET Runtime 6.0.28
    • ASP.NET Core Runtime 6.0.28
    • .NET Windows Desktop Runtime 6.0.28

.NET 8.0 SDK

https://dotnet.microsoft.com/zh-cn/download/dotnet/sdk-for-vs-code?utm_source=vs-code&utm_medium=referral&utm_campaign=sdk-install'

.NET 8.0 SDK

下载到 dotnet-sdk-8.0.203-win-x64.exe ,执行安装,但是不知道安装到哪里去了。。。

下列产品已安装: 
    • .NET SDK 8.0.203
    • .NET Runtime 8.0.3
    • ASP.NET Core Runtime 8.0.3
    • .NET Windows Desktop Runtime 8.0.3

查看一下:

$ dotnet --version
8.0.203

$ which dotnet
/c/Program Files/dotnet/dotnet

$ echo $PATH
/c/Users/sky/bin:/mingw64/bin:/usr/local/bin:/usr/bin:/bin:/mingw64/bin:/usr/bin:/c/Users/sky/bin:/c/Windows/system32:/c/Windows:/c/Windows/System32/Wbem:/c/Windows/System32/WindowsPowerShell/v1.0:/c/Windows/System32/OpenSSH:/cmd:/c/Program Files/dotnet:/c/Users/sky/AppData/Local/Microsoft/WindowsApps:/bin:/c/Users/sky/.dotnet/tools:/usr/bin/vendor_perl:/usr/bin/core_perl

PATH 中增加了两个路径:

  • /c/Program Files/dotnet
  • /c/Users/sky/.dotnet/tools

.NET Core 3.1 Runtime

还必须安装这个 3.1 runtime,虽然已经被标注为 此版本已过期:

Testhost process for source(s) 'C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Core.Tests\bin\Debug\netcoreapp3.1\DurableTask.Core.Tests.dll' exited with error: You must install or update .NET to run this application.
App: C:\Users\sky\.nuget\packages\microsoft.testplatform.testhost\15.9.0\lib\netstandard1.5\testhost.dll
Architecture: x64
Framework: 'Microsoft.NETCore.App', version '3.1.0' (x64)
.NET location: C:\Program Files\dotnet\
The following frameworks were found:
  6.0.28 at [C:\Program Files\dotnet\shared\Microsoft.NETCore.App]
  8.0.3 at [C:\Program Files\dotnet\shared\Microsoft.NETCore.App]
Learn more:
https://aka.ms/dotnet/app-launch-failed
To install missing framework, download:
https://aka.ms/dotnet-core-applaunch?framework=Microsoft.NETCore.App&framework_version=3.1.0&arch=x64&rid=win-x64&os=win10
. Please check the diagnostic logs for more information.

下载地址:

https://aka.ms/dotnet-core-applaunch?framework=Microsoft.NETCore.App&framework_version=3.1.0&arch=x64&rid=win-x64&os=win10

下载得到 dotnet-runtime-3.1.32-win-x64.exe 文件,安装即可。

.NET Core 2.1 Runtime

还必须安装这个 2.1 runtime,azure-functions-durable-extension 项目需要:

https://dotnet.microsoft.com/zh-cn/download/dotnet/2.1/runtime

下载得到 dotnet-runtime-2.1.30-win-x64.exe 文件,安装即可。

2.1.6 - 安装nuget

在 windows 上安装nuget

下载

下载地址:

https://www.nuget.org/downloads

安装

nuget.exe 复制到 C:\soft\nuget 目录,然后修改 windows 系统设置中的 path 环境变量,加入地址 C:\soft\nuget

验证

查看版本:

$ nuget
NuGet 版本: 6.9.1.3

2.1.7 - 本地发布

在本地发布包并在其他项目使用

发布到本地

实际操作

durabletask-dotnet 项目

修改 eng\targets\Release.props ,从版本号从 1.2.2:

<VersionPrefix>1.2.2</VersionPrefix>
<VersionSuffix></VersionSuffix>

修改为 1.2.3-alpha

    <VersionPrefix>1.2.3</VersionPrefix>
    <VersionSuffix>alpha</VersionSuffix>

修改代码,构建项目:

# git submodule update --init --recursive
# git submodule update --remote: see https://stackoverflow.org.cn/questions/1777854
# cd ~/work/code/durabletask-fork/durabletask-dotnet
rm -rf out/bin
rm -rf out/pkg
dotnet build
dotnet pack

看到输出:

PS C:\Users\sky\work\code\durabletask-fork\durabletask-dotnet> dotnet pack
用于 .NET MSBuild 版本 17.9.6+a4ecab324
  正在确定要还原的项目…
  所有项目均是最新的,无法还原。
  Generators -> C:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\out\bin\Release\Generators\netstandard2.0\Microsoft.DurableTask.Generators.dll
  Abstractions -> C:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\out\bin\Release\Abstractions\netstandard2.0\Microsoft.DurableTask.Abstractions.dll
  已成功创建包“C:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\out/pkg/Microsoft.DurableTask.Generators.1.0.0-preview.1.nupkg”。
  Grpc -> C:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\out\bin\Release\Grpc\netstandard2.0\Microsoft.DurableTask.Grpc.dll
  Grpc -> C:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\out\bin\Release\Grpc\net6.0\Microsoft.DurableTask.Grpc.dll
  已成功创建包“C:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\out/pkg/Microsoft.DurableTask.Abstractions.1.2.3-alpha.nupkg”。
  已成功创建包“C:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\out/pkg/Microsoft.DurableTask.Abstractions.1.2.3-alpha.snupkg”。
  Client -> C:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\out\bin\Release\Client\netstandard2.0\Microsoft.DurableTask.Client.dll
  Worker -> C:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\out\bin\Release\Worker\netstandard2.0\Microsoft.DurableTask.Worker.dll
  已成功创建包“C:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\out/pkg/Microsoft.DurableTask.Grpc.1.2.3-alpha.nupkg”。
  已成功创建包“C:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\out/pkg/Microsoft.DurableTask.Grpc.1.2.3-alpha.snupkg”。
  已成功创建包“C:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\out/pkg/Microsoft.DurableTask.Client.1.2.3-alpha.nupkg”。
  已成功创建包“C:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\out/pkg/Microsoft.DurableTask.Client.1.2.3-alpha.snupkg”。
  已成功创建包“C:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\out/pkg/Microsoft.DurableTask.Worker.1.2.3-alpha.nupkg”。
  已成功创建包“C:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\out/pkg/Microsoft.DurableTask.Worker.1.2.3-alpha.snupkg”。
  Worker.Grpc -> C:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\out\bin\Release\Worker.Grpc\net6.0\Microsoft.DurableTask.Worker.Grpc.dll
  Worker.Grpc -> C:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\out\bin\Release\Worker.Grpc\netstandard2.0\Microsoft.DurableTask.Worker.Grpc.dll
  Client.OrchestrationServiceClientShim -> C:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\out\bin\Release\Client.OrchestrationServiceClientShim\netstandard
  2.0\Microsoft.DurableTask.Client.OrchestrationServiceClientShim.dll
  Client.Grpc -> C:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\out\bin\Release\Client.Grpc\netstandard2.0\Microsoft.DurableTask.Client.Grpc.dll
C:\Program Files\dotnet\sdk\8.0.203\Sdks\NuGet.Build.Tasks.Pack\build\NuGet.Build.Tasks.Pack.targets(221,5): warning NU5104: 包的稳定版本不应有预发布依赖项。请修改依赖
项“Microsoft.DurableTa
sk.Client [1.2.3-alpha, )”的版本规范,或更新 nuspec 中的版本字段。 [C:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\src\Client\OrchestrationServiceClientShim\Client.Orchest
rationServiceClientShim.csproj]
  已成功创建包“C:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\out/pkg/Microsoft.DurableTask.Client.OrchestrationServiceClientShim.1.0.5.nupkg”。
  已成功创建包“C:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\out/pkg/Microsoft.DurableTask.Client.OrchestrationServiceClientShim.1.0.5.snupkg”。
  Client.Grpc -> C:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\out\bin\Release\Client.Grpc\net6.0\Microsoft.DurableTask.Client.Grpc.dll
  已成功创建包“C:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\out/pkg/Microsoft.DurableTask.Worker.Grpc.1.2.3-alpha.nupkg”。
  已成功创建包“C:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\out/pkg/Microsoft.DurableTask.Worker.Grpc.1.2.3-alpha.snupkg”。
  已成功创建包“C:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\out/pkg/Microsoft.DurableTask.Client.Grpc.1.2.3-alpha.nupkg”。
  已成功创建包“C:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\out/pkg/Microsoft.DurableTask.Client.Grpc.1.2.3-alpha.snupkg”。

nupkg 和 package 之间的对应关系:

nupkg file package
Microsoft.DurableTask.Abstractions.1.2.3.nupkg
Microsoft.DurableTask.Client.1.2.3.nupkg
Microsoft.DurableTask.Client.Grpc.1.2.3.nupkg microsoft.durabletask.client.grpc
Microsoft.DurableTask.Client.OrchestrationServiceClientShim.1.0.5.nupkg
Microsoft.DurableTask.Generators.1.0.0-preview.1.nupkg
Microsoft.DurableTask.Grpc.1.2.3.nupkg microsoft.durabletask.grpc
Microsoft.DurableTask.Worker.Grpc.1.2.3.nupkg microsoft.durabletask.worker.grpc
Microsoft.DurableTask.Worker.1.2.3.nupkg microsoft.durabletask.worker

发布文件到 nuget 本地仓库:

rm -rf "C:\soft\nuget-local-package\microsoft.durabletask.abstractions"
rm -rf "C:\soft\nuget-local-package\microsoft.durabletask.client"
rm -rf "C:\soft\nuget-local-package\microsoft.durabletask.client.grpc"
rm -rf "C:\soft\nuget-local-package\microsoft.durabletask.grpc"
rm -rf "C:\soft\nuget-local-package\microsoft.durabletask.worker"
rm -rf "C:\soft\nuget-local-package\microsoft.durabletask.worker.grpc"

rm -rf "C:\Users\sky\.nuget\packages\microsoft.durabletask.abstractions"
rm -rf "C:\Users\sky\.nuget\packages\microsoft.durabletask.client"
rm -rf "C:\Users\sky\.nuget\packages\microsoft.durabletask.client.grpc"
# rm -rf "C:\Users\sky\.nuget\packages\microsoft.durabletask.generators"
rm -rf "C:\Users\sky\.nuget\packages\microsoft.durabletask.grpc"
# rm -rf "C:\Users\sky\.nuget\packages\microsoft.durabletask.sidecar"
# rm -rf "C:\Users\sky\.nuget\packages\microsoft.durabletask.sidecar.protobuf"
rm -rf "C:\Users\sky\.nuget\packages\microsoft.durabletask.worker"
rm -rf "C:\Users\sky\.nuget\packages\microsoft.durabletask.worker.grpc"

nuget add out/pkg/Microsoft.DurableTask.Abstractions.1.2.3-alpha.nupkg -source "C:\soft\nuget-local-package"
nuget add out/pkg/Microsoft.DurableTask.Client.1.2.3-alpha.nupkg -source "C:\soft\nuget-local-package"
nuget add out/pkg/Microsoft.DurableTask.Worker.1.2.3-alpha.nupkg -source "C:\soft\nuget-local-package"
nuget add out/pkg/Microsoft.DurableTask.Grpc.1.2.3-alpha.nupkg -source "C:\soft\nuget-local-package"
nuget add out/pkg/Microsoft.DurableTask.Client.Grpc.1.2.3-alpha.nupkg -source "C:\soft\nuget-local-package"
nuget add out/pkg/Microsoft.DurableTask.Worker.Grpc.1.2.3-alpha.nupkg -source "C:\soft\nuget-local-package"

azure-functions-durable-extension 项目

修改 src\Worker.Extensions.DurableTask\Worker.Extensions.DurableTask.csproj 文件,

  <ItemGroup>
    ......
    <PackageReference Include="Microsoft.DurableTask.Client.Grpc" Version="1.2.2" />
    <PackageReference Include="Microsoft.DurableTask.Worker.Grpc" Version="1.2.2" />
  </ItemGroup>

修改为

  <ItemGroup>
    ......
    <PackageReference Include="Microsoft.DurableTask.Client.Grpc" Version="1.2.3-alpha" />
    <PackageReference Include="Microsoft.DurableTask.Worker.Grpc" Version="1.2.3-alpha" />
  </ItemGroup>

修改 nuget.config 文件

<configuration>
  <packageSources>
    ......
    <add key="nugetlocal" value="C:\soft\nuget-local-package" />
  </packageSources>
</configuration>

这个项目也需要打包给其他项目用,因此类似的也需要修改版本,打开 src\Worker.Extensions.DurableTask\Worker.Extensions.DurableTask.csproj 文件

    <!-- Version information -->
    <VersionPrefix>1.1.2</VersionPrefix>
    <VersionSuffix></VersionSuffix>

修改为

    <!-- Version information -->
    <VersionPrefix>1.1.3</VersionPrefix>
    <VersionSuffix>alpha</VersionSuffix>

之后就可以执行

dotnet build

命令来打包

rm -rf "C:\soft\nuget-local-package\microsoft.azure.functions.worker.extensions.durabletask"
rm -rf "C:\Users\sky\.nuget\packages\microsoft.azure.functions.worker.extensions.durabletas"

nuget add "src\Worker.Extensions.DurableTask\bin\Debug\Microsoft.Azure.Functions.Worker.Extensions.DurableTask.1.1.3-alpha.nupkg" -source "C:\soft\nuget-local-package"

quickstart 项目

在项目根目录下增加 nuget.config 文件,内容为:

<?xml version="1.0" encoding="utf-8"?>
<configuration>
  <packageSources>
    <clear/>
    <add key="nugetlocal" value="C:\soft\nuget-local-package" />
  </packageSources>
</configuration>

修改项目的 csproj 文件,如 MyDurableFunction1.csproj

    <PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask" Version="1.1.2" />

修改为

    <PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask" Version="1.1.3-alpha" />

构建之前,最好先删除掉 “C:\Users\sky.nuget\packages" 下要用的依赖(命令在上面),避免缓存造成版本没有更新。

2.1.8 - nuget 401

nuget 401

报错

构建时突然遇到 401 (Unauthorized) 错误,然后就无法继续构建:

dotnet build
适用于 .NET MSBuild 版本 17.9.6+a4ecab324
  正在确定要还原的项目…
C:\Users\sky\work\code\durabletask\durabletask-dotnet\test\Generators.Tests\Generators.Tests.csproj : error NU1301: 无法加载源 https://pkgs.dev.azure.com/azfunc/e6a70c92-412
8-43
9f-8012-382fe78d6396/_packaging/AzureFunctionsTempStaging/nuget/v3/index.json 的服务索引。 [C:\Users\sky\work\code\durabletask\durabletask-dotnet\Microsoft.DurableTask.sln]
  未能还原 C:\Users\sky\work\code\durabletask\durabletask-dotnet\test\Generators.Tests\Generators.Tests.csproj (用时 15.28 sec)C:\Program Files\dotnet\sdk\8.0.203\NuGet.targets(169,5): error : 无法加载源 https://pkgs.dev.azure.com/azfunc/e6a70c92-4128-439f-8012-382fe78d6396/_packaging/AzureFunctions
Temp
Staging/nuget/v3/index.json 的服务索引。 [C:\Users\sky\work\code\durabletask\durabletask-dotnet\Microsoft.DurableTask.sln]
C:\Program Files\dotnet\sdk\8.0.203\NuGet.targets(169,5): error :   Response status code does not indicate success: 401 (Unauthorized). [C:\Users\sky\work\code\durabletask\ 
durabletask-dotnet\Microsoft.DurableTask.sln]

访问地址 https://pkgs.dev.azure.com/azfunc/e6a70c92-4128-439f-8012-382fe78d6396/_packaging/AzureFunctions Temp Staging/nuget/v3/index.json 时报错 401:

Response status code does not indicate success: 401 (Unauthorized)
dotnet build --interactive
适用于 .NET MSBuild 版本 17.9.6+a4ecab324
  正在确定要还原的项目…
      [CredentialProvider]DeviceFlow: https://pkgs.dev.azure.com/azfunc/e6a70c92-4128-439f-8012-382fe78d6396/_packaging/AzureFunctionsTempStaging/nuget/v3/index.json
      [CredentialProvider]ATTENTION: User interaction required.

      **********************************************************************

      To sign in, use a web browser to open the page https://microsoft.com/devicelogin and enter the code IDJNNR9QS to authenticate.

      **********************************************************************

      [CredentialProvider]DeviceFlow: https://pkgs.dev.azure.com/azfunc/e6a70c92-4128-439f-8012-382fe78d6396/_packaging/AzureFunctionsTempStaging/nuget/v3/index.json
      [CredentialProvider]ATTENTION: User interaction required.

      **********************************************************************

      To sign in, use a web browser to open the page https://microsoft.com/devicelogin and enter the code IZGMCADU2 to authenticate.

      **********************************************************************

2.2 - durabletask项目

搭建durabletask项目的开发环境

2.2.1 - 构建项目

构建 Durable Task 项目

获取源码

cd ~/work/code/durabletask
git clone git@github.com:Azure/durabletask.git

执行 build

遇到问题

在终端执行命令:

$ cd ~/work/code/durabletask/durabletask
dotnet build

报错如下:

dotnet build
MSBuild version 17.9.6+a4ecab324 for .NET
  Determining projects to restore...
C:\Program Files\dotnet\sdk\8.0.203\Sdks\Microsoft.NET.Sdk\targets\Microsoft.NET.EolTargetFrameworks.targets(32,5): warning NETSDK1138: The target framework 'netcoreapp3.1' is 
out of support and will not receive security updates in the future. Please refer to https://aka.ms/dotnet-core-support for more information about the support policy. [C:\Users\
sky\work\code\durabletask\durabletask\samples\Correlation.Samples\Correlation.Samples.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Sdks\Microsoft.NET.Sdk\targets\Microsoft.NET.EolTargetFrameworks.targets(32,5): warning NETSDK1138: The target framework 'netcoreapp3.1' is 
out of support and will not receive security updates in the future. Please refer to https://aka.ms/dotnet-core-support for more information about the support policy. [C:\Users\
sky\work\code\durabletask\durabletask\test\DurableTask.Stress.Tests\DurableTask.Stress.Tests.csproj::TargetFramework=netcoreapp3.1]
C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.SqlServer.Tests\DurableTask.SqlServer.Tests.csproj : error NU1903: Warning As Error: Package 'System.Data.SqlCli
ent' 4.8.5 has a known high severity vulnerability, https://github.com/advisories/GHSA-98g6-xh36-x2p7 [C:\Users\sky\work\code\durabletask\durabletask\DurableTask.sln]
  Failed to restore C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.SqlServer.Tests\DurableTask.SqlServer.Tests.csproj (in 374 ms).
  23 of 24 projects are up-to-date for restore.

Build FAILED.

C:\Program Files\dotnet\sdk\8.0.203\Sdks\Microsoft.NET.Sdk\targets\Microsoft.NET.EolTargetFrameworks.targets(32,5): warning NETSDK1138: The target framework 'netcoreapp3.1' is 
out of support and will not receive security updates in the future. Please refer to https://aka.ms/dotnet-core-support for more information about the support policy. [C:\Users\
sky\work\code\durabletask\durabletask\samples\Correlation.Samples\Correlation.Samples.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Sdks\Microsoft.NET.Sdk\targets\Microsoft.NET.EolTargetFrameworks.targets(32,5): warning NETSDK1138: The target framework 'netcoreapp3.1' is 
out of support and will not receive security updates in the future. Please refer to https://aka.ms/dotnet-core-support for more information about the support policy. [C:\Users\
sky\work\code\durabletask\durabletask\test\DurableTask.Stress.Tests\DurableTask.Stress.Tests.csproj::TargetFramework=netcoreapp3.1]
C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.SqlServer.Tests\DurableTask.SqlServer.Tests.csproj : error NU1903: Warning As Error: Package 'System.Data.SqlCli
ent' 4.8.5 has a known high severity vulnerability, https://github.com/advisories/GHSA-98g6-xh36-x2p7 [C:\Users\sky\work\code\durabletask\durabletask\DurableTask.sln]
    2 Warning(s)
    1 Error(s)

vs code 也会因为这个错误而无法解析项目。

解决方案

TBD

临时解决方案

修改 DurableTask.sln 文件,删除 DurableTask.SqlServer.Tests.csproj 这个项目:

Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DurableTask.SqlServer.Tests", "test\DurableTask.SqlServer.Tests\DurableTask.SqlServer.Tests.csproj", "{B835BFA6-D9BB-47C4-8594-38EAE0157BBA}"
EndProject

再删除 313 行:

		{B835BFA6-D9BB-47C4-8594-38EAE0157BBA} = {95C69A06-7F62-4652-A480-207B614C2869}

build 结果

dotnet build
MSBuild version 17.9.6+a4ecab324 for .NET
  Determining projects to restore...
C:\Program Files\dotnet\sdk\8.0.203\Sdks\Microsoft.NET.Sdk\targets\Microsoft.NET.EolTargetFrameworks.targets(32,5): warning NETSDK1138: The target framework 'netcoreapp3.1' is 
out of support and will not receive security updates in the future. Please refer to https://aka.ms/dotnet-core-support for more information about the support policy. [C:\Users\
sky\work\code\durabletask\durabletask\samples\Correlation.Samples\Correlation.Samples.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Sdks\Microsoft.NET.Sdk\targets\Microsoft.NET.EolTargetFrameworks.targets(32,5): warning NETSDK1138: The target framework 'netcoreapp3.1' is 
out of support and will not receive security updates in the future. Please refer to https://aka.ms/dotnet-core-support for more information about the support policy. [C:\Users\
sky\work\code\durabletask\durabletask\test\DurableTask.Stress.Tests\DurableTask.Stress.Tests.csproj::TargetFramework=netcoreapp3.1]
  All projects are up-to-date for restore.
C:\Program Files\dotnet\sdk\8.0.203\Sdks\Microsoft.NET.Sdk\targets\Microsoft.NET.EolTargetFrameworks.targets(32,5): warning NETSDK1138: The target framework 'netcoreapp3.1' is 
out of support and will not receive security updates in the future. Please refer to https://aka.ms/dotnet-core-support for more information about the support policy. [C:\Users\
sky\work\code\durabletask\durabletask\test\DurableTask.Stress.Tests\DurableTask.Stress.Tests.csproj::TargetFramework=netcoreapp3.1]
C:\Program Files\dotnet\sdk\8.0.203\Sdks\Microsoft.NET.Sdk\targets\Microsoft.NET.EolTargetFrameworks.targets(32,5): warning NETSDK1138: The target framework 'netcoreapp3.1' is 
out of support and will not receive security updates in the future. Please refer to https://aka.ms/dotnet-core-support for more information about the support policy. [C:\Users\
sky\work\code\durabletask\durabletask\samples\Correlation.Samples\Correlation.Samples.csproj]
  DurableTask.Core -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.Core\bin\Debug\net462\DurableTask.Core.dll
  DurableTask.Core -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.Core\bin\Debug\netstandard2.0\DurableTask.Core.dll
  DurableTask.SqlServer -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.SqlServer\bin\Debug\net462\DurableTask.SqlServer.dll
  DurableTask.Emulator -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.Emulator\bin\Debug\net462\DurableTask.Emulator.dll
  DurableTask.Test.Orchestrations -> C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.Test.Orchestrations\bin\Debug\net462\DurableTask.Test.Orchestrations.dll
  The package Microsoft.Azure.DurableTask.Core.2.16.2 is missing a readme. Go to https://aka.ms/nuget/authoring-best-practices/readme to learn why package readmes are important
  .
  Successfully created package 'C:\Users\sky\work\code\durabletask\durabletask\build_output\packages\Microsoft.Azure.DurableTask.Core.2.16.2.nupkg'.
  Successfully created package 'C:\Users\sky\work\code\durabletask\durabletask\build_output\packages\Microsoft.Azure.DurableTask.Core.2.16.2.symbols.nupkg'.
  DurableTask.ServiceBus -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.ServiceBus\bin\Debug\net462\DurableTask.ServiceBus.dll
  DurableTask.ApplicationInsights -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.ApplicationInsights\bin\Debug\netstandard2.0\DurableTask.ApplicationInsights
  .dll
  DurableTask.Test.Orchestrations -> C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.Test.Orchestrations\bin\Debug\netstandard2.0\DurableTask.Test.Orchestration
  s.dll
  DurableTask.Redis -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.Redis\bin\Debug\netstandard2.0\DurableTask.Redis.dll
  DurableTask.SqlServer -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.SqlServer\bin\Debug\netstandard2.0\DurableTask.SqlServer.dll
  DurableTask.AzureServiceFabric -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.AzureServiceFabric\bin\x64\Debug\net462\DurableTask.AzureServiceFabric.dll
  DurableTask.AzureServiceFabric -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.AzureServiceFabric\bin\x64\Debug\net472\DurableTask.AzureServiceFabric.dll
  DurableTask.Emulator -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.Emulator\bin\Debug\netstandard2.0\DurableTask.Emulator.dll
  DurableTask.Core.Tests -> C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.Core.Tests\bin\Debug\net462\DurableTask.Core.Tests.dll
  DurableTask.AzureStorage -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.AzureStorage\bin\Debug\netstandard2.0\DurableTask.AzureStorage.dll
  TestApplication.Common -> C:\Users\sky\work\code\durabletask\durabletask\test\TestFabricApplication\TestApplication.Common\bin\x64\Debug\net472\TestApplication.Common.dll
  DurableTask.Emulator.Tests -> C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.Emulator.Tests\bin\Debug\net462\DurableTask.Emulator.Tests.dll
  DurableTask.ServiceBus -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.ServiceBus\bin\Debug\netstandard2.0\DurableTask.ServiceBus.dll
  DurableTask.AzureStorage -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.AzureStorage\bin\Debug\net462\DurableTask.AzureStorage.dll
  DurableTask.AzureServiceFabric.Tests -> C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Tests\bin\x64\Debug\net462\DurableTask.AzureService
  Fabric.Tests.dll
  TestApplication.Common -> C:\Users\sky\work\code\durabletask\durabletask\test\TestFabricApplication\TestApplication.Common\bin\x64\Debug\net462\TestApplication.Common.dll
  The package Microsoft.Azure.DurableTask.Redis.0.1.9-alpha is missing a readme. Go to https://aka.ms/nuget/authoring-best-practices/readme to learn why package readmes are imp
  ortant.
  Successfully created package 'C:\Users\sky\work\code\durabletask\durabletask\build_output\packages\Microsoft.Azure.DurableTask.Redis.0.1.9-alpha.nupkg'.
  DurableTask.ServiceBus.Tests -> C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.ServiceBus.Tests\bin\Debug\net462\DurableTask.ServiceBus.Tests.dll
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277: Found conflicts between different versions of "Newtonsoft.Json" that could
 not be resolved. [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric.Integration.Tests.csproj] 
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277: There was a conflict between "Newtonsoft.Json, Version=7.0.0.0, Culture=ne 
utral, PublicKeyToken=30ad4fe6b2a6aeed" and "Newtonsoft.Json, Version=12.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed". [C:\Users\sky\work\code\durabletask\durableta 
sk\test\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:     "Newtonsoft.Json, Version=7.0.0.0, Culture=neutral, PublicKeyToken=30a 
d4fe6b2a6aeed" was chosen because it was primary and "Newtonsoft.Json, Version=12.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed" was not. [C:\Users\sky\work\code\dura 
bletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:     References which depend on "Newtonsoft.Json, Version=7.0.0.0, Culture= 
neutral, PublicKeyToken=30ad4fe6b2a6aeed" [C:\Users\sky\.nuget\packages\newtonsoft.json\7.0.1\lib\net45\Newtonsoft.Json.dll]. [C:\Users\sky\work\code\durabletask\durabletask\te 
st\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:         C:\Users\sky\.nuget\packages\newtonsoft.json\7.0.1\lib\net45\Newto 
nsoft.Json.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric.Integration.Tests.csproj]    
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:           Project file item includes which caused reference "C:\Users\sky\ 
.nuget\packages\newtonsoft.json\7.0.1\lib\net45\Newtonsoft.Json.dll". [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\Dura 
bleTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:             C:\Users\sky\.nuget\packages\newtonsoft.json\7.0.1\lib\net45\N 
ewtonsoft.Json.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric.Integration.Tests.csproj 
]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:     References which depend on or have been unified to "Newtonsoft.Json, V 
ersion=12.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed" []. [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\Dur 
ableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:         C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.Cor 
e\bin\Debug\net462\DurableTask.Core.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric.Int 
egration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:           Project file item includes which caused reference "C:\Users\sky\ 
work\code\durabletask\durabletask\src\DurableTask.Core\bin\Debug\net462\DurableTask.Core.dll". [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabr 
ic.Integration.Tests\DurableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:             C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask 
.Core\bin\Debug\net462\DurableTask.Core.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric 
.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:             C:\Users\sky\work\code\durabletask\durabletask\test\DurableTas 
k.Test.Orchestrations\bin\Debug\net462\DurableTask.Test.Orchestrations.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests 
\DurableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:             C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask 
.AzureServiceFabric\bin\x64\Debug\net462\DurableTask.AzureServiceFabric.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Test 
s\DurableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:             C:\Users\sky\work\code\durabletask\durabletask\test\TestFabric 
Application\TestApplication.Common\bin\x64\Debug\net462\TestApplication.Common.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integrati 
on.Tests\DurableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:         C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.Azu 
reServiceFabric\bin\x64\Debug\net462\DurableTask.AzureServiceFabric.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\Du 
rableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:           Project file item includes which caused reference "C:\Users\sky\ 
work\code\durabletask\durabletask\src\DurableTask.AzureServiceFabric\bin\x64\Debug\net462\DurableTask.AzureServiceFabric.dll". [C:\Users\sky\work\code\durabletask\durabletask\t 
est\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:             C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask 
.AzureServiceFabric\bin\x64\Debug\net462\DurableTask.AzureServiceFabric.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Test 
s\DurableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:         C:\Users\sky\.nuget\packages\microsoft.aspnet.webapi.client\5.2.6\ 
lib\net45\System.Net.Http.Formatting.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric.In 
tegration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:           Project file item includes which caused reference "C:\Users\sky\ 
.nuget\packages\microsoft.aspnet.webapi.client\5.2.6\lib\net45\System.Net.Http.Formatting.dll". [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFab 
ric.Integration.Tests\DurableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:             C:\Users\sky\.nuget\packages\microsoft.aspnet.webapi.client\5. 
2.6\lib\net45\System.Net.Http.Formatting.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabri 
c.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:             C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask 
.AzureServiceFabric\bin\x64\Debug\net462\DurableTask.AzureServiceFabric.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Test 
s\DurableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:             C:\Users\sky\.nuget\packages\microsoft.aspnet.webapi.core\5.2. 
6\lib\net45\System.Web.Http.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric.Integration 
.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:             C:\Users\sky\.nuget\packages\microsoft.aspnet.webapi.owin\5.2. 
6\lib\net45\System.Web.Http.Owin.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric.Integr 
ation.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:         C:\Users\sky\.nuget\packages\microsoft.aspnet.webapi.core\5.2.6\li 
b\net45\System.Web.Http.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric.Integration.Tes 
ts.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:           Project file item includes which caused reference "C:\Users\sky\ 
.nuget\packages\microsoft.aspnet.webapi.core\5.2.6\lib\net45\System.Web.Http.dll". [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integrati 
on.Tests\DurableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:             C:\Users\sky\.nuget\packages\microsoft.aspnet.webapi.core\5.2. 
6\lib\net45\System.Web.Http.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric.Integration 
.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:             C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask 
.AzureServiceFabric\bin\x64\Debug\net462\DurableTask.AzureServiceFabric.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Test 
s\DurableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:             C:\Users\sky\.nuget\packages\microsoft.aspnet.webapi.owin\5.2. 
6\lib\net45\System.Web.Http.Owin.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric.Integr 
ation.Tests.csproj]
  TestApplication.StatefulService -> C:\Users\sky\work\code\durabletask\durabletask\test\TestFabricApplication\TestApplication.StatefulService\bin\Debug\net472\TestApplication.
  StatefulService.exe
  DurableTask.Emulator.Tests -> C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.Emulator.Tests\bin\Debug\netcoreapp3.1\DurableTask.Emulator.Tests.dll
  Correlation.Samples -> C:\Users\sky\work\code\durabletask\durabletask\samples\Correlation.Samples\bin\Debug\netcoreapp3.1\Correlation.Samples.dll
  The package Microsoft.Azure.DurableTask.AzureServiceFabric.2.3.11 is missing a readme. Go to https://aka.ms/nuget/authoring-best-practices/readme to learn why package readmes
   are important.
  Successfully created package 'C:\Users\sky\work\code\durabletask\durabletask\build_output\packages\Microsoft.Azure.DurableTask.AzureServiceFabric.2.3.11.nupkg'.
  DurableTask.Redis.Tests -> C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.Redis.Tests\bin\Debug\netcoreapp3.1\DurableTask.Redis.Tests.dll
  Successfully created package 'C:\Users\sky\work\code\durabletask\durabletask\build_output\packages\Microsoft.Azure.DurableTask.AzureServiceFabric.2.3.11.symbols.nupkg'.
  DurableTask.AzureServiceFabric.Integration.Tests -> C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\bin\x64\Debug\net462\
  DurableTask.AzureServiceFabric.Integration.Tests.dll
C:\Users\sky\work\code\durabletask\durabletask\samples\DistributedTraceSample\OpenTelemetry\Program.cs(100,48): warning CS1998: This async method lacks 'await' operators and wi
ll run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread. [C:\Users\sky\w
ork\code\durabletask\durabletask\samples\DistributedTraceSample\OpenTelemetry\OpenTelemetrySample.csproj]
  DurableTask.ServiceBus.Tests -> C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.ServiceBus.Tests\bin\Debug\netcoreapp3.1\DurableTask.ServiceBus.Tests.dll
  OpenTelemetrySample -> C:\Users\sky\work\code\durabletask\durabletask\samples\DistributedTraceSample\OpenTelemetry\bin\Debug\net6.0\OpenTelemetrySample.dll
  ApplicationInsightsSample -> C:\Users\sky\work\code\durabletask\durabletask\samples\DistributedTraceSample\ApplicationInsights\bin\Debug\net6.0\ApplicationInsightsSample.dll
  DurableTask.Stress.Tests -> C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.Stress.Tests\bin\Debug\netcoreapp3.1\DurableTask.Stress.Tests.dll
  DurableTask.Core.Tests -> C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.Core.Tests\bin\Debug\netcoreapp3.1\DurableTask.Core.Tests.dll
  DurableTask.Stress.Tests -> C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.Stress.Tests\bin\Debug\net462\DurableTask.Stress.Tests.exe
  DurableTask.Samples -> C:\Users\sky\work\code\durabletask\durabletask\samples\DurableTask.Samples\bin\Debug\net462\DurableTask.Samples.exe
  The package Microsoft.Azure.DurableTask.AzureStorage.1.17.1 is missing a readme. Go to https://aka.ms/nuget/authoring-best-practices/readme to learn why package readmes are i
  mportant.
  Successfully created package 'C:\Users\sky\work\code\durabletask\durabletask\build_output\packages\Microsoft.Azure.DurableTask.AzureStorage.1.17.1.nupkg'.
  DurableTask.AzureStorage.Tests -> C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureStorage.Tests\bin\Debug\netcoreapp3.1\DurableTask.AzureStorage.Tests.dl
  l
  DurableTask.AzureStorage.Tests -> C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureStorage.Tests\bin\Debug\net462\DurableTask.AzureStorage.Tests.dll

Build succeeded.

C:\Program Files\dotnet\sdk\8.0.203\Sdks\Microsoft.NET.Sdk\targets\Microsoft.NET.EolTargetFrameworks.targets(32,5): warning NETSDK1138: The target framework 'netcoreapp3.1' is  
out of support and will not receive security updates in the future. Please refer to https://aka.ms/dotnet-core-support for more information about the support policy. [C:\Users\ 
sky\work\code\durabletask\durabletask\samples\Correlation.Samples\Correlation.Samples.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Sdks\Microsoft.NET.Sdk\targets\Microsoft.NET.EolTargetFrameworks.targets(32,5): warning NETSDK1138: The target framework 'netcoreapp3.1' is  
out of support and will not receive security updates in the future. Please refer to https://aka.ms/dotnet-core-support for more information about the support policy. [C:\Users\ 
sky\work\code\durabletask\durabletask\test\DurableTask.Stress.Tests\DurableTask.Stress.Tests.csproj::TargetFramework=netcoreapp3.1]
C:\Program Files\dotnet\sdk\8.0.203\Sdks\Microsoft.NET.Sdk\targets\Microsoft.NET.EolTargetFrameworks.targets(32,5): warning NETSDK1138: The target framework 'netcoreapp3.1' is  
out of support and will not receive security updates in the future. Please refer to https://aka.ms/dotnet-core-support for more information about the support policy. [C:\Users\ 
sky\work\code\durabletask\durabletask\test\DurableTask.Stress.Tests\DurableTask.Stress.Tests.csproj::TargetFramework=netcoreapp3.1]
C:\Program Files\dotnet\sdk\8.0.203\Sdks\Microsoft.NET.Sdk\targets\Microsoft.NET.EolTargetFrameworks.targets(32,5): warning NETSDK1138: The target framework 'netcoreapp3.1' is  
out of support and will not receive security updates in the future. Please refer to https://aka.ms/dotnet-core-support for more information about the support policy. [C:\Users\ 
sky\work\code\durabletask\durabletask\samples\Correlation.Samples\Correlation.Samples.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277: Found conflicts between different versions of "Newtonsoft.Json" that could 
 not be resolved. [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric.Integration.Tests.csproj] 
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277: There was a conflict between "Newtonsoft.Json, Version=7.0.0.0, Culture=ne 
utral, PublicKeyToken=30ad4fe6b2a6aeed" and "Newtonsoft.Json, Version=12.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed". [C:\Users\sky\work\code\durabletask\durableta 
sk\test\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:     "Newtonsoft.Json, Version=7.0.0.0, Culture=neutral, PublicKeyToken=30a 
d4fe6b2a6aeed" was chosen because it was primary and "Newtonsoft.Json, Version=12.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed" was not. [C:\Users\sky\work\code\dura 
bletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:     References which depend on "Newtonsoft.Json, Version=7.0.0.0, Culture= 
neutral, PublicKeyToken=30ad4fe6b2a6aeed" [C:\Users\sky\.nuget\packages\newtonsoft.json\7.0.1\lib\net45\Newtonsoft.Json.dll]. [C:\Users\sky\work\code\durabletask\durabletask\te 
st\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:         C:\Users\sky\.nuget\packages\newtonsoft.json\7.0.1\lib\net45\Newto 
nsoft.Json.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric.Integration.Tests.csproj]    
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:           Project file item includes which caused reference "C:\Users\sky\ 
.nuget\packages\newtonsoft.json\7.0.1\lib\net45\Newtonsoft.Json.dll". [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\Dura 
bleTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:             C:\Users\sky\.nuget\packages\newtonsoft.json\7.0.1\lib\net45\N 
ewtonsoft.Json.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric.Integration.Tests.csproj 
]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:     References which depend on or have been unified to "Newtonsoft.Json, V 
ersion=12.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed" []. [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\Dur 
ableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:         C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.Cor 
e\bin\Debug\net462\DurableTask.Core.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric.Int 
egration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:           Project file item includes which caused reference "C:\Users\sky\ 
work\code\durabletask\durabletask\src\DurableTask.Core\bin\Debug\net462\DurableTask.Core.dll". [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabr 
ic.Integration.Tests\DurableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:             C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask 
.Core\bin\Debug\net462\DurableTask.Core.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric 
.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:             C:\Users\sky\work\code\durabletask\durabletask\test\DurableTas 
k.Test.Orchestrations\bin\Debug\net462\DurableTask.Test.Orchestrations.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests 
\DurableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:             C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask 
.AzureServiceFabric\bin\x64\Debug\net462\DurableTask.AzureServiceFabric.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Test 
s\DurableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:             C:\Users\sky\work\code\durabletask\durabletask\test\TestFabric 
Application\TestApplication.Common\bin\x64\Debug\net462\TestApplication.Common.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integrati 
on.Tests\DurableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:         C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.Azu 
reServiceFabric\bin\x64\Debug\net462\DurableTask.AzureServiceFabric.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\Du 
rableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:           Project file item includes which caused reference "C:\Users\sky\ 
work\code\durabletask\durabletask\src\DurableTask.AzureServiceFabric\bin\x64\Debug\net462\DurableTask.AzureServiceFabric.dll". [C:\Users\sky\work\code\durabletask\durabletask\t 
est\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:             C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask 
.AzureServiceFabric\bin\x64\Debug\net462\DurableTask.AzureServiceFabric.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Test 
s\DurableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:         C:\Users\sky\.nuget\packages\microsoft.aspnet.webapi.client\5.2.6\ 
lib\net45\System.Net.Http.Formatting.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric.In 
tegration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:           Project file item includes which caused reference "C:\Users\sky\ 
.nuget\packages\microsoft.aspnet.webapi.client\5.2.6\lib\net45\System.Net.Http.Formatting.dll". [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFab 
ric.Integration.Tests\DurableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:             C:\Users\sky\.nuget\packages\microsoft.aspnet.webapi.client\5. 
2.6\lib\net45\System.Net.Http.Formatting.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabri
c.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:             C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask 
.AzureServiceFabric\bin\x64\Debug\net462\DurableTask.AzureServiceFabric.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Test 
s\DurableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:             C:\Users\sky\.nuget\packages\microsoft.aspnet.webapi.core\5.2. 
6\lib\net45\System.Web.Http.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric.Integration 
.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:             C:\Users\sky\.nuget\packages\microsoft.aspnet.webapi.owin\5.2. 
6\lib\net45\System.Web.Http.Owin.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric.Integr 
ation.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:         C:\Users\sky\.nuget\packages\microsoft.aspnet.webapi.core\5.2.6\li 
b\net45\System.Web.Http.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric.Integration.Tes 
ts.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:           Project file item includes which caused reference "C:\Users\sky\ 
.nuget\packages\microsoft.aspnet.webapi.core\5.2.6\lib\net45\System.Web.Http.dll". [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integrati 
on.Tests\DurableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:             C:\Users\sky\.nuget\packages\microsoft.aspnet.webapi.core\5.2. 
6\lib\net45\System.Web.Http.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric.Integration 
.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:             C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask 
.AzureServiceFabric\bin\x64\Debug\net462\DurableTask.AzureServiceFabric.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Test 
s\DurableTask.AzureServiceFabric.Integration.Tests.csproj]
C:\Program Files\dotnet\sdk\8.0.203\Microsoft.Common.CurrentVersion.targets(2389,5): warning MSB3277:             C:\Users\sky\.nuget\packages\microsoft.aspnet.webapi.owin\5.2. 
6\lib\net45\System.Web.Http.Owin.dll [C:\Users\sky\work\code\durabletask\durabletask\test\DurableTask.AzureServiceFabric.Integration.Tests\DurableTask.AzureServiceFabric.Integr 
ation.Tests.csproj]
C:\Users\sky\work\code\durabletask\durabletask\samples\DistributedTraceSample\OpenTelemetry\Program.cs(100,48): warning CS1998: This async method lacks 'await' operators and wi 
ll run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread. [C:\Users\sky\w 
ork\code\durabletask\durabletask\samples\DistributedTraceSample\OpenTelemetry\OpenTelemetrySample.csproj]
    6 Warning(s)
    0 Error(s)

Time Elapsed 00:00:05.24

2.2.2 - 运行单元测试

运行 Durable Task 项目的单元测试

2.2.2.1 - 运行单元测试的准备工作

运行 Durable Task 单元测试的准备工作

运行单元测试的准备工作

https://github.com/Azure/durabletask#development-notes

To run unit tests, you must specify your Service Bus connection string for the tests to use. You can do this via the ServiceBusConnectionString app.config value in the test project, or by defining a DurableTaskTestServiceBusConnectionString environment variable. The benefit of the environment variable is that no temporary source changes are required.

要运行单元测试,您必须指定 Service Bus 连接字符串供测试使用。您可以通过测试项目中的 ServiceBusConnectionString app.config值,或通过定义 DurableTaskTestServiceBusConnectionString 环境变量来做到这一点。使用环境变量的好处是无需临时更改源代码。

Unit tests also require Azure Storage Emulator, so make sure it’s installed and running.

单元测试还需要 Azure Storage Emulator,因此请确保它已安装并正在运行。

Note: While it’s possible to use in tests a real Azure Storage account it is not recommended to do so because many tests will fail with a 409 Conflict error. This is because tests delete and quickly recreate the same storage tables, and Azure Storage doesn’t do well in these conditions. If you really want to change Azure Storage connection string you can do so via the StorageConnectionString app.config value in the test project, or by defining a DurableTaskTestStorageConnectionString environment variable.

注意:虽然可以在测试中使用真实的 Azure Storage 帐户,但不建议这样做,因为许多测试会因 409 Conflict 错误而失败。这是因为测试会删除并快速重新创建相同的存储表,而 Azure Storage 在这种情况下表现不佳。如果真的要更改 Azure Storage 连接字符串,可以通过测试项目中的 StorageConnectionString app.config 值,或通过定义 DurableTaskTestStorageConnectionString 环境变量来实现。

修改

StorageConnectionString 有在两个地方进行定义,默认值都是空

  • Test\DurableTask.Core.Tests\app.config
  • Test\DurableTask.ServiceBus.Tests\app.config

还是定义环境变量会更合适

export DurableTaskTestStorageConnectionString=""

Azure Storage Emulator

使用文档:使用 Azure 存储模拟器进行开发和测试(已弃用)

下载并安装

https://go.microsoft.com/fwlink/?linkid=717179&clcid=0x409

若要启动 Azure 存储模拟器:

  1. 选择“开始”按钮或按“Windows”键。
  2. 开始键入 Azure Storage Emulator
  3. 从所示应用程序的列表中选择该模拟器。

Azurite

在 Visual Studio Code 中,选择“扩展”图标并搜索“Azurite”。 选择“安装”按钮以安装 Azurite 扩展。

2.2.2.2 - 运行单元测试

运行 Durable Task 单元测试

DurableTask.Core.Tests

路径为 Test/DurableTask.Core.Tests,不需要启动模拟器或者设置 service bus 连接,直接运行:

dotnet test Test/DurableTask.Core.Tests/DurableTask.Core.Tests.csproj

输出为:

  Determining projects to restore...
  All projects are up-to-date for restore.
  DurableTask.Core -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.Core\bin\Debug\netstandard2.0\DurableTask.Core.dll
  DurableTask.Core -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.Core\bin\Debug\net462\DurableTask.Core.dll
  DurableTask.Emulator -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.Emulator\bin\Debug\netstandard2.0\DurableTask.Emulator.dll
  DurableTask.Test.Orchestrations -> C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Test.Orchestrations\bin\Debug\netstandard2.0\DurableTask.Test.Orchestrati
  ons.dll
  DurableTask.Emulator -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.Emulator\bin\Debug\net462\DurableTask.Emulator.dll
  DurableTask.Test.Orchestrations -> C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Test.Orchestrations\bin\Debug\net462\DurableTask.Test.Orchestrations.dll
  DurableTask.Core.Tests -> C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Core.Tests\bin\Debug\netcoreapp3.1\DurableTask.Core.Tests.dll
  DurableTask.Core.Tests -> C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Core.Tests\bin\Debug\net462\DurableTask.Core.Tests.dll
Test run for C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Core.Tests\bin\Debug\netcoreapp3.1\DurableTask.Core.Tests.dll (.NETCoreApp,Version=v3.1)
Microsoft (R) Test Execution Command Line Tool Version 17.9.0 (x64)
Copyright (c) Microsoft Corporation.  All rights reserved.

Starting test execution, please wait...
A total of 1 test files matched the specified pattern.

Passed!  - Failed:     0, Passed:    59, Skipped:     0, Total:    59, Duration: 21 s - DurableTask.Core.Tests.dll (netcoreapp3.1)
Test run for C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Core.Tests\bin\Debug\net462\DurableTask.Core.Tests.dll (.NETFramework,Version=v4.6.2)
Microsoft (R) Test Execution Command Line Tool Version 17.9.0 (x64)
Copyright (c) Microsoft Corporation.  All rights reserved.

Starting test execution, please wait...
A total of 1 test files matched the specified pattern.

Passed!  - Failed:     0, Passed:    59, Skipped:     0, Total:    59, Duration: 21 s - DurableTask.Core.Tests.dll (net462)

DurableTask.Redis.Tests

dotnet test Test/DurableTask.Redis.Tests/DurableTask.Redis.Tests.csproj

报错无法连接 redis 服务器:

dotnet test Test/DurableTask.Redis.Tests/DurableTask.Redis.Tests.csproj
  Determining projects to restore...
  Restored C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Redis.Tests\DurableTask.Redis.Tests.csproj (in 329 ms).
  2 of 3 projects are up-to-date for restore.
  DurableTask.Core -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.Core\bin\Debug\netstandard2.0\DurableTask.Core.dll
  DurableTask.Redis -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.Redis\bin\Debug\netstandard2.0\DurableTask.Redis.dll
  The package Microsoft.Azure.DurableTask.Redis.0.1.9-alpha is missing a readme. Go to https://aka.ms/nuget/authoring-best-practices/readme to learn why package readmes are i
  mportant.
  Successfully created package 'C:\Users\sky\work\code\durabletask\durabletask\build_output\packages\Microsoft.Azure.DurableTask.Redis.0.1.9-alpha.nupkg'.
  DurableTask.Redis.Tests -> C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Redis.Tests\bin\Debug\netcoreapp3.1\DurableTask.Redis.Tests.dll
Test run for C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Redis.Tests\bin\Debug\netcoreapp3.1\DurableTask.Redis.Tests.dll (.NETCoreApp,Version=v3.1)
Microsoft (R) Test Execution Command Line Tool Version 17.9.0 (x64)
Copyright (c) Microsoft Corporation.  All rights reserved.

Starting test execution, please wait...
A total of 1 test files matched the specified pattern.
[xUnit.net 00:00:13.73]     DurableTask.Redis.Tests.EndToEndServiceScenarioTests.SimpleFanOutOrchestration [FAIL]
  Failed DurableTask.Redis.Tests.EndToEndServiceScenarioTests.SimpleFanOutOrchestration [13 s]
  Error Message:
   StackExchange.Redis.RedisConnectionException : It was not possible to connect to the redis server(s). UnableToConnect on localhost:6379/Interactive, Initializing, last: NONE, origin: BeginConnectAsync, outstanding: 0, last-read: 2s ago, last-write: 2s ago, keep-alive: 60s, state: Connecting, mgr: 10 of 10 available, last-heartbeat: never, global: 11s ago, v: 2.0.571.20511
  Stack Trace:
     at StackExchange.Redis.ConnectionMultiplexer.ConnectImplAsync(Object configuration, TextWriter log) in C:\projects\stackexchange-redis\src\StackExchange.Redis\ConnectionMultiplexer.cs:line 825
   at DurableTask.Redis.RedisOrchestrationService.DeleteAsync() in C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.Redis\RedisOrchestrationService.cs:line 267  
   at DurableTask.Redis.Tests.EndToEndServiceScenarioTests.SimpleFanOutOrchestration() in C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Redis.Tests\EndToEndServiceScenarioTests.cs:line 148
--- End of stack trace from previous location where exception was thrown ---
[xUnit.net 00:00:21.90]     DurableTask.Redis.Tests.EndToEndServiceScenarioTests.ScheduledStart_NotSupported [FAIL]
  Failed DurableTask.Redis.Tests.EndToEndServiceScenarioTests.ScheduledStart_NotSupported [8 s]
  Error Message:
   StackExchange.Redis.RedisConnectionException : It was not possible to connect to the redis server(s). UnableToConnect on localhost:6379/Interactive, Initializing, last: NONE, origin: BeginConnectAsync, outstanding: 0, last-read: 2s ago, last-write: 2s ago, keep-alive: 60s, state: Connecting, mgr: 10 of 10 available, last-heartbeat: never, global: 19s ago, v: 2.0.571.20511
  Stack Trace:
     at StackExchange.Redis.ConnectionMultiplexer.ConnectImplAsync(Object configuration, TextWriter log) in C:\projects\stackexchange-redis\src\StackExchange.Redis\ConnectionMultiplexer.cs:line 825
   at DurableTask.Redis.RedisOrchestrationService.DeleteAsync() in C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.Redis\RedisOrchestrationService.cs:line 267  
   at DurableTask.Redis.Tests.EndToEndServiceScenarioTests.ScheduledStart_NotSupported() in C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Redis.Tests\EndToEndServiceScenarioTests.cs:line 229
--- End of stack trace from previous location where exception was thrown ---
[xUnit.net 00:00:30.14]     DurableTask.Redis.Tests.EndToEndServiceScenarioTests.SimpleFanOutOrchestration_DurabilityTest [FAIL]
  Failed DurableTask.Redis.Tests.EndToEndServiceScenarioTests.SimpleFanOutOrchestration_DurabilityTest [8 s]
  Error Message:
   StackExchange.Redis.RedisConnectionException : It was not possible to connect to the redis server(s). UnableToConnect on localhost:6379/Interactive, Initializing, last: NONE, origin: BeginConnectAsync, outstanding: 0, last-read: 2s ago, last-write: 2s ago, keep-alive: 60s, state: Connecting, mgr: 10 of 10 available, last-heartbeat: never, global: 27s ago, v: 2.0.571.20511
  Stack Trace:
     at StackExchange.Redis.ConnectionMultiplexer.ConnectImplAsync(Object configuration, TextWriter log) in C:\projects\stackexchange-redis\src\StackExchange.Redis\ConnectionMultiplexer.cs:line 825
   at DurableTask.Redis.RedisOrchestrationService.DeleteAsync() in C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.Redis\RedisOrchestrationService.cs:line 267  
   at DurableTask.Redis.Tests.EndToEndServiceScenarioTests.SimpleFanOutOrchestration_DurabilityTest() in C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Redis.Tests\EndToEndServiceScenarioTests.cs:line 202
--- End of stack trace from previous location where exception was thrown ---
[xUnit.net 00:00:34.23]     DurableTask.Redis.Tests.EndToEndServiceScenarioTests.DeleteTaskHub_DeletesAllKeysInRelevantNamespace [FAIL]
  Failed DurableTask.Redis.Tests.EndToEndServiceScenarioTests.DeleteTaskHub_DeletesAllKeysInRelevantNamespace [4 s]
  Error Message:
   StackExchange.Redis.RedisConnectionException : It was not possible to connect to the redis server(s). UnableToConnect on localhost:6379/Interactive, Initializing, last: NONE, origin: BeginConnectAsync, outstanding: 0, last-read: 2s ago, last-write: 2s ago, keep-alive: 60s, state: Connecting, mgr: 10 of 10 available, last-heartbeat: never, global: 31s ago, v: 2.0.571.20511
  Stack Trace:
     at StackExchange.Redis.ConnectionMultiplexer.ConnectImplAsync(Object configuration, TextWriter log) in C:\projects\stackexchange-redis\src\StackExchange.Redis\ConnectionMultiplexer.cs:line 825
   at DurableTask.Redis.Tests.TestHelpers.GetRedisConnection() in C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Redis.Tests\TestHelpers.cs:line 39
   at DurableTask.Redis.Tests.EndToEndServiceScenarioTests.DeleteTaskHub_DeletesAllKeysInRelevantNamespace() in C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Redis.Tests\EndToEndServiceScenarioTests.cs:line 31
--- End of stack trace from previous location where exception was thrown ---
[xUnit.net 00:00:42.37]     DurableTask.Redis.Tests.EndToEndServiceScenarioTests.SimpleGreetingOrchestration [FAIL]
  Failed DurableTask.Redis.Tests.EndToEndServiceScenarioTests.SimpleGreetingOrchestration [8 s]
  Error Message:
   StackExchange.Redis.RedisConnectionException : It was not possible to connect to the redis server(s). UnableToConnect on localhost:6379/Interactive, Initializing, last: NONE, origin: BeginConnectAsync, outstanding: 0, last-read: 2s ago, last-write: 2s ago, keep-alive: 60s, state: Connecting, mgr: 10 of 10 available, last-heartbeat: never, global: 39s ago, v: 2.0.571.20511
  Stack Trace:
     at StackExchange.Redis.ConnectionMultiplexer.ConnectImplAsync(Object configuration, TextWriter log) in C:\projects\stackexchange-redis\src\StackExchange.Redis\ConnectionMultiplexer.cs:line 825
   at DurableTask.Redis.RedisOrchestrationService.DeleteAsync() in C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.Redis\RedisOrchestrationService.cs:line 267  
   at DurableTask.Redis.Tests.EndToEndServiceScenarioTests.SimpleGreetingOrchestration() in C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Redis.Tests\EndToEndServiceScenarioTests.cs:line 110
--- End of stack trace from previous location where exception was thrown ---

Failed!  - Failed:     5, Passed:     6, Skipped:     0, Total:    11, Duration: 41 s - DurableTask.Redis.Tests.dll (netcoreapp3.1)

DurableTask.Emulator.Tests

不需要启动模拟器或者设置 service bus 连接,直接运行:

dotnet test Test/DurableTask.Emulator.Tests/DurableTask.Emulator.Tests.csproj

输出为:

dotnet test Test/DurableTask.Emulator.Tests/DurableTask.Emulator.Tests.csproj 
  Determining projects to restore...
  Restored C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Test.Orchestrations\DurableTask.Test.Orchestrations.csproj (in 192 ms).
  Restored C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Emulator.Tests\DurableTask.Emulator.Tests.csproj (in 192 ms).
  2 of 4 projects are up-to-date for restore.
  DurableTask.Core -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.Core\bin\Debug\netstandard2.0\DurableTask.Core.dll
  DurableTask.Core -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.Core\bin\Debug\net462\DurableTask.Core.dll
  DurableTask.Emulator -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.Emulator\bin\Debug\net462\DurableTask.Emulator.dll
  DurableTask.Test.Orchestrations -> C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Test.Orchestrations\bin\Debug\net462\DurableTask.Test.Orchestrations.dll
  DurableTask.Emulator -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.Emulator\bin\Debug\netstandard2.0\DurableTask.Emulator.dll
  DurableTask.Emulator.Tests -> C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Emulator.Tests\bin\Debug\net462\DurableTask.Emulator.Tests.dll
  DurableTask.Test.Orchestrations -> C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Test.Orchestrations\bin\Debug\netstandard2.0\DurableTask.Test.Orchestrati
  ons.dll
  DurableTask.Emulator.Tests -> C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Emulator.Tests\bin\Debug\netcoreapp3.1\DurableTask.Emulator.Tests.dll
Test run for C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Emulator.Tests\bin\Debug\net462\DurableTask.Emulator.Tests.dll (.NETFramework,Version=v4.6.2)
Microsoft (R) Test Execution Command Line Tool Version 17.9.0 (x64)
Copyright (c) Microsoft Corporation.  All rights reserved.

Starting test execution, please wait...
A total of 1 test files matched the specified pattern.

Passed!  - Failed:     0, Passed:     5, Skipped:     0, Total:     5, Duration: 34 s - DurableTask.Emulator.Tests.dll (net462)
Test run for C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Emulator.Tests\bin\Debug\netcoreapp3.1\DurableTask.Emulator.Tests.dll (.NETCoreApp,Version=v3.1)
Microsoft (R) Test Execution Command Line Tool Version 17.9.0 (x64)
Copyright (c) Microsoft Corporation.  All rights reserved.

Starting test execution, please wait...
A total of 1 test files matched the specified pattern.

Passed!  - Failed:     0, Passed:     5, Skipped:     0, Total:     5, Duration: 32 s - DurableTask.Emulator.Tests.dll (netcoreapp3.1)

DurableTask.Samples.Tests

dotnet test Test/DurableTask.Samples.Tests/DurableTask.Samples.Tests.csproj 
  Determining projects to restore...
C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Samples.Tests\DurableTask.Samples.Tests.csproj : error NU1201: Project DurableTask.Samples is not compatible w
ith net451 (.NETFramework,Version=v4.5.1). Project DurableTask.Samples supports: net462 (.NETFramework,Version=v4.6.2)
  Failed to restore C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Samples.Tests\DurableTask.Samples.Tests.csproj (in 4.15 sec).
  3 of 4 projects are up-to-date for restore.

DurableTask.ServiceBus.Tests

dotnet test Test/DurableTask.ServiceBus.Tests/DurableTask.ServiceBus.Tests.csproj

必须设置 service bus 连接,否则报错:

  Failed GenerationSubNoCompressionTest [< 1 ms]
  Error Message:
   Initialization method DurableTask.ServiceBus.Tests.FunctionalTests.TestInitialize threw exception. System.TypeInitializationException: The type initializer for 'DurableTask.ServiceBus.Tests.TestHelpers' threw an exception. ---> System.ArgumentException: A ServiceBus connection string must be defined in either an environment variable or in configuration..

DurableTask.SqlServer.Tests

dotnet test Test/DurableTask.SqlServer.Tests/DurableTask.SqlServer.Tests.csproj
dotnet test Test/DurableTask.SqlServer.Tests/DurableTask.SqlServer.Tests.csproj
  Determining projects to restore...
C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.SqlServer.Tests\DurableTask.SqlServer.Tests.csproj : error NU1903: Warning As Error: Package 'System.Data.SqlC
lient' 4.8.5 has a known high severity vulnerability, https://github.com/advisories/GHSA-98g6-xh36-x2p7
  Failed to restore C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.SqlServer.Tests\DurableTask.SqlServer.Tests.csproj (in 280 ms).
  2 of 3 projects are up-to-date for restore.

DurableTask.Stress.Tests

dotnet test Test/DurableTask.Stress.Tests/DurableTask.Stress.Tests.csproj 

没有测试类:

dotnet test Test/DurableTask.Stress.Tests/DurableTask.Stress.Tests.csproj 
  Determining projects to restore...
C:\Program Files\dotnet\sdk\8.0.203\Sdks\Microsoft.NET.Sdk\targets\Microsoft.NET.EolTargetFrameworks.targets(32,5): warning NETSDK1138: The target framework 'netcoreapp3.1' i
s out of support and will not receive security updates in the future. Please refer to https://aka.ms/dotnet-core-support for more information about the support policy. [C:\Us
ers\sky\work\code\durabletask\durabletask\Test\DurableTask.Stress.Tests\DurableTask.Stress.Tests.csproj::TargetFramework=netcoreapp3.1]
  Restored C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Stress.Tests\DurableTask.Stress.Tests.csproj (in 521 ms).
  3 of 4 projects are up-to-date for restore.
C:\Program Files\dotnet\sdk\8.0.203\Sdks\Microsoft.NET.Sdk\targets\Microsoft.NET.EolTargetFrameworks.targets(32,5): warning NETSDK1138: The target framework 'netcoreapp3.1' i
s out of support and will not receive security updates in the future. Please refer to https://aka.ms/dotnet-core-support for more information about the support policy. [C:\Us 
ers\sky\work\code\durabletask\durabletask\Test\DurableTask.Stress.Tests\DurableTask.Stress.Tests.csproj::TargetFramework=netcoreapp3.1]
  DurableTask.Core -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.Core\bin\Debug\netstandard2.0\DurableTask.Core.dll
  DurableTask.Core -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.Core\bin\Debug\net462\DurableTask.Core.dll
  DurableTask.Test.Orchestrations -> C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Test.Orchestrations\bin\Debug\netstandard2.0\DurableTask.Test.Orchestrati
  ons.dll
  DurableTask.AzureStorage -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.AzureStorage\bin\Debug\netstandard2.0\DurableTask.AzureStorage.dll
  DurableTask.Test.Orchestrations -> C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Test.Orchestrations\bin\Debug\net462\DurableTask.Test.Orchestrations.dll
  DurableTask.AzureStorage -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.AzureStorage\bin\Debug\net462\DurableTask.AzureStorage.dll
  DurableTask.Stress.Tests -> C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Stress.Tests\bin\Debug\net462\DurableTask.Stress.Tests.exe
  DurableTask.Stress.Tests -> C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Stress.Tests\bin\Debug\netcoreapp3.1\DurableTask.Stress.Tests.dll

DurableTask.Test.Orchestrations

dotnet test Test/DurableTask.Test.Orchestrations/DurableTask.Test.Orchestrations.csproj 
  Determining projects to restore...
  All projects are up-to-date for restore.
  DurableTask.Core -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.Core\bin\Debug\netstandard2.0\DurableTask.Core.dll
  DurableTask.Core -> C:\Users\sky\work\code\durabletask\durabletask\src\DurableTask.Core\bin\Debug\net462\DurableTask.Core.dll
  DurableTask.Test.Orchestrations -> C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Test.Orchestrations\bin\Debug\netstandard2.0\DurableTask.Test.Orchestrati
  ons.dll
  DurableTask.Test.Orchestrations -> C:\Users\sky\work\code\durabletask\durabletask\Test\DurableTask.Test.Orchestrations\bin\Debug\net462\DurableTask.Test.Orchestrations.dll

2.2.3 - 运行示例

运行 Durable Task 项目的示例

2.3 - durabletask-dotnet项目

搭建durabletask-dotnet项目的开发环境

2.3.1 - 构建项目

构建 DurableTask-dotnet 项目

https://github.com/microsoft/durabletask-dotnet

获取源码

# cd ~/work/code/durabletask
git clone git@github.com:microsoft/durabletask-dotnet.git
git submodule update --init --recursive
# git submodule update --remote

当 submodule 有内容更新时,也就是说 durabletask-protobuf 项目有更新,则需要更新 submodule 的内容,需要执行:

git submodule update --remote

输出结果为:

remote: Enumerating objects: 7, done.
remote: Counting objects: 100% (7/7), done.
remote: Compressing objects: 100% (2/2), done.
remote: Total 4 (delta 2), reused 4 (delta 2), pack-reused 0
Unpacking objects: 100% (4/4), 431 bytes | 86.00 KiB/s, done.
From https://github.com/skyao/durabletask-protobuf  
   5f49779..6466c8c  versioning -> origin/versioning
Submodule path 'eng/proto': checked out '6466c8c4a35a4bcdd15dabcb499b4c71d83e8dd1'

这里 checked out 最新的代码更新。

执行 build

在终端执行命令:

# cd ~/work/code/durabletask/durabletask-dotnet
dotnet build

输出如下:

dotnet build
适用于 .NET MSBuild 版本 17.9.6+a4ecab324
  正在确定要还原的项目…
  所有项目均是最新的,无法还原。
  TestHelpers -> C:\Users\sky\work\code\durabletask\durabletask-dotnet\out\bin\Debug\TestHelpers\netstandard2.0\Microsoft.DurableTask.TestHelpers.dll
  Grpc -> C:\Users\sky\work\code\durabletask\durabletask-dotnet\out\bin\Debug\Grpc\netstandard2.0\Microsoft.DurableTask.Grpc.dll
  ConsoleApp -> C:\Users\sky\work\code\durabletask\durabletask-dotnet\out\samples\bin\Debug\ConsoleApp\net6.0\Samples.ConsoleApp.dll
  Generators -> C:\Users\sky\work\code\durabletask\durabletask-dotnet\out\bin\Debug\Generators\netstandard2.0\Microsoft.DurableTask.Generators.dll
  Abstractions -> C:\Users\sky\work\code\durabletask\durabletask-dotnet\out\bin\Debug\Abstractions\netstandard2.0\Microsoft.DurableTask.Abstractions.dll
  NetFxConsoleApp -> C:\Users\sky\work\code\durabletask\durabletask-dotnet\out\samples\bin\Debug\NetFxConsoleApp\net48\Samples.NetFxConsoleApp.exe
  Grpc -> C:\Users\sky\work\code\durabletask\durabletask-dotnet\out\bin\Debug\Grpc\net6.0\Microsoft.DurableTask.Grpc.dll
  AzureFunctionsApp -> C:\Users\sky\work\code\durabletask\durabletask-dotnet\out\samples\bin\Debug\AzureFunctionsApp\net6.0\Samples.AzureFunctionsApp.dll
  Worker -> C:\Users\sky\work\code\durabletask\durabletask-dotnet\out\bin\Debug\Worker\netstandard2.0\Microsoft.DurableTask.Worker.dll
  Client -> C:\Users\sky\work\code\durabletask\durabletask-dotnet\out\bin\Debug\Client\netstandard2.0\Microsoft.DurableTask.Client.dll
  Generators.Tests -> C:\Users\sky\work\code\durabletask\durabletask-dotnet\out\bin\Debug\Generators.Tests\net6.0\Microsoft.DurableTask.Generators.Tests.dll
  Abstractions.Tests -> C:\Users\sky\work\code\durabletask\durabletask-dotnet\out\bin\Debug\Abstractions.Tests\net6.0\Microsoft.DurableTask.Abstractions.Tests.dll
  Worker.Grpc -> C:\Users\sky\work\code\durabletask\durabletask-dotnet\out\bin\Debug\Worker.Grpc\netstandard2.0\Microsoft.DurableTask.Worker.Grpc.dll
  Worker.Grpc -> C:\Users\sky\work\code\durabletask\durabletask-dotnet\out\bin\Debug\Worker.Grpc\net6.0\Microsoft.DurableTask.Worker.Grpc.dll
  Benchmarks -> C:\Users\sky\work\code\durabletask\durabletask-dotnet\out\bin\Debug\Benchmarks\net6.0\Benchmarks.dll
  Client.Grpc -> C:\Users\sky\work\code\durabletask\durabletask-dotnet\out\bin\Debug\Client.Grpc\netstandard2.0\Microsoft.DurableTask.Client.Grpc.dll
  Client.Grpc -> C:\Users\sky\work\code\durabletask\durabletask-dotnet\out\bin\Debug\Client.Grpc\net6.0\Microsoft.DurableTask.Client.Grpc.dll
  Client.Tests -> C:\Users\sky\work\code\durabletask\durabletask-dotnet\out\bin\Debug\Client.Tests\net6.0\Microsoft.DurableTask.Client.Tests.dll
  Client.OrchestrationServiceClientShim -> C:\Users\sky\work\code\durabletask\durabletask-dotnet\out\bin\Debug\Client.OrchestrationServiceClientShim\netstandard2.0\Microsoft.DurableTask.Client.OrchestrationServiceClientShim.dll
  Worker.Tests -> C:\Users\sky\work\code\durabletask\durabletask-dotnet\out\bin\Debug\Worker.Tests\net6.0\Microsoft.DurableTask.Worker.Tests.dll
  Client.OrchestrationServiceClientShim.Tests -> C:\Users\sky\work\code\durabletask\durabletask-dotnet\out\bin\Debug\Client.OrchestrationServiceClientShim.Tests\net6.0\Microsoft.DurableTask.Client.OrchestrationServiceClientShim.Tests.dll
  正在确定要还原的项目…
  Grpc.IntegrationTests -> C:\Users\sky\work\code\durabletask\durabletask-dotnet\out\bin\Debug\Grpc.IntegrationTests\net6.0\Microsoft.DurableTask.Grpc.IntegrationTests.dll
  Client.Grpc.Tests -> C:\Users\sky\work\code\durabletask\durabletask-dotnet\out\bin\Debug\Client.Grpc.Tests\net6.0\Microsoft.DurableTask.Client.Grpc.Tests.dll
  Worker.Grpc.Tests -> C:\Users\sky\work\code\durabletask\durabletask-dotnet\out\bin\Debug\Worker.Grpc.Tests\net6.0\Microsoft.DurableTask.Worker.Grpc.Tests.dll
  WebAPI -> C:\Users\sky\work\code\durabletask\durabletask-dotnet\out\samples\bin\Debug\WebAPI\net6.0\Samples.WebAPI.dll
  已还原 C:\Users\sky\AppData\Local\Temp\n4zwr33f.gin\WorkerExtensions.csproj (用时 300 ms)  WorkerExtensions -> C:\Users\sky\AppData\Local\Temp\n4zwr33f.gin\buildout\Microsoft.Azure.Functions.Worker.Extensions.dll

已成功生成。
    0 个警告
    0 个错误

已用时间 00:00:02.01

2.4 - azure-functions-durable-extension项目

搭建azure-functions-durable-extension项目的开发环境

2.4.1 - 构建项目

构建 azure-functions-durable-extension 项目

https://github.com/Azure/azure-functions-durable-extension

获取源码

# cd ~/work/code/durabletask
git clone git@github.com:Azure/azure-functions-durable-extension.git

执行 build

注意:azure-functions-durable-extension 项目需要 .net 2.1

C:\Users\sky\.nuget\packages\microsoft.net.sdk.functions\1.0.30\build\Microsoft.NET.Sdk.Functions.Build.targets(41,5): error : To install missing framework, download: [C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\test\DFPerfScenariosV1\DFPerfScenariosV1.csproj]
C:\Users\sky\.nuget\packages\microsoft.net.sdk.functions\1.0.30\build\Microsoft.NET.Sdk.Functions.Build.targets(41,5): error : https://aka.ms/dotnet-core-applaunch?framework=Microsoft.NETCore.App&framework_version=2.1.0&arch=x64&rid=win-x64&os=win10 [C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\test\DFPerfScenariosV1\DFPerfScenariosV1.csproj]

在终端执行命令:

# cd ~/work/code/durabletask/azure-functions-durable-extension
dotnet build

输出如下:

dotnet build
适用于 .NET MSBuild 版本 17.9.6+a4ecab324
  正在确定要还原的项目…
C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\test\SmokeTests\SmokeTestsV1\VSSampleV1.csproj : warning NU1903: Package 'Newtonsoft.Json' 9.0.1 has a known high severity vulnerability, https://github.com/advisories/GHSA-5crp-9r3c-p9vr [C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\WebJobs.Extensions.DurableTask.sln]
C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\src\WebJobs.Extensions.DurableTask\WebJobs.Extensions.DurableTask.csproj : warning NU1903: Package 'Azure.Identity' 1.1.1 has a known high severity vulnerability, https://github.com/advisories/GHSA-5mfx-4wcx-rv27 [C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\WebJobs.Extensions.DurableTask.sln]
  所有项目均是最新的,无法还原。
C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\test\SmokeTests\SmokeTestsV1\VSSampleV1.csproj : warning NU1903: Package 'Newtonsoft.Json' 9.0.1 has a known high severity vulnerability, https://github.com/advisories/GHSA-5crp-9r3c-p9vr
C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\src\WebJobs.Extensions.DurableTask\WebJobs.Extensions.DurableTask.csproj : warning NU1903: Package 'Azure.Identity' 1.1.1 has a known high severity vulnerability, https://github.com/advisories/GHSA-5mfx-4wcx-rv27 [TargetFramework=netcoreapp3.1]
C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\src\WebJobs.Extensions.DurableTask\WebJobs.Extensions.DurableTask.csproj : warning NU1903: Package 'Azure.Identity' 1.1.1 has a known high severity vulnerability, https://github.com/advisories/GHSA-5mfx-4wcx-rv27 [TargetFramework=netstandard2.0]
  WebJobs.Extensions.DurableTask.Analyzers -> C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\src\WebJobs.Extensions.DurableTask.Analyzers\bin\Debug\netstandard2.0\Microsoft.Azure.WebJobs.Extensions.DurableTask.Analyzers.dll
  DurableFunctions.TypedInterfaces -> C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\src\DurableFunctions.TypedInterfaces\SourceGenerator\bin\Debug\netstandard2.0\DurableFunctions.TypedInterfaces.dll
  Worker.Extensions.DurableTask -> C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\src\Worker.Extensions.DurableTask\bin\Debug\netstandard2.0\Microsoft.Azure.Functions.Worker.Extensions.DurableTask.dll
C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\src\WebJobs.Extensions.DurableTask\WebJobs.Extensions.DurableTask.csproj : warning NU1903: Package 'Azure.Identity' 1.1.1 has a known high severity vulnerability, https://github.com/advisories/GHSA-5mfx-4wcx-rv27 [TargetFramework=net462]
  Worker.Extensions.DurableTask -> C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\src\Worker.Extensions.DurableTask\bin\Debug\net6.0\Microsoft.Azure.Functions.Worker.Extensions.DurableTask.dll
  WebJobs.Extensions.DurableTask -> C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\src\WebJobs.Extensions.DurableTask\bin\Debug\netstandard2.0\Microsoft.Azure.WebJobs.Extensions.DurableTask.dll
  WebJobs.Extensions.DurableTask -> C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\src\WebJobs.Extensions.DurableTask\bin\Debug\netcoreapp3.1\Microsoft.Azure.WebJobs.Extensions.DurableTask.dll
  WebJobs.Extensions.DurableTask -> C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\src\WebJobs.Extensions.DurableTask\bin\Debug\net462\Microsoft.Azure.WebJobs.Extensions.DurableTask.dll
  WebJobs.Extensions.DurableTask.Analyzers.Test -> C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\test\WebJobs.Extensions.DurableTask.Analyzers.Test\bin\Debug\net6.0\Microsoft.Azure.WebJobs.Extensions.DurableTask.Analyzers.Test.dll
  extensions -> C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\test\TimeoutTests\Python\bin\Debug\netstandard2.0\extensions.dll
  DFPerfScenariosV4 -> C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\test\DFPerfScenarios\bin\Debug\net60\DFPerfScenariosV4.dll
  VSSampleV3 -> C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\test\SmokeTests\SmokeTestsV3\bin\Debug\netcoreapp3.1\VSSampleV3.dll
  WebJobs.Extensions.DurableTask.Tests.V2 -> C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\test\FunctionsV2\bin\Debug\net6.0\WebJobs.Extensions.DurableTask.Tests.V2.dll
  VSSampleV1 -> C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\test\SmokeTests\SmokeTestsV1\bin\Debug\net462\bin\VSSampleV1.dll
  DFPerfScenariosV1 -> C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\test\DFPerfScenariosV1\bin\Debug\net462\bin\DFPerfScenariosV1.dll
  WebJobs.Extensions.DurableTask.Tests.V1 -> C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\test\FunctionsV1\bin\Debug\net462\WebJobs.Extensions.DurableTask.Tests.V1.dll
  DurableFunctions.TypedInterfaces.SourceGenerator.Test -> C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\test\CodeGen.SourceGenerator.Test\bin\Debug\net6.0\DurableFunctions.TypedInterfaces.SourceGenerator.Test.dll
  DurableFunctions.TypedInterfaces.Example -> C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\src\DurableFunctions.TypedInterfaces\Example\bin\Debug\net6.0\DurableFunctions.TypedInterfaces.Example.dll
  VSSampleV2 -> C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\test\SmokeTests\SmokeTestsV2\bin\Debug\netstandard2.0\bin\VSSampleV2.dll
  TimeoutTests -> C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\test\TimeoutTests\CSharp\bin\Debug\netstandard2.0\TimeoutTests.dll
  extensions -> C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\test\SmokeTests\OOProcSmokeTests\durableJS\bin\Debug\netstandard2.0\extensions.dll
  extensions -> C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\test\SmokeTests\OOProcSmokeTests\durablePy\bin\Debug\netstandard2.0\extensions.dll
C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\src\WebJobs.Extensions.DurableTask\WebJobs.Extensions.DurableTask.csproj : warning NU1903: Package 'Azure.Identity' 1.1.1 has a known high severity vulnerability, https://github.com/advisories/GHSA-5mfx-4wcx-rv27 [TargetFramework=net462]
C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\src\WebJobs.Extensions.DurableTask\WebJobs.Extensions.DurableTask.csproj : warning NU1903: Package 'Azure.Identity' 1.1.1 has a known high severity vulnerability, https://github.com/advisories/GHSA-5mfx-4wcx-rv27 [TargetFramework=netcoreapp3.1]
C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\src\WebJobs.Extensions.DurableTask\WebJobs.Extensions.DurableTask.csproj : warning NU1903: Package 'Azure.Identity' 1.1.1 has a known high severity vulnerability, https://github.com/advisories/GHSA-5mfx-4wcx-rv27 [TargetFramework=netstandard2.0]
  DurableFunctions.TypedInterfaces.Examples.Test -> C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\test\CodeGen.Example.Test\bin\Debug\net6.0\DurableFunctions.TypedInterfaces.Examples.Test.dll

已成功生成。

C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\test\SmokeTests\SmokeTestsV1\VSSampleV1.csproj : warning NU1903: Package 'Newtonsoft.Json' 9.0.1 has a known high severity vulnerability, https://github.com/advisories/GHSA-5crp-9r3c-p9vr [C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\WebJobs.Extensions.DurableTask.sln]
C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\src\WebJobs.Extensions.DurableTask\WebJobs.Extensions.DurableTask.csproj : warning NU1903: Package 'Azure.Identity' 1.1.1 has a known high severity vulnerability, https://github.com/advisories/GHSA-5mfx-4wcx-rv27 [C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\WebJobs.Extensions.DurableTask.sln]
C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\test\SmokeTests\SmokeTestsV1\VSSampleV1.csproj : warning NU1903: Package 'Newtonsoft.Json' 9.0.1 has a known high severity vulnerability, https://github.com/advisories/GHSA-5crp-9r3c-p9vr
C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\src\WebJobs.Extensions.DurableTask\WebJobs.Extensions.DurableTask.csproj : warning NU1903: Package 'Azure.Identity' 1.1.1 has a known high severity vulnerability, https://github.com/advisories/GHSA-5mfx-4wcx-rv27 [TargetFramework=netcoreapp3.1]
C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\src\WebJobs.Extensions.DurableTask\WebJobs.Extensions.DurableTask.csproj : warning NU1903: Package 'Azure.Identity' 1.1.1 has a known high severity vulnerability, https://github.com/advisories/GHSA-5mfx-4wcx-rv27 [TargetFramework=netstandard2.0]
C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\src\WebJobs.Extensions.DurableTask\WebJobs.Extensions.DurableTask.csproj : warning NU1903: Package 'Azure.Identity' 1.1.1 has a known high severity vulnerability, https://github.com/advisories/GHSA-5mfx-4wcx-rv27 [TargetFramework=net462]
C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\src\WebJobs.Extensions.DurableTask\WebJobs.Extensions.DurableTask.csproj : warning NU1903: Package 'Azure.Identity' 1.1.1 has a known high severity vulnerability, https://github.com/advisories/GHSA-5mfx-4wcx-rv27 [TargetFramework=net462]
C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\src\WebJobs.Extensions.DurableTask\WebJobs.Extensions.DurableTask.csproj : warning NU1903: Package 'Azure.Identity' 1.1.1 has a known high severity vulnerability, https://github.com/advisories/GHSA-5mfx-4wcx-rv27 [TargetFramework=netcoreapp3.1]
C:\Users\sky\work\code\durabletask\azure-functions-durable-extension\src\WebJobs.Extensions.DurableTask\WebJobs.Extensions.DurableTask.csproj : warning NU1903: Package 'Azure.Identity' 1.1.1 has a known high severity vulnerability, https://github.com/advisories/GHSA-5mfx-4wcx-rv27 [TargetFramework=netstandard2.0]
    9 个警告
    0 个错误

已用时间 00:00:02.03

2.5 - azure-functions-dotnet-worker项目

搭建azure-functions-dotnet-worker项目的开发环境

2.5.1 - 构建项目

构建 azure-functions-dotnet-worker 项目

https://github.com/Azure/azure-functions-dotnet-worker

获取源码

# cd ~/work/code/durabletask-fork
git clone git@github.com:Azure/azure-functions-dotnet-worker.git

执行 build

在终端执行命令:

# cd ~/work/code/durabletask-fork/azure-functions-dotnet-worker
dotnet build

输出如下:

dotnet build
适用于 .NET MSBuild 版本 17.9.6+a4ecab324
  正在确定要还原的项目…
  所有项目均是最新的,无法还原。
  Sdk.Analyzers -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\sdk\Sdk.Analyzers\bin\Debug\netstandard2.0\Microsoft.Azure.Functions.Worker.Sdk.Analyzers.dll
  TestUtility -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\test\TestUtility\bin\Debug\net7.0\Microsoft.Azure.Functions.Tests.TestUtility.dll
  FunctionMetadataLoaderExtension -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\sdk\FunctionMetadataLoaderExtension\bin\Debug\netstandard2.0\Microsoft.Azure.WebJobs.Exten 
  sions.FunctionMetadataLoader.dll
  Worker.Extensions.Abstractions -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\extensions\Worker.Extensions.Abstractions\src\bin\Debug\netstandard2.0\Microsoft.Azure.Func
  tions.Worker.Extensions.Abstractions.dll
  Sdk.Generators -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\sdk\Sdk.Generators\bin\Debug\netstandard2.0\Microsoft.Azure.Functions.Worker.Sdk.Generators.dll
  Worker.Extensions.Http.AspNetCore.Analyzers -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\extensions\Worker.Extensions.Http.AspNetCore.Analyzers\bin\Debug\netstandard2.
  0\Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore.Analyzers.dll
  DependentAssemblyWithFunctions -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\test\DependentAssemblyWithFunctions\bin\Debug\net7.0\DependentAssemblyWithFunctions.dll     
  DotNetWorker.Core -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\src\DotNetWorker.Core\bin\Debug\netstandard2.0\Microsoft.Azure.Functions.Worker.Core.dll
  CustomMiddleware -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\samples\CustomMiddleware\bin\Debug\net8.0\CustomMiddleware.dll
  DependentAssemblyWithFunctions.NetStandard -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\test\DependentAssemblyWithFunctions.NetStandard\bin\Debug\netstandard2.0\Depend
  entAssemblyWithFunctions.NetStandard.dll
  Worker.Extensions.Warmup -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\extensions\Worker.Extensions.Warmup\src\bin\Debug\netstandard2.0\Microsoft.Azure.Functions.Worker 
  .Extensions.Warmup.dll
  Worker.Extensions.Timer -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\extensions\Worker.Extensions.Timer\src\bin\Debug\netstandard2.0\Microsoft.Azure.Functions.Worker.E 
  xtensions.Timer.dll
  Worker.Extensions.RabbitMQ -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\extensions\Worker.Extensions.RabbitMQ\src\bin\Debug\netstandard2.0\Microsoft.Azure.Functions.Wo
  rker.Extensions.RabbitMQ.dll
  Worker.Extensions.SendGrid -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\extensions\Worker.Extensions.SendGrid\src\bin\Debug\netstandard2.0\Microsoft.Azure.Functions.Wo 
  rker.Extensions.SendGrid.dll
  Worker.Extensions.SignalRService -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\extensions\Worker.Extensions.SignalRService\src\bin\Debug\netstandard2.0\Microsoft.Azure. 
  Functions.Worker.Extensions.SignalRService.dll
  Worker.Extensions.Shared.Tests -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\test\Worker.Extensions.Shared.Tests\bin\Debug\net7.0\Worker.Extensions.Shared.Tests.dll
  Worker.Extensions.Kafka -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\extensions\Worker.Extensions.Kafka\src\bin\Debug\netstandard2.0\Microsoft.Azure.Functions.Worker.E 
  xtensions.Kafka.dll
  正在确定要还原的项目…
  正在确定要还原的项目…
  DotNetWorker.Core -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\src\DotNetWorker.Core\bin\Debug\net5.0\Microsoft.Azure.Functions.Worker.Core.dll
  E2ETests -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\test\E2ETests\E2ETests\bin\Debug\net7.0\Microsoft.Azure.Functions.Worker.E2ETests.dll
  所有项目均是最新的,无法还原。
  正在确定要还原的项目…
  正在确定要还原的项目…
  正在确定要还原的项目…
  所有项目均是最新的,无法还原。
  Worker.Extensions.EventHubs -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\extensions\Worker.Extensions.EventHubs\src\bin\Debug\netstandard2.0\Microsoft.Azure.Functions.
  Worker.Extensions.EventHubs.dll
  Worker.Extensions.Tables -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\extensions\Worker.Extensions.Tables\src\bin\Debug\netstandard2.0\Microsoft.Azure.Functions.Worker
  .Extensions.Tables.dll
  Worker.Extensions.Http -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\extensions\Worker.Extensions.Http\src\bin\Debug\netstandard2.0\Microsoft.Azure.Functions.Worker.Ext 
  ensions.Http.dll
  Worker.Extensions.Storage.Queues -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\extensions\Worker.Extensions.Storage.Queues\src\bin\Debug\netstandard2.0\Microsoft.Azure. 
  Functions.Worker.Extensions.Storage.Queues.dll
  Worker.Extensions.Sample-IncorrectImplementation -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\test\Worker.Extensions.Sample-IncorrectImplementation\bin\Debug\netstanda
  rd2.0\Worker.Extensions.Sample-IncorrectImplementation.dll
  Worker.Extensions.Sample -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\test\Worker.Extensions.Sample\bin\Debug\netstandard2.0\Worker.Extensions.Sample.dll
  DotNetWorker.Grpc -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\src\DotNetWorker.Grpc\bin\Debug\net6.0\Microsoft.Azure.Functions.Worker.Grpc.dll
  Worker.Extensions.Rpc -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\extensions\Worker.Extensions.Rpc\src\bin\Debug\netstandard2.0\Microsoft.Azure.Functions.Worker.Exten
  sions.Rpc.dll
  WorkerExtensions -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\samples\NetFxWorker\obj\Debug\net48\WorkerExtensions\buildout\Microsoft.Azure.Functions.Worker.Extensions
  .dll
  Worker.Extensions.EventGrid -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\extensions\Worker.Extensions.EventGrid\src\bin\Debug\netstandard2.0\Microsoft.Azure.Functions. 
  Worker.Extensions.EventGrid.dll
  所有项目均是最新的,无法还原。
  Worker.Extensions.SignalRService.Tests -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\test\Worker.Extensions.SignalRService.Tests\bin\Debug\net7.0\Worker.Extensions.Sign
  alRService.Tests.dll
  Sdk -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\sdk\Sdk\bin\Debug\netstandard2.0\Microsoft.Azure.Functions.Worker.Sdk.dll
  WorkerExtensions -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\samples\Configuration\obj\Debug\net8.0\WorkerExtensions\buildout\Microsoft.Azure.Functions.Worker.Extensi 
  ons.dll
  Sdk -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\sdk\Sdk\bin\Debug\net472\Microsoft.Azure.Functions.Worker.Sdk.dll
  所有项目均是最新的,无法还原。
  正在确定要还原的项目…
  Worker.Extensions.Rpc -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\extensions\Worker.Extensions.Rpc\src\bin\Debug\net6.0\Microsoft.Azure.Functions.Worker.Extensions.Rp 
  c.dll
  所有项目均是最新的,无法还原。
  Worker.Extensions.CosmosDB -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\extensions\Worker.Extensions.CosmosDB\src\bin\Debug\netstandard2.0\Microsoft.Azure.Functions.Wo
  rker.Extensions.CosmosDB.dll
  DotNetWorker.Grpc -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\src\DotNetWorker.Grpc\bin\Debug\net5.0\Microsoft.Azure.Functions.Worker.Grpc.dll
  DotNetWorker -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\src\DotNetWorker\bin\Debug\net6.0\Microsoft.Azure.Functions.Worker.dll
  Worker.Extensions.ServiceBus -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\extensions\Worker.Extensions.ServiceBus\src\bin\Debug\netstandard2.0\Microsoft.Azure.Function
  s.Worker.Extensions.ServiceBus.dll
  DotNetWorker.ApplicationInsights -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\src\DotNetWorker.ApplicationInsights\bin\Debug\netstandard2.0\Microsoft.Azure.Functions.W 
  orker.ApplicationInsights.dll
  Worker.Extensions.Storage.Blobs -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\extensions\Worker.Extensions.Storage.Blobs\src\bin\Debug\netstandard2.0\Microsoft.Azure.Fu 
  nctions.Worker.Extensions.Storage.Blobs.dll
  DotNetWorker.Grpc -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\src\DotNetWorker.Grpc\bin\Debug\net7.0\Microsoft.Azure.Functions.Worker.Grpc.dll
  Worker.Extensions.Rpc.Tests -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\test\Worker.Extensions.Rpc.Tests\bin\Debug\net7.0\Worker.Extensions.Rpc.Tests.dll
  DotNetWorker -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\src\DotNetWorker\bin\Debug\net7.0\Microsoft.Azure.Functions.Worker.dll
  WorkerExtensions -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\samples\Extensions\obj\Debug\net8.0\WorkerExtensions\buildout\Microsoft.Azure.Functions.Worker.Extensions
  .dll
  SdkE2ETests -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\test\SdkE2ETests\bin\Debug\net7.0\Microsoft.Azure.Functions.SdkE2ETests.dll
  Worker.Extensions.Rpc.Tests -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\test\Worker.Extensions.Rpc.Tests\bin\Debug\net48\Worker.Extensions.Rpc.Tests.dll
  WorkerExtensions -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\samples\Net7Worker\obj\Debug\net7.0\WorkerExtensions\buildout\Microsoft.Azure.Functions.Worker.Extensions
  .dll
  Worker.Extensions.Storage -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\extensions\Worker.Extensions.Storage\src\bin\Debug\netstandard2.0\Microsoft.Azure.Functions.Work
  er.Extensions.Storage.dll
  E2EApp -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\test\E2ETests\E2EApps\E2EApp\bin\Debug\net8.0\Microsoft.Azure.Functions.Worker.E2EApp.dll
  Sdk.Analyzers.Tests -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\test\Sdk.Analyzers.Tests\bin\Debug\net7.0\Sdk.Analyzers.Tests.dll
  FunctionApp -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\samples\FunctionApp\bin\Debug\net8.0\FunctionApp.dll
  NetFxWorker -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\samples\NetFxWorker\bin\Debug\net48\NetFxWorker.exe
  正在确定要还原的项目…
  正在确定要还原的项目…
  DotNetWorker.Grpc -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\src\DotNetWorker.Grpc\bin\Debug\netstandard2.0\Microsoft.Azure.Functions.Worker.Grpc.dll
  Configuration -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\samples\Configuration\bin\Debug\net8.0\Configuration.dll
  DotNetWorker -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\src\DotNetWorker\bin\Debug\netstandard2.0\Microsoft.Azure.Functions.Worker.dll
  已还原 C:\Users\sky\AppData\Local\Temp\1tpad3lw.vns\WorkerExtensions.csproj (用时 550 ms)  WorkerExtensions -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\samples\EntityFramework\obj\Debug\net8.0\WorkerExtensions\buildout\Microsoft.Azure.Functions.Worker.Exten
  sions.dll
  Net7Worker -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\samples\Net7Worker\bin\Debug\net7.0\Net7Worker.dll
  Extensions -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\samples\Extensions\bin\Debug\net8.0\Extensions.dll
  已还原 C:\Users\sky\AppData\Local\Temp\vwl4yy2k.1ac\WorkerExtensions.csproj (用时 394 ms)  已还原 C:\Users\sky\AppData\Local\Temp\finw3nul.fi4\WorkerExtensions.csproj (用时 459 ms)  WorkerExtensions -> C:\Users\sky\AppData\Local\Temp\1tpad3lw.vns\buildout\Microsoft.Azure.Functions.Worker.Extensions.dll
  WorkerExtensions -> C:\Users\sky\AppData\Local\Temp\vwl4yy2k.1ac\buildout\Microsoft.Azure.Functions.Worker.Extensions.dll
  Worker.Extensions.Http.AspNetCore -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\extensions\Worker.Extensions.Http.AspNetCore\src\bin\Debug\net6.0\Microsoft.Azure.Functi
  ons.Worker.Extensions.Http.AspNetCore.dll
  EntityFramework -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\samples\EntityFramework\bin\Debug\net8.0\EntityFramework.dll
  WorkerExtensions -> C:\Users\sky\AppData\Local\Temp\finw3nul.fi4\buildout\Microsoft.Azure.Functions.Worker.Extensions.dll
  DotNetWorker -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\src\DotNetWorker\bin\Debug\net5.0\Microsoft.Azure.Functions.Worker.dll
  DotNetWorkerTests -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\test\DotNetWorkerTests\bin\Debug\net7.0\Microsoft.Azure.Functions.Worker.Tests.dll
  正在确定要还原的项目…
  SdkTests -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\test\FunctionMetadataGeneratorTests\bin\Debug\net7.0\Microsoft.Azure.Functions.SdkTests.dll
  所有项目均是最新的,无法还原。
  WorkerExtensions -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\samples\AspNetIntegration\obj\Debug\net8.0\WorkerExtensions\buildout\Microsoft.Azure.Functions.Worker.Ext
  ensions.dll
  Worker.Extensions.Tests -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\test\Worker.Extensions.Tests\bin\Debug\net7.0\Microsoft.Azure.Functions.Worker.Extensions.Tests.dl
  l
  Worker.Extensions.Http.AspNetCore.Tests -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\test\extensions\Worker.Extensions.Http.AspNetCore.Tests\bin\Debug\net7.0\Microsoft
  .Azure.Functions.Worker.Extensions.Http.AspNetCore.Tests.dll
  AspNetIntegration -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\samples\AspNetIntegration\bin\Debug\net8.0\AspNetIntegration.dll
  Sdk.Generator.Tests -> C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\test\Sdk.Generator.Tests\bin\Debug\net8.0\Microsoft.Azure.Functions.SdkGeneratorTests.dll

已成功生成。
    0 个警告
    0 个错误

已用时间 00:00:03.17

2.5.2 - 生成protobuf文件

在 azure-functions-dotnet-worker 项目中更新protobuf并生成代码

参考: https://github.com/Azure/azure-functions-dotnet-worker/tree/main/protos/azure-functions-language-worker-protobuf

准备

grpc.tools

nuget.exe install grpc.tools -Version 2.60.0

Google.Protobuf.Tools

nuget.exe install Google.Protobuf.Tools -Version 3.26.1

准备环境变量和目录

在 windows cmd 下执行:

cd C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\protos

set NUGET_PATH="%UserProfile%\.nuget\packages"
set GRPC_TOOLS_PATH=%NUGET_PATH%\grpc.tools\2.60.0\tools\windows_x86
set PROTO_PATH=.\azure-functions-language-worker-protobuf\src\proto
set PROTO=.\azure-functions-language-worker-protobuf\src\proto\FunctionRpc.proto
set PROTOBUF_TOOLS=%NUGET_PATH%\google.protobuf.tools\3.26.1\tools
set MSGDIR=.\Messages

if exist %MSGDIR% rmdir /s /q %MSGDIR%
mkdir %MSGDIR%

set OUTDIR=%MSGDIR%\DotNet
mkdir %OUTDIR%

生成

在 windows cmd 下执行:

%GRPC_TOOLS_PATH%\protoc.exe %PROTO% --csharp_out %OUTDIR% --grpc_out=%OUTDIR% --plugin=protoc-gen-grpc=%GRPC_TOOLS_PATH%\grpc_csharp_plugin.exe --proto_path=%PROTO_PATH% --proto_path=%PROTOBUF_TOOLS%

会得到两个生成的文件:

  • protos\Messages\DotNet\FunctionRpc.cs

  • protos\Messages\DotNet\FunctionRpcGrpc.cs

备注:发现不需要这样生成,直接 dotnet build / dotnet pack 后就会自动更新。

3 - DTFx WIKI

Durable Task Framework的 WIKI

https://github.com/Azure/durabletask/wiki/

3.1 - wiki Home

Durable Task Framework的 WIKI 首页

https://github.com/Azure/durabletask/wiki/

概述

持久任务框架为开发人员提供了一种使用 .NET 任务框架和 .NET 4.5 中添加的 async/await 关键字在 C# 中编写代码协调的方法。

以下是持久任务框架的主要功能:

  • 在简单的 C# 代码中定义代码协调
  • 程序状态的自动持久化和检查点化
  • 协调和活动的版本化
  • 异步计时器、协调组合、用户辅助检查点

该框架本身非常轻量级,只需要一个 Azure 服务总线命名空间和可选的 Azure 存储账户。协调和工作节点的运行实例完全由用户托管。用户代码不会在服务总线 “内部” 执行。

问题陈述

许多场景都涉及以事务方式在多个地方更新状态或执行操作。例如,从数据库 A 中的某个账户借入一笔钱,并将其贷记到数据库 B 中的另一个账户中,这些操作都需要以原子方式完成。这种一致性可以通过使用分布式事务来实现,该事务将分别跨越数据库 A 和 B 的借记(debit)和贷记(credit)操作。

但是,为了实现严格的一致性,事务意味着锁,而锁不利于扩展,因为需要相同锁的后续操作会被阻塞,直到锁被释放。对于旨在实现高可用性和一致性的云服务来说,这将成为一个巨大的规模瓶颈。此外,即使我们认为可以承受分布式事务的冲击,我们也会发现几乎没有任何云服务真正支持分布式事务(甚至是简单的锁定)。

实现一致性的另一种模式是在持久工作流中执行借记和贷记的业务逻辑。在这种情况下,工作流的伪代码如下:

  1. 从数据库 A 中的某个账户借记
  2. 如果借记成功,则
  3. 贷记到 DB B 中的某个账户
  4. 如果上述操作失败,则继续重试,直到达到某个阈值
  5. 如果贷记仍然失败,则撤销数据库 A 中的借记,并发送通知电子邮件

在理想情况下,这将给我们带来 “最终” 的一致性。也就是说,在(1)之后,整个系统的状态会变得不一致,但在工作流完成后最终会变得一致。然而,在 “不理想” 路径中,有很多事情都可能出错;执行伪代码的节点可能在任意点崩溃,从数据库 A 的借记可能失败,或从数据库 B 的贷记可能失败。在这种情况下,为了保持一致性,我们必须确保以下几点:

  1. 借记和贷记操作是幂等的,即重新执行相同的借记或贷记操作将变成无操作。
  2. 如果执行节点崩溃,它将从我们上次成功执行持久操作的地方重新开始(如上文 #1 或 #2a)

从这两项来看,(1) 项只能由借记/贷记活动实现提供。第(2)项也可以通过代码完成,即在某个数据库中跟踪当前位置。但这种状态管理会变得很麻烦,尤其是当持久操作的数量增加时。这时,如果有一个能自动进行状态管理的框架,就能大大简化基于代码构建工作流的过程。

3.2 - 核心概念

Durable Task Framework的 核心概念

https://github.com/Azure/durabletask/wiki/Core-Concepts

Task Hub

Task Hub 是命名空间内服务总线实体的逻辑容器。Task Hub worker 使用这些实体在代码协调和它们协调的活动之间可靠地传递消息。

任务活动

任务活动是执行特定协调步骤的代码片段。任务活动可以从任务协调代码中 “调度”。这种调度产生了一个普通的 .NET 任务,该任务可以(异步)等待,并与其他类似任务组成,以构建复杂的协调。

任务编排

任务协调安排任务活动,并围绕代表活动的任务构建代码协调。

Task Hub Worker

Worker 是任务协调和活动的主机。它还包含对 Task Hub 本身执行 CRUD 操作的 API。

Task Hub Client

Task Hub Client提供

  • 用于创建和管理任务协调实例的 API
  • 用于从 Azure 表查询任务协调实例状态的 API

Task Hub Worker 和 Task Hub client 都配置了服务总线的连接字符串,还可选择配置存储帐户的连接字符串。服务总线用于存储任务协调实例和任务活动之间执行和消息传递的控制流状态。但是,服务总线并不是一个数据库,因此当代码协调完成后,状态就会从服务总线中移除。如果配置了 Azure Table storage,那么只要用户将状态保存在该账户中,就可以对其进行查询。

该框架提供了任务协调(TaskOrchestration)和任务活动(TaskActivity)基类,用户可以从这些基类派生指定自己的协调和活动。然后,他们可以使用 TaskHub API 将这些协调和活动加载到进程中,然后启动 Worker 开始处理创建新协调实例的请求。

TaskHubClient API 可用于创建新的协调实例、查询现有实例并在需要时终止这些实例。

3.3 - 视频编码的示例

Durable Task Framework 视频编码的示例

https://github.com/Azure/durabletask/wiki/Example---Video-Encoding

假设用户希望建立一个代码协调,对视频进行编码,然后在编码完成后向用户发送电子邮件。

为了使用服务总线持久任务框架实现这一目标,用户将编写两个任务活动,分别用于编码视频和发送电子邮件,以及一个任务协调,在这两个活动之间进行协调。

public class EncodeVideoOrchestration : TaskOrchestration<string, string>
{
    public override async Task<string> RunTask(OrchestrationContext context, 
                                                 string input)
    {
        string encodedUrl = 
              await context.ScheduleTask<string>(typeof (EncodeActivity), input);
        await context.ScheduleTask<object>(typeof (EmailActivity), input);
        return encodedUrl;
    }
}

在此协调中,用户调度编码视频活动,等待响应,然后调度发送电子邮件活动。框架将确保持久保存执行状态。例如,如果托管上述任务协调的节点在调度编码视频活动之前崩溃,那么重启时它就会知道要调度该活动。如果节点在调度完活动后但在收到响应前崩溃,重启时它就会聪明地知道该活动已被调度,并直接开始等待 EncodeVideo 活动的响应。

public class EncodeActivity : TaskActivity<string, string>
{
    protected override string Execute(TaskContext context, string input)
    {
        Console.WriteLine("Encoding video " + input);
        // TODO : actually encode the video to a destination
        return "http://<azurebloblocation>/encoded_video.avi";
    }
}

public class EmailActivity : TaskActivity<string, object>
{
    protected override object Execute(TaskContext context, string input)
    {
        // TODO : actually send email to user
        return null;
    }
}

上述用户代码(EncodeVideoOrchestration、EncodeActivity 和 EmailActivity)需要在某个地方托管和可用才能发挥作用。

这样,用户才能在 Worker 中加载这些协调和活动类,并开始处理请求以创建新的协调实例。

string serviceBusConnString = "Endpoint=sb://<namespace>.servicebus.windows.net/;SharedSecretIssuer=[issuer];SharedSecretValue=[value]";

TaskHubWorker hubWorker = new TaskHubWorker("myvideohub", serviceBusConnString)
    .AddTaskOrchestrations(typeof (EncodeVideoOrchestration))
    .AddTaskActivities(typeof (EncodeActivity), typeof (EmailActivity))
    .Start();

这些 Worker 的多个实例可针对同一个任务中心同时运行,以根据需要提供负载平衡。该框架保证特定的协调实例代码在同一时间只能在单个 Worker 上执行。

TaskHubWorker 还提供了停止 Worker 实例的方法。

最后剩下的部分是创建和管理协调实例,即如何实际触发用户加载的代码协调,以及如何监控或终止它们。

string serviceBusConnString = "Endpoint=sb://<namespace>.servicebus.windows.net/;SharedSecretIssuer=[issuer];SharedSecretValue=[value]";

TaskHubClient client = new TaskHubClient("myvideohub", serviceBusConnString);
client.CreateOrchestrationInstance(typeof (EncodeVideoOrchestration), "http://<azurebloblocation>/MyVideo.mpg");

3.4 - 编写任务编排

在 Durable Task Framework 中编写任务编排

https://github.com/Azure/durabletask/wiki/Writing-Task-Orchestrations

任务协调基本上是调用任务活动,并定义控制如何从一个活动流向另一个活动。可以在协调中编写的代码是普通的 C#,但有一些限制。之所以存在这些限制,是因为框架是如何重播协调代码的。下面将对此进行简要说明。

每次协调需要处理新工作时(如任务活动完成或定时器启动),框架都会从头开始重播用户的任务协调代码。每当该用户代码尝试调度任务活动时,框架都会拦截该调用并查询协调的 “执行历史”。如果框架发现特定的任务活动已被执行并产生了一些结果,它就会立即重放该活动的结果,然后继续任务协调。这种情况会一直持续,直到用户代码执行到结束或计划了新的 Activity。如果是后一种情况,那么框架将实际安排并执行指定的 Activity。该活动完成后,其结果也将成为执行历史的一部分,其值将用于后续重放。

考虑到这一点,以下是我们可以在任务协调中编写的代码类型的限制:

  • 代码必须具有确定性,因为它将被多次重放,每次都必须产生相同的结果。例如,不能直接调用获取当前日期/时间、随机数、Guids 或远程服务调用等。
  • 传递给 TaskOrchestration.RunTask() 方法的 OrchestrationContext 对象上有一个辅助 API,它提供了一种获取当前日期/时间的确定性方法。应使用该对象代替 System.DateTime
  • 用户可以将非确定性操作封装在 TaskActivities 中,使其成为确定性操作。例如,GenerateGuidActivity、GenerateRandomNumberActivity 等。由于框架会重放任务活动的结果,因此非确定值将在首次执行时生成一次,然后在后续执行时重放相同的值。
  • 未来,“OrchestrationContext “还将添加其他辅助 API。
  • 代码应该是非阻塞的,即没有线程睡眠或 Task.WaitXXX() 方法。框架提供了设置异步计时器的辅助方法,应使用这些方法。
  • 协调的执行历史记录包含所有已安排的任务活动及其结果。该历史记录还受到服务总线大小的限制(在使用服务总线提供程序时),因此,如果没有用户辅助检查点(在 [Eternal Orchestrations](https://github.com/Azure/durabletask/wiki/Feature---Eternal-Orchestrations-(aka-infinite-loops) 中描述),就无法实现无限循环。)

3.5 - 编写任务活动

在 Durable Task Framework 中编写任务活动

https://github.com/Azure/durabletask/wiki/Writing-Task-Activities

任务活动是协调的 “叶” 节点。这是在协调中实际执行单元操作的代码。这是没有约束的普通 C# 代码。

任务活动代码保证至少被调用一次(at least once)。但在错误情况下,它可能会被调用多次,因此最好是幂等调用。

注:在框架的未来版本中,用户可以将保证切换为最多调用一次,而不是最少调用一次,从而在协调代码中获得更多控制权。

备注

这里提到了 at least once 和 at most once 的问题。活动的执行保证还是很麻烦的。

3.6 - 编排实例管理

在 Durable Task Framework 中编排实例管理

https://github.com/Azure/durabletask/wiki/Orchestration-Instance-Management

TaskHubClient API 允许用户创建新的协调实例、查询已创建协调实例的状态并终止这些实例。

创建协调实例的 API 将返回实例信息。该信息可在后续 API 中用于查询实例的状态。

OrchestrationInstance instance = client.CreateOrchestrationInstance(typeof (EncodeVideoOrchestration), "http://<azurebloblocation>/MyVideo.mpg");

OrchestrationState state = client.GetOrchestrationState(instance);
Console.WriteLine(state.Name + " " + state.OrchestrationStatus + " " + state.Output);

返回的实例还可用于终止协调:

OrchestrationInstance instance = client.CreateOrchestrationInstance(typeof (EncodeVideoOrchestration),"http://<azurebloblocation>/MyVideo.mpg");
// 不好的事情发生了
client.TerminateInstance(instance);

请注意,实例查询方法要求使用 Azure 存储连接字符串创建 task hub。如果未提供连接字符串,则所有实例查询方法都将抛出 InvalidOperationException 异常。

3.7 - 错误处理和补偿

在 Durable Task Framework 中进行错误处理和补偿

https://github.com/Azure/durabletask/wiki/Error-Handling-&-Compensation

在 TaskActivity 代码中抛出的任何异常都会在 TaskOrchestration 代码中以 TaskFailedException 的形式回调和抛出。用户可以根据自己的需要编写适当的错误处理和补偿代码。

public class DebitCreditOrchestration : 
    TaskOrchestration<object, DebitCreditOperation>
{
    public override async Task<object> RunTask(OrchestrationContext context, 
        DebitCreditOperation operation)
    {
        bool failed = false;
        bool debited = false;
        try
        {
            await context.ScheduleTask<object>(typeof (DebitAccount),
                            new Tuple<string, float>(operation.SourceAccount, operation.Amount));
            debited = true;

            await context.ScheduleTask<object>(typeof(CreditAccount),
                            new Tuple<string, float>(operation.TargetAccount, operation.Amount));
        }
        catch (TaskFailedException exception)
        {
            if (debited)
            {
                // can build a try-catch around this as well, in which case the 
                // orchestration may either retry a few times or log the inconsistency for review
                await context.ScheduleTask<object>(typeof(CreditAccount),
                            new Tuple<string, float>(operation.SourceAccount, operation.Amount));
            }
        }

        return null;
    }
}

请注意,在 dotnet 6.0 之前,由于 CLR 的限制,不能在 catch 块中使用 await 关键字。在这种情况下,我们可以修改上面的代码,设置一个失败标志,并在返回结果前执行补偿:

# ... orchestration code here
        catch (TaskFailedException exception)
        {
            failed = true;
        }

        if (failed)
        {
            if (debited)
            {
                // can build a try-catch around this as well, in which case the 
                // orchestration may either retry a few times or log the inconsistency for review
                await context.ScheduleTask<object>(typeof(CreditAccount),
                            new Tuple<string, float>(operation.SourceAccount, operation.Amount));
            }
        }

        return null;
    }
}

3.8 - Task Hub 管理

在 Durable Task Framework 中对 Task Hub 进行管理

https://github.com/Azure/durabletask/wiki/Task-Hub-Management

TaskHubWorker 的 API 可用于对 TaskHub 本身执行 CRUD 操作。

string serviceBusConnString = "Endpoint=sb://<namespace>.servicebus.windows.net/;SharedSecretIssuer=[issuer];SharedSecretValue=[value]";

string tableConnectionString = "UseDevelopmentStorage=true;DevelopmentStorageProxyUri=http://127.0.0.1:10002/";

TaskHubWorker hubWorker = new TaskHubWorker("mytesthub", serviceBusConnString, tableConnectionString);

// creates the required underlying entities in Service Bus and Azure Storage for the task hub
hubWorker.CreateHub();

// creates the required underlying entities in Service Bus and Azure Storage for the task hub
// only if they don't already exist
hubWorker.CreateHubIfNotExists();

// deletes the underlying entities in Service Bus and Azure Storage for the task hub
hubWorker.DeleteHub();

// existence check
bool hubExists = hubWorker.HubExists();

Azure 存储连接字符串是可选的。如果不提供该字符串,将无法创建实例存储,因此也就无法查询实例数据。

3.9 - 编排版本控制

在 Durable Task Framework 中对编排进行版本控制

https://github.com/Azure/durabletask/wiki/Orchestration-Versioning

本节将概述升级长期运行的协调的过程,并提供示例代码。

使用协调进行调度和工作流管理时,不可避免地会遇到以下挑战:

  1. 需要对协调和活动进行更新、版本控制或删除。
  2. 新版本的协调可能无法向后兼容。事实上,我们严格假定它们永远不会向后兼容。
  3. 在升级时,协调可能正在进行某些工作。我们必须允许它运行到完成,以避免系统处于潜在的不一致状态。另一种方法是以这样一种方式设计协调:删除hub(因此,在中途终止协调)不会使系统处于不一致状态。

处理这些挑战有一套流程,以下是其大纲。

过程概要

  1. 在调度协调时,我们不使用类名,而是使用字符串名和版本名。这样我们就可以将两个不同的协调类视为同一协调的两个不同版本。

  2. 更新协调和/或活动时,我们会将新旧代码都部署到 Worker 上。这需要让之前启动的协调运行完成,然后在运行时优雅地切换到新版本。

  3. 我们在 Worker 中以相同的名称注册新旧协调类,但版本不同。我将在示例中介绍这一点,但我们的想法是使用一个类似于 ObjectCreator 工厂的类。

  4. 对旧的协调代码稍作修改:新版本中提供了 ContinueAsNew 方法(用于启动新的生成)。例如,当您第一次使用版本 “1” 的协调时,您可以在代码中使用如下内容:

    Context.ContinueAsNew("1", input);

    现在,我们将其改为

    Context.ContinueAsNew("2", input);

    这样,当旧的协调完成时,它就会使用新版本重新进行调度。在实践中,我不建议直接修改代码;相反,我们应该建立一个包含常量的文件,然后在代码中使用这些常量。当然,我们必须重新构建协调代码,以便获得常量的新值,或者使用属性代替。

  5. 应更新客户端以调度新版本的协调(为了保持一致性)

  6. 停止和删除协调的过程与更新过程相同,只是新版本是一个立即退出的无操作协调。在下一次部署时,两个版本都可以安全删除。

代码

在本节中,我将向您展示我们需要使用哪些方法重载来实现上一小节中概述的过程。

假设我们有两个类,都已部署:InfiniteOrchestrationV1InfiniteOrchestrationV2

同时,让我们给它们起一个字符串名称 “Infinite”,并分别给它们起一个版本 “1” 和 “2”。版本应为字符串,但可解析为 int。

首先,我们在 Worker 中调度两个版本。

我们首先创建两个 ObjectCreator 子类型"TaskHubObjectCreator<TaskOrchestration>",它们接受三个参数:

  • 编排逻辑名称(在我们的例子中,两个编排都被命名为 “Infinite”)
  • 版本(分别为 “1” 和 “2”)
  • 为(名称、版本)对实际创建正确协调实例的 Lambda:
var InfiniteV1 = new TaskHubObjectCreator<TaskOrchestration>("Infinite", "1", () => { return new InfiniteDemoOrchestrationV1(); });

var InfiniteV2 = new TaskHubObjectCreator<TaskOrchestration>("Infinite", "2", () => { return new InfiniteDemoOrchestrationV2(); });

下面我将提供 TaskHubObjectCreator 类的定义。然后,我们将实例放入一个数组中:

var InfiniteOrchestrations = new[] {InfiniteV1, InfiniteV2};

最后,我们使用 TaskHubWorker AddTaskOrchestrations 重载,它接受一个 ObjectCreator 实例列表:

var taskWorkerHub2 = new TaskHubWorker(Constants.HubName, Constants.ServiceBusConnString)
    .AddTaskOrchestrations(InfiniteOrchestrations)
    .AddTaskActivities(typeof(DemoActivityV1), typeof(DemoActivityV2));

这样,worker 现在就能知道协调的两个版本,并将它们关联为具有相同名称但不同版本的协调。

以下是 TaskHubObjectCreator 类的定义:使用 DurableTask.TaskHubObjectCreator 类;

using System;
using DurableTask;

/// <summary>
/// A factory class which allows creation an orchestration instance based on the string ID and version
/// </summary>
public class TaskHubObjectCreator : ObjectCreator<TaskOrchestration>
{
    private readonly Func<TaskOrchestration> objectCreatorFunc;

    /// <summary>
    /// Constructs an instance of TaskHubObjectCreator
    /// </summary>
    /// <param name="name">The string name of the orchestration which this factory creates. Several different classes which are conceptually related can have the same string ID but differnet version.</param>
    /// <param name="version">The version of the orchestration that this object creates</param>
    /// <param name="objectCreatorFunc">Creator function. This function must create the correct object for the (name, version) pair provided</param>
    public TaskHubObjectCreator(string name, string version, Func<TaskOrchestration> objectCreatorFunc)
    {
        if (string.IsNullOrEmpty(name))
        {
            throw new ArgumentNullException("name");
        }

        if (string.IsNullOrEmpty(version))
        {
            throw new ArgumentNullException("version");
        }

        if (objectCreatorFunc == null)
        {
            throw new ArgumentNullException("objectCreatorFunc");
        }

        this.Name = name;
        this.Version = version;
        this.objectCreatorFunc = objectCreatorFunc;
    }

    /// <summary>
    /// Invokes the creator function, thus creating the correct instance for (name, version) provided.
    /// </summary>
    /// <returns>An instance of an orchestration</returns>
    public override TaskOrchestration Create()
    {
        return this.objectCreatorFunc();
    }
}

完成 Worker 的配置后,我们需要编写与默认不同的客户端,以使该进程正常运行: orchestrationInstance = client.CreateOrchestrationInstance("Infinite", "1", "1", input);

第一个参数是协调的名称。通常,这将是类型或类型的 .ToString 值。但由于我们用逻辑名称 “Infinite “注册了协调,因此将使用该名称。

第二个参数是版本,等于 “1”。这将是 Worker 可以理解的版本之一。 第三个参数(也是 “1”)是 ID。出于幂等性考虑,ID 在不同调用中应保持一致。 最后一个参数是协调的输入,其类型由泛型决定。

最后,在部署过程中不要忘记用版本号更新常量:一旦部署完成,旧的协调必须继续使用新的版本。

3.10 - 特性-自动重试

在 Durable Task Framework 中实现自动重试的特性

https://github.com/Azure/durabletask/wiki/Feature---Automatic-Retries

任何使用云服务的应用程序都应在一定程度上对故障具有弹性,因此客户端重试成为实施的重要部分。

该框架提供了替代调度方法,可根据提供的策略在任务活动失败时执行重试。如果您需要自动重试,例如从网络服务读取数据或向数据库执行空闲写入的任务活动,这将非常有用。

public class GetQuoteOrchestration : TaskOrchestration<string, string>
{
    public override async Task<string> RunTask(OrchestrationContext context, string input)
    {
        // retry every 10 seconds upto 5 times before giving up and bubbling up the exception
        RetryOptions retryOptions = new RetryOptions(TimeSpan.FromSeconds(10), 5);
        await context.ScheduleWithRetry<object>(typeof (GetQuote), retryOptions, null);
        return null;
    }
}

3.11 - 特性-持久计时器

在 Durable Task Framework 中实现持久计时器的特性

https://github.com/Azure/durabletask/wiki/Feature---Durable-Timers

用户可以在协调代码中等待异步定时器事件。

public class EncodeVideoOrchestration : TaskOrchestration<string, string>
{
    public override async Task<string> RunTask(OrchestrationContext context, string input)
    {
        string encodedUrl = await context.ScheduleTask<string>(typeof (EncodeActivity), input);
        await context.CreateTimer(context.CurrentUtcDateTime.Add(TimeSpan.FromDays(1)), "timer1");
        await context.ScheduleTask<object>(typeof (EmailActivity), input);
                
        return encodedUrl;
    }
}

等待 CreateTimer 任务的行将导致协调在编码视频和电子邮件活动之间休眠一天。

定时器可用于定期工作和超时。

public class BillingOrchestration : TaskOrchestration<string, string>
{
    public override async Task<string> RunTask(OrchestrationContext context, string input)
    {
        for (int i = 0; i < 10; i++)
        {
            await context.CreateTimer(context.CurrentUtcDateTime.Add(TimeSpan.FromDays(1)), "timer1");
            await context.ScheduleTask<object>(typeof (BillingActivity));
        }
        return null;
    }
}

在上面的片段中,计费协调将每天发出信号,并在唤醒时调用一些计费活动。

public class GetQuoteOrchestration : TaskOrchestration<string, string>
{
    public override async Task<string> RunTask(OrchestrationContext context, string input)
    {
        CancellationTokenSource cancellationTokenForTimer = new CancellationTokenSource();
        Task timer = context.CreateTimer(
            context.CurrentUtcDateTime.Add(TimeSpan.FromSeconds(5)), "timer1", cancellationTokenForTimer.Token);
        Task getQuote = context.ScheduleTask<object>(typeof(GetQuote));
        Task winner = await Task.WhenAny(timer, getQuote);
        if (timer.IsCompleted)
        {
            // request timed out, do some compensating action
        }
        else
        {
            // without this, timer will still block
            // orchestration completion
            cancellationTokenForTimer.Cancel();

            // use getQuote task result
        }
        return null;
    }
}

在此代码段中,我们安排了 GetQuote 活动,并创建了一个 5 秒后触发的计时器。如果定时器在活动返回前触发,我们就运行一些补偿,否则就使用返回的报价。

3.12 - 特性-等待外部事件

在 Durable Task Framework 中实现等待外部事件的特性

https://github.com/Azure/durabletask/wiki/Feature---Waiting-on-External-Events-%28Human-Interaction%29

通常情况下,协调需要等待外部事件,如人类输入某些输入或其他外部触发。该框架为协调提供了一种异步等待外部事件的机制。

public class GetQuoteOrchestration : TaskOrchestration<string, string>
{
    TaskCompletionSource<object> getPermission = new TaskCompletionSource<object>(); 

    public override async Task<string> RunTask(OrchestrationContext context, string input)
    {
        await getPermission.Task;
        await context.ScheduleTask<object>(typeof (GetQuote), null);
        return null;
    }

    public override void OnEvent(OrchestrationContext context, string name, string input)
    {
        getPermission.SetResult(null);
    }
}

要从外部触发事件,用户可以调用 TaskHubClient.RaiseEvent 方法。

TaskHubClient client = new TaskHubClient("test", serviceBusConnString);
OrchestrationInstance instance = client.CreateOrchestrationInstance(typeof (GetQuoteOrchestration),  "quote")
client.RaiseEvent(instance, "dummyEvent", "dummyData");

3.13 - 特性-永恒编排

在 Durable Task Framework 中实现永恒编排(又名无限循环)的特性

https://github.com/Azure/durabletask/wiki/Feature---Eternal-Orchestrations-%28aka-infinite-loops%29

如《编写任务编排》一文所述,框架会重播执行历史,为用户的任务编排实例重新创建程序状态。历史记录的大小是有限制的,因此不可能出现无限循环的任务编排类。

利用生成功能,用户可以 “检查点” 协调实例并创建新的实例。

public class CronOrchestration : TaskOrchestration<string, int>
{
    public override async Task<string> RunTask(OrchestrationContext context, int intervalHours)
    {
        // bounded loop
        for (int i = 0; i < 10; i++)
        {
            await context.CreateTimer<object>(
                context.CurrentUtcDateTime.Add(TimeSpan.FromHours(intervalHours)), null);
            // TODO : do something interesting 
        }

        // create a new instance of self with the same input (or different if needed)
        context.ContinueAsNew(intervalHours);
        return null;
    }
}

在这个片段中,用户告诉框架为自己创建一个全新的实例(即新一代或执行),并将收到的输入作为输入转发给新实例。这个协调过程可以无限期运行,而不会受到历史记录大小的限制。

3.14 - 特性-子编排

在 Durable Task Framework 中实现子编排(又名内嵌编排)的特性

https://github.com/Azure/durabletask/wiki/Feature---Sub-orchestrations-%28aka-nested-orchestrations%29

协调还可以使用子协调功能启动和等待其他协调。如果您有一个协调库,并希望围绕这些协调库构建一个更大的协调,这将非常有用。

public class PeriodicBillingJob : TaskOrchestration<string, int>
{
    // hardcoded list of apps to run billing orchestrations on
    static readonly string[] ApplicationList = new string[] { "app1", "app2" };

    public override async Task<string> RunTask(OrchestrationContext context, int intervalHours)
    {
        // bounded loop
        for (int i = 0; i < 10; i++)
        {
            await context.CreateTimer<object>(
                context.CurrentUtcDateTime.Add(TimeSpan.FromHours(intervalHours)), null);

            List<Task> billingTasks = new List<Task>();

            foreach (string appName in PeriodicBillingJob.ApplicationList)
            {
                billingTasks.Add(
                    context.CreateSubOrchestrationInstance<bool>(typeof (BillingOrchestration), appName));
            }
            await Task.WhenAll(billingTasks);
        }

        // create a new instance of self with the same input (or different if needed)
        context.ContinueAsNew(intervalHours);
        return null;
    }
}

// a reusable orchestration which can either be triggered directly by the admin or via 
// some master recurring periodic billing orchestration
public class BillingOrchestration : TaskOrchestration<bool, string>
{
    public override async Task<bool> RunTask(OrchestrationContext context, string applicationName)
    {
        // TODO : process billing information for 'applicationName'
        return true;
    }
}

3.15 - 特性-框架诊断

在 Durable Task Framework 中实现框架诊断的特性

https://github.com/Azure/durabletask/wiki/Framework-Diagnostics

重要: 此信息已过时,仅适用于已废弃的 Durable Task Framework v1 版本。关于所有最新版本,请参阅 此处 信息。

框架的所有组件都会记录到跟踪源 “DurableTask”。可以将监听器连接到该跟踪源,以获取框架跟踪。

框架随附的 TraceListener 可将日志记录到控制台和调试流。监听器的类名为 DurableTask.Tracing.OrchestrationConsoleTraceListener。

下面是一个 app.config 文件片段,显示了如何加载控制台跟踪监听器:

  <system.diagnostics>
    <trace autoflush="true"/>
    <sources>
      <source name="DurableTask"
              switchName="traceSwitch"
              switchType="System.Diagnostics.SourceSwitch" >
        <listeners>
          <clear/>
          <add name="configConsoleListener" 
            type=" DurableTask.Tracing.OrchestrationConsoleTraceListener, DurableTask"
               traceOutputOptions="DateTime" />
        </listeners>
      </source>
    </sources>
    <switches>
      <add name="traceSwitch" value="Verbose" />
    </switches>
  </system.diagnostics>

4 - 持久性存储

Durable Task 的后端持久性存储

4.1 - 持久性存储概述

Durable Task 的后端持久性存储概述

介绍

https://github.com/Azure/durabletask#supported-persistance-stores

支持的持久性存储

从 v2.x 版开始,持久任务框架支持一组可扩展的后端持久性存储。每个存储都可以使用不同的 NuGet 软件包启用。所有软件包的最新版本都已签名,可在 nuget.org 上下载。

Package 详细信息 发展状况
DurableTask.ServiceBus 协调消息和运行时状态存储在 Service Bus 队列中,而跟踪状态存储在 Azure Storage 中。该提供商的优势在于其成熟性和事务一致性。不过,微软已不再对其进行积极开发。 生产准备就绪,但未积极维护
DurableTask.AzureStorage 所有协调状态都存储在 Azure Storage queues, tables, 和 blobs 中。该提供商的优势在于服务依赖性最小、效率高、功能丰富。这是唯一可用于 Durable Functions 的后端。 生产准备就绪并积极维护
DurableTask.AzureServiceFabric 所有协调状态都存储在 Azure Service Fabric Reliable Collections 中。如果您在 Azure Service Fabric 中托管应用程序,并且不想在存储状态时依赖外部资源,那么这是一个理想的选择。 生产准备就绪并积极维护
DurableTask.Netherite 由微软研究院开发的超高性能后端,使用微软研究院的 FASTER 数据库技术将状态存储在 Azure Event Hubs 和 Azure Page Blobs 中。GitHub Repo 生产准备就绪并积极维护
DurableTask.SqlServer 所有协调状态都存储在 Microsoft SQL Server 或 Azure SQL 数据库中,并带有索引表和存储过程,以便直接交互。 👉 GitHub Repo 生产准备就绪并积极维护
DurableTask.Emulator 这是一个内存存储,仅供测试之用。不建议用于任何生产工作负载。 未进行积极维护

持久任务框架的核心编程模型包含在 DurableTask.Core 软件包中,该软件包也在积极开发中。

代码实现

Package 对应代码实现
DurableTask.ServiceBus src\DurableTask.ServiceBus
DurableTask.AzureStorage src\DurableTask.AzureStorage
DurableTask.AzureServiceFabric src\DurableTask.AzureServiceFabric
DurableTask.Netherite https://github.com/microsoft/durabletask-netherite
DurableTask.SqlServer src\DurableTask.SqlServer
https://github.com/microsoft/durabletask-mssql
DurableTask.Emulator src\DurableTask.Emulator

4.2 - 核心编程模型

Durable Task 后端持久性存储的核心编程模型

4.2.1 - Orchestration

核心编程模型的 Orchestration 定义

4.2.1.1 - TaskOrchestration

核心编程模型之 TaskOrchestration

src\DurableTask.Core\TaskOrchestration.cs

TaskOrchestration.cs 中定义了三个方法:

  • Execute()
  • RaiseEvent()
  • GetStatus()

Execute() 方法

public abstract Task<string> Execute(OrchestrationContext context, string input);

方法实现为:

public override async Task<string> Execute(OrchestrationContext context, string input)
        {
            var parameter = DataConverter.Deserialize<TInput>(input);
            TResult result;
            try
            {
                result = await RunTask(context, parameter);
            }
            catch (Exception e) when (!Utils.IsFatal(e) && !Utils.IsExecutionAborting(e))
            {
                string details = null;
                FailureDetails failureDetails = null;
                if (context.ErrorPropagationMode == ErrorPropagationMode.SerializeExceptions)
                {
                    details = Utils.SerializeCause(e, DataConverter);
                }
                else
                {
                    failureDetails = new FailureDetails(e);
                }

                throw new OrchestrationFailureException(e.Message, details)
                {
                    FailureDetails = failureDetails,
                };
            }

            return DataConverter.Serialize(result);
        }

RunTask() 方法是个抽象方法。

public abstract Task<TResult> RunTask(OrchestrationContext context, TInput input);

默认的 DataConverter 是 json:

    public abstract class TaskOrchestration<TResult, TInput, TEvent, TStatus> : TaskOrchestration
    {
        /// <summary>
        /// Creates a new TaskOrchestration with the default DataConverter
        /// </summary>
        protected TaskOrchestration()
        {
            DataConverter = JsonDataConverter.Default;
        }

        /// <summary>
        /// The DataConverter to use for input and output serialization/deserialization
        /// </summary>
        public DataConverter DataConverter { get; protected set; }

RaiseEvent() 方法

public abstract void RaiseEvent(OrchestrationContext context, string name, string input);

方法实现为:

public override void RaiseEvent(OrchestrationContext context, string name, string input)
{
  var parameter = DataConverter.Deserialize<TEvent>(input);
  OnEvent(context, name, parameter);
}

OnEvent() 是一个空实现。

        public virtual void OnEvent(OrchestrationContext context, string name, TEvent input)
        {
            // do nothing
        }

GetStatus() 方法

public abstract string GetStatus();

4.2.1.2 - OrchestrationInstance

核心编程模型之 OrchestrationInstance

src\DurableTask.Core\OrchestrationInstance.cs

OrchestrationInstance 中定义了几个属性:

  • InstanceId
  • ExecutionId()
    [DataContract]
    public class OrchestrationInstance : IExtensibleDataObject
    {
        /// <summary>
        /// The instance id, assigned as unique to the orchestration
        /// </summary>
        [DataMember]
        public string InstanceId { get; set; }

        /// <summary>
        /// The execution id, unique to the execution of this instance
        /// </summary>
        [DataMember]
        public string ExecutionId { get; set; }

4.2.1.3 - OrchestrationState

核心编程模型之 OrchestrationState

src\DurableTask.Core\OrchestrationState.cs

OrchestrationState 中定义了几个属性:

  • CompletedTime
  • CompressedSize
  • CreatedTime
  • Input
  • LastUpdatedTime
  • Name
  • OrchestrationInstance: 包含 InstanceId 和 ExecutionId
  • Output
  • ParentInstance
  • Size
  • Status
  • Tags
  • Version: string  格式,看能否复用。
  • Generation
  • ScheduledStartTime
  • FailureDetails

4.2.2 - Activity

核心编程模型的 Activity 定义

4.2.2.1 - TaskActivity

核心编程模型之 TaskActivity

src\DurableTask.Core\TaskActivity.cs

TaskActivity 中定义了三个方法:

  • Run()
  • RunAsync()

Run() 方法

public abstract string Run(TaskContext context, string input);

blocked for AsyncTaskActivity:

        /// <summary>
        /// Synchronous execute method, blocked for AsyncTaskActivity
        /// </summary>
        /// <returns>string.Empty</returns>
        public override string Run(TaskContext context, string input)
        {
            // will never run
            return string.Empty;
        }

RunAsync() 方法

        public virtual Task<string> RunAsync(TaskContext context, string input)
        {
            return Task.FromResult(Run(context, input));
        }

会被覆盖为:

public override async Task<string> RunAsync(TaskContext context, string input)
        {
            TInput parameter = default(TInput);

            var jArray = Utils.ConvertToJArray(input);

            int parameterCount = jArray.Count;
            if (parameterCount > 1)
            {
                throw new TaskFailureException(
                    "TaskActivity implementation cannot be invoked due to more than expected input parameters.  Signature mismatch.");
            }
            
            if (parameterCount == 1)
            {
                JToken jToken = jArray[0];
                if (jToken is JValue jValue)
                {
                    parameter = jValue.ToObject<TInput>();
                }
                else
                {
                    string serializedValue = jToken.ToString();
                    parameter = DataConverter.Deserialize<TInput>(serializedValue);
                }
            }

            TResult result;
            try
            {
                result = await ExecuteAsync(context, parameter);
            }
            catch (Exception e) when (!Utils.IsFatal(e) && !Utils.IsExecutionAborting(e))
            {
                string details = null;
                FailureDetails failureDetails = null;
                if (context != null && context.ErrorPropagationMode == ErrorPropagationMode.SerializeExceptions)
                {
                    details = Utils.SerializeCause(e, DataConverter);
                }
                else
                {
                    failureDetails = new FailureDetails(e);
                }

                throw new TaskFailureException(e.Message, e, details)
                    .WithFailureDetails(failureDetails);
            }

            string serializedResult = DataConverter.Serialize(result);
            return serializedResult;
        }
    }

ExecuteAsync() 是一个abstract 方法:

        protected abstract Task<TResult> ExecuteAsync(TaskContext context, TInput input);

GetStatus() 方法

public abstract string GetStatus();

4.2.2.2 - TaskContext

核心编程模型之 TaskContext

src\DurableTask.Core\TaskContext.cs

TaskActivity 中定义了以下属性

  • OrchestrationInstance: 包含 InstanceId 和 InstanceId
  • ErrorPropagationMode

4.2.3 - Entity

核心编程模型的 Entity 定义

4.2.3.1 - TaskEntity

核心编程模型之 TaskEntity

Abstract base class for entities

src\DurableTask.Core\Entities\TaskEntity.cs

TaskActivity 中定义了三个方法:

  • ExecuteOperationBatchAsync()

ExecuteOperationBatchAsync() 方法

public abstract Task<EntityBatchResult> ExecuteOperationBatchAsync(EntityBatchRequest operations);

EnztityBatchRequest 类

A request for execution of a batch of operations on an entity.

  • string InstanceId
  • string EntityState
  • List<OperationRequest> Operations

OperationRequest 类

包含属性:

  • string Operation
  • Guid Id
  • string Input

4.2.3.2 - EntityId

核心编程模型之 EntityId

A unique identifier for an entity, consisting of entity name and entity key.

src\DurableTask.Core\Entities\EntityId.cs

EntityId 中定义以下属性:

  • string Name
  • string Key

4.2.4 - History

核心编程模型的 History event 定义

4.2.4.1 - History概述

核心编程模型的 History event 概述

介绍

以下介绍来自 README.md

Durable Task Framework History Events

以下是构成协调状态的一些常见历史事件。您可以在 DTFx 的 Azure StorageMSSQL 存储后端的历史记录表中轻松查看这些事件。在使用 DTFx 代码、调试问题或创建直接读取历史记录的诊断工具(如 Durable Functions Monitor 项目)时,了解这些事件非常有用。

Event Type Description
OrchestratorStarted 协调器函数正在开始新的_执行/execution_。您将在历史记录中看到许多此类事件–每次协调器从 “等待 “状态恢复时都会出现一个。请注意,这并不意味着协调器首次启动–首次执行由 “ExecutionStarted “历史事件表示(见下文)。该事件的 timestamp 时间戳用于填充 CurrentDateTimeUtc 属性。
ExecutionStarted 协调已开始首次执行。该事件包含协调器名称、输入内容和协调器的_scheduled_时间(可能早于历史记录中前面的 OrchestratorStarted事件)。这总是协调历史中的第二个事件。
TaskScheduled 协调器调度了一项活动任务。该事件包括活动名称、输入和一个连续的 “EventId”,可用于将 “TaskScheduled " 事件与相应的 “TaskCompleted “或 “TaskFailed “事件关联起来。请注意,如果一个活动任务被重试,可能会生成多个 Task*** 事件。
TaskCompleted 调度的任务活动已成功完成。TaskScheduledId 字段将与相应 TaskScheduled 事件的 “EventId” 字段匹配。
TaskFailed 计划的任务活动以失败告终。TaskScheduledId 字段将与相应 “TaskScheduled” 事件的 “EventId” 字段匹配。
SubOrchestrationInstanceCreated 协调器已调度子协调器。该事件包含已调度协调器的名称、实例 ID、输入和有序事件 ID,可用于将 SubOrchestrationInstanceCreated 事件与后续的 SubOrchestrationInstanceCompleted 或 SubOrchestrationInstanceFailed 历史事件关联起来。时间戳指的是调度子协调器的时间,它将早于开始执行的时间。请注意,如果一个活动任务被重试,可能会产生多个 SubOrchestrationInstance*** 事件。
SubOrchestrationInstanceCompleted 调度的子协调器已成功完成。TaskScheduledId “字段将与相应 “SubOrchestrationInstanceCreated “事件的 “EventId “字段匹配。
SubOrchestrationInstanceFailed 计划的子协调器已完成,但出现故障。TaskScheduledId 字段将与相应 SubOrchestrationInstanceCreated 事件的 EventId 字段匹配。
TimerCreated 协调器安排了一个持久定时器。FireAt “属性包含定时器启动的日期。
TimerFired 先前安排的持久定时器已启动。TimerId 字段将与相应 TimeCreated 事件的 EventId 字段匹配。
EventRaised 协调(或持久实体中的实体)收到外部事件。该记录包含事件名称、有效载荷和事件_发送_的时间戳(应与历史事件实际被持久化的时间相同或更早)。
EventSent 协调(或entity)向另一个协调(或entity)发送了单向消息。
ExecutionCompleted 协调已完成。该事件包括协调的输出,不区分成功或失败。
ExecutionTerminated 协调被 API 调用强制终止。该事件的时间戳表示计划终止的时间,而不一定是实际终止的时间。
OrchestratorCompleted 协调器函数已等待并提交了任何副作用。您将在历史记录中看到许多此类事件–协调器每次等待时都会出现一个。请注意,这并不意味着协调器已经完成(完成由 ExecutionCompletedExecutionTerminated 表示)。
GenericEvent 通用历史事件,有一个 Data 字段,但没有特定含义。这种历史事件并不常用。在某些情况下,该事件用于触发空闲协调的全新重放,例如在协调重绕之后。
HistoryStateEvent 包含协调历史快照的历史事件。大多数现代后端类型都不使用这种事件类型。

4.2.4.2 - HistoryEvent事件

核心编程模型的 HistoryEvent 事件

包含属性:

  • int EventId
  • EventType EventType
  • bool IsPlayed
  • DateTime Timestamp
  • ExtensionDataObject ExtensionData

这个类也是其他 event 的父类。

4.2.4.3 - ExecutionStartedEvent事件

核心编程模型的 ExecutionStartedEvent 事件

包含属性:

  • string EventId
  • string Input
  • EventType EventType
  • ParentInstance ParentInstance
  • string Name
  • string Version:可以复用
  • IDictionary<string, string> Tags
  • string Correlation
  • DistributedTraceContext ParentTraceContext
  • DateTime ScheduledStartTime
  • int Generation

4.2.4.4 - OrchestratorStartedEvent事件

核心编程模型的 OrchestratorStartedEvent 事件

包含属性:

  • string EventId
  • EventType EventType

4.3 - Azure Storage

Azure Storage 后端持久性存储实现

5 - durable functions

durable functions

5.1 - durable functions概述

durable functions概述

Durable Functions 是 Azure Functions 的一个扩展,可用于在无服务器计算环境中编写有状态函数。

在该扩展中,可以通过编写业务流程协调程序函数和有状态实体并使用 Azure Functions 编程模型编写实体函数,来定义有状态工作流。 在幕后,该扩展可以管理状态、检查点和重启,使你可以专注于业务逻辑。

文档:

5.2 - durable functions quickstart

durable functions quickstart

参考:

文档

Durable Functions 是 Azure Functions 的扩展,可让您在无服务器环境中编写有状态函数。该扩展可为您管理状态、检查点和重启。

在本文中,您将学习如何使用 Visual Studio Code 本地创建和测试 “hello world “持久函数。该函数协调和连锁调用其他函数。然后,您就可以将函数代码发布到 Azure。这些工具是 Visual Studio Code Azure 函数扩展的一部分。

前提条件

要完成本教程

  • 安装 Visual Studio Code。

  • 安装以下 Visual Studio Code 扩展:

    • Azure 函数

    • C#

  • 确保拥有最新版本的 Azure Functions 核心工具。

  • Durable Functions 需要一个 Azure 存储账户。你需要一个 Azure 订阅。

  • 确保安装了 3.1 版或更高版本的 .NET Core SDK。

实操

准备工作

安装 vs code 插件

  • Azure Functions

  • C#

安装 Azure Functions Core Tools

https://go.microsoft.com/fwlink/?linkid=2174087

安装完成之后,用 windows 的 cmd 就可以使用 func 命令了:

func --version
4.0.5611

此时用 git bash 或者 zsh 也能使用 func 命令:

which func
/c/Program Files/Microsoft/Azure Functions Core Tools/func
func --version
4.0.5611

但是,vs code 下的 shell 不能访问 func。需要 修改 zsh

vi ~/.zshrc

加入 PATH

# azure functions
export PATH=$PATH:"/c/Program Files/Microsoft/Azure Functions Core Tools/"

source ~/.zshrc 就可以了。

创建本地项目

参考:

https://learn.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-create-first-csharp?pivots=code-editor-vscode#create-an-azure-functions-project

添加 function 到应用

https://learn.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-create-first-csharp?pivots=code-editor-vscode#add-functions-to-the-app

报错:

Detected package downgrade: Microsoft.Azure.Functions.Worker.Extensions.Http from 3.1.0 to 3.0.13. Reference the package directly from the project to select a different version.  

修改 mydurablefunctions1.csproj 文件,将

<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.0.13" />

修改为:

<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.1.0" />

关闭 vs code,再重新打开这个项目。

然后就是弹出 select a storage account 时,选择了 azure storage account,但是很奇怪没有弹出文档中所说的各种提示。导致无法按照文档操作,只能手工创建 azure storage account

手工创建 azure storage account

打开 azure 页面,手工创建 azure storage account:

  • resource group: versioning-dev

  • storage account name: skyversioningdev

  • Region: east asia

创建完成后,打开 “Accdess keys”,就能看到key,类似:

DefaultEndpointsProtocol=https;AccountName=skyversioningdev;AccountKey=5dx xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxEvy2+AStFSJ8zg==;EndpointSuffix=core.windows.net

修改 local.settings.json 文件,将原本为空的 AzureWebJobsStorage 设置为:

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "DefaultEndpointsProtocol=https;AccountName=skyversioningdev;AccountKey=5dx xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxEvy2+AStFSJ8zg==;EndpointSuffix=core.windows.net",
    "FUNCTIONS_WORKER_RUNTIME": "dotnet-isolated"
  }
}

手工添加 DurableTask package

备注:忘了为什么要加这个依赖了,奇怪

然后再手工添加 DurableTask package 到项目中,执行命令:

dotnet add package Microsoft.Azure.WebJobs.Extensions.DurableTask --version 2.13.1

这时打开 mydurablefunctions1.csproj 就能看到了:

  <ItemGroup>
    <FrameworkReference Include="Microsoft.AspNetCore.App" />
    <PackageReference Include="Microsoft.Azure.Functions.Worker" Version="1.20.1" />
    <PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask" Version="1.0.0" />
    <PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.1.0" />
    <PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore" Version="1.2.0" />
    <PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" Version="1.16.4" />
    <PackageReference Include="Microsoft.ApplicationInsights.WorkerService" Version="2.21.0" />
    <PackageReference Include="Microsoft.Azure.Functions.Worker.ApplicationInsights" Version="1.1.0" />
    <PackageReference Include="Microsoft.Azure.WebJobs.Extensions.DurableTask" Version="2.13.1" />
  </ItemGroup>

debug

修改 .vscode\launch.json ,增加 justMyCode=false 的配置:

{
  "version": "0.2.0",
  "configurations": [
    {
      "name": "HelloOrchestration",
      "type": "coreclr",
      "request": "attach",
      "processId": "${command:azureFunctions.pickProcess}",
      "justMyCode": false,
    }
  ]
}

备注:

  • 如果启动 debug 时遇到报错,提示 func start 命令失败,请重启 vs code 再试

改用 async api 并修改依赖版本

运行时报错:

Azure Functions Core Tools
Core Tools Version:       4.0.5611 Commit hash: N/A +591b8aec842e333a87ea9e23ba390bb5effe0655 (64-bit)
Function Runtime Version: 4.31.1.22191

[2024-04-12T06:48:22.103Z] Found C:\Users\sky\work\code\durabletask-fork2\MyDurableFunction2\MyDurableFunction2.csproj. Using for user secrets file configuration.
[2024-04-12T06:48:24.667Z] Worker process started and initialized.

Functions:

        HelloOrchestration_HttpStart: [GET,POST] http://localhost:7071/api/HelloOrchestration_HttpStart

        HelloOrchestration: orchestrationTrigger

        SayHello: activityTrigger

For detailed output, run func with --verbose flag.
[2024-04-12T06:48:30.227Z] Host lock lease acquired by instance ID '00000000000000000000000000EA9F9B'.
[2024-04-12T06:49:31.664Z] Executing 'Functions.HelloOrchestration_HttpStart' (Reason='This function was programmatically called via the host APIs.', Id=8785cd2a-89e9-496c-bb9b-6cfcb7be39be)
[2024-04-12T06:49:32.010Z] Scheduling new HelloOrchestration orchestration with instance ID '94153d04204a4081802ebaa3642210c3' and 0 bytes of input data.
[2024-04-12T06:49:32.365Z] Started orchestration with ID = '94153d04204a4081802ebaa3642210c3'.
[2024-04-12T06:49:32.432Z] Function 'HelloOrchestration_HttpStart', Invocation id '8785cd2a-89e9-496c-bb9b-6cfcb7be39be': An exception was thrown by the invocation.
[2024-04-12T06:49:32.432Z] Result: Function 'HelloOrchestration_HttpStart', Invocation id '8785cd2a-89e9-496c-bb9b-6cfcb7be39be': An exception was thrown by the invocation.
Exception: System.InvalidOperationException: Synchronous operations are disallowed. Call WriteAsync or set AllowSynchronousIO to true instead.
[2024-04-12T06:49:32.433Z]    at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http.HttpResponseStream.Write(Byte[] buffer, Int32 offset, Int32 count)
[2024-04-12T06:49:32.433Z]    at Azure.Core.Serialization.JsonObjectSerializer.Serialize(Stream stream, Object value, Type inputType, CancellationToken 
cancellationToken)
[2024-04-12T06:49:32.434Z]    at Microsoft.Azure.Functions.Worker.DurableTaskClientExtensions.CreateCheckStatusResponse(DurableTaskClient client, HttpRequestData request, String instanceId, HttpStatusCode statusCode, CancellationToken cancellation) in /_/src/Worker.Extensions.DurableTask/DurableTaskClientExtensions.cs:line 97
[2024-04-12T06:49:32.435Z]    at Microsoft.Azure.Functions.Worker.DurableTaskClientExtensions.CreateCheckStatusResponse(DurableTaskClient client, HttpRequestData request, String instanceId, CancellationToken cancellation) in /_/src/Worker.Extensions.DurableTask/DurableTaskClientExtensions.cs:line 34    
[2024-04-12T06:49:32.435Z]    at Company.Function.HelloOrchestration.HttpStart(HttpRequestData req, DurableTaskClient client, FunctionContext executionContext) in C:\Users\sky\work\code\durabletask-fork2\MyDurableFunction2\HelloOrchestration.cs:line 52
[2024-04-12T06:49:32.435Z]    at MyDurableFunction2.DirectFunctionExecutor.ExecuteAsync(FunctionContext context) in C:\Users\sky\work\code\durabletask-fork2\MyDurableFunction2\Microsoft.Azure.Functions.Worker.Sdk.Generators\Microsoft.Azure.Functions.Worker.Sdk.Generators.FunctionExecutorGenerator\GeneratedFunctionExecutor.g.cs:line 40
[2024-04-12T06:49:32.436Z]    at Microsoft.Azure.Functions.Worker.OutputBindings.OutputBindingsMiddleware.Invoke(FunctionContext context, FunctionExecutionDelegate next) in D:\a\_work\1\s\src\DotNetWorker.Core\OutputBindings\OutputBindingsMiddleware.cs:line 13
[2024-04-12T06:49:32.437Z]    at Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore.FunctionsHttpProxyingMiddleware.Invoke(FunctionContext context, FunctionExecutionDelegate next) in D:\a\_work\1\s\extensions\Worker.Extensions.Http.AspNetCore\src\FunctionsMiddleware\FunctionsHttpProxyingMiddleware.cs:line 48
[2024-04-12T06:49:32.437Z]    at Microsoft.Azure.Functions.Worker.Extensions.DurableTask.DurableTaskFunctionsMiddleware.Invoke(FunctionContext functionContext, FunctionExecutionDelegate next) in /_/src/Worker.Extensions.DurableTask/DurableTaskFunctionsMiddleware.cs:line 22
[2024-04-12T06:49:32.438Z]    at Microsoft.Azure.Functions.Worker.FunctionsApplication.InvokeFunctionAsync(FunctionContext context) in D:\a\_work\1\s\src\DotNetWorker.Core\FunctionsApplication.cs:line 77
Stack:    at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http.HttpResponseStream.Write(Byte[] buffer, Int32 offset, Int32 count)
[2024-04-12T06:49:32.438Z]    at Azure.Core.Serialization.JsonObjectSerializer.Serialize(Stream stream, Object value, Type inputType, CancellationToken 
cancellationToken)
[2024-04-12T06:49:32.439Z]    at Microsoft.Azure.Functions.Worker.DurableTaskClientExtensions.CreateCheckStatusResponse(DurableTaskClient client, HttpRequestData request, String instanceId, HttpStatusCode statusCode, CancellationToken cancellation) in /_/src/Worker.Extensions.DurableTask/DurableTaskClientExtensions.cs:line 97
[2024-04-12T06:49:32.439Z]    at Microsoft.Azure.Functions.Worker.DurableTaskClientExtensions.CreateCheckStatusResponse(DurableTaskClient client, HttpRequestData request, String instanceId, CancellationToken cancellation) in /_/src/Worker.Extensions.DurableTask/DurableTaskClientExtensions.cs:line 34    
[2024-04-12T06:49:32.440Z]    at Company.Function.HelloOrchestration.HttpStart(HttpRequestData req, DurableTaskClient client, FunctionContext executionContext) in C:\Users\sky\work\code\durabletask-fork2\MyDurableFunction2\HelloOrchestration.cs:line 52
[2024-04-12T06:49:32.440Z]    at MyDurableFunction2.DirectFunctionExecutor.ExecuteAsync(FunctionContext context) in C:\Users\sky\work\code\durabletask-fork2\MyDurableFunction2\Microsoft.Azure.Functions.Worker.Sdk.Generators\Microsoft.Azure.Functions.Worker.Sdk.Generators.FunctionExecutorGenerator\GeneratedFunctionExecutor.g.cs:line 40
[2024-04-12T06:49:32.441Z]    at Microsoft.Azure.Functions.Worker.OutputBindings.OutputBindingsMiddleware.Invoke(FunctionContext context, FunctionExecutionDelegate next) in D:\a\_work\1\s\src\DotNetWorker.Core\OutputBindings\OutputBindingsMiddleware.cs:line 13
[2024-04-12T06:49:32.441Z]    at Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore.FunctionsHttpProxyingMiddleware.Invoke(FunctionContext context, FunctionExecutionDelegate next) in D:\a\_work\1\s\extensions\Worker.Extensions.Http.AspNetCore\src\FunctionsMiddleware\FunctionsHttpProxyingMiddleware.cs:line 48
[2024-04-12T06:49:32.442Z]    at Microsoft.Azure.Functions.Worker.Extensions.DurableTask.DurableTaskFunctionsMiddleware.Invoke(FunctionContext functionContext, FunctionExecutionDelegate next) in /_/src/Worker.Extensions.DurableTask/DurableTaskFunctionsMiddleware.cs:line 22
[2024-04-12T06:49:32.443Z]    at Microsoft.Azure.Functions.Worker.FunctionsApplication.InvokeFunctionAsync(FunctionContext context) in D:\a\_work\1\s\src\DotNetWorker.Core\FunctionsApplication.cs:line 77.
[2024-04-12T06:49:32.484Z] Executed 'Functions.HelloOrchestration_HttpStart' (Failed, Id=8785cd2a-89e9-496c-bb9b-6cfcb7be39be, Duration=840ms)
[2024-04-12T06:49:32.485Z] System.Private.CoreLib: Exception while executing function: Functions.HelloOrchestration_HttpStart. System.Private.CoreLib: Result: Failure
Exception: System.InvalidOperationException: Synchronous operations are disallowed. Call WriteAsync or set AllowSynchronousIO to true instead.
[2024-04-12T06:49:32.485Z]    at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http.HttpResponseStream.Write(Byte[] buffer, Int32 offset, Int32 count)
[2024-04-12T06:49:32.485Z]    at Azure.Core.Serialization.JsonObjectSerializer.Serialize(Stream stream, Object value, Type inputType, CancellationToken 
cancellationToken)
[2024-04-12T06:49:32.486Z]    at Microsoft.Azure.Functions.Worker.DurableTaskClientExtensions.CreateCheckStatusResponse(DurableTaskClient client, HttpRequestData request, String instanceId, HttpStatusCode statusCode, CancellationToken cancellation) in /_/src/Worker.Extensions.DurableTask/DurableTaskClientExtensions.cs:line 97
[2024-04-12T06:49:32.486Z]    at Microsoft.Azure.Functions.Worker.DurableTaskClientExtensions.CreateCheckStatusResponse(DurableTaskClient client, HttpRequestData request, String instanceId, CancellationToken cancellation) in /_/src/Worker.Extensions.DurableTask/DurableTaskClientExtensions.cs:line 34    
[2024-04-12T06:49:32.486Z]    at Company.Function.HelloOrchestration.HttpStart(HttpRequestData req, DurableTaskClient client, FunctionContext executionContext) in C:\Users\sky\work\code\durabletask-fork2\MyDurableFunction2\HelloOrchestration.cs:line 52
[2024-04-12T06:49:32.486Z]    at MyDurableFunction2.DirectFunctionExecutor.ExecuteAsync(FunctionContext context) in C:\Users\sky\work\code\durabletask-fork2\MyDurableFunction2\Microsoft.Azure.Functions.Worker.Sdk.Generators\Microsoft.Azure.Functions.Worker.Sdk.Generators.FunctionExecutorGenerator\GeneratedFunctionExecutor.g.cs:line 40
[2024-04-12T06:49:32.487Z]    at Microsoft.Azure.Functions.Worker.OutputBindings.OutputBindingsMiddleware.Invoke(FunctionContext context, FunctionExecutionDelegate next) in D:\a\_work\1\s\src\DotNetWorker.Core\OutputBindings\OutputBindingsMiddleware.cs:line 13
[2024-04-12T06:49:32.487Z]    at Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore.FunctionsHttpProxyingMiddleware.Invoke(FunctionContext context, FunctionExecutionDelegate next) in D:\a\_work\1\s\extensions\Worker.Extensions.Http.AspNetCore\src\FunctionsMiddleware\FunctionsHttpProxyingMiddleware.cs:line 48
[2024-04-12T06:49:32.487Z]    at Microsoft.Azure.Functions.Worker.Extensions.DurableTask.DurableTaskFunctionsMiddleware.Invoke(FunctionContext functionContext, FunctionExecutionDelegate next) in /_/src/Worker.Extensions.DurableTask/DurableTaskFunctionsMiddleware.cs:line 22
[2024-04-12T06:49:32.488Z]    at Microsoft.Azure.Functions.Worker.FunctionsApplication.InvokeFunctionAsync(FunctionContext context) in D:\a\_work\1\s\src\DotNetWorker.Core\FunctionsApplication.cs:line 77
[2024-04-12T06:49:32.488Z]    at Microsoft.Azure.Functions.Worker.Handlers.InvocationHandler.InvokeAsync(InvocationRequest request) in D:\a\_work\1\s\src\DotNetWorker.Grpc\Handlers\InvocationHandler.cs:line 88
Stack:    at Microsoft.AspNetCore.Server.Kestrel.Core.Internal.Http.HttpResponseStream.Write(Byte[] buffer, Int32 offset, Int32 count)
[2024-04-12T06:49:32.488Z]    at Azure.Core.Serialization.JsonObjectSerializer.Serialize(Stream stream, Object value, Type inputType, CancellationToken 
cancellationToken)
[2024-04-12T06:49:32.489Z]    at Microsoft.Azure.Functions.Worker.DurableTaskClientExtensions.CreateCheckStatusResponse(DurableTaskClient client, HttpRequestData request, String instanceId, HttpStatusCode statusCode, CancellationToken cancellation) in /_/src/Worker.Extensions.DurableTask/DurableTaskClientExtensions.cs:line 97
[2024-04-12T06:49:32.489Z]    at Microsoft.Azure.Functions.Worker.DurableTaskClientExtensions.CreateCheckStatusResponse(DurableTaskClient client, HttpRequestData request, String instanceId, CancellationToken cancellation) in /_/src/Worker.Extensions.DurableTask/DurableTaskClientExtensions.cs:line 34    
[2024-04-12T06:49:32.489Z]    at Company.Function.HelloOrchestration.HttpStart(HttpRequestData req, DurableTaskClient client, FunctionContext executionContext) in C:\Users\sky\work\code\durabletask-fork2\MyDurableFunction2\HelloOrchestration.cs:line 52
[2024-04-12T06:49:32.490Z]    at MyDurableFunction2.DirectFunctionExecutor.ExecuteAsync(FunctionContext context) in C:\Users\sky\work\code\durabletask-fork2\MyDurableFunction2\Microsoft.Azure.Functions.Worker.Sdk.Generators\Microsoft.Azure.Functions.Worker.Sdk.Generators.FunctionExecutorGenerator\GeneratedFunctionExecutor.g.cs:line 40
[2024-04-12T06:49:32.490Z]    at Microsoft.Azure.Functions.Worker.OutputBindings.OutputBindingsMiddleware.Invoke(FunctionContext context, FunctionExecutionDelegate next) in D:\a\_work\1\s\src\DotNetWorker.Core\OutputBindings\OutputBindingsMiddleware.cs:line 13
[2024-04-12T06:49:32.490Z]    at Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore.FunctionsHttpProxyingMiddleware.Invoke(FunctionContext context, FunctionExecutionDelegate next) in D:\a\_work\1\s\extensions\Worker.Extensions.Http.AspNetCore\src\FunctionsMiddleware\FunctionsHttpProxyingMiddleware.cs:line 48
[2024-04-12T06:49:32.491Z]    at Microsoft.Azure.Functions.Worker.Extensions.DurableTask.DurableTaskFunctionsMiddleware.Invoke(FunctionContext functionContext, FunctionExecutionDelegate next) in /_/src/Worker.Extensions.DurableTask/DurableTaskFunctionsMiddleware.cs:line 22
[2024-04-12T06:49:32.491Z]    at Microsoft.Azure.Functions.Worker.FunctionsApplication.InvokeFunctionAsync(FunctionContext context) in D:\a\_work\1\s\src\DotNetWorker.Core\FunctionsApplication.cs:line 77
[2024-04-12T06:49:32.491Z]    at Microsoft.Azure.Functions.Worker.Handlers.InvocationHandler.InvokeAsync(InvocationRequest request) in D:\a\_work\1\s\src\DotNetWorker.Grpc\Handlers\InvocationHandler.cs:line 88.

意思是自动生成的代码中:

return client.CreateCheckStatusResponse(req, instanceId);

这里不应该用同步(sync)api,而应该改用异步 (async)api:

Exception: System.InvalidOperationException: Synchronous operations are disallowed. Call WriteAsync or set AllowSynchronousIO to true instead.

将这段代码修改为:

return await client.CreateCheckStatusResponseAsync(req, instanceId);

编译失败,没有这个API,这是 Microsoft.Azure.Functions.Worker.Extensions.DurableTask 依赖版本太低的问题。修改 MyDurableFunction1.csproj 文件:

    <PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask" Version="1.1.2" />

这下终于可以正常的跑起来这个 quickstart 了。

[2024-04-12T07:02:33.790Z] Executing 'Functions.HelloOrchestration_HttpStart' (Reason='This function was programmatically called via the host APIs.', Id=fa30ce71-7f16-4dd7-a640-df39f29158e7)
[2024-04-12T07:02:34.055Z] Scheduling new HelloOrchestration orchestration with instance ID '1a01031830e8409eaa09d14b27c7e681' and 0 bytes of input data.
[2024-04-12T07:02:34.716Z] Host lock lease acquired by instance ID '00000000000000000000000000EA9F9B'.
[2024-04-12T07:02:34.831Z] Started orchestration with ID = '1a01031830e8409eaa09d14b27c7e681'.
[2024-04-12T07:02:34.864Z] Executed 'Functions.HelloOrchestration_HttpStart' (Succeeded, Id=fa30ce71-7f16-4dd7-a640-df39f29158e7, Duration=1095ms)
[2024-04-12T07:02:39.126Z] Executing 'Functions.HelloOrchestration' (Reason='(null)', Id=c02f27dd-a472-448d-a3d9-918cae4febbf)
[2024-04-12T07:02:39.319Z] Saying hello.
[2024-04-12T07:02:39.335Z] Executed 'Functions.HelloOrchestration' (Succeeded, Id=c02f27dd-a472-448d-a3d9-918cae4febbf, Duration=215ms)
[2024-04-12T07:02:39.516Z] Executing 'Functions.SayHello' (Reason='(null)', Id=3ff9d7fa-1b1a-4ae6-8846-d41670f3c53d)
[2024-04-12T07:02:39.524Z] Saying hello to Tokyo.
[2024-04-12T07:02:39.525Z] Executed 'Functions.SayHello' (Succeeded, Id=3ff9d7fa-1b1a-4ae6-8846-d41670f3c53d, Duration=10ms)
[2024-04-12T07:02:39.894Z] Executing 'Functions.HelloOrchestration' (Reason='(null)', Id=6ef4aea3-2366-4dcc-951c-b96e43fd62a2)
[2024-04-12T07:02:39.906Z] Executed 'Functions.HelloOrchestration' (Succeeded, Id=6ef4aea3-2366-4dcc-951c-b96e43fd62a2, Duration=12ms)
[2024-04-12T07:02:40.016Z] Executing 'Functions.SayHello' (Reason='(null)', Id=b232d3c6-98d7-4299-9224-ff7a87d9ad9c)
[2024-04-12T07:02:40.042Z] Saying hello to Seattle.
[2024-04-12T07:02:40.043Z] Executed 'Functions.SayHello' (Succeeded, Id=b232d3c6-98d7-4299-9224-ff7a87d9ad9c, Duration=28ms)
[2024-04-12T07:02:40.424Z] Executing 'Functions.HelloOrchestration' (Reason='(null)', Id=d7dd4d04-1b1c-41e2-b47f-51459792cd4a)
[2024-04-12T07:02:40.442Z] Executed 'Functions.HelloOrchestration' (Succeeded, Id=d7dd4d04-1b1c-41e2-b47f-51459792cd4a, Duration=19ms)
[2024-04-12T07:02:40.614Z] Executing 'Functions.SayHello

5.3 - trigger

azure functions trigger

6 - durabletask源码学习

学习durabletask项目的源码

7 - durabletaskk-dotnet源码学习

学习durabletask-dotnet项目的源码

8 - azure-functions-durable-extension源码学习

学习azure-functions-durable-extension项目的源码

8.1 - client

DurableTask client 的流程源码分析

8.1.1 - client initial

DurableTask client 初始化的源码分析

8.1.2 - client start new instance

DurableTask client 启动新的实例时的源码分析

8.1.2.1 - 调用堆栈

DurableTask client 启动新的实例时的调用堆栈

调用堆栈概况

Azure function client sdk code

GrpcWorkerClientFactory

Azure/azure-functions-dotnet-worker 项目下的 GrpcWorkerClientFactory

            public async Task StartAsync(CancellationToken token)
            {
                if (_running)
                {
                    throw new InvalidOperationException($"The client is already running. Multiple calls to {nameof(StartAsync)} are not supported.");
                }

                _running = true;

                var eventStream = _grpcClient.EventStream(cancellationToken: token);

                await SendStartStreamMessageAsync(eventStream.RequestStream);

                _ = StartWriterAsync(eventStream.RequestStream);
                _ = StartReaderAsync(eventStream.ResponseStream);
            }

其中 StartReaderAsync 的实现是:

            private async Task StartReaderAsync(IAsyncStreamReader<StreamingMessage> responseStream)
            {
                while (await responseStream.MoveNext())
                {
                    await _processor!.ProcessMessageAsync(responseStream.Current);
                }
            }

GrpcWorker

        Task IMessageProcessor.ProcessMessageAsync(StreamingMessage message)
        {
            // Dispatch and return.
            Task.Run(() => ProcessRequestCoreAsync(message));

            return Task.CompletedTask;
        }

后面就是异步操作了:

 private async Task ProcessRequestCoreAsync(StreamingMessage request)
        {
            StreamingMessage responseMessage = new StreamingMessage
            {
                RequestId = request.RequestId
            };

            switch (request.ContentCase)
            {
                case MsgType.InvocationRequest:
                    responseMessage.InvocationResponse = await InvocationRequestHandlerAsync(request.InvocationRequest);
                    break;

                case MsgType.WorkerInitRequest:
                    responseMessage.WorkerInitResponse = WorkerInitRequestHandler(request.WorkerInitRequest, _workerOptions);
                    break;

                case MsgType.WorkerStatusRequest:
                    responseMessage.WorkerStatusResponse = new WorkerStatusResponse();
                    break;

                case MsgType.FunctionsMetadataRequest:
                    responseMessage.FunctionMetadataResponse = await GetFunctionMetadataAsync(request.FunctionsMetadataRequest.FunctionAppDirectory);
                    break;

                case MsgType.WorkerTerminate:
                    WorkerTerminateRequestHandler(request.WorkerTerminate);
                    break;

                case MsgType.FunctionLoadRequest:
                    responseMessage.FunctionLoadResponse = FunctionLoadRequestHandler(request.FunctionLoadRequest, _application, _methodInfoLocator);
                    break;

                case MsgType.FunctionEnvironmentReloadRequest:
                    responseMessage.FunctionEnvironmentReloadResponse = EnvironmentReloadRequestHandler(_workerOptions);
                    break;

                case MsgType.InvocationCancel:
                    InvocationCancelRequestHandler(request.InvocationCancel);
                    break;

                default:
                    // TODO: Trace failure here.
                    return;
            }

            await _workerClient!.SendMessageAsync(responseMessage);
        }

InvocationHandler

public async Task<InvocationResponse> InvokeAsync(InvocationRequest request)
        {
            using CancellationTokenSource cancellationTokenSource = new();
            FunctionContext? context = null;
            InvocationResponse response = new()
            {
                InvocationId = request.InvocationId,
                Result = new StatusResult()
            };

            if (!_inflightInvocations.TryAdd(request.InvocationId, cancellationTokenSource))
            {
                var exception = new InvalidOperationException("Unable to track CancellationTokenSource");
                response.Result.Status = StatusResult.Types.Status.Failure;
                response.Result.Exception = exception.ToRpcException();

                return response;
            }

            try
            {
                var invocation = new GrpcFunctionInvocation(request);

                IInvocationFeatures invocationFeatures = _invocationFeaturesFactory.Create();
                invocationFeatures.Set<FunctionInvocation>(invocation);
                invocationFeatures.Set<IExecutionRetryFeature>(invocation);

                context = _application.CreateContext(invocationFeatures, cancellationTokenSource.Token);
                invocationFeatures.Set<IFunctionBindingsFeature>(new GrpcFunctionBindingsFeature(context, request, _outputBindingsInfoProvider));

                if (_inputConversionFeatureProvider.TryCreate(typeof(DefaultInputConversionFeature), out var conversion))
                {
                    invocationFeatures.Set<IInputConversionFeature>(conversion!);
                }

                await _application.InvokeFunctionAsync(context);

                var serializer = _workerOptions.Serializer!;
                var functionBindings = context.GetBindings();

                foreach (var binding in functionBindings.OutputBindingData)
                {
                    var parameterBinding = new ParameterBinding
                    {
                        Name = binding.Key
                    };

                    if (binding.Value is not null)
                    {
                        parameterBinding.Data = await binding.Value.ToRpcAsync(serializer);
                    }

                    response.OutputData.Add(parameterBinding);
                }

                if (functionBindings.InvocationResult is not null)
                {
                    TypedData? returnVal = await functionBindings.InvocationResult.ToRpcAsync(serializer);
                    response.ReturnValue = returnVal;
                }

                response.Result.Status = StatusResult.Types.Status.Success;
            }
            catch (Exception ex)
            {
                response.Result.Exception = _workerOptions.EnableUserCodeException ? ex.ToUserRpcException() : ex.ToRpcException();
                response.Result.Status = StatusResult.Types.Status.Failure;

                if (ex.InnerException is TaskCanceledException or OperationCanceledException)
                {
                    response.Result.Status = StatusResult.Types.Status.Cancelled;
                }
            }
            finally
            {
                _inflightInvocations.TryRemove(request.InvocationId, out var cts);

                if (context is IAsyncDisposable asyncContext)
                {
                    await asyncContext.DisposeAsync();
                }

                (context as IDisposable)?.Dispose();
            }

            return response;
        }

FunctionsApplication

        public async Task InvokeFunctionAsync(FunctionContext context)
        {
            var scope = new FunctionInvocationScope(context.FunctionDefinition.Name, context.InvocationId);

            using var logScope = _logger.BeginScope(scope);
            using Activity? invokeActivity = _functionActivitySourceFactory.StartInvoke(context);

            try
            {
                await _functionExecutionDelegate(context);
            }
            catch (Exception ex)
            {
                invokeActivity?.SetStatus(ActivityStatusCode.Error, ex.Message);

                Log.InvocationError(_logger, context.FunctionDefinition.Name, context.InvocationId, ex);

                throw;
            }
        }

FunctionsHttpProxyingMiddleware

Azure/azure-functions-dotnet-worker 项目下的 OutputBindingsMiddleware

        public async Task Invoke(FunctionContext context, FunctionExecutionDelegate next)
        {
            // Only use the coordinator for HttpTriggers
            if (!_isHttpTrigger.GetOrAdd(context.FunctionId, static (_, c) => IsHttpTriggerFunction(c), context))
            {
                await next(context);
                return;
            }

            var invocationId = context.InvocationId;

            // this call will block until the ASP.NET middleware pipeline has signaled that it's ready to run the function
            var httpContext = await _coordinator.SetFunctionContextAsync(invocationId, context);

            AddHttpContextToFunctionContext(context, httpContext);

            // Register additional context features
            context.Features.Set<IFromBodyConversionFeature>(FromBodyConverstionFeature.Instance);

            await next(context);

            var invocationResult = context.GetInvocationResult();
            ......
        }

OutputBindingsMiddleware

Azure/azure-functions-dotnet-worker 项目下的 OutputBindingsMiddleware

        public static async Task Invoke(FunctionContext context, FunctionExecutionDelegate next)
        {
            await next(context);

            AddOutputBindings(context);
        }

调用 next(context) 方法:

Work.Sdk.Generator.GeneratedFunctionExecutor

被 GeneratedFunctionExecutor.g.cs 调用:

        public async ValueTask ExecuteAsync(FunctionContext context)
        {
            var inputBindingFeature = context.Features.Get<IFunctionInputBindingFeature>();
            var inputBindingResult = await inputBindingFeature.BindFunctionInputAsync(context);
            var inputArguments = inputBindingResult.Values;

            if (string.Equals(context.FunctionDefinition.EntryPoint, "Company.Function.HelloOrchestration.RunOrchestrator", StringComparison.Ordinal))
            {
                context.GetInvocationResult().Value = await global::Company.Function.HelloOrchestration.RunOrchestrator((global::Microsoft.DurableTask.TaskOrchestrationContext)inputArguments[0]);
            }
            else if (string.Equals(context.FunctionDefinition.EntryPoint, "Company.Function.HelloOrchestration.SayHello", StringComparison.Ordinal))
            {
                context.GetInvocationResult().Value = global::Company.Function.HelloOrchestration.SayHello((string)inputArguments[0], (global::Microsoft.Azure.Functions.Worker.FunctionContext)inputArguments[1]);
            }
            else if (string.Equals(context.FunctionDefinition.EntryPoint, "Company.Function.HelloOrchestration.HttpStart", StringComparison.Ordinal))
            {
                context.GetInvocationResult().Value = await global::Company.Function.HelloOrchestration.HttpStart((global::Microsoft.Azure.Functions.Worker.Http.HttpRequestData)inputArguments[0], (global::Microsoft.DurableTask.Client.DurableTaskClient)inputArguments[1], (global::Microsoft.Azure.Functions.Worker.FunctionContext)inputArguments[2]);
            }
        }

通过检查 context.FunctionDefinition.EntryPoint 的值,如果为以下值时,则分别调用对应的 function:

context.FunctionDefinition.EntryPoint 的值 function funciton source code
“Company.Function.HelloOrchestration.SayHello” HelloOrchestration.RunOrchestrator() [Function(nameof(HelloOrchestration))]
“Company.Function.HelloOrchestration.RunOrchestrator” HelloOrchestration.SayHello() [Function(nameof(SayHello))]
“Company.Function.HelloOrchestration.HttpStart” HelloOrchestration.HttpStart() [Function(“HelloOrchestration_HttpStart”)]

这里将调用 Company.Function.HelloOrchestration.HttpStart() 方法:

Customer Code

HelloOrchestration

以 quickstart HelloOrchestration.cs 为例:

        public static async Task<HttpResponseData> HttpStart(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req,
            [DurableClient] DurableTaskClient client,
            FunctionContext executionContext)
        {
            string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
                nameof(HelloOrchestration));
            ......
        }

Azure functions durable extension的代码

FunctionsDurableTaskClient

FunctionsDurableTaskClient 的 ScheduleNewOrchestrationInstanceAsync() 实现:

	private readonly DurableTaskClient inner;    

	public override Task<string> ScheduleNewOrchestrationInstanceAsync(
        TaskName orchestratorName,
        object? input = null,
        StartOrchestrationOptions? options = null,
        CancellationToken cancellation = default)
    {
        return this.inner.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input, options, cancellation);
    }

调用了内部的 inner 的 ScheduleNewOrchestrationInstanceAsync()方法, inner 定义的类型是 DurableTaskClient,实际的代码实现是类 GrpcDurableTaskClient

GrpcDurableTaskClient

GrpcDurableTaskClient 的 ScheduleNewOrchestrationInstanceAsync() 方法实现:

readonly TaskHubSidecarServiceClient sidecarClient;

public override async Task<string> ScheduleNewOrchestrationInstanceAsync(
        TaskName orchestratorName,
        object? input = null,
        StartOrchestrationOptions? options = null,
        CancellationToken cancellation = default)
    {
        Check.NotEntity(this.options.EnableEntitySupport, options?.InstanceId);

        var request = new P.CreateInstanceRequest
        {
            // 这里的 orchestratorName.Name 的值是 "HelloOrchestration"
            Name = orchestratorName.Name,
            // 这里的 version 是一个 null
            Version = orchestratorName.Version,
            InstanceId = options?.InstanceId ?? Guid.NewGuid().ToString("N"),
            // input 为 null
            Input = this.DataConverter.Serialize(input),
        };

        DateTimeOffset? startAt = options?.StartAt;
        this.logger.SchedulingOrchestration(
            request.InstanceId,
            orchestratorName,
            sizeInBytes: request.Input != null ? Encoding.UTF8.GetByteCount(request.Input) : 0,
            startAt.GetValueOrDefault(DateTimeOffset.UtcNow));

        if (startAt.HasValue)
        {
            // Convert timestamps to UTC if not already UTC
            request.ScheduledStartTimestamp = Timestamp.FromDateTimeOffset(startAt.Value.ToUniversalTime());
        }

        P.CreateInstanceResponse? result = await this.sidecarClient.StartInstanceAsync(
            request, cancellationToken: cancellation);
        return result.InstanceId;
    }

最后调用 sidecarClient 的 StartInstanceAsync() 方法发出 grpc 请求并得到返回的结果。

Durabletask-dotnet的代码

TaskHubSidecarServiceClient

Microsoft.DurableTask.Protobuf.TaskHubSidecarService.TaskHubSidecarServiceClient

sidecarClient 定义为 TaskHubSidecarServiceClient 类型,这是根据 protobuf 文件生成的 grpc client 代码:

      public virtual grpc::AsyncUnaryCall<global::Microsoft.DurableTask.Protobuf.CreateInstanceResponse> StartInstanceAsync(global::Microsoft.DurableTask.Protobuf.CreateInstanceRequest request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
      {
        return StartInstanceAsync(request, new grpc::CallOptions(headers, deadline, cancellationToken));
      }

protobuf 定义

在 Durabletask-dotnet 项目中的eng\proto\protos\orchestrator_service.proto 文件中定义了 StartInstance() 方法:

service TaskHubSidecarService {
	......
    // Starts a new orchestration instance.
    rpc StartInstance(CreateInstanceRequest) returns (CreateInstanceResponse);
    ......
}

message CreateInstanceRequest {
    string instanceId = 1;
    string name = 2;
    google.protobuf.StringValue version = 3;
    google.protobuf.StringValue input = 4;
    google.protobuf.Timestamp scheduledStartTimestamp = 5;
    OrchestrationIdReusePolicy orchestrationIdReusePolicy = 6;
}

message CreateInstanceResponse {
    string instanceId = 1;
    string version = 2;
}

8.1.2.2 - FunctionsDurableTaskClient

FunctionsDurableTaskClient的源码

client 调用的开始:

        public static async Task<HttpResponseData> HttpStart(
            [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post")] HttpRequestData req,
            [DurableClient] DurableTaskClient client,
            FunctionContext executionContext)
        {
          ......
string instanceId = await client.ScheduleNewOrchestrationInstanceAsync(
                nameof(HelloOrchestration));
          ......
        }

client 类型是 DurableTaskClient,实际实现的类是 FunctionsDurableTaskClient。代码在 azure-functions-durable-extension 仓库的 src\Worker.Extensions.DurableTask\FunctionsDurableTaskClient.cs 文件中

类定义和构造函数

internal sealed class FunctionsDurableTaskClient : DurableTaskClient
{
    private readonly DurableTaskClient inner;

    public FunctionsDurableTaskClient(DurableTaskClient inner, string? queryString)
        : base(inner.Name)
    {
        this.inner = inner;
        this.QueryString = queryString;
    }
  

Debug 看到这个 inner 的实现是 Microsoft.DurableTask.Client.Grpc.GrpcDurableTaskClient

ScheduleNewOrchestrationInstanceAsync()

    public override Task<string> ScheduleNewOrchestrationInstanceAsync(
        TaskName orchestratorName,
        object? input = null,
        StartOrchestrationOptions? options = null,
        CancellationToken cancellation = default)
    {
        return this.inner.ScheduleNewOrchestrationInstanceAsync(orchestratorName, input, options, cancellation);
    }

TaskName

TaskName 的代码在 durabletask-dotnet 仓库下的src\Abstractions\TaskName.cs

public readonly struct TaskName : IEquatable<TaskName>

8.2 - worker

DurableTask worker 的源码分析

8.2.1 - worker initial

DurableTask client 初始化的源码分析

8.2.1.1 - 过时的初始化

DurableTask worker 已经过时的初始化过程

GrpcDurableTaskWorker

仓库 durabletask-dotnet 下的文件 src\Worker\Grpc\GrpcDurableTaskWorker.Processor.cs

 public async Task ExecuteAsync(CancellationToken cancellation)
        {
            while (!cancellation.IsCancellationRequested)
            {
                try
                {
                    AsyncServerStreamingCall<P.WorkItem> stream = await this.ConnectAsync(cancellation);
                    await this.ProcessWorkItemsAsync(stream, cancellation);
                }
                ......
            }
        }

ConnectAsync() 方法中会调用 grpc protobuf 文件中定义的 GetWorkItems() 方法:

        async Task<AsyncServerStreamingCall<P.WorkItem>> ConnectAsync(CancellationToken cancellation)
        {
            await this.sidecar!.HelloAsync(EmptyMessage, cancellationToken: cancellation);
            this.Logger.EstablishedWorkItemConnection();

            Console.WriteLine("********GrpcDurableTaskWorker call GetWorkItems()********");

            // Get the stream for receiving work-items
            return this.sidecar!.GetWorkItems(new P.GetWorkItemsRequest(), cancellationToken: cancellation);
        }

这本该是 worker 正常的初始化流程,但现在已经被废弃。

备注:记录一下避免误解,事实上我被耽误了很多时间在这里。

8.2.1.2 - 调用堆栈

DurableTask client 启动新的实例时的调用堆栈

调用堆栈概况

azure-functions-dotnet-worker 仓库下的 src\DotNetWorker.Grpc\GrpcWorker.cs 中增加日志打印:

        public Task StartAsync(CancellationToken token)
        {
            Console.WriteLine(new System.Diagnostics.StackTrace(true));

            _workerClient = _workerClientFactory.CreateClient(this);

            Console.WriteLine("_workerClient is " + _workerClient.GetType().Name);

            return _workerClient.StartAsync(token);
        }

得到启动时初始化 worker 的调用堆栈:

[2024-04-08T09:07:04.591Z]    at Microsoft.Azure.Functions.Worker.GrpcWorker.StartAsync(CancellationToken token) in C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\src\DotNetWorker.Grpc\GrpcWorker.cs:line 58
[2024-04-08T09:07:04.592Z]    at Microsoft.Azure.Functions.Worker.WorkerHostedService.StartAsync(CancellationToken cancellationToken) in C:\Users\sky\work\code\durabletask-fork\azure-functions-dotnet-worker\src\DotNetWorker.Core\WorkerHostedService.cs:line 25
[2024-04-08T09:07:04.592Z]    at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
[2024-04-08T09:07:04.593Z]    at Microsoft.Azure.Functions.Worker.WorkerHostedService.StartAsync(CancellationToken cancellationToken)
[2024-04-08T09:07:04.593Z]    at Microsoft.Extensions.Hosting.Internal.Host.<StartAsync>b__15_1(IHostedService service, CancellationToken token)
[2024-04-08T09:07:04.593Z]    at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
[2024-04-08T09:07:04.593Z]    at Microsoft.Extensions.Hosting.Internal.Host.<StartAsync>b__15_1(IHostedService service, CancellationToken token)
[2024-04-08T09:07:04.594Z]    at Microsoft.Extensions.Hosting.Internal.Host.ForeachService[T](IEnumerable`1 services, CancellationToken token, Boolean concurrent, Boolean abortOnFirstException, List`1 exceptions, Func`3 operation)
[2024-04-08T09:07:04.594Z]    at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
[2024-04-08T09:07:04.594Z]    at Microsoft.Extensions.Hosting.Internal.Host.ForeachService[T](IEnumerable`1 services, CancellationToken token, Boolean concurrent, Boolean abortOnFirstException, List`1 exceptions, Func`3 operation)
[2024-04-08T09:07:04.594Z]    at Microsoft.Extensions.Hosting.Internal.Host.StartAsync(CancellationToken cancellationToken)
[2024-04-08T09:07:04.595Z]    at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
[2024-04-08T09:07:04.595Z]    at Microsoft.Extensions.Hosting.Internal.Host.StartAsync(CancellationToken cancellationToken)
[2024-04-08T09:07:04.595Z]    at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.RunAsync(IHost host, CancellationToken token)
[2024-04-08T09:07:04.596Z]    at System.Runtime.CompilerServices.AsyncMethodBuilderCore.Start[TStateMachine](TStateMachine& stateMachine)
[2024-04-08T09:07:04.596Z]    at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.RunAsync(IHost host, CancellationToken token)
[2024-04-08T09:07:04.596Z]    at Microsoft.Extensions.Hosting.HostingAbstractionsHostExtensions.Run(IHost host)
[2024-04-08T09:07:04.596Z]    at Program.<Main>$(String[] args) in C:\Users\sky\work\code\durabletask\MyDurableFunction1\Program.cs:line 13
    
......
[2024-04-08T09:07:04.598Z] _workerClient is GrpcWorkerClient
[2024-04-08T09:07:04.629Z] Worker process started and initialized.

调用堆栈

GrpcWorker

        public Task StartAsync(CancellationToken token)
        {
          _workerClient = _workerClientFactory.CreateClient(this);
          return _workerClient.StartAsync(token);
        }

GrpcWorkerClient

src\DotNetWorker.Grpc\GrpcWorkerClientFactory.cs 文件中的 GrpcWorkerClient


            public async Task StartAsync(CancellationToken token)
            {
                if (_running)
                {
                    throw new InvalidOperationException($"The client is already running. Multiple calls to {nameof(StartAsync)} are not supported.");
                }

                _running = true;

                var eventStream = _grpcClient.EventStream(cancellationToken: token);

                await SendStartStreamMessageAsync(eventStream.RequestStream);

                _ = StartWriterAsync(eventStream.RequestStream);
                _ = StartReaderAsync(eventStream.ResponseStream);
            }

8.2.1.3 - GrpcWorkerClient

DurableTask GrpcWorkerClient

src\DotNetWorker.Grpc\GrpcWorkerClientFactory.cs 文件中的 GrpcWorkerClient

类定义

         private class GrpcWorkerClient : IWorkerClient
        {
            private readonly FunctionRpcClient _grpcClient;
            private readonly GrpcWorkerStartupOptions _startupOptions;
            private readonly ChannelReader<StreamingMessage> _outputReader;
            private readonly ChannelWriter<StreamingMessage> _outputWriter;
            private bool _running;
            private IMessageProcessor? _processor;
           
         }

构造函数

            public GrpcWorkerClient(GrpcHostChannel outputChannel, GrpcWorkerStartupOptions startupOptions, IMessageProcessor processor)
            {
                _startupOptions = startupOptions ?? throw new ArgumentNullException(nameof(startupOptions));
                _processor = processor ?? throw new ArgumentNullException(nameof(processor));

              // 初始化 reader 和 writer,都来自 outputChannel
                _outputReader = outputChannel.Channel.Reader;
                _outputWriter = outputChannel.Channel.Writer;

              // 创建 _grpcClient,FunctionRpcClient 类型
                _grpcClient = CreateClient();
            }

CreateClient() 方法:

private FunctionRpcClient CreateClient()
            {
#if NET5_0_OR_GREATER
                GrpcChannel grpcChannel = GrpcChannel.ForAddress(_startupOptions.HostEndpoint!.AbsoluteUri, new GrpcChannelOptions()
                {
                    MaxReceiveMessageSize = _startupOptions.GrpcMaxMessageLength,
                    MaxSendMessageSize = _startupOptions.GrpcMaxMessageLength,
                    Credentials = ChannelCredentials.Insecure
                });
#else

                var options = new ChannelOption[]
                {
                    new ChannelOption(GrpcCore.ChannelOptions.MaxReceiveMessageLength, _startupOptions.GrpcMaxMessageLength),
                    new ChannelOption(GrpcCore.ChannelOptions.MaxSendMessageLength, _startupOptions.GrpcMaxMessageLength)
                };

                GrpcCore.Channel grpcChannel = new GrpcCore.Channel(_startupOptions.HostEndpoint!.Host, _startupOptions.HostEndpoint.Port, ChannelCredentials.Insecure, options);

#endif
                return new FunctionRpcClient(grpcChannel);
            }
        }

start过程

StartAsync


            public async Task StartAsync(CancellationToken token)
            {
                if (_running)
                {
                    throw new InvalidOperationException($"The client is already running. Multiple calls to {nameof(StartAsync)} are not supported.");
                }

                _running = true;

                var eventStream = _grpcClient.EventStream(cancellationToken: token);

                await SendStartStreamMessageAsync(eventStream.RequestStream);

                _ = StartWriterAsync(eventStream.RequestStream);
                _ = StartReaderAsync(eventStream.ResponseStream);
            }

8.2.1.4 - GrpcWorker

DurableTask GrpcWorker

src\DotNetWorker.Grpc\GrpcWorkerClientFactory.cs 文件中的 GrpcWorkerClient

类定义

         private class GrpcWorkerClient : IWorkerClient
        {
            private readonly FunctionRpcClient _grpcClient;
            private readonly GrpcWorkerStartupOptions _startupOptions;
            private readonly ChannelReader<StreamingMessage> _outputReader;
            private readonly ChannelWriter<StreamingMessage> _outputWriter;
            private bool _running;
            private IMessageProcessor? _processor;
           
         }

构造函数

            public GrpcWorkerClient(GrpcHostChannel outputChannel, GrpcWorkerStartupOptions startupOptions, IMessageProcessor processor)
            {
                _startupOptions = startupOptions ?? throw new ArgumentNullException(nameof(startupOptions));
                _processor = processor ?? throw new ArgumentNullException(nameof(processor));

              // 初始化 reader 和 writer,都来自 outputChannel
                _outputReader = outputChannel.Channel.Reader;
                _outputWriter = outputChannel.Channel.Writer;

              // 创建 _grpcClient,FunctionRpcClient 类型
                _grpcClient = CreateClient();
            }

CreateClient() 方法:

private FunctionRpcClient CreateClient()
            {
#if NET5_0_OR_GREATER
                GrpcChannel grpcChannel = GrpcChannel.ForAddress(_startupOptions.HostEndpoint!.AbsoluteUri, new GrpcChannelOptions()
                {
                    MaxReceiveMessageSize = _startupOptions.GrpcMaxMessageLength,
                    MaxSendMessageSize = _startupOptions.GrpcMaxMessageLength,
                    Credentials = ChannelCredentials.Insecure
                });
#else

                var options = new ChannelOption[]
                {
                    new ChannelOption(GrpcCore.ChannelOptions.MaxReceiveMessageLength, _startupOptions.GrpcMaxMessageLength),
                    new ChannelOption(GrpcCore.ChannelOptions.MaxSendMessageLength, _startupOptions.GrpcMaxMessageLength)
                };

                GrpcCore.Channel grpcChannel = new GrpcCore.Channel(_startupOptions.HostEndpoint!.Host, _startupOptions.HostEndpoint.Port, ChannelCredentials.Insecure, options);

#endif
                return new FunctionRpcClient(grpcChannel);
            }
        }

处理消息

ProcessMessageAsync

        Task IMessageProcessor.ProcessMessageAsync(StreamingMessage message)
        {
            // Dispatch and return.
            Task.Run(() => ProcessRequestCoreAsync(message));

            return Task.CompletedTask;
        }

ProcessRequestCoreAsync

private async Task ProcessRequestCoreAsync(StreamingMessage request)
        {
            StreamingMessage responseMessage = new StreamingMessage
            {
                RequestId = request.RequestId
            };

            switch (request.ContentCase)
            {
                case MsgType.InvocationRequest:
                    responseMessage.InvocationResponse = await InvocationRequestHandlerAsync(request.InvocationRequest);
                    break;

                case MsgType.WorkerInitRequest:
                    Console.WriteLine("GrpcWorker received WorkerInitRequest");
                    responseMessage.WorkerInitResponse = WorkerInitRequestHandler(request.WorkerInitRequest, _workerOptions);
                    break;

                case MsgType.WorkerStatusRequest:
                    responseMessage.WorkerStatusResponse = new WorkerStatusResponse();
                    break;

                case MsgType.FunctionsMetadataRequest:
                    responseMessage.FunctionMetadataResponse = await GetFunctionMetadataAsync(request.FunctionsMetadataRequest.FunctionAppDirectory);
                    break;

                case MsgType.WorkerTerminate:
                    WorkerTerminateRequestHandler(request.WorkerTerminate);
                    break;

                case MsgType.FunctionLoadRequest:
                    responseMessage.FunctionLoadResponse = FunctionLoadRequestHandler(request.FunctionLoadRequest, _application, _methodInfoLocator);
                    break;

                case MsgType.FunctionEnvironmentReloadRequest:
                    responseMessage.FunctionEnvironmentReloadResponse = EnvironmentReloadRequestHandler(_workerOptions);
                    break;

                case MsgType.InvocationCancel:
                    InvocationCancelRequestHandler(request.InvocationCancel);
                    break;

                default:
                    // TODO: Trace failure here.
                    return;
            }

            await _workerClient!.SendMessageAsync(responseMessage);
        }

WorkerInitRequest

                case MsgType.WorkerInitRequest:
                    responseMessage.WorkerInitResponse = WorkerInitRequestHandler(request.WorkerInitRequest, _workerOptions);
                    break;

WorkerInitRequestHandler() 方法的实现:

        internal static WorkerInitResponse WorkerInitRequestHandler(WorkerInitRequest request, WorkerOptions workerOptions)
        {
            var response = new WorkerInitResponse
            {
                Result = new StatusResult { Status = StatusResult.Types.Status.Success },
                WorkerVersion = WorkerInformation.Instance.WorkerVersion,
                WorkerMetadata = GetWorkerMetadata()
            };

            response.Capabilities.Add(GetWorkerCapabilities(workerOptions));

            return response;
        }

TBD: WorkerVersion 怎么来的?

InvocationRequest

                case MsgType.InvocationRequest:
                    responseMessage.InvocationResponse = await InvocationRequestHandlerAsync(request.InvocationRequest);
                    break;

InvocationRequestHandlerAsync 方法的实现:

        internal Task<InvocationResponse> InvocationRequestHandlerAsync(InvocationRequest request)
        {
            return _invocationHandler.InvokeAsync(request);
        }

8.2.1.5 - FunctionRpcClient

FunctionRpcClient

Azure/azure-functions-dotnet-worker 仓库下的 FunctionRpcClient protobuf 定义。

protobuf

proto文件地址:

protos\azure-functions-language-worker-protobuf\src\proto\FunctionRpc.proto

FunctionRpc service

FunctionRpc.proto  定义了 FunctionRpc 这个 grpc service:

option java_multiple_files = true;
option java_package = "com.microsoft.azure.functions.rpc.messages";
option java_outer_classname = "FunctionProto";
option csharp_namespace = "Microsoft.Azure.Functions.Worker.Grpc.Messages";
option go_package ="github.com/Azure/azure-functions-go-worker/internal/rpc";

package AzureFunctionsRpcMessages;

import "google/protobuf/duration.proto";
import "identity/ClaimsIdentityRpc.proto";
import "shared/NullableTypes.proto";

// Interface exported by the server.
service FunctionRpc {
}

EventStream 方法

只定义了一个 EventStream 方法:


 rpc EventStream (stream StreamingMessage) returns (stream StreamingMessage) {}

request 和 response 都是 stream,类型都是 StreamingMessage。

StreamingMessage

StreamingMessage 除了一个 request_id 用来在 host 和 worker 之间做唯一标识外,就只有一个 oneof content 字段:

message StreamingMessage {
  // Used to identify message between host and worker
  string request_id = 1;

  // Payload of the message
  oneof content {
  	......
  }
}

消息类型还挺多:

  oneof content {

    // Worker initiates stream
    StartStream start_stream = 20;

    // Host sends capabilities/init data to worker
    WorkerInitRequest worker_init_request = 17;
    // Worker responds after initializing with its capabilities & status
    WorkerInitResponse worker_init_response = 16;

    // MESSAGE NOT USED
    // Worker periodically sends empty heartbeat message to host
    WorkerHeartbeat worker_heartbeat = 15;

    // Host sends terminate message to worker.
    // Worker terminates if it can, otherwise host terminates after a grace period
    WorkerTerminate worker_terminate = 14;

    // Host periodically sends status request to the worker
    WorkerStatusRequest worker_status_request = 12;
    WorkerStatusResponse worker_status_response = 13;

    // On file change event, host sends notification to worker
    FileChangeEventRequest file_change_event_request = 6;

    // Worker requests a desired action (restart worker, reload function)
    WorkerActionResponse worker_action_response = 7;

    // Host sends required metadata to worker to load function
    FunctionLoadRequest function_load_request = 8;
    // Worker responds after loading with the load result
    FunctionLoadResponse function_load_response = 9;

    // Host requests a given invocation
    InvocationRequest invocation_request = 4;

    // Worker responds to a given invocation
    InvocationResponse invocation_response = 5;

    // Host sends cancel message to attempt to cancel an invocation.
    // If an invocation is cancelled, host will receive an invocation response with status cancelled.
    InvocationCancel invocation_cancel = 21;

    // Worker logs a message back to the host
    RpcLog rpc_log = 2;

    FunctionEnvironmentReloadRequest function_environment_reload_request = 25;

    FunctionEnvironmentReloadResponse function_environment_reload_response = 26;

    // Ask the worker to close any open shared memory resources for a given invocation
    CloseSharedMemoryResourcesRequest close_shared_memory_resources_request = 27;
    CloseSharedMemoryResourcesResponse close_shared_memory_resources_response = 28;

    // Worker indexing message types
    FunctionsMetadataRequest functions_metadata_request = 29;
    FunctionMetadataResponse function_metadata_response = 30;

    // Host sends required metadata to worker to load functions
    FunctionLoadRequestCollection function_load_request_collection = 31;

    // Host gets the list of function load responses
    FunctionLoadResponseCollection function_load_response_collection = 32;
    
    // Host sends required metadata to worker to warmup the worker
    WorkerWarmupRequest worker_warmup_request = 33;
    
    // Worker responds after warming up with the warmup result
    WorkerWarmupResponse worker_warmup_response = 34;
    
  }

StartStream

// Worker initiates stream
StartStream start_stream = 20;

// Process.Start required info
//   connection details
//   protocol type
//   protocol version

// Worker sends the host information identifying itself
message StartStream {
  // id of the worker
  string worker_id = 2;
}

TBD: 这里可以考虑增加一个 version 字段。

WorkerInitRequest / WorkerInitResponse

    // Host sends capabilities/init data to worker
    WorkerInitRequest worker_init_request = 17;
    // Worker responds after initializing with its capabilities & status
    WorkerInitResponse worker_init_response = 16;
    

// Host requests the worker to initialize itself
message WorkerInitRequest {
  // version of the host sending init request
  string host_version = 1;

  // A map of host supported features/capabilities
  map<string, string> capabilities = 2;

  // inform worker of supported categories and their levels
  // i.e. Worker = Verbose, Function.MyFunc = None
  map<string, RpcLog.Level> log_categories = 3;

  // Full path of worker.config.json location
  string worker_directory = 4;

  // base directory for function app
  string function_app_directory = 5;
}

// Worker responds with the result of initializing itself
message WorkerInitResponse {
  // PROPERTY NOT USED
  // TODO: Remove from protobuf during next breaking change release
  string worker_version = 1;

  // A map of worker supported features/capabilities
  map<string, string> capabilities = 2;

  // Status of the response
  StatusResult result = 3;

  // Worker metadata captured for telemetry purposes
  WorkerMetadata worker_metadata = 4;
}

WorkerHeartbeat

    // MESSAGE NOT USED
    // Worker periodically sends empty heartbeat message to host
    WorkerHeartbeat worker_heartbeat = 15;
    
    // MESSAGE NOT USED
// TODO: Remove from protobuf during next breaking change release
message WorkerHeartbeat {}

WorkerTerminate

    // Host sends terminate message to worker.
    // Worker terminates if it can, otherwise host terminates after a grace period
    WorkerTerminate worker_terminate = 14;

// Warning before killing the process after grace_period
// Worker self terminates ..no response on this
message WorkerTerminate {
  google.protobuf.Duration grace_period = 1;
}

WorkerStatusRequest / WorkerStatusResponse

    // Host periodically sends status request to the worker
    WorkerStatusRequest worker_status_request = 12;
    WorkerStatusResponse worker_status_response = 13;
    
// Used by the host to determine worker health
message WorkerStatusRequest {
}

// Worker responds with status message
// TODO: Add any worker relevant status to response
message WorkerStatusResponse {
}

InvocationRequest / InvocationResponse

    // Host requests a given invocation
    InvocationRequest invocation_request = 4;

    // Worker responds to a given invocation
    InvocationResponse invocation_response = 5;
    
    
    // Host requests worker to invoke a Function
message InvocationRequest {
  // Unique id for each invocation
  string invocation_id = 1;

  // Unique id for each Function
  string function_id = 2;

  // Input bindings (include trigger)
  repeated ParameterBinding input_data = 3;

  // binding metadata from trigger
  map<string, TypedData> trigger_metadata = 4;

  // Populates activityId, tracestate and tags from host
  RpcTraceContext trace_context = 5;

  // Current retry context
  RetryContext retry_context = 6;
}

// Worker responds with status of Invocation
message InvocationResponse {
  // Unique id for invocation
  string invocation_id = 1;

  // Output binding data
  repeated ParameterBinding output_data = 2;

  // data returned from Function (for $return and triggers with return support)
  TypedData return_value = 4;

  // Status of the invocation (success/failure/canceled)
  StatusResult result = 3;
}

WorkerWarmupRequest / WorkerWarmupResponse

    // Host sends required metadata to worker to warmup the worker
    WorkerWarmupRequest worker_warmup_request = 33;
    
    // Worker responds after warming up with the warmup result
    WorkerWarmupResponse worker_warmup_response = 34;
    
    
    message WorkerWarmupRequest {
  // Full path of worker.config.json location
  string worker_directory = 1;
}

message WorkerWarmupResponse {
  StatusResult result = 1;
}

8.2.2 - client run orchestrator

DurableTask client 运行 Orchestration 的源码分析

8.2.2.1 - 调用堆栈

DurableTask client 运行 Orchestration 的调用堆栈

调用堆栈概况

MyDurableFunction1.dll!Company.Function.HelloOrchestration.RunOrchestrator(Microsoft.DurableTask.TaskOrchestrationContext context) Line 16 (c:\Users\sky\work\code\durabletask\MyDurableFunction1\HelloOrchestration.cs:16)
MyDurableFunction1.dll!MyDurableFunction1.DirectFunctionExecutor.ExecuteAsync(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 32 (GeneratedFunctionExecutor.g.cs:32)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Azure.Functions.Worker.Pipeline.FunctionExecutionMiddleware.Invoke(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 20 (FunctionExecutionMiddleware.cs:20)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Extensions.Hosting.MiddlewareWorkerApplicationBuilderExtensions.UseFunctionExecutionMiddleware.AnonymousMethod__1_2(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 57 (WorkerMiddlewareWorkerApplicationBuilderExtensions.cs:57)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Azure.Functions.Worker.OutputBindings.OutputBindingsMiddleware.Invoke(Microsoft.Azure.Functions.Worker.FunctionContext context, Microsoft.Azure.Functions.Worker.Middleware.FunctionExecutionDelegate next) Line 13 (OutputBindingsMiddleware.cs:13)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Extensions.Hosting.MiddlewareWorkerApplicationBuilderExtensions.UseOutputBindingsMiddleware.AnonymousMethod__3(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 84 (WorkerMiddlewareWorkerApplicationBuilderExtensions.cs:84)
Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore.dll!Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore.FunctionsHttpProxyingMiddleware.Invoke(Microsoft.Azure.Functions.Worker.FunctionContext context, Microsoft.Azure.Functions.Worker.Middleware.FunctionExecutionDelegate next) Line 34 (FunctionsHttpProxyingMiddleware.cs:34)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Extensions.Hosting.MiddlewareWorkerApplicationBuilderExtensions.UseMiddleware.AnonymousMethod__1(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 105 (WorkerMiddlewareWorkerApplicationBuilderExtensions.cs:105)
Microsoft.Azure.Functions.Worker.Extensions.DurableTask.dll!Microsoft.Azure.Functions.Worker.Extensions.DurableTask.FunctionsOrchestrator.EnsureSynchronousExecution(Microsoft.Azure.Functions.Worker.FunctionContext functionContext, Microsoft.Azure.Functions.Worker.Middleware.FunctionExecutionDelegate next, Microsoft.Azure.Functions.Worker.Extensions.DurableTask.FunctionsOrchestrationContext orchestrationContext) Line 72 (c:\Users\sky\work\code\durabletask-fork\azure-functions-durable-extension\src\Worker.Extensions.DurableTask\FunctionsOrchestrator.cs:72)
Microsoft.Azure.Functions.Worker.Extensions.DurableTask.dll!Microsoft.Azure.Functions.Worker.Extensions.DurableTask.FunctionsOrchestrator.RunAsync(Microsoft.DurableTask.TaskOrchestrationContext context, object input) Line 51 (c:\Users\sky\work\code\durabletask-fork\azure-functions-durable-extension\src\Worker.Extensions.DurableTask\FunctionsOrchestrator.cs:51)
Microsoft.DurableTask.Worker.dll!Microsoft.DurableTask.Worker.Shims.TaskOrchestrationShim.Execute(DurableTask.Core.OrchestrationContext innerContext, string rawInput) Line 52 (c:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\src\Worker\Core\Shims\TaskOrchestrationShim.cs:52)
DurableTask.Core.dll!DurableTask.Core.TaskOrchestrationExecutor.ProcessEvent(DurableTask.Core.History.HistoryEvent historyEvent) Line 211 (TaskOrchestrationExecutor.cs:211)
DurableTask.Core.dll!DurableTask.Core.TaskOrchestrationExecutor.ExecuteCore.__ProcessEvents|12_0(System.Collections.Generic.IEnumerable<DurableTask.Core.History.HistoryEvent> events) Line 135 (TaskOrchestrationExecutor.cs:135)
DurableTask.Core.dll!DurableTask.Core.TaskOrchestrationExecutor.ExecuteCore(System.Collections.Generic.IEnumerable<DurableTask.Core.History.HistoryEvent> pastEvents, System.Collections.Generic.IEnumerable<DurableTask.Core.History.HistoryEvent> newEvents) Line 143 (TaskOrchestrationExecutor.cs:143)
DurableTask.Core.dll!DurableTask.Core.TaskOrchestrationExecutor.Execute() Line 93 (TaskOrchestrationExecutor.cs:93)
Microsoft.DurableTask.Worker.Grpc.dll!Microsoft.DurableTask.Worker.Grpc.GrpcOrchestrationRunner.LoadAndRun(string encodedOrchestratorRequest, Microsoft.DurableTask.ITaskOrchestrator implementation, System.IServiceProvider services) Line 113 (c:\Users\sky\work\code\durabletask-fork\durabletask-dotnet\src\Worker\Grpc\GrpcOrchestrationRunner.cs:113)
Microsoft.Azure.Functions.Worker.Extensions.DurableTask.dll!Microsoft.Azure.Functions.Worker.Extensions.DurableTask.DurableTaskFunctionsMiddleware.RunOrchestrationAsync(Microsoft.Azure.Functions.Worker.FunctionContext context, Microsoft.Azure.Functions.Worker.BindingMetadata triggerBinding, Microsoft.Azure.Functions.Worker.Middleware.FunctionExecutionDelegate next) Line 59 (c:\Users\sky\work\code\durabletask-fork\azure-functions-durable-extension\src\Worker.Extensions.DurableTask\DurableTaskFunctionsMiddleware.cs:59)
Microsoft.Azure.Functions.Worker.Extensions.DurableTask.dll!Microsoft.Azure.Functions.Worker.Extensions.DurableTask.DurableTaskFunctionsMiddleware.Invoke(Microsoft.Azure.Functions.Worker.FunctionContext functionContext, Microsoft.Azure.Functions.Worker.Middleware.FunctionExecutionDelegate next) Line 22 (c:\Users\sky\work\code\durabletask-fork\azure-functions-durable-extension\src\Worker.Extensions.DurableTask\DurableTaskFunctionsMiddleware.cs:22)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Extensions.Hosting.MiddlewareWorkerApplicationBuilderExtensions.UseMiddleware.AnonymousMethod__1(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 105 (WorkerMiddlewareWorkerApplicationBuilderExtensions.cs:105)
Microsoft.Azure.Functions.Worker.Core.dll!Microsoft.Azure.Functions.Worker.FunctionsApplication.InvokeFunctionAsync(Microsoft.Azure.Functions.Worker.FunctionContext context) Line 77 (FunctionsApplication.cs:77)
Microsoft.Azure.Functions.Worker.Grpc.dll!Microsoft.Azure.Functions.Worker.Handlers.InvocationHandler.InvokeAsync(Microsoft.Azure.Functions.Worker.Grpc.Messages.InvocationRequest request) Line 88 (InvocationHandler.cs:88)
Microsoft.Azure.Functions.Worker.Grpc.dll!Microsoft.Azure.Functions.Worker.GrpcWorker.InvocationRequestHandlerAsync(Microsoft.Azure.Functions.Worker.Grpc.Messages.InvocationRequest request) Line 122 (GrpcWorker.cs:122)
Microsoft.Azure.Functions.Worker.Grpc.dll!Microsoft.Azure.Functions.Worker.GrpcWorker.ProcessRequestCoreAsync(Microsoft.Azure.Functions.Worker.Grpc.Messages.StreamingMessage request) Line 81 (GrpcWorker.cs:81)
Microsoft.Azure.Functions.Worker.Grpc.dll!Microsoft.Azure.Functions.Worker.GrpcWorker.Microsoft.Azure.Functions.Worker.Grpc.IMessageProcessor.ProcessMessageAsync.AnonymousMethod__0() Line 66 (GrpcWorker.cs:66)
System.Private.CoreLib.dll!System.Threading.Tasks.Task<System.Threading.Tasks.Task>.InnerInvoke() (Unknown Source:0)
System.Private.CoreLib.dll!System.Threading.ExecutionContext.RunFromThreadPoolDispatchLoop(System.Threading.Thread threadPoolThread, System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state) (Unknown Source:0)
System.Private.CoreLib.dll!System.Threading.Tasks.Task.ExecuteWithThreadLocal(ref System.Threading.Tasks.Task currentTaskSlot, System.Threading.Thread threadPoolThread) (Unknown Source:0)
System.Private.CoreLib.dll!System.Threading.ThreadPoolWorkQueue.Dispatch() (Unknown Source:0)
System.Private.CoreLib.dll!System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart() (Unknown Source:0)
[Native to Managed Transition] (Unknown Source:0)

Azure function dotnet worker

GrpcWorkerClientFactory

            private async Task StartReaderAsync(IAsyncStreamReader<StreamingMessage> responseStream)
            {
                while (await responseStream.MoveNext())
                {
                    await _processor!.ProcessMessageAsync(responseStream.Current);
                }
            }

这里的 _processor 实现的 Microsoft.Azure.Functions.Worker.GrpcWorker

GrpcWorker

grpc worker 收到 grpc 消息之后,调用 ProcessRequestCoreAsync() 方法进行处理,注意这里是异步:

        Task IMessageProcessor.ProcessMessageAsync(StreamingMessage message)
        {
            // Dispatch and return.
            Task.Run(() => ProcessRequestCoreAsync(message));

            return Task.CompletedTask;
        }

ProcessRequestCoreAsync() 方法的实现:

        private async Task ProcessRequestCoreAsync(StreamingMessage request)
        {
            StreamingMessage responseMessage = new StreamingMessage
            {
                RequestId = request.RequestId
            };

            switch (request.ContentCase)
            {
                case MsgType.InvocationRequest:
                    // 会走到这里
                    responseMessage.InvocationResponse = await InvocationRequestHandlerAsync(request.InvocationRequest);
                    break;

                case MsgType.WorkerInitRequest:
                    responseMessage.WorkerInitResponse = WorkerInitRequestHandler(request.WorkerInitRequest, _workerOptions);
                    break;

                case MsgType.WorkerStatusRequest:
                    responseMessage.WorkerStatusResponse = new WorkerStatusResponse();
                    break;

                case MsgType.FunctionsMetadataRequest:
                    responseMessage.FunctionMetadataResponse = await GetFunctionMetadataAsync(request.FunctionsMetadataRequest.FunctionAppDirectory);
                    break;

                case MsgType.WorkerTerminate:
                    WorkerTerminateRequestHandler(request.WorkerTerminate);
                    break;

                case MsgType.FunctionLoadRequest:
                    responseMessage.FunctionLoadResponse = FunctionLoadRequestHandler(request.FunctionLoadRequest, _application, _methodInfoLocator);
                    break;

                case MsgType.FunctionEnvironmentReloadRequest:
                    responseMessage.FunctionEnvironmentReloadResponse = EnvironmentReloadRequestHandler(_workerOptions);
                    break;

                case MsgType.InvocationCancel:
                    InvocationCancelRequestHandler(request.InvocationCancel);
                    break;

                default:
                    // TODO: Trace failure here.
                    return;
            }

            await _workerClient!.SendMessageAsync(responseMessage);
        }

InvocationRequestHandlerAsync() 方法

        internal Task<InvocationResponse> InvocationRequestHandlerAsync(InvocationRequest request)
        {
            return _invocationHandler.InvokeAsync(request);
        }

这里的 _invocationHandler 的定义类型是 IInvocationHandler,实现是 Microsoft.Azure.Functions.Worker.Handlers.InvocationHandler

InvocationHandler

代码在 azure-functions-dotnet-worker 仓库下的 src\DotNetWorker.Grpc\Handlers\InvocationHandler.cs :

        public async Task<InvocationResponse> InvokeAsync(InvocationRequest request)
        {
            using CancellationTokenSource cancellationTokenSource = new();
            FunctionContext? context = null;
            InvocationResponse response = new()
            {
                InvocationId = request.InvocationId,
                Result = new StatusResult()
            };

            if (!_inflightInvocations.TryAdd(request.InvocationId, cancellationTokenSource))
            {
                var exception = new InvalidOperationException("Unable to track CancellationTokenSource");
                response.Result.Status = StatusResult.Types.Status.Failure;
                response.Result.Exception = exception.ToRpcException();

                return response;
            }

            try
            {
                var invocation = new GrpcFunctionInvocation(request);

                IInvocationFeatures invocationFeatures = _invocationFeaturesFactory.Create();
                invocationFeatures.Set<FunctionInvocation>(invocation);
                invocationFeatures.Set<IExecutionRetryFeature>(invocation);

                context = _application.CreateContext(invocationFeatures, cancellationTokenSource.Token);
                invocationFeatures.Set<IFunctionBindingsFeature>(new GrpcFunctionBindingsFeature(context, request, _outputBindingsInfoProvider));

                if (_inputConversionFeatureProvider.TryCreate(typeof(DefaultInputConversionFeature), out var conversion))
                {
                    invocationFeatures.Set<IInputConversionFeature>(conversion!);
                }

                // 进入这里
                await _application.InvokeFunctionAsync(context);

                var serializer = _workerOptions.Serializer!;
                var functionBindings = context.GetBindings();

                foreach (var binding in functionBindings.OutputBindingData)
                {
                    var parameterBinding = new ParameterBinding
                    {
                        Name = binding.Key
                    };

                    if (binding.Value is not null)
                    {
                        parameterBinding.Data = await binding.Value.ToRpcAsync(serializer);
                    }

                    response.OutputData.Add(parameterBinding);
                }

                if (functionBindings.InvocationResult is not null)
                {
                    TypedData? returnVal = await functionBindings.InvocationResult.ToRpcAsync(serializer);
                    response.ReturnValue = returnVal;
                }

                response.Result.Status = StatusResult.Types.Status.Success;
            }
            catch (Exception ex)
            {
                response.Result.Exception = _workerOptions.EnableUserCodeException ? ex.ToUserRpcException() : ex.ToRpcException();
                response.Result.Status = StatusResult.Types.Status.Failure;

                if (ex.InnerException is TaskCanceledException or OperationCanceledException)
                {
                    response.Result.Status = StatusResult.Types.Status.Cancelled;
                }
            }
            finally
            {
                _inflightInvocations.TryRemove(request.InvocationId, out var cts);

                if (context is IAsyncDisposable asyncContext)
                {
                    await asyncContext.DisposeAsync();
                }

                (context as IDisposable)?.Dispose();
            }

            return response;
        }

_application 的定义类型是IFunctionsApplication ,实际实现是 Microsoft.Azure.Functions.Worker.FunctionsApplication

FunctionsApplication

代码在azure-functions-dotnet-worker 仓库下的 src\DotNetWorker.Core\FunctionsApplication.cs

		public async Task InvokeFunctionAsync(FunctionContext context)
        {
            var scope = new FunctionInvocationScope(context.FunctionDefinition.Name, context.InvocationId);

            using var logScope = _logger.BeginScope(scope);
            using Activity? invokeActivity = _functionActivitySourceFactory.StartInvoke(context);

            try
            {
                // 进入这里
                await _functionExecutionDelegate(context);
            }
            catch (Exception ex)
            {
                invokeActivity?.SetStatus(ActivityStatusCode.Error, ex.Message);

                Log.InvocationError(_logger, context.FunctionDefinition.Name, context.InvocationId, ex);

                throw;
            }
        }

_functionExecutionDelegate 的实现是 Microsoft.Azure.Functions.Worker.Middleware.FunctionExecutionDelegate

MiddlewareWorkerApplicationBuilderExtensions

        public static IFunctionsWorkerApplicationBuilder UseMiddleware<T>(this IFunctionsWorkerApplicationBuilder builder)
            where T : class, IFunctionsWorkerMiddleware
        {
            builder.Services.AddSingleton<T>();

            builder.Use(next =>
            {
                return context =>
                {
                    var middleware = context.InstanceServices.GetRequiredService<T>();

                    return middleware.Invoke(context, next);
                };
            });

            return builder;
        }

这里的 middleware 实现是 DurableTaskFunctionsMiddleware

Azure functions durable extension

DurableTaskFunctionsMiddleware

    public Task Invoke(FunctionContext functionContext, FunctionExecutionDelegate next)
    {
        if (IsOrchestrationTrigger(functionContext, out BindingMetadata? triggerBinding))
        {
            // 代码进入这里
            return RunOrchestrationAsync(functionContext, triggerBinding, next);
        }

        if (IsEntityTrigger(functionContext, out triggerBinding))
        {
            return RunEntityAsync(functionContext, triggerBinding, next);
        }

        return next(functionContext);
    }

RunOrchestrationAsync() 方法的实现:

    static async Task RunOrchestrationAsync(
        FunctionContext context, BindingMetadata triggerBinding, FunctionExecutionDelegate next)
    {
        InputBindingData<object> triggerInputData = await context.BindInputAsync<object>(triggerBinding);
        if (triggerInputData?.Value is not string encodedOrchestratorState)
        {
            throw new InvalidOperationException("Orchestration history state was either missing from the input or not a string value.");
        }

        FunctionsOrchestrator orchestrator = new(context, next, triggerInputData);
        string orchestratorOutput = GrpcOrchestrationRunner.LoadAndRun(
            encodedOrchestratorState, orchestrator, context.InstanceServices);

        // Send the encoded orchestrator output as the return value seen by the functions host extension
        context.GetInvocationResult().Value = orchestratorOutput;
    }

GrpcOrchestrationRunner

durabletask-dotnet 仓库下的 \src\Worker\Grpc\GrpcOrchestrationRunner.cs

    public static string LoadAndRun(
        string encodedOrchestratorRequest,
        ITaskOrchestrator implementation,
        IServiceProvider? services = null)
    {
        Check.NotNullOrEmpty(encodedOrchestratorRequest);
        Check.NotNull(implementation);

        P.OrchestratorRequest request = P.OrchestratorRequest.Parser.Base64Decode<P.OrchestratorRequest>(
            encodedOrchestratorRequest);

        List<HistoryEvent> pastEvents = request.PastEvents.Select(ProtoUtils.ConvertHistoryEvent).ToList();
        IEnumerable<HistoryEvent> newEvents = request.NewEvents.Select(ProtoUtils.ConvertHistoryEvent);

        // Re-construct the orchestration state from the history.
        // New events must be added using the AddEvent method.
        OrchestrationRuntimeState runtimeState = new(pastEvents);
        foreach (HistoryEvent newEvent in newEvents)
        {
            runtimeState.AddEvent(newEvent);
        }

        TaskName orchestratorName = new(runtimeState.Name);
        ParentOrchestrationInstance? parent = runtimeState.ParentInstance is ParentInstance p
            ? new(new(p.Name), p.OrchestrationInstance.InstanceId)
            : null;

        DurableTaskShimFactory factory = services is null
            ? DurableTaskShimFactory.Default
            : ActivatorUtilities.GetServiceOrCreateInstance<DurableTaskShimFactory>(services);
        TaskOrchestration shim = factory.CreateOrchestration(orchestratorName, implementation, parent);
        TaskOrchestrationExecutor executor = new(runtimeState, shim, BehaviorOnContinueAsNew.Carryover, request.EntityParameters.ToCore(), ErrorPropagationMode.UseFailureDetails);
        // 代码进入这里
        OrchestratorExecutionResult result = executor.Execute();

        P.OrchestratorResponse response = ProtoUtils.ConstructOrchestratorResponse(
            request.InstanceId,
            result.CustomStatus,
            result.Actions);
        byte[] responseBytes = response.ToByteArray();
        return Convert.ToBase64String(responseBytes);
    }

TaskOrchestrationExecutor

DurableTask.Core.TaskOrchestrationExecutor 这个类在 Azure/durabletask 项目中

        public OrchestratorExecutionResult Execute()
        {
            return this.ExecuteCore(
                pastEvents: this.orchestrationRuntimeState.PastEvents,
                newEvents: this.orchestrationRuntimeState.NewEvents);
        }

第一次执行时,PastEvents 为空,NewEvents 里面有两个 event:

  • DurableTask.Core.History.OrchestratorStartedEvent
  • DurableTask.Core.History.ExecutionStartedEvent

ExecuteCore() 方法的实现:

       OrchestratorExecutionResult ExecuteCore(IEnumerable<HistoryEvent> pastEvents, IEnumerable<HistoryEvent> newEvents)
        {
            SynchronizationContext prevCtx = SynchronizationContext.Current;

            try
            {
                SynchronizationContext syncCtx = new TaskOrchestrationSynchronizationContext(this.decisionScheduler);
                SynchronizationContext.SetSynchronizationContext(syncCtx);
                OrchestrationContext.IsOrchestratorThread = true;

                try
                {
                    void ProcessEvents(IEnumerable<HistoryEvent> events)
                    {
                        foreach (HistoryEvent historyEvent in events)
                        {
                            if (historyEvent.EventType == EventType.OrchestratorStarted)
                            {
                                var decisionStartedEvent = (OrchestratorStartedEvent)historyEvent;
                                this.context.CurrentUtcDateTime = decisionStartedEvent.Timestamp;
                                continue;
                            }

                            // 进入这里
                            this.ProcessEvent(historyEvent);
                            historyEvent.IsPlayed = true;
                        }
                    }

                    // Replay the old history to rebuild the local state of the orchestration.
                    // TODO: Log a verbose message indicating that the replay has started (include event count?)
                    this.context.IsReplaying = true;
                    ProcessEvents(pastEvents);

                    // Play the newly arrived events to determine the next action to take.
                    // TODO: Log a verbose message indicating that new events are being processed (include event count?)
                    this.context.IsReplaying = false;
                    // 第一次调用会进去这里,IsReplaying 设置为 false
                    ProcessEvents(newEvents);

                    // check if workflow is completed after this replay
                    // TODO: Create a setting that allows orchestrations to complete when the orchestrator
                    //       function completes, even if there are open tasks.
                    if (!this.context.HasOpenTasks)
                    {
                        if (this.result!.IsCompleted)
                        {
                            if (this.result.IsFaulted)
                            {
                                Exception? exception = this.result.Exception?.InnerExceptions.FirstOrDefault();
                                Debug.Assert(exception != null);

                                if (Utils.IsExecutionAborting(exception!))
                                {
                                    // Let this exception propagate out to be handled by the dispatcher
                                    ExceptionDispatchInfo.Capture(exception).Throw();
                                }
                                
                                this.context.FailOrchestration(exception);
                            }
                            else
                            {
                                this.context.CompleteOrchestration(this.result.Result);
                            }
                        }

                        // TODO: It is an error if result is not completed when all OpenTasks are done.
                        // Throw an exception in that case.
                    }
                }
                catch (NonDeterministicOrchestrationException exception)
                {
                    this.context.FailOrchestration(exception);
                }

                return new OrchestratorExecutionResult
                {
                    Actions = this.context.OrchestratorActions,
                    CustomStatus = this.taskOrchestration.GetStatus(),
                };
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(prevCtx);
                OrchestrationContext.IsOrchestratorThread = false;
            }
        }

ProcessEvent() 方法

void ProcessEvent(HistoryEvent historyEvent)
        {
            bool overrideSuspension = historyEvent.EventType == EventType.ExecutionResumed || historyEvent.EventType == EventType.ExecutionTerminated;
            if (this.context.IsSuspended && !overrideSuspension)
            {
                this.context.HandleEventWhileSuspended(historyEvent);
            }
            else
            {
                switch (historyEvent.EventType)
                {
                    case EventType.ExecutionStarted:
                        // 执行这里的代码
                        var executionStartedEvent = (ExecutionStartedEvent)historyEvent;
                        this.result = this.taskOrchestration.Execute(this.context, executionStartedEvent.Input);
                        break;
                    case EventType.ExecutionTerminated:
                        this.context.HandleExecutionTerminatedEvent((ExecutionTerminatedEvent)historyEvent);
                        break;
                    case EventType.TaskScheduled:
                        this.context.HandleTaskScheduledEvent((TaskScheduledEvent)historyEvent);
                        break;
                    case EventType.TaskCompleted:
                        this.context.HandleTaskCompletedEvent((TaskCompletedEvent)historyEvent);
                        break;
                    case EventType.TaskFailed:
                        this.context.HandleTaskFailedEvent((TaskFailedEvent)historyEvent);
                        break;
                    case EventType.SubOrchestrationInstanceCreated:
                        this.context.HandleSubOrchestrationCreatedEvent((SubOrchestrationInstanceCreatedEvent)historyEvent);
                        break;
                    case EventType.SubOrchestrationInstanceCompleted:
                        this.context.HandleSubOrchestrationInstanceCompletedEvent(
                            (SubOrchestrationInstanceCompletedEvent)historyEvent);
                        break;
                    case EventType.SubOrchestrationInstanceFailed:
                        this.context.HandleSubOrchestrationInstanceFailedEvent((SubOrchestrationInstanceFailedEvent)historyEvent);
                        break;
                    case EventType.TimerCreated:
                        this.context.HandleTimerCreatedEvent((TimerCreatedEvent)historyEvent);
                        break;
                    case EventType.TimerFired:
                        this.context.HandleTimerFiredEvent((TimerFiredEvent)historyEvent);
                        break;
                    case EventType.EventSent:
                        this.context.HandleEventSentEvent((EventSentEvent)historyEvent);
                        break;
                    case EventType.EventRaised:
                        this.context.HandleEventRaisedEvent((EventRaisedEvent)historyEvent, this.skipCarryOverEvents, this.taskOrchestration);
                        break;
                    case EventType.ExecutionSuspended:
                        this.context.HandleExecutionSuspendedEvent((ExecutionSuspendedEvent)historyEvent);
                        break;
                    case EventType.ExecutionResumed:
                        this.context.HandleExecutionResumedEvent((ExecutionResumedEvent)historyEvent, ProcessEvent);
                        break;
                }
            }
        }

versioning TODO: 这里的 ExecutionStartedEvent 的 version 字段暂时为空,后面需要更新。

TaskOrchestrationShim

    public override async Task<string?> Execute(OrchestrationContext innerContext, string rawInput)
    {
        Check.NotNull(innerContext);
        JsonDataConverterShim converterShim = new(this.invocationContext.Options.DataConverter);
        innerContext.MessageDataConverter = converterShim;
        innerContext.ErrorDataConverter = converterShim;

        object? input = this.DataConverter.Deserialize(rawInput, this.implementation.InputType);
        this.wrapperContext = new(innerContext, this.invocationContext, input);

        try
        {
            object? output = await this.implementation.RunAsync(this.wrapperContext, input);

            // Return the output (if any) as a serialized string.
            return this.DataConverter.Serialize(output);
        }
        finally
        {
            // if user code crashed inside a critical section, or did not exit it, do that now
            this.wrapperContext.ExitCriticalSectionIfNeeded();
        }
    }

versioning TODO: 这里的 OrchestrationContext 的 OrchestrationInstance 字段只包含 InstanceId 和 ExecutionId,需要增加一个 InstanceVersion 字段,其值应该从 ExecutionStartedEvent 的 version 字段中获取。

wrapperContext 的实现是 TaskOrchestrationContextWrapper

versioning TODO: TaskOrchestrationContextWrapper 需要增加一个 InstanceVersion 字段,其值从 this.innerContext.OrchestrationInstance.InstanceVersion 中获取

this.implementation 的实现是 Microsoft.Azure.Functions.Worker.Extensions.DurableTask.FunctionsOrchestrator

FunctionsOrchestrator

public async Task<object?> RunAsync(TaskOrchestrationContext context, object? input)
    {
        // Set the function input to be the orchestration context wrapped in our own object so that we can
        // intercept any of the calls and inject our own logic or tracking.
        FunctionsOrchestrationContext wrapperContext = new(context, this.functionContext);
        this.contextBinding.Value = wrapperContext;
        this.inputContext.PrepareInput(input);

        try
        {
            // This method will advance to the next middleware and throw if it detects an asynchronous execution.
            await EnsureSynchronousExecution(this.functionContext, this.next, wrapperContext);
        }
        catch (Exception ex)
        {
            this.functionContext.GetLogger<FunctionsOrchestrator>().LogError(
                ex,
                "An error occurred while executing the orchestrator function '{FunctionName}'.",
                this.functionContext.FunctionDefinition.Name);
            throw;
        }

        // Set the raw function output as the orchestrator output
        object? functionOutput = this.functionContext.GetInvocationResult().Value;
        return functionOutput;
    }

EnsureSynchronousExecution的实现:

    private static async Task EnsureSynchronousExecution(
        FunctionContext functionContext,
        FunctionExecutionDelegate next,
        FunctionsOrchestrationContext orchestrationContext)
    {
        Task orchestratorTask = next(functionContext);
        if (!orchestratorTask.IsCompleted && !orchestrationContext.IsAccessed)
        {
            // If the middleware returns before the orchestrator function's context object was accessed and before
            // it completes its execution, then we know that either some middleware component went async or that the
            // orchestrator function did some illegal await as its very first action.
            throw new InvalidOperationException(Constants.IllegalAwaitErrorMessage);
        }

        await orchestratorTask;

        // This will throw if either the orchestrator performed an illegal await or if some middleware ahead of this
        // one performed some illegal await.
        orchestrationContext.ThrowIfIllegalAccess();
    }

next 函数的实现是在前面定义的:

                return context =>
                {
                    var middleware = context.InstanceServices.GetRequiredService<T>();

                    return middleware.Invoke(context, next);
                };

Azure-functions-dotnet-worker

FunctionExecutionMiddleware

public async Task Invoke(FunctionContext context, FunctionExecutionDelegate next)
        {
            // Only use the coordinator for HttpTriggers
            if (!_isHttpTrigger.GetOrAdd(context.FunctionId, static (_, c) => IsHttpTriggerFunction(c), context))
            {
                await next(context);
                return;
            }

            var invocationId = context.InvocationId;

            // this call will block until the ASP.NET middleware pipeline has signaled that it's ready to run the function
            var httpContext = await _coordinator.SetFunctionContextAsync(invocationId, context);

            AddHttpContextToFunctionContext(context, httpContext);

            // Register additional context features
            context.Features.Set<IFromBodyConversionFeature>(FromBodyConverstionFeature.Instance);

            await next(context);

            var invocationResult = context.GetInvocationResult();

            if (invocationResult?.Value is IActionResult actionResult)
            {
                ActionContext actionContext = new ActionContext(httpContext, httpContext.GetRouteData(), new ActionDescriptor());

                await actionResult.ExecuteResultAsync(actionContext);
            }
            else if (invocationResult?.Value is AspNetCoreHttpResponseData)
            {
                // The AspNetCoreHttpResponseData implementation is
                // simply a wrapper over the underlying HttpResponse and
                // all APIs manipulate the request.
                // There's no need to return this result as no additional
                // processing is required.
                invocationResult.Value = null;
            }

            // allows asp.net middleware to continue
            _coordinator.CompleteFunctionInvocation(invocationId);
        }
        public Task Invoke(FunctionContext context)
        {
            return _functionExecutor.ExecuteAsync(context).AsTask();
        }

这里的 _functionExecutor 的实现是 MyDurableFunction1.DirectFunctionExecutor

OutputBindingsMiddleware

        public static async Task Invoke(FunctionContext context, FunctionExecutionDelegate next)
        {
            await next(context);

            AddOutputBindings(context);
        }

Work.Sdk.Generator.GeneratedFunctionExecutor

被 GeneratedFunctionExecutor.g.cs 调用:

        public async ValueTask ExecuteAsync(FunctionContext context)
        {
            var inputBindingFeature = context.Features.Get<IFunctionInputBindingFeature>();
            var inputBindingResult = await inputBindingFeature.BindFunctionInputAsync(context);
            var inputArguments = inputBindingResult.Values;

            if (string.Equals(context.FunctionDefinition.EntryPoint, "Company.Function.HelloOrchestration.RunOrchestrator", StringComparison.Ordinal))
            {
                context.GetInvocationResult().Value = await global::Company.Function.HelloOrchestration.RunOrchestrator((global::Microsoft.DurableTask.TaskOrchestrationContext)inputArguments[0]);
            }
            else if (string.Equals(context.FunctionDefinition.EntryPoint, "Company.Function.HelloOrchestration.SayHello", StringComparison.Ordinal))
            {
                context.GetInvocationResult().Value = global::Company.Function.HelloOrchestration.SayHello((string)inputArguments[0], (global::Microsoft.Azure.Functions.Worker.FunctionContext)inputArguments[1]);
            }
            else if (string.Equals(context.FunctionDefinition.EntryPoint, "Company.Function.HelloOrchestration.HttpStart", StringComparison.Ordinal))
            {
                context.GetInvocationResult().Value = await global::Company.Function.HelloOrchestration.HttpStart((global::Microsoft.Azure.Functions.Worker.Http.HttpRequestData)inputArguments[0], (global::Microsoft.DurableTask.Client.DurableTaskClient)inputArguments[1], (global::Microsoft.Azure.Functions.Worker.FunctionContext)inputArguments[2]);
            }
        }

通过检查 context.FunctionDefinition.EntryPoint 的值,如果为以下值时,则分别调用对应的 function:

context.FunctionDefinition.EntryPoint 的值 function funciton source code
“Company.Function.HelloOrchestration.SayHello” HelloOrchestration.RunOrchestrator() [Function(nameof(HelloOrchestration))]
“Company.Function.HelloOrchestration.RunOrchestrator” HelloOrchestration.SayHello() [Function(nameof(SayHello))]
“Company.Function.HelloOrchestration.HttpStart” HelloOrchestration.HttpStart() [Function(“HelloOrchestration_HttpStart”)]

其中,HttpStart() function 是用来接受 http 请求然后出发 Schedule New Orchestration Instance 操作的。

之后 Orchestration Engine 就会启动 Orchestration ,然后 RunOrchestrator() 方法被执行。

context.GetInvocationResult().Value = await global::Company.Function.HelloOrchestration.RunOrchestrator((global::Microsoft.DurableTask.TaskOrchestrationContext)inputArguments[0]);

Customer Code

HelloOrchestration function

以 quickstart HelloOrchestration.cs 为例:

        [Function(nameof(HelloOrchestration))]
        public static async Task<List<string>> RunOrchestrator(
            [OrchestrationTrigger("1.5.6")] TaskOrchestrationContext context)
        {
           var instanceId = context.InstanceId;
           var InstanceVersion = context.InstanceVersion;
            ......
        }

8.3 - trigger

DurableTask trigger

src\WebJobs.Extensions.DurableTask\TriggerAttributes

8.3.1 - OrchestrationTriggerAttribute

OrchestrationTriggerAttribute
    public sealed class OrchestrationTriggerAttribute : Attribute
    {
        /// <summary>
        /// Gets or sets the name of the orchestrator function.
        /// </summary>
        /// <remarks>
        /// If not specified, the function name is used as the name of the orchestration.
        /// </remarks>
        /// <value>
        /// The name of the orchestrator function or <c>null</c> to use the function name.
        /// </value>
#pragma warning disable CS0618 // Type or member is obsolete
        [AutoResolve]
#pragma warning restore CS0618 // Type or member is obsolete
        public string Orchestration { get; set; }
    }

Orchestration() method

Gets or sets the name of the orchestrator function.

获取或设置协调器函数的名称。

If not specified, the function name is used as the name of the orchestration.

如果未指定,函数名称将用作协调名称。

Value: The name of the orchestrator function or null to use the function name.

这个方法被 src\WebJobs.Extensions.DurableTask\Bindings\OrchestrationTriggerAttributeBindingProvider.cs 的 TryCreateAsync() 方法调用:

public Task<ITriggerBinding?> TryCreateAsync(TriggerBindingProviderContext context)
        {
            if (context == null)
            {
                throw new ArgumentNullException(nameof(context));
            }

            ParameterInfo parameter = context.Parameter;
            OrchestrationTriggerAttribute? trigger = parameter.GetCustomAttribute<OrchestrationTriggerAttribute>(inherit: false);
            if (trigger == null)
            {
                return Task.FromResult<ITriggerBinding?>(null);
            }

            // Priority for getting the name is [OrchestrationTrigger], [FunctionName], method name
            string name = trigger.Orchestration;
            if (string.IsNullOrEmpty(name))
            {
                MemberInfo method = context.Parameter.Member;
                name = method.GetCustomAttribute<FunctionNameAttribute>()?.Name ?? method.Name;
            }

            var orchestratorName = new FunctionName(name);
            if (name.StartsWith("@"))
            {
                throw new ArgumentException("Orchestration names must not start with @.");
            }

            this.config.RegisterOrchestrator(orchestratorName, null);
            var binding = new OrchestrationTriggerBinding(this.config, parameter, orchestratorName, this.connectionName, this.platormInformation);
            return Task.FromResult<ITriggerBinding?>(binding);
        }