workflow的源码
Dapr workflow的源码
1 - workflow API
Dapr workflow的API定义
proto 定义
dapr/proto/runtime/v1/dapr.proto
service Dapr {
// Starts a new instance of a workflow
rpc StartWorkflowAlpha1 (StartWorkflowRequest) returns (StartWorkflowResponse) {}
// Gets details about a started workflow instance
rpc GetWorkflowAlpha1 (GetWorkflowRequest) returns (GetWorkflowResponse) {}
// Purge Workflow
rpc PurgeWorkflowAlpha1 (PurgeWorkflowRequest) returns (google.protobuf.Empty) {}
// Terminates a running workflow instance
rpc TerminateWorkflowAlpha1 (TerminateWorkflowRequest) returns (google.protobuf.Empty) {}
// Pauses a running workflow instance
rpc PauseWorkflowAlpha1 (PauseWorkflowRequest) returns (google.protobuf.Empty) {}
// Resumes a paused workflow instance
rpc ResumeWorkflowAlpha1 (ResumeWorkflowRequest) returns (google.protobuf.Empty) {}
// Raise an event to a running workflow instance
rpc RaiseEventWorkflowAlpha1 (RaiseEventWorkflowRequest) returns (google.protobuf.Empty) {}
}
workflow 没有 sidecar 往应用方向发请求的场景,也就是没有 appcallback 。
生成的 go 代码
pkg/proto/runtime/v1
下存放的是根据 proto 生成的 go 代码
比如 pkg/proto/runtime/v1/dapr_grpc.pb.go
2 - workflow HTTP API
Dapr workflow的HTTP API实现
pkg/http/api.go
构建workflow的endpoint
const (
workflowComponent = "workflowComponent"
workflowName = "workflowName"
)
func NewAPI(opts APIOpts) API {
api := &api{
......
api.endpoints = append(api.endpoints, api.constructWorkflowEndpoints()...)
return api
}
constructWorkflowEndpoints() 方法的实现在 pkg/http/api_workflow.go
中:
func (a *api) constructWorkflowEndpoints() []Endpoint {
return []Endpoint{
{
Methods: []string{http.MethodGet},
Route: "workflows/{workflowComponent}/{instanceID}",
Version: apiVersionV1alpha1,
Handler: a.onGetWorkflowHandler(),
},
{
Methods: []string{http.MethodPost},
Route: "workflows/{workflowComponent}/{instanceID}/raiseEvent/{eventName}",
Version: apiVersionV1alpha1,
Handler: a.onRaiseEventWorkflowHandler(),
},
{
Methods: []string{http.MethodPost},
Route: "workflows/{workflowComponent}/{workflowName}/start",
Version: apiVersionV1alpha1,
Handler: a.onStartWorkflowHandler(),
},
{
Methods: []string{http.MethodPost},
Route: "workflows/{workflowComponent}/{instanceID}/pause",
Version: apiVersionV1alpha1,
Handler: a.onPauseWorkflowHandler(),
},
{
Methods: []string{http.MethodPost},
Route: "workflows/{workflowComponent}/{instanceID}/resume",
Version: apiVersionV1alpha1,
Handler: a.onResumeWorkflowHandler(),
},
{
Methods: []string{http.MethodPost},
Route: "workflows/{workflowComponent}/{instanceID}/terminate",
Version: apiVersionV1alpha1,
Handler: a.onTerminateWorkflowHandler(),
},
{
Methods: []string{http.MethodPost},
Route: "workflows/{workflowComponent}/{instanceID}/purge",
Version: apiVersionV1alpha1,
Handler: a.onPurgeWorkflowHandler(),
},
}
}
handler 实现
pkg/http/api_workflow.go
onStartWorkflowHandler()
// Route: "workflows/{workflowComponent}/{workflowName}/start?instanceID={instanceID}",
// Workflow Component: Component specified in yaml
// Workflow Name: Name of the workflow to run
// Instance ID: Identifier of the specific run
func (a *api) onStartWorkflowHandler() http.HandlerFunc {
return UniversalHTTPHandler(
a.universal.StartWorkflowAlpha1,
// UniversalHTTPHandlerOpts 是范型结构体
UniversalHTTPHandlerOpts[*runtimev1pb.StartWorkflowRequest, *runtimev1pb.StartWorkflowResponse]{
// We pass the input body manually rather than parsing it using protojson
SkipInputBody: true,
InModifier: func(r *http.Request, in *runtimev1pb.StartWorkflowRequest) (*runtimev1pb.StartWorkflowRequest, error) {
in.WorkflowName = chi.URLParam(r, workflowName)
in.WorkflowComponent = chi.URLParam(r, workflowComponent)
// instance id 是可选的,如果没有指定则生成一个随机的
// The instance ID is optional. If not specified, we generate a random one.
in.InstanceId = r.URL.Query().Get(instanceID)
if in.InstanceId == "" {
randomID, err := uuid.NewRandom()
if err != nil {
return nil, err
}
in.InstanceId = randomID.String()
}
// HTTP request body 直接用来做 workflow 的 Input
// We accept the HTTP request body as the input to the workflow
// without making any assumptions about its format.
var err error
in.Input, err = io.ReadAll(r.Body)
if err != nil {
return nil, messages.ErrBodyRead.WithFormat(err)
}
return in, nil
},
SuccessStatusCode: http.StatusAccepted,
})
}
onGetWorkflowHandler()
// Route: POST "workflows/{workflowComponent}/{instanceID}"
func (a *api) onGetWorkflowHandler() http.HandlerFunc {
return UniversalHTTPHandler(
a.universal.GetWorkflowAlpha1,
UniversalHTTPHandlerOpts[*runtimev1pb.GetWorkflowRequest, *runtimev1pb.GetWorkflowResponse]{
InModifier: workflowInModifier[*runtimev1pb.GetWorkflowRequest],
})
}
workflowInModifier() 方法是通用方法,读取 WorkflowComponent 和 InstanceId 两个参数:
// Shared InModifier method for all universal handlers for workflows that adds the "WorkflowComponent" and "InstanceId" properties
func workflowInModifier[T runtimev1pb.WorkflowRequests](r *http.Request, in T) (T, error) {
in.SetWorkflowComponent(chi.URLParam(r, workflowComponent))
in.SetInstanceId(chi.URLParam(r, instanceID))
return in, nil
}
3 - workflow gRPC API
Dapr workflow的gRPC API实现
proto 定义
dapr/proto/runtime/v1/dapr.proto
service Dapr {
// Starts a new instance of a workflow
rpc StartWorkflowAlpha1 (StartWorkflowRequest) returns (StartWorkflowResponse) {}
// Gets details about a started workflow instance
rpc GetWorkflowAlpha1 (GetWorkflowRequest) returns (GetWorkflowResponse) {}
// Purge Workflow
rpc PurgeWorkflowAlpha1 (PurgeWorkflowRequest) returns (google.protobuf.Empty) {}
// Terminates a running workflow instance
rpc TerminateWorkflowAlpha1 (TerminateWorkflowRequest) returns (google.protobuf.Empty) {}
// Pauses a running workflow instance
rpc PauseWorkflowAlpha1 (PauseWorkflowRequest) returns (google.protobuf.Empty) {}
// Resumes a paused workflow instance
rpc ResumeWorkflowAlpha1 (ResumeWorkflowRequest) returns (google.protobuf.Empty) {}
// Raise an event to a running workflow instance
rpc RaiseEventWorkflowAlpha1 (RaiseEventWorkflowRequest) returns (google.protobuf.Empty) {}
}
workflow 没有 sidecar 往应用方向发请求的场景,也就是没有 appcallback 。
生成的 go 代码
pkg/proto/runtime/v1
下存放的是根据 proto 生成的 go 代码
比如 pkg/proto/runtime/v1/dapr_grpc.pb.go