Kubernets实现
Kubernets实现
Controller的定义
ControllerOptions 存储Controller的可配置属性。
type ControllerOptions struct {
// Controller监控的Namespace. 如果设置为 meta_v1.NamespaceAll (""), controller 监控所有的 namespaces
WatchedNamespace string
ResyncPeriod time.Duration
DomainSuffix string
// ClusterID 在多集群环境中标识远程集群
ClusterID string
// XDSUpdater 将变更推送给 xDS server.
XDSUpdater model.XDSUpdater
// 在 SPIFFE identity 中使用的 TrustDomain
TrustDomain string
stop chan struct{}
}
Controller的 struct 定义:
type Controller struct {
domainSuffix string
client kubernetes.Interface
queue Queue
services cacheHandler
endpoints cacheHandler
nodes cacheHandler
pods *PodCache
// Env 由server设置,用于指向环境,以容许controller使用 env 数据并推送状态。测试中可能为null
Env *model.Environment
// ClusterID 在多集群环境中标识远程集群
ClusterID string
// XDSUpdater 将推送 EDS 变更到 ADS model.
XDSUpdater model.XDSUpdater
stop chan struct{}
sync.RWMutex
// servicesMap 存储 hostname ==> service, 用于减少对 convertService 的调用.
servicesMap map[model.Hostname]*model.Service
// externalNameSvcInstanceMap 存储 hostname ==> instance, 用于存储 ExternalName k8s services 的实例
externalNameSvcInstanceMap map[model.Hostname][]*model.ServiceInstance
// CIDR ranger based on path-compressed prefix trie
ranger cidranger.Ranger
// registry 的 Network name,通过 MeshNetworks 的 configmap 指定
networkForRegistry string
}
重点关注这四个内容:
services cacheHandler
endpoints cacheHandler
nodes cacheHandler
pods *PodCache
cacheHandler
其中 cacheHandler 的定义是:
type cacheHandler struct {
informer cache.SharedIndexInformer
handler *ChainHandler
}
// ChainHandler ChainHandler按顺序应用handler
type ChainHandler struct {
funcs []Handler
}
// Apply方法用于执行各个handler
func (ch *ChainHandler) Apply(obj interface{}, event model.Event) error {
// 按照顺序
for _, f := range ch.funcs {
if err := f(obj, event); err != nil {
return err
}
}
return nil
}
// 通过Append方法将一个 handler 加入到链的最后面
func (ch *ChainHandler) Append(h Handler) {
ch.funcs = append(ch.funcs, h)
}
这样 services / endpoints / nodes 三个字段就分别保存有各自的 Handler Chain。
PodCache
PodCache是最终一致性的 pod cache:
type PodCache struct {
cacheHandler
sync.RWMutex
// keys 维护稳定的pod ip 到 name 的映射。
// 这容许我们通过 pod ip 获取最新的状态。
// 应该只包含 RUNNING 或者 PENDING 状态的pod,带有已分配的IP
keys map[string]string
// 对当前podcache所在的controller的引用
c *Controller
}
除了同样有cacheHandler之外,还有其他几个属性。
newPodCache()方法,在cacheHandler中增加了handler,以回调 event() 方法:
func newPodCache(ch cacheHandler, c *Controller) *PodCache {
out := &PodCache{
cacheHandler: ch,
c: c,
keys: make(map[string]string),
}
// 额外增加一个handler在cacheHandler中
ch.handler.Append(func(obj interface{}, ev model.Event) error {
// 用在调用 PodCache 的 event() 方法
return out.event(obj, ev)
})
return out
}
event()方法是关键,这个方法会根据event的信息,来决定更新 pod cache 中 keys 字段保存的 pod ip 到 name 的索引信息,并调用 XDSUpdater:
// event 更新 IP-based 索引 (podccache 的 keys 字段).
func (pc *PodCache) event(obj interface{}, ev model.Event) error {
// 当 pod 被删除时, obj 可以是 *v1.Pod 或者 DeletionFinalStateUnknown marker item.
pod, ok := obj.(*v1.Pod)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return fmt.Errorf("couldn't get object from tombstone %+v", obj)
}
pod, ok = tombstone.Obj.(*v1.Pod)
if !ok {
return fmt.Errorf("tombstone contained object that is not a pod %#v", obj)
}
}
// 拿到 pod 对象之后
ip := pod.Status.PodIP
// PodIP will be empty when pod is just created, but before the IP is assigned
// via UpdateStatus.
if len(ip) > 0 {
log.Infof("Handling event %s for pod %s in namespace %s -> %v", ev, pod.Name, pod.Namespace, ip)
key := KeyFunc(pod.Name, pod.Namespace)
switch ev {
case model.EventAdd:
switch pod.Status.Phase {
case v1.PodPending, v1.PodRunning:
// 添加到cache,如果pod 是 running 或者 pending
pc.keys[ip] = key
if pc.c != nil && pc.c.XDSUpdater != nil {
// 通知到 XDSUpdater
pc.c.XDSUpdater.WorkloadUpdate(ip, pod.ObjectMeta.Labels, pod.ObjectMeta.Annotations)
}
}
case model.EventUpdate:
switch pod.Status.Phase {
case v1.PodPending, v1.PodRunning:
pc.keys[ip] = key
if pc.c != nil && pc.c.XDSUpdater != nil {
pc.c.XDSUpdater.WorkloadUpdate(ip, pod.ObjectMeta.Labels, pod.ObjectMeta.Annotations)
}
default:
// delete if the pod switched to other states and is in the cache
if pc.keys[ip] == key {
delete(pc.keys, ip)
if pc.c != nil && pc.c.XDSUpdater != nil {
pc.c.XDSUpdater.WorkloadUpdate(ip, nil, nil)
}
}
}
case model.EventDelete:
// delete only if this pod was in the cache
if pc.keys[ip] == key {
delete(pc.keys, ip)
if pc.c != nil && pc.c.XDSUpdater != nil {
pc.c.XDSUpdater.WorkloadUpdate(ip, nil, nil)
}
}
}
}
return nil
}
Controller的构建
NewController()
NewController()方法创建一个新的 Kubernetes controller。通常是通过 bootstrap 和 multicluster 创建:
func NewController(client kubernetes.Interface, options ControllerOptions) *Controller {
// Queue 需要一个 time duration,用于在handler出错之后的重试延迟, 这里hard code 为 1 秒钟
out := &Controller{
domainSuffix: options.DomainSuffix,
client: client,
queue: NewQueue(1 * time.Second),
ClusterID: options.ClusterID,
XDSUpdater: options.XDSUpdater,
servicesMap: make(map[model.Hostname]*model.Service),
externalNameSvcInstanceMap: make(map[model.Hostname][]*model.ServiceInstance),
}
// 创建sharedInformers
sharedInformers := informers.NewSharedInformerFactoryWithOptions(client, options.ResyncPeriod, informers.WithNamespace(options.WatchedNamespace))
// 在分别创建 service / endpoint / node / pod 的 informer和 cacheHandler 对象
svcInformer := sharedInformers.Core().V1().Services().Informer()
out.services = out.createCacheHandler(svcInformer, "Services")
epInformer := sharedInformers.Core().V1().Endpoints().Informer()
out.endpoints = out.createEDSCacheHandler(epInformer, "Endpoints")
nodeInformer := sharedInformers.Core().V1().Nodes().Informer()
out.nodes = out.createCacheHandler(nodeInformer, "Nodes")
podInformer := sharedInformers.Core().V1().Pods().Informer()
// pod 特殊一点,还要在 cacheHandler 外面包装一个 PodCache
out.pods = newPodCache(out.createCacheHandler(podInformer, "Pod"), out)
return out
}
createCacheHandler()
createCacheHandler()方法为特定事件注册handler。
当前实现在 queue.go 中将事件排队,并且 handler 在运行时会有一些限流。
用于 Service, Endpoint, Node 和 Pod。
func (c *Controller) createCacheHandler(informer cache.SharedIndexInformer, otype string) cacheHandler {
// 现在 handler 中加入第一个 handler,来自 c.notify
handler := &ChainHandler{funcs: []Handler{c.notify}}
// 然后在 informer 上添加 EventHandler
informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
k8sEvents.With(prometheus.Labels{"type": otype, "event": "add"}).Add(1)
// push task 到 Controller.queue
c.queue.Push(Task{handler: handler.Apply, obj: obj, event: model.EventAdd})
},
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
k8sEvents.With(prometheus.Labels{"type": otype, "event": "update"}).Add(1)
c.queue.Push(Task{handler: handler.Apply, obj: cur, event: model.EventUpdate})
} else {
k8sEvents.With(prometheus.Labels{"type": otype, "event": "updateSame"}).Add(1)
}
},
DeleteFunc: func(obj interface{}) {
k8sEvents.With(prometheus.Labels{"type": otype, "event": "delete"}).Add(1)
c.queue.Push(Task{handler: handler.Apply, obj: obj, event: model.EventDelete})
},
})
return cacheHandler{informer: informer, handler: handler}
}
其中 Controller.notify 是 handler chain 中的第一个 handler,如果这个方法返回error,会导致整个 chain 的重复执行:
func (c *Controller) notify(obj interface{}, event model.Event) error {
// 检查 Controller 的状态,是否已经同步完成
if !c.HasSynced() {
return errors.New("waiting till full synchronization")
}
return nil
}
// HasSynced 在初始化状态同步完成之后返回true
func (c *Controller) HasSynced() bool {
// 需要检查四个informer的同步状态
if !c.services.informer.HasSynced() ||
!c.endpoints.informer.HasSynced() ||
!c.pods.informer.HasSynced() ||
!c.nodes.informer.HasSynced() {
return false
}
return true
}
Controller Loop的实现
Controller 的 Event Controller Loop 实现在 Run()方法中:
func (c *Controller) Run(stop <-chan struct{}) {
// chan stop 用于传递停止信号
go c.queue.Run(stop)
go c.services.informer.Run(stop)
go c.pods.informer.Run(stop)
go c.nodes.informer.Run(stop)
// 为了避免没有Label或者ports的 endpoint,endpoint informer的运行需要等待nodes/pods/services同步
cache.WaitForCacheSync(stop, c.nodes.informer.HasSynced, c.pods.informer.HasSynced,
c.services.informer.HasSynced)
go c.endpoints.informer.Run(stop)
<-stop
log.Infof("Controller terminated")
}
queue 的实现
type Queue interface {
Push(Task)
Run(<-chan struct{})
}
type queueImpl struct {
// 延迟,发生错误时重试的间隔时间
delay time.Duration
// 任务列表
queue []Task
// 条件,用于线程同步
cond *sync.Cond
// 是否要关闭的标志
closing bool
}
// push方法的实现,去掉线程同步和关闭处理,其实就是两行代码
func (q *queueImpl) Push(item Task) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if !q.closing {
// 1. 将任务存放在queue中
q.queue = append(q.queue, item)
}
// 2. 发信号通知Run()进行处理
q.cond.Signal()
}
func (q *queueImpl) Run(stop <-chan struct{}) {
// 关闭处理
go func() {
<-stop
q.cond.L.Lock()
q.closing = true
q.cond.L.Unlock()
}()
for {
q.cond.L.Lock()
for !q.closing && len(q.queue) == 0 {
q.cond.Wait()
}
if len(q.queue) == 0 {
q.cond.L.Unlock()
return
}
// 从queue中取出一个任务
var item Task
item, q.queue = q.queue[0], q.queue[1:]
q.cond.L.Unlock()
// 执行handler
if err := item.handler(item.obj, item.event); err != nil {
// 如果执行失败
log.Infof("Work item handle failed (%v), retry after delay %v", err, q.delay)
// 延迟一段时间之后,再将这个任务放回queue,继续重试
// 如果有任务总是返回error,岂不是要无限循环?
// 没有看到重试次数的限制
time.AfterFunc(q.delay, func() {
q.Push(item)
})
}
}
}
Informer的实现
SharedInformer 具有共享数据缓存,并且能够将对缓存更改的通知分发给通过 AddEventHandler 注册的多个监听器。
如果使用此方法,则与标准 Informer 相比,有一个行为有所不同。当您收到通知时,缓存将至少与通知一样新鲜,但它可能更新鲜。 您不应该依赖缓存的内容与处理函数中收到的通知完全匹配。 如果先有创建,然后是删除,则缓存可能没有您的项目。
这比广播有优势,因为它允许我们在许多控制器之间共享公共缓存。 扩展广播需要我们为每个监听保留重复的缓存。
type SharedInformer interface {
// AddEventHandler 使用共享信息器的重新同步周期(resync period)向共享信息器添加事件处理程序。
// 发往单个handler的事件按顺序传递,但不同handler之间没有协调。
AddEventHandler(handler ResourceEventHandler)
// AddEventHandler 使用特定的重新同步周期(resync period)向共享信息器添加事件处理程序。
// 发往单个handler的事件按顺序传递,但不同handler之间没有协调。
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
// GetStore 返回存储对象.
GetStore() Store
// GetController 取回启动informer的合成接口
GetController() Controller
// Run 启动shared informer, 当stopCh 关闭时停止。
Run(stopCh <-chan struct{})
// HasSynced 返回true,如果shared informer的存储已经同步
HasSynced() bool
// LastSyncResourceVersion是上次与底层存储同步时观察到的资源版本。
// 返回的值与访问底层存储不同步,也不是线程安全的。
LastSyncResourceVersion() string
}
type sharedIndexInformer struct {
indexer Indexer
controller Controller
processor *sharedProcessor
cacheMutationDetector CacheMutationDetector
// 跟踪该块以处理controller的后期初始化
listerWatcher ListerWatcher
objectType runtime.Object
// resyncCheckPeriod 是我们希望reflector的resync timer触发的频率
// reflector可以调用 shouldResync 来检查我们的监听器是否需要重新同步。
resyncCheckPeriod time.Duration
// defaultEventHandlerResyncPeriod 是通过 AddEventHandler添加的handler的默认重新同步周期(即,它们不指定而只想使用shared informer的默认值)。
defaultEventHandlerResyncPeriod time.Duration
// clock 容许进行测试
clock clock.Clock
started, stopped bool
startedLock sync.Mutex
// blockDeltas 提供了一种停止所有事件分发的方法,以便后期事件处理程序可以安全地加入共享informer。
blockDeltas sync.Mutex
}
关键的Run方法:
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// 创建Delta FIFO 作为 queue 使用
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
// 最后调用到controller.Run()
s.controller.Run(stopCh)
}
继续看controller.Run()方法:
// Run begins processing items, and will continue until a value is sent down stopCh.
// It's an error to call Run more than once.
// Run blocks; call via go.
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
defer wg.Wait()
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
}
进入r.Run,继续看 NewReflector.Run()的实现:
func (r *Reflector) Run(stopCh <-chan struct{}) {
// wait.Until()方法每隔一段时间(r.period)就执行一次func()
wait.Until(func() {
// func的实现就是调用ListAndWatch()
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
}, r.period, stopCh)
}
总结:Controller 通过对 client-go 中提供的 SharedIndexInformer 等机制实现了对 k8s 资源的获取(List + Watch),其调用流程大体是:
- Controller.run(): kube.Controller in pilot
- go c.services.informer.Run(stop): cache.sharedIndexInformer in k8s.io/client-go
- s.controller.Run(stopCh): cache.Controller in k8s.io/client-go
- wg.StartWithChannel(stopCh, r.Run): NewReflector.Run() in k8s.io/client-go
Discovery的实现
Controller 实现了 ServiceDiscovery 接口,Controller 的 servicesMap 和 externalNameSvcInstanceMap 字段存储了后面需要用到的数据:
type Controller struct {
servicesMap map[model.Hostname]*model.Service
externalNameSvcInstanceMap map[model.Hostname][]*model.ServiceInstance
}
Services() 方法从 Controller 的 servicesMap 中获取 service 列表:
func (c *Controller) Services() ([]*model.Service, error) {
c.RLock()
out := make([]*model.Service, 0, len(c.servicesMap))
// 获取所有的 service
for _, svc := range c.servicesMap {
out = append(out, svc)
}
c.RUnlock()
// 按照 hostname 排序
sort.Slice(out, func(i, j int) bool { return out[i].Hostname < out[j].Hostname })
return out, nil
}
getService 方法就更简单了:
func (c *Controller) GetService(hostname model.Hostname) (*model.Service, error) {
c.RLock()
defer c.RUnlock()
return c.servicesMap[hostname], nil
}
InstancesByPort 方法比较复杂,删除细节处理代码,重点看主流程:
func (c *Controller) InstancesByPort(hostname model.Hostname, reqSvcPort int,
labelsList model.LabelsCollection) ([]*model.ServiceInstance, error) {
name, namespace, err := parseHostname(hostname)
// 根据 hostname 获取 service
svc := c.servicesMap[hostname]
// 检查要求查找的 port 是否在 service 的 port 列表中
svcPortEntry, exists := svc.Ports.GetByPort(reqSvcPort)
// 如果是 external service,直接返回
instances := c.externalNameSvcInstanceMap[hostname]
if instances != nil {
return instances, nil
}
// 从 informer 获取到 endpoints 列表
// 这是所有的 endpoint ? 有点狠
for _, item := range c.endpoints.informer.GetStore().List() {
ep := *item.(*v1.Endpoints)
// 根据 endpoint 的 name 和 namespace 过滤一下
// 还好这里的过滤方式足够简单,应该速度够快
if ep.Name == name && ep.Namespace == namespace {
var out []*model.ServiceInstance
for _, ss := range ep.Subsets {
for _, ea := range ss.Addresses {
// 两个for循环,终于拿到ip地址
// 通过ip地址获取labels
labels, _ := c.pods.labelsByIP(ea.IP)
// 检查 labels 匹配情况
if !labelsList.HasSubsetOf(labels) {
continue
}
// 通过ip地址获取pod
pod := c.pods.getPodByIP(ea.IP)
for _, port := range ss.Ports {
if port.Name == "" ||
reqSvcPort == 0 ||
svcPortEntry.Name == port.Name {
// 生成 ServiceInstance 实例,加入out
out = append(out, &model.ServiceInstance{
Endpoint: model.NetworkEndpoint{
Address: ea.IP,
Port: int(port.Port),
......
})
}
}
}
}
return out, nil
}
}
return nil, nil
}
简单说,就是在 servicesMap 、externalNameSvcInstanceMap,还有 pod 、 endpoints 等存储有全量数据的情况下,根据输入条件取满足要求的数据。