Dapr Runtime 处理来自客户端的 publish 请求

Dapr Runtime 接收来自客户端的 publish 请求的代码分析

在 dapr runtime 中,提供 HTTP 和 gRPC 两种协议,前面 runtime 初始化时介绍了 HTTP 和 gRPC 两种协议是如何在 runtime 初始化时准备好接收来自客户端的 publish 请求的。现在我们介绍在接收到来自客户端的 publish 请求后,dapr runtime 是如何处理请求的。

gRPC API

在 gRPC API 的实现中,PublishEvent() 方法负责处理接收到的 publish 请求,其主要流程大体是如下4个步骤:

type api struct {
    pubsubAdapter              runtimePubsub.Adapter
}

func (a *api) PublishEvent(ctx context.Context, in *runtimev1pb.PublishEventRequest) (*emptypb.Empty, error) {
  // 1. 根据名称找到可以处理请求的 pubsub 组件
  thepubsub := a.pubsubAdapter.GetPubSub(pubsubName)
  // 2. 处理参数的细节:如是否要封装为 cloudevent
  // 细节忽略,后续展开
  // 3. 构建 PublishRequest 请求对象
  req := pubsub.PublishRequest{
		PubsubName: pubsubName,
		Topic:      topic,
		Data:       data,
		Metadata:   in.Metadata,
	}
  // 4. 未退 pubsub 组件来负责具体的请求发送
  err := a.pubsubAdapter.Publish(&req)
}

查找处理请求的 pubsub 组件

  // 检查是否有初始化 pubsubAdapter,没有的话报错退出
  if a.pubsubAdapter == nil {
		err := status.Error(codes.FailedPrecondition, messages.ErrPubsubNotConfigured)
		apiServerLogger.Debug(err)
		return &emptypb.Empty{}, err
	}

	pubsubName := in.PubsubName
  // 检查请求,pubsubName 参数不能为空
	if pubsubName == "" {
		err := status.Error(codes.InvalidArgument, messages.ErrPubsubEmpty)
		apiServerLogger.Debug(err)
		return &emptypb.Empty{}, err
	}

  // 根据 pubsubName 参数在 pubsubAdapter 中找到对应的组件
	thepubsub := a.pubsubAdapter.GetPubSub(pubsubName)
	if thepubsub == nil {
    // 如果找不到,则报错退出
		err := status.Errorf(codes.InvalidArgument, messages.ErrPubsubNotFound, pubsubName)
		apiServerLogger.Debug(err)
		return &emptypb.Empty{}, err
	}

GetPubSub() 方法的实现很简单,就是根据 pubsubName 在现有已经初始化的 pubsub 组件中进行简单的map查找:

// GetPubSub is an adapter method to find a pubsub by name.
func (a *DaprRuntime) GetPubSub(pubsubName string) pubsub.PubSub {
	ps, ok := a.pubSubs[pubsubName]
	if !ok {
		return nil
	}
	return ps.component
}

委托 pubsub 组件发送请求

func (a *DaprRuntime) Publish(req *pubsub.PublishRequest) error {
  // 这里又根据名称做了一次查找
  // TBD:可以考虑做代码优化了,从前面把找到的组件传递过来就好了
	ps, ok := a.pubSubs[req.PubsubName]
	if !ok {
		return runtimePubsub.NotFoundError{PubsubName: req.PubsubName}
	}

  // 检查 pubsub 操作是否被容许
	if allowed := a.isPubSubOperationAllowed(req.PubsubName, req.Topic, ps.scopedPublishings); !allowed {
		return runtimePubsub.NotAllowedError{Topic: req.Topic, ID: a.runtimeConfig.ID}
	}

  // 执行策略
	policy := a.resiliency.ComponentOutboundPolicy(a.ctx, req.PubsubName)
	return policy(func(ctx context.Context) (err error) {
    // 最终调用到底层实际组件的 Publish 方法来发送请求
		return ps.component.Publish(req)
	})
}

HTTP API

HTTP API 的处理方式和 gRPC API 是一致的,只是 HTTP API 这边由于 HTTP 协议的原因,在请求参数的获取上无法像 gRPC API 那样有一个的 runtimev1pb.PublishEventRequest 对象可以完整的封装所有请求参数,HTTP API 会多出一个请求参数的获取过程。

从 HTTP 请求中获取所有参数

HTTP API 实现中的 onPublish() 方法的前面一段代码就是在处理如何从 HTTP 请求中获取 publish 所需的所有参数:

func (a *api) onPublish(reqCtx *fasthttp.RequestCtx) {
  // 1. pubsubName
  pubsubName := reqCtx.UserValue(pubsubnameparam).(string)
  // 2. topic
  topic := reqCtx.UserValue(topicParam).(string)
  // 3. data
  body := reqCtx.PostBody()
  // 4. data content type
	contentType := string(reqCtx.Request.Header.Peek("Content-Type"))
  // 5. metadata
	metadata := getMetadataFromRequest(reqCtx)
  
  // 后续处理和 gRPC 协议一致
  ......
}