这是本节的多页打印视图。 点击此处打印.

返回本页常规视图.

workflow的源码

Dapr workflow的源码

1 - workflow API

Dapr workflow的API定义

proto 定义

dapr/proto/runtime/v1/dapr.proto

service Dapr {
  // Starts a new instance of a workflow
  rpc StartWorkflowAlpha1 (StartWorkflowRequest) returns (StartWorkflowResponse) {}

  // Gets details about a started workflow instance
  rpc GetWorkflowAlpha1 (GetWorkflowRequest) returns (GetWorkflowResponse) {}

  // Purge Workflow
  rpc PurgeWorkflowAlpha1 (PurgeWorkflowRequest) returns (google.protobuf.Empty) {}

  // Terminates a running workflow instance
  rpc TerminateWorkflowAlpha1 (TerminateWorkflowRequest) returns (google.protobuf.Empty) {}

  // Pauses a running workflow instance
  rpc PauseWorkflowAlpha1 (PauseWorkflowRequest) returns (google.protobuf.Empty) {}

  // Resumes a paused workflow instance
  rpc ResumeWorkflowAlpha1 (ResumeWorkflowRequest) returns (google.protobuf.Empty) {}

  // Raise an event to a running workflow instance
  rpc RaiseEventWorkflowAlpha1 (RaiseEventWorkflowRequest) returns (google.protobuf.Empty) {}
}

workflow 没有 sidecar 往应用方向发请求的场景,也就是没有 appcallback 。

生成的 go 代码

pkg/proto/runtime/v1 下存放的是根据 proto 生成的 go 代码

比如 pkg/proto/runtime/v1/dapr_grpc.pb.go

2 - workflow HTTP API

Dapr workflow的HTTP API实现

pkg/http/api.go

构建workflow的endpoint

const (
    workflowComponent        = "workflowComponent"
	workflowName             = "workflowName"
)

func NewAPI(opts APIOpts) API {
	api := &api{
        ......
	api.endpoints = append(api.endpoints, api.constructWorkflowEndpoints()...)
	return api
}
    

constructWorkflowEndpoints() 方法的实现在 pkg/http/api_workflow.go 中:

func (a *api) constructWorkflowEndpoints() []Endpoint {
	return []Endpoint{
		{
			Methods: []string{http.MethodGet},
			Route:   "workflows/{workflowComponent}/{instanceID}",
			Version: apiVersionV1alpha1,
			Handler: a.onGetWorkflowHandler(),
		},
		{
			Methods: []string{http.MethodPost},
			Route:   "workflows/{workflowComponent}/{instanceID}/raiseEvent/{eventName}",
			Version: apiVersionV1alpha1,
			Handler: a.onRaiseEventWorkflowHandler(),
		},
		{
			Methods: []string{http.MethodPost},
			Route:   "workflows/{workflowComponent}/{workflowName}/start",
			Version: apiVersionV1alpha1,
			Handler: a.onStartWorkflowHandler(),
		},
		{
			Methods: []string{http.MethodPost},
			Route:   "workflows/{workflowComponent}/{instanceID}/pause",
			Version: apiVersionV1alpha1,
			Handler: a.onPauseWorkflowHandler(),
		},
		{
			Methods: []string{http.MethodPost},
			Route:   "workflows/{workflowComponent}/{instanceID}/resume",
			Version: apiVersionV1alpha1,
			Handler: a.onResumeWorkflowHandler(),
		},
		{
			Methods: []string{http.MethodPost},
			Route:   "workflows/{workflowComponent}/{instanceID}/terminate",
			Version: apiVersionV1alpha1,
			Handler: a.onTerminateWorkflowHandler(),
		},
		{
			Methods: []string{http.MethodPost},
			Route:   "workflows/{workflowComponent}/{instanceID}/purge",
			Version: apiVersionV1alpha1,
			Handler: a.onPurgeWorkflowHandler(),
		},
	}
}

handler 实现

pkg/http/api_workflow.go

onStartWorkflowHandler()

// Route:   "workflows/{workflowComponent}/{workflowName}/start?instanceID={instanceID}",
// Workflow Component: Component specified in yaml
// Workflow Name: Name of the workflow to run
// Instance ID: Identifier of the specific run
func (a *api) onStartWorkflowHandler() http.HandlerFunc {
	return UniversalHTTPHandler(
		a.universal.StartWorkflowAlpha1,
        // UniversalHTTPHandlerOpts 是范型结构体
		UniversalHTTPHandlerOpts[*runtimev1pb.StartWorkflowRequest, *runtimev1pb.StartWorkflowResponse]{
			// We pass the input body manually rather than parsing it using protojson
			SkipInputBody: true,
			InModifier: func(r *http.Request, in *runtimev1pb.StartWorkflowRequest) (*runtimev1pb.StartWorkflowRequest, error) {
				in.WorkflowName = chi.URLParam(r, workflowName)
				in.WorkflowComponent = chi.URLParam(r, workflowComponent)

                // instance id 是可选的,如果没有指定则生成一个随机的
				// The instance ID is optional. If not specified, we generate a random one.
				in.InstanceId = r.URL.Query().Get(instanceID)
				if in.InstanceId == "" {
					randomID, err := uuid.NewRandom()
					if err != nil {
						return nil, err
					}
					in.InstanceId = randomID.String()
				}

                // HTTP request body 直接用来做 workflow 的 Input
				// We accept the HTTP request body as the input to the workflow
				// without making any assumptions about its format.
				var err error
				in.Input, err = io.ReadAll(r.Body)
				if err != nil {
					return nil, messages.ErrBodyRead.WithFormat(err)
				}
				return in, nil
			},
			SuccessStatusCode: http.StatusAccepted,
		})
}

onGetWorkflowHandler()

// Route: POST "workflows/{workflowComponent}/{instanceID}"
func (a *api) onGetWorkflowHandler() http.HandlerFunc {
	return UniversalHTTPHandler(
		a.universal.GetWorkflowAlpha1,
		UniversalHTTPHandlerOpts[*runtimev1pb.GetWorkflowRequest, *runtimev1pb.GetWorkflowResponse]{
			InModifier: workflowInModifier[*runtimev1pb.GetWorkflowRequest],
		})
}

workflowInModifier() 方法是通用方法,读取 WorkflowComponent 和 InstanceId 两个参数:

// Shared InModifier method for all universal handlers for workflows that adds the "WorkflowComponent" and "InstanceId" properties
func workflowInModifier[T runtimev1pb.WorkflowRequests](r *http.Request, in T) (T, error) {
	in.SetWorkflowComponent(chi.URLParam(r, workflowComponent))
	in.SetInstanceId(chi.URLParam(r, instanceID))
	return in, nil
}

3 - workflow gRPC API

Dapr workflow的gRPC API实现

proto 定义

dapr/proto/runtime/v1/dapr.proto

service Dapr {
  // Starts a new instance of a workflow
  rpc StartWorkflowAlpha1 (StartWorkflowRequest) returns (StartWorkflowResponse) {}

  // Gets details about a started workflow instance
  rpc GetWorkflowAlpha1 (GetWorkflowRequest) returns (GetWorkflowResponse) {}

  // Purge Workflow
  rpc PurgeWorkflowAlpha1 (PurgeWorkflowRequest) returns (google.protobuf.Empty) {}

  // Terminates a running workflow instance
  rpc TerminateWorkflowAlpha1 (TerminateWorkflowRequest) returns (google.protobuf.Empty) {}

  // Pauses a running workflow instance
  rpc PauseWorkflowAlpha1 (PauseWorkflowRequest) returns (google.protobuf.Empty) {}

  // Resumes a paused workflow instance
  rpc ResumeWorkflowAlpha1 (ResumeWorkflowRequest) returns (google.protobuf.Empty) {}

  // Raise an event to a running workflow instance
  rpc RaiseEventWorkflowAlpha1 (RaiseEventWorkflowRequest) returns (google.protobuf.Empty) {}
}

workflow 没有 sidecar 往应用方向发请求的场景,也就是没有 appcallback 。

生成的 go 代码

pkg/proto/runtime/v1 下存放的是根据 proto 生成的 go 代码

比如 pkg/proto/runtime/v1/dapr_grpc.pb.go