mdns命名解析
基本输入输出
跳过细节和错误处理,尤其是去除所有同步保护代码(很复杂),只简单看输入和输出:
// ResolveID 通过 mDNS 将名称解析为地址。
func (m *Resolver) ResolveID(req nameresolution.ResolveRequest) (string, error) {
m.browseOne(ctx, req.ID, published)
select {
case addr := <-sub.AddrChan:
return addr, nil
case err := <-sub.ErrChan:
return "", err
case <-time.After(subscriberTimeout):
return "", fmt.Errorf("timeout waiting for address for app id %s", req.ID)
}
}
func (m *Resolver) browseOne(ctx context.Context, appID string, published chan struct{}) {
err := m.browse(browseCtx, appID, onFirst)
}
注意:只用到了 req.ID, 全程没有使用 req.Namespace,也就是 MDNS 根本不支持 Namespace.
mdns解析方式
mdns 的核心实现在 browseOne() 方法中:
func (m *Resolver) browseOne(ctx context.Context, appID string, published chan struct{}) {
// 启动一个 goroutine 异步执行
go func() {
var addr string
browseCtx, cancel := context.WithCancel(ctx)
defer cancel()
// 准备回调函数,收到第一个地址之后就取消 browse,所以这个函数名为 browseOne
onFirst := func(ip string) {
addr = ip
cancel() // cancel to stop browsing.
}
m.logger.Debugf("Browsing for first mDNS address for app id %s", appID)
// 执行 browse
err := m.browse(browseCtx, appID, onFirst)
// 忽略错误处理
......
m.pubAddrToSubs(appID, addr)
published <- struct{}{} // signal that all subscribers have been notified.
}()
}
继续看 browse 的实现:
// browse 将对所提供的 App ID 进行无阻塞的 mdns 网络浏览
func (m *Resolver) browse(ctx context.Context, appID string, onEach func(ip string)) error {
......
}
首先通过 zeroconf.NewResolver 构建一个 Resolver:
import "github.com/grandcat/zeroconf"
resolver, err := zeroconf.NewResolver(nil)
if err != nil {
return fmt.Errorf("failed to initialize resolver: %w", err)
}
......
zeroconf 是一个纯Golang库,采用多播 DNS-SD 来浏览和解析网络中的服务,并在本地网络中注册自己的服务。
执行mdns解析的代码是 resolver.Browse() 方法,解析的结果会异步发送到 entries 这个 channel 中:
entries := make(chan *zeroconf.ServiceEntry)
if err = resolver.Browse(ctx, appID, "local.", entries); err != nil {
return fmt.Errorf("failed to browse: %w", err)
}
每个从 mDNS browse 返回的 service entry 会这样处理:
// handle each service entry returned from the mDNS browse.
go func(results <-chan *zeroconf.ServiceEntry) {
for {
select {
case entry := <-results:
if entry == nil {
break
}
// 调用 handleEntry 方法来处理每个返回的 service entry
handleEntry(entry)
case <-ctx.Done():
// 如果所有 service entry 都处理完成了,或者是出错(取消或者超时)
// 此时需要推出 browse,但在退出之前需要检查一下是否有已经收到但还没有处理的结果
for len(results) > 0 {
handleEntry(<-results)
}
if errors.Is(ctx.Err(), context.Canceled) {
m.logger.Debugf("mDNS browse for app id %s canceled.", appID)
} else if errors.Is(ctx.Err(), context.DeadlineExceeded) {
m.logger.Debugf("mDNS browse for app id %s timed out.", appID)
}
return // stop listening for results.
}
}
}(entries)
handleEntry() 方法的实现:
handleEntry := func(entry *zeroconf.ServiceEntry) {
for _, text := range entry.Text {
// 检查appID看是否是自己要查找的app
if text != appID {
m.logger.Debugf("mDNS response doesn't match app id %s, skipping.", appID)
break
}
m.logger.Debugf("mDNS response for app id %s received.", appID)
// 检查是否有 IPv4 或者 ipv6 地址
hasIPv4Address := len(entry.AddrIPv4) > 0
hasIPv6Address := len(entry.AddrIPv6) > 0
if !hasIPv4Address && !hasIPv6Address {
m.logger.Debugf("mDNS response for app id %s doesn't contain any IPv4 or IPv6 addresses, skipping.", appID)
break
}
var addr string
port := entry.Port
// 目前只支持取第一个地址
// TODO: we currently only use the first IPv4 and IPv6 address.
// We should understand the cases in which additional addresses
// are returned and whether we need to support them.
// 加入到缓存中,缓存后面细看
if hasIPv4Address {
addr = fmt.Sprintf("%s:%d", entry.AddrIPv4[0].String(), port)
m.addAppAddressIPv4(appID, addr)
}
if hasIPv6Address {
addr = fmt.Sprintf("%s:%d", entry.AddrIPv6[0].String(), port)
m.addAppAddressIPv6(appID, addr)
}
// 开始回调,就是前面说的拿到第一个地址就取消 browse
if onEach != nil {
onEach(addr) // invoke callback.
}
}
}
至此就完成了 mdns 的解析,从 ID 到 address。
缓存设计
mdns 是非常慢的,为了性能就需要缓存解析后的地址,前面的代码在解析完成之后会保存这些地址:
// addAppAddressIPv4 adds an IPv4 address to the
// cache for the provided app id.
func (m *Resolver) addAppAddressIPv4(appID string, addr string) {
m.ipv4Mu.Lock()
defer m.ipv4Mu.Unlock()
m.logger.Debugf("Adding IPv4 address %s for app id %s cache entry.", addr, appID)
if _, ok := m.appAddressesIPv4[appID]; !ok {
var addrList addressList
m.appAddressesIPv4[appID] = &addrList
}
m.appAddressesIPv4[appID].add(addr)
}
在解析之前,在 ResolveID() 方法中会线尝试检查缓存中是否有数据,如果有就直接使用:
func (m *Resolver) ResolveID(req nameresolution.ResolveRequest) (string, error) {
// check for cached IPv4 addresses for this app id first.
if addr := m.nextIPv4Address(req.ID); addr != nil {
return *addr, nil
}
// check for cached IPv6 addresses for this app id second.
if addr := m.nextIPv6Address(req.ID); addr != nil {
return *addr, nil
}
......
}
从缓存中获取appID对应的地址:
// nextIPv4Address returns the next IPv4 address for
// the provided app id from the cache.
func (m *Resolver) nextIPv4Address(appID string) *string {
m.ipv4Mu.RLock()
defer m.ipv4Mu.RUnlock()
addrList, exists := m.appAddressesIPv4[appID]
if exists {
addr := addrList.next()
if addr != nil {
m.logger.Debugf("found mDNS IPv4 address in cache: %s", *addr)
return addr
}
}
return nil
}
addrList.next() 比较有意思,这里不是要获取地址列表,而是取单个地址。也就是说,当有多个地址时,这里 addrList.next() 实际上实现了负载均衡 ^0^
负载均衡
addressList 结构体的组成:
// addressList represents a set of addresses along with
// data used to control and access said addresses.
type addressList struct {
addresses []address
counter int
mu sync.RWMutex
}
除了地址数组之外,还有一个 counter ,以及并发保护的读写锁。
// max integer value supported on this architecture.
const maxInt = int(^uint(0) >> 1)
// next 从列表中获取下一个地址,考虑到当前的循环实现。除了尽力而为的线性迭代,对选择没有任何保证。
func (a *addressList) next() *string {
// 获取读锁
a.mu.RLock()
defer a.mu.RUnlock()
if len(a.addresses) == 0 {
return nil
}
// 如果 counter 达到 maxInt,就从头再来
if a.counter == maxInt {
a.counter = 0
}
// 用地址数量 对 counter 求余,去余数所对应的地址,然后counter递增
// 相当于一个最简单常见的 轮询 算法
index := a.counter % len(a.addresses)
addr := a.addresses[index]
a.counter++
return &addr.ip
}
并发保护
为了避免多个请求同时去解析同一个 ID,因此设计了并发保护机制,对于单个ID,只容许一个请求执行解析,其他请求会等待这个解析的结果:
// ResolveID resolves name to address via mDNS.
func (m *Resolver) ResolveID(req nameresolution.ResolveRequest) (string, error) {
sub := NewSubscriber()
// add the sub to the pool of subs for this app id.
m.subMu.Lock()
appIDSubs, exists := m.subs[req.ID]
if !exists {
// WARN: must set appIDSubs variable for use below.
appIDSubs = NewSubscriberPool(sub)
m.subs[req.ID] = appIDSubs
} else {
appIDSubs.Add(sub)
}
m.subMu.Unlock()
// only one subscriber per pool will perform the first browse for the
// requested app id. The rest will subscribe for an address or error.
var once *sync.Once
var published chan struct{}
ctx, cancel := context.WithTimeout(context.Background(), browseOneTimeout)
defer cancel()
appIDSubs.Once.Do(func() {
published = make(chan struct{})
m.browseOne(ctx, req.ID, published)
// once will only be set for the first browser.
once = new(sync.Once)
})
......
}
总结
mdns name resolver 返回的是一个简单的 ip 地址+端口(v4或者v6),形如 “192.168.0.100:8000”。