命名解析的设计和实现
1 - 命名解析概述
介绍
Name resolvers provide a common way to interact with different name resolvers, which are used to return the address or IP of other services your applications may connect to.
命名解析器提供了一种与不同命名解析器互动的通用方法,这些解析器用于返回你的应用程序可能要连接到的其他服务的地址或IP。
接口定义
兼容的名称解析器需要实现 nameresolution.go
文件中的 Resolver
接口。
// Resolver是命名解析器的接口。
type Resolver interface {
// Init initializes name resolver.
Init(metadata Metadata) error
// ResolveID resolves name to address.
ResolveID(req ResolveRequest) (string, error)
}
// ResolveRequest 表示服务发现解析器请求。
type ResolveRequest struct {
ID string
Namespace string
Port int
Data map[string]string
}
2 - 使用方式
解析地址
name resolver 被调用的地方只有一个:
func (d *directMessaging) getRemoteApp(appID string) (remoteApp, error) {
// 从appID中获取id和namespace
// appID 可能是类似 "appID.namespace" 的格式
id, namespace, err := d.requestAppIDAndNamespace(appID)
if err != nil {
return remoteApp{}, err
}
// 执行 resolver 的解析
request := nr.ResolveRequest{ID: id, Namespace: namespace, Port: d.grpcPort}
address, err := d.resolver.ResolveID(request)
if err != nil {
return remoteApp{}, err
}
// 返回 remoteApp 的地址
return remoteApp{
namespace: namespace,
id: id,
address: address,
}, nil
}
解析出来的地址在 directMessaging 的 Invoke() 中使用,用来执行远程调用:
// Invoke takes a message requests and invokes an app, either local or remote.
func (d *directMessaging) Invoke(ctx context.Context, targetAppID string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
app, err := d.getRemoteApp(targetAppID)
if err != nil {
return nil, err
}
// 如果目标应用的 id 和 namespace 都和 directMessaging 的一致,则执行 invokeLocal()
if app.id == d.appID && app.namespace == d.namespace {
return d.invokeLocal(ctx, req)
}
// 这是在带有重试机制的情况下调用 invokeRemote
return d.invokeWithRetry(ctx, retry.DefaultLinearRetryCount, retry.DefaultLinearBackoffInterval, app, d.invokeRemote, req)
}
invokeWithRetry() 中忽略重试的代码:
func (d *directMessaging) invokeWithRetry(
ctx context.Context,
numRetries int,
backoffInterval time.Duration,
app remoteApp,
fn func(ctx context.Context, appID, namespace, appAddress string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error),
req *invokev1.InvokeMethodRequest,
) (*invokev1.InvokeMethodResponse, error) {
}
invokeRemote()
func (d *directMessaging) invokeRemote(ctx context.Context, appID, namespace, appAddress string, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
//
conn, teardown, err := d.connectionCreatorFn(context.TODO(), appAddress, appID, namespace, false, false, false)
defer teardown()
if err != nil {
return nil, err
}
ctx = d.setContextSpan(ctx)
d.addForwardedHeadersToMetadata(req)
d.addDestinationAppIDHeaderToMetadata(appID, req)
clientV1 := internalv1pb.NewServiceInvocationClient(conn)
var opts []grpc.CallOption
opts = append(opts, grpc.MaxCallRecvMsgSize(d.maxRequestBodySize*1024*1024), grpc.MaxCallSendMsgSize(d.maxRequestBodySize*1024*1024))
resp, err := clientV1.CallLocal(ctx, req.Proto(), opts...)
if err != nil {
return nil, err
}
return invokev1.InternalInvokeResponse(resp)
}
3 - mdns命名解析
基本输入输出
跳过细节和错误处理,尤其是去除所有同步保护代码(很复杂),只简单看输入和输出:
// ResolveID 通过 mDNS 将名称解析为地址。
func (m *Resolver) ResolveID(req nameresolution.ResolveRequest) (string, error) {
m.browseOne(ctx, req.ID, published)
select {
case addr := <-sub.AddrChan:
return addr, nil
case err := <-sub.ErrChan:
return "", err
case <-time.After(subscriberTimeout):
return "", fmt.Errorf("timeout waiting for address for app id %s", req.ID)
}
}
func (m *Resolver) browseOne(ctx context.Context, appID string, published chan struct{}) {
err := m.browse(browseCtx, appID, onFirst)
}
注意:只用到了 req.ID, 全程没有使用 req.Namespace,也就是 MDNS 根本不支持 Namespace.
mdns解析方式
mdns 的核心实现在 browseOne() 方法中:
func (m *Resolver) browseOne(ctx context.Context, appID string, published chan struct{}) {
// 启动一个 goroutine 异步执行
go func() {
var addr string
browseCtx, cancel := context.WithCancel(ctx)
defer cancel()
// 准备回调函数,收到第一个地址之后就取消 browse,所以这个函数名为 browseOne
onFirst := func(ip string) {
addr = ip
cancel() // cancel to stop browsing.
}
m.logger.Debugf("Browsing for first mDNS address for app id %s", appID)
// 执行 browse
err := m.browse(browseCtx, appID, onFirst)
// 忽略错误处理
......
m.pubAddrToSubs(appID, addr)
published <- struct{}{} // signal that all subscribers have been notified.
}()
}
继续看 browse 的实现:
// browse 将对所提供的 App ID 进行无阻塞的 mdns 网络浏览
func (m *Resolver) browse(ctx context.Context, appID string, onEach func(ip string)) error {
......
}
首先通过 zeroconf.NewResolver 构建一个 Resolver:
import "github.com/grandcat/zeroconf"
resolver, err := zeroconf.NewResolver(nil)
if err != nil {
return fmt.Errorf("failed to initialize resolver: %w", err)
}
......
zeroconf 是一个纯Golang库,采用多播 DNS-SD 来浏览和解析网络中的服务,并在本地网络中注册自己的服务。
执行mdns解析的代码是 resolver.Browse() 方法,解析的结果会异步发送到 entries 这个 channel 中:
entries := make(chan *zeroconf.ServiceEntry)
if err = resolver.Browse(ctx, appID, "local.", entries); err != nil {
return fmt.Errorf("failed to browse: %w", err)
}
每个从 mDNS browse 返回的 service entry 会这样处理:
// handle each service entry returned from the mDNS browse.
go func(results <-chan *zeroconf.ServiceEntry) {
for {
select {
case entry := <-results:
if entry == nil {
break
}
// 调用 handleEntry 方法来处理每个返回的 service entry
handleEntry(entry)
case <-ctx.Done():
// 如果所有 service entry 都处理完成了,或者是出错(取消或者超时)
// 此时需要推出 browse,但在退出之前需要检查一下是否有已经收到但还没有处理的结果
for len(results) > 0 {
handleEntry(<-results)
}
if errors.Is(ctx.Err(), context.Canceled) {
m.logger.Debugf("mDNS browse for app id %s canceled.", appID)
} else if errors.Is(ctx.Err(), context.DeadlineExceeded) {
m.logger.Debugf("mDNS browse for app id %s timed out.", appID)
}
return // stop listening for results.
}
}
}(entries)
handleEntry() 方法的实现:
handleEntry := func(entry *zeroconf.ServiceEntry) {
for _, text := range entry.Text {
// 检查appID看是否是自己要查找的app
if text != appID {
m.logger.Debugf("mDNS response doesn't match app id %s, skipping.", appID)
break
}
m.logger.Debugf("mDNS response for app id %s received.", appID)
// 检查是否有 IPv4 或者 ipv6 地址
hasIPv4Address := len(entry.AddrIPv4) > 0
hasIPv6Address := len(entry.AddrIPv6) > 0
if !hasIPv4Address && !hasIPv6Address {
m.logger.Debugf("mDNS response for app id %s doesn't contain any IPv4 or IPv6 addresses, skipping.", appID)
break
}
var addr string
port := entry.Port
// 目前只支持取第一个地址
// TODO: we currently only use the first IPv4 and IPv6 address.
// We should understand the cases in which additional addresses
// are returned and whether we need to support them.
// 加入到缓存中,缓存后面细看
if hasIPv4Address {
addr = fmt.Sprintf("%s:%d", entry.AddrIPv4[0].String(), port)
m.addAppAddressIPv4(appID, addr)
}
if hasIPv6Address {
addr = fmt.Sprintf("%s:%d", entry.AddrIPv6[0].String(), port)
m.addAppAddressIPv6(appID, addr)
}
// 开始回调,就是前面说的拿到第一个地址就取消 browse
if onEach != nil {
onEach(addr) // invoke callback.
}
}
}
至此就完成了 mdns 的解析,从 ID 到 address。
缓存设计
mdns 是非常慢的,为了性能就需要缓存解析后的地址,前面的代码在解析完成之后会保存这些地址:
// addAppAddressIPv4 adds an IPv4 address to the
// cache for the provided app id.
func (m *Resolver) addAppAddressIPv4(appID string, addr string) {
m.ipv4Mu.Lock()
defer m.ipv4Mu.Unlock()
m.logger.Debugf("Adding IPv4 address %s for app id %s cache entry.", addr, appID)
if _, ok := m.appAddressesIPv4[appID]; !ok {
var addrList addressList
m.appAddressesIPv4[appID] = &addrList
}
m.appAddressesIPv4[appID].add(addr)
}
在解析之前,在 ResolveID() 方法中会线尝试检查缓存中是否有数据,如果有就直接使用:
func (m *Resolver) ResolveID(req nameresolution.ResolveRequest) (string, error) {
// check for cached IPv4 addresses for this app id first.
if addr := m.nextIPv4Address(req.ID); addr != nil {
return *addr, nil
}
// check for cached IPv6 addresses for this app id second.
if addr := m.nextIPv6Address(req.ID); addr != nil {
return *addr, nil
}
......
}
从缓存中获取appID对应的地址:
// nextIPv4Address returns the next IPv4 address for
// the provided app id from the cache.
func (m *Resolver) nextIPv4Address(appID string) *string {
m.ipv4Mu.RLock()
defer m.ipv4Mu.RUnlock()
addrList, exists := m.appAddressesIPv4[appID]
if exists {
addr := addrList.next()
if addr != nil {
m.logger.Debugf("found mDNS IPv4 address in cache: %s", *addr)
return addr
}
}
return nil
}
addrList.next() 比较有意思,这里不是要获取地址列表,而是取单个地址。也就是说,当有多个地址时,这里 addrList.next() 实际上实现了负载均衡 ^0^
负载均衡
addressList 结构体的组成:
// addressList represents a set of addresses along with
// data used to control and access said addresses.
type addressList struct {
addresses []address
counter int
mu sync.RWMutex
}
除了地址数组之外,还有一个 counter ,以及并发保护的读写锁。
// max integer value supported on this architecture.
const maxInt = int(^uint(0) >> 1)
// next 从列表中获取下一个地址,考虑到当前的循环实现。除了尽力而为的线性迭代,对选择没有任何保证。
func (a *addressList) next() *string {
// 获取读锁
a.mu.RLock()
defer a.mu.RUnlock()
if len(a.addresses) == 0 {
return nil
}
// 如果 counter 达到 maxInt,就从头再来
if a.counter == maxInt {
a.counter = 0
}
// 用地址数量 对 counter 求余,去余数所对应的地址,然后counter递增
// 相当于一个最简单常见的 轮询 算法
index := a.counter % len(a.addresses)
addr := a.addresses[index]
a.counter++
return &addr.ip
}
并发保护
为了避免多个请求同时去解析同一个 ID,因此设计了并发保护机制,对于单个ID,只容许一个请求执行解析,其他请求会等待这个解析的结果:
// ResolveID resolves name to address via mDNS.
func (m *Resolver) ResolveID(req nameresolution.ResolveRequest) (string, error) {
sub := NewSubscriber()
// add the sub to the pool of subs for this app id.
m.subMu.Lock()
appIDSubs, exists := m.subs[req.ID]
if !exists {
// WARN: must set appIDSubs variable for use below.
appIDSubs = NewSubscriberPool(sub)
m.subs[req.ID] = appIDSubs
} else {
appIDSubs.Add(sub)
}
m.subMu.Unlock()
// only one subscriber per pool will perform the first browse for the
// requested app id. The rest will subscribe for an address or error.
var once *sync.Once
var published chan struct{}
ctx, cancel := context.WithTimeout(context.Background(), browseOneTimeout)
defer cancel()
appIDSubs.Once.Do(func() {
published = make(chan struct{})
m.browseOne(ctx, req.ID, published)
// once will only be set for the first browser.
once = new(sync.Once)
})
......
}
总结
mdns name resolver 返回的是一个简单的 ip 地址+端口(v4或者v6),形如 “192.168.0.100:8000”。
4 - kubernetes
实现
kubernetes 的实现超级简单,直接按照 Kubernetes services 的格式要求,评出一个 Kubernetes services 的 name 即可:
// ResolveID resolves name to address in Kubernetes.
func (k *resolver) ResolveID(req nameresolution.ResolveRequest) (string, error) {
// Dapr requires this formatting for Kubernetes services
return fmt.Sprintf("%s-dapr.%s.svc.%s:%d", req.ID, req.Namespace, k.clusterDomain, req.Port), nil
}
其中, req.ID 和 req.Namespace 对应到 Kubernetes 的 service name 和 namespace,注意这里的 Kubernetes service 是在 ID 后面加了 “-dapr” 后缀。Port 来自请求参数,简单拼接而已。
clusterDomain 的设置
clusterDomain 稍微复杂一点,默认值是 “cluster.local”,在构建 Resolver 时设置:
const (
DefaultClusterDomain = "cluster.local"
)
type resolver struct {
logger logger.Logger
clusterDomain string
}
// NewResolver creates Kubernetes name resolver.
func NewResolver(logger logger.Logger) nameresolution.Resolver {
return &resolver{
logger: logger,
clusterDomain: DefaultClusterDomain,
}
}
可以在配置中设置名为 “clusterDomain” 的 metadata 来覆盖默认值:
const (
ClusterDomainKey = "clusterDomain"
)
func (k *resolver) Init(metadata nameresolution.Metadata) error {
configInterface, err := config.Normalize(metadata.Configuration)
if err != nil {
return err
}
if config, ok := configInterface.(map[string]string); ok {
clusterDomain := config[ClusterDomainKey]
if clusterDomain != "" {
k.clusterDomain = clusterDomain
}
}
return nil
}
总结
kubernetes name resolver 返回的是一个简单的 Kubernetes services 的 name,形如 “app1-dapr.default.svc.cluster.local:80”。而不是一般意义上的 IP 地址。
5 - dns
实现
dns 的实现也是超级简单,类似 kubernetes 的实现,直接按照 DNS 的格式要求,评出一个 Kubernetes services 的 name 即可:
// ResolveID resolves name to address in orchestrator.
func (k *resolver) ResolveID(req nameresolution.ResolveRequest) (string, error) {
return fmt.Sprintf("%s-dapr.%s.svc:%d", req.ID, req.Namespace, req.Port), nil
}
所有参数都来自请求,只是拼接而已。
总结
DNS name resolver 返回的是一个简单的 Kubernetes services 的 name,形如 “app1-dapr.default.svc:80”。而不是一般意义上的 IP 地址。
6 - consul
初始化
初始化需要读取配置,建立连接:
func (r *resolver) Init(metadata nr.Metadata) error {
var err error
r.config, err = getConfig(metadata)
if err != nil {
return err
}
if err = r.client.InitClient(r.config.Client); err != nil {
return fmt.Errorf("failed to init consul client: %w", err)
}
// register service to consul
......
return nil
}
服务注册
在 init 函数中,还可以根据配置的要求执行 consul 的服务注册功能:
// register service to consul
if r.config.Registration != nil {
if err := r.client.Agent().ServiceRegister(r.config.Registration); err != nil {
return fmt.Errorf("failed to register consul service: %w", err)
}
r.logger.Infof("service:%s registered on consul agent", r.config.Registration.Name)
} else if _, err := r.client.Agent().Self(); err != nil {
return fmt.Errorf("failed check on consul agent: %w", err)
}
解析器实现
consul 命名解析器的实现比较简单:
// ResolveID resolves name to address via consul.
func (r *resolver) ResolveID(req nr.ResolveRequest) (string, error) {
cfg := r.config
// 查询 consul 中对应服务的健康实例
// 只用到 req.ID,namespace 没有用到
services, _, err := r.client.Health().Service(req.ID, "", true, cfg.QueryOptions)
if err != nil {
return "", fmt.Errorf("failed to query healthy consul services: %w", err)
}
if len(services) == 0 {
return "", fmt.Errorf("no healthy services found with AppID:%s", req.ID)
}
// shuffle:洗牌,将传入的 services 按照随机方式对调位置
shuffle := func(services []*consul.ServiceEntry) []*consul.ServiceEntry {
for i := len(services) - 1; i > 0; i-- {
rndbig, _ := rand.Int(rand.Reader, big.NewInt(int64(i+1)))
j := rndbig.Int64()
services[i], services[j] = services[j], services[i]
}
return services
}
// 先洗牌,然后取结果中的第一个地址,相当于负载均衡中的随机算法
svc := shuffle(services)[0]
addr := ""
// 取地址和port信息
if port, ok := svc.Service.Meta[cfg.DaprPortMetaKey]; ok {
if svc.Service.Address != "" {
addr = fmt.Sprintf("%s:%s", svc.Service.Address, port)
} else if svc.Node.Address != "" {
addr = fmt.Sprintf("%s:%s", svc.Node.Address, port)
} else {
return "", fmt.Errorf("no healthy services found with AppID:%s", req.ID)
}
} else {
return "", fmt.Errorf("target service AppID:%s found but DAPR_PORT missing from meta", req.ID)
}
return addr, nil
}
总结
consul name resolver 返回的是一个简单的ip/端口字符串,形如 “192.168.0.100:80”。对于多个实例,内部实现了随机算法。