资源绑定的output处理源码分析

Dapr资源绑定的output处理源码分析

pkc/grpc/api.go 中的 InvokeBinding 方法:

func (a *api) InvokeBinding(ctx context.Context, in *runtimev1pb.InvokeBindingRequest) (*runtimev1pb.InvokeBindingResponse, error) {
	req := &bindings.InvokeRequest{
		Metadata:  in.Metadata,
		Operation: bindings.OperationKind(in.Operation),
	}
	if in.Data != nil {
		req.Data = in.Data
	}

	r := &runtimev1pb.InvokeBindingResponse{}
  // 关键实现在这里
	resp, err := a.sendToOutputBindingFn(in.Name, req)
	if err != nil {
		err = fmt.Errorf("ERR_INVOKE_OUTPUT_BINDING: %s", err)
		apiServerLogger.Debug(err)
		return r, err
	}

	if resp != nil {
		r.Data = resp.Data
		r.Metadata = resp.Metadata
	}
	return r, nil
}

sendToOutputBindingFn 方法的初始化在这里:

func (a *DaprRuntime) getGRPCAPI() grpc.API {
	return grpc.NewAPI(a.runtimeConfig.ID, a.appChannel, a.stateStores, a.secretStores, a.getPublishAdapter(), a.directMessaging, a.actor, a.sendToOutputBinding, a.globalConfig.Spec.TracingSpec)
}

sendToOutputBinding 方法的实现在 pkg/runtime/runtime.go:

func (a *DaprRuntime) sendToOutputBinding(name string, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
   if req.Operation == "" {
      return nil, errors.New("operation field is missing from request")
   }

   // 根据 name 找已经注册好的 binding
   if binding, ok := a.outputBindings[name]; ok {
      ops := binding.Operations()
      for _, o := range ops {
      	 // 找到改 binding 下支持的 operation
         if o == req.Operation {
         		// 关键代码,需要转到具体的实现了
            return binding.Invoke(req)
         }
      }
      supported := make([]string, len(ops))
      for _, o := range ops {
         supported = append(supported, string(o))
      }
      return nil, errors.Errorf("binding %s does not support operation %s. supported operations:%s", name, req.Operation, strings.Join(supported, " "))
   }
   return nil, errors.Errorf("couldn't find output binding %s", name)
}