组件实现

组件实现publish的实际功能

组件接口中的 Publish() 方法定义

在 dapr runtime API 实现(包括 HTTP API 和 gRPC API)和底层 pubsub 组件之间,还有一个简单的内部接口,定义了 pubsub 组件的功能:

// PubSub is the interface for message buses.
type PubSub interface {
	Init(metadata Metadata) error
	Features() []Feature
	Publish(req *PublishRequest) error
	Subscribe(ctx context.Context, req SubscribeRequest, handler Handler) error
	Close() error
}

其中的 Publish() 用来发送消息。请求参数 PublishRequest 的字段和 Dapr API 定义中保持一致:

// PublishRequest is the request to publish a message.
type PublishRequest struct {
	Data        []byte            `json:"data"`
	PubsubName  string            `json:"pubsubname"`
	Topic       string            `json:"topic"`
	Metadata    map[string]string `json:"metadata"`
	ContentType *string           `json:"contentType,omitempty"`
}

redis 组件实现

以 redis stream 为例,看看 publish 方法的实现:

func (r *redisStreams) Publish(req *pubsub.PublishRequest) error {
	_, err := r.client.XAdd(r.ctx, &redis.XAddArgs{
		Stream:       req.Topic,
		MaxLenApprox: r.metadata.maxLenApprox,
		Values:       map[string]interface{}{"data": req.Data},
	}).Result()
	if err != nil {
		return fmt.Errorf("redis streams: error from publish: %s", err)
	}

	return nil
}

redis stream 的实现很简单,req.Topic 参数指定要写入的 redis stream,内容为一个map,其中 key “data” 的值为 req.Data。