



























当我们第一次写 Kubernetes Operator 或者开发自己的控制器时,总会遇到一个让人头疼的问题:为什么我们需要 Informer?它和 Controller 到底有什么区别?SharedIndexInformer、shareProcessor、SharedInformerFactory 这些名字眼花缭乱,它们各自承担什么职责?今天这篇文章,我们从源码出发,用最直白的语言,把 Informer 机制的每一个细节都讲清楚。读完这篇,你将能够:理解 Informer 和 Controller 的本质区别、掌握 SharedIndexInformer 的内部工作原理、看懂 shareProcessor 如何分发事件、以及熟练使用 SharedInformerFactory 管理多个 Informer 的生命周期。
Kubernetes client-go Informer Controller 源码分析
🔓 学习重点提示 — 建议先通读全文,再重点回顾标注内容
★ 重点掌握(必须)
• Informer vs Controller 的本质区别:Informer 是数据管道,Controller 是业务逻辑
• SharedIndexInformer 的三大组件:Indexer、Controller、Processor
• shareProcessor 的事件分发机制:三个 goroutine 的协作
• SharedInformerFactory 的生命周期管理:Start、WaitForCacheSync、Shutdown
☆ 次重点(了解即可)
• DeltaFIFO 的 Delta 类型和队列机制
• Resync 机制和最小同步周期
• 缓存变更检测器 (CacheMutationDetector)
很多初学者会把 Informer 和 Controller 混为一谈,觉得它们是同一个东西。实际上,它们承担着完全不同的职责,理解这个区别是掌握整个 Kubernetes 控制器开发的关键。让我用一个生活化的比喻来解释:把 Kubernetes 集群想象成一个大型图书馆,API Server 就是图书管理员,而你(开发者)需要知道馆里有哪些书、哪些书被借走了、哪些书过期了。
Informer 是数据管道,负责从 API Server 拉取数据、维护本地缓存、向你通知资源变化。Controller 是业务逻辑,负责监听资源变化、计算期望状态、调用 API Server 调整实际状态。简单来说:Informer 告诉你"发生了什么",Controller 决定"该怎么处理"。
我们来看 EndpointController(端点控制器)的结构定义,它同时使用了 Informer 和 Controller 两种组件:
// pkg/controller/endpoint/endpoints_controller.go(行 132-180)
// Controller manages selector-based service endpoints.
// 这个 Controller 是业务逻辑层,它使用 Informer 提供的缓存
type Controller struct {
client clientset.Interface
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
// serviceLister 和 podLister 是 Informer 提供的只读缓存
// Controller 通过它们读取数据,从不直接访问 API Server
serviceLister corelisters.ServiceLister
servicesSynced cache.InformerSynced // Informer 的同步状态
podLister corelisters.PodLister
podsSynced cache.InformerSynced
endpointsLister corelisters.EndpointsLister
endpointsSynced cache.InformerSynced
// queue 是 Controller 的工作队列,用于批量处理和去重
// 这是 Controller 自己的组件,Informer 不关心
queue workqueue.TypedRateLimitingInterface[string]
podQueue workqueue.TypedRateLimitingInterface[*endpointsliceutil.PodProjectionKey]
workerLoopPeriod time.Duration
triggerTimeTracker *endpointsliceutil.TriggerTimeTracker
endpointUpdatesBatchPeriod time.Duration
}
从这段代码我们可以看到,EndpointController 本身并不直接连接 API Server,它依赖 Informer 提供的 serviceLister、podLister、endpointsLister 来读取数据。这些 Lister 就是 Informer 缓存的访问入口。Controller 的核心职责是根据服务的变化,计算出应该有哪些端点,然后更新到 API Server。
让我用一张图来展示 Informer 和 Controller 的协作关系:
┌─────────────────────────────────────────────────────────────────────────┐
│ API Server(数据源) │
└──────────────────────────────┬──────────────────────────────────────────┘
│ List/Watch
┌──────────────────────────────▼──────────────────────────────────────────┐
│ Informer(数据管道) │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ SharedIndexInformer │ │
│ │ ├── Reflector ──────→ DeltaFIFO ──→ Controller ──→ Indexer │ │
│ │ │ (增量队列) (处理循环) (本地缓存) │ │
│ │ │ │ │
│ │ └── shareProcessor ──→ processorListener(事件通知) │ │
│ │ (多播分发) (每个 Handler 一个) │ │
│ └─────────────────────────────────────────────────────────────────┘ │
└──────────────────────────────┬──────────────────────────────────────────┘
│ 事件回调 + 缓存读取
┌──────────────────────────────▼──────────────────────────────────────────┐
│ Controller(业务逻辑) │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ EndpointController / DeploymentController / JobController... │ │
│ │ ├── 接收 Informer 的事件回调 │ │
│ │ ├── 读取 Indexer 缓存获取完整数据 │ │
│ │ ├── 放入 WorkQueue 等待处理 │ │
│ │ ├── Worker 协程从队列消费,计算期望状态 │ │
│ │ └── 调用 API Server 调整实际状态 │ │
│ └─────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────────┘
从这幅图我们可以看出:Informer 负责数据的获取和同步,它通过 Reflector 持续从 API Server 拉取数据,放入 DeltaFIFO 队列,然后通过 Controller 处理后存入 Indexer 缓存。Controller 负责业务逻辑,它监听 Informer 的事件回调,读取缓存数据,计算差异,然后采取行动。
🌟 实用技巧
当你开发自己的控制器时,记住这个黄金法则:Controller 永远只读缓存,不直接访问 API Server。如果需要创建、更新、删除资源,才通过 client 调用 API Server。这不仅减少了 API Server 的压力,还保证了控制器的高可用性(即使 API Server 暂时不可用,控制器依然可以正常工作)。
SharedIndexInformer 是 client-go 中最核心的 Informer 实现。它的名字里有"Shared",意味着多个控制器可以共享同一个 Informer 实例,从而减少对 API Server 的连接数和请求压力。让我带你深入它的源码结构。
首先我们看 SharedIndexInformer 实现了哪些接口,这些接口定义了它的核心能力:
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go(行 130-288)
// SharedInformer 是基础接口,提供事件监听和缓存访问能力
type SharedInformer interface {
// AddEventHandler 添加一个事件处理器,返回一个注册句柄
// 这个句柄可以用于后续移除处理器
AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error)
// AddEventHandlerWithOptions 添加带选项的事件处理器
// 可以指定 resync 周期等参数
AddEventHandlerWithOptions(handler ResourceEventHandler, options HandlerOptions) (ResourceEventHandlerRegistration, error)
// RemoveEventHandler 移除之前添加的事件处理器
RemoveEventHandler(handle ResourceEventHandlerRegistration) error
// GetStore 返回本地存储(已废弃,使用 GetIndexer)
GetStore() Store
// GetIndexer 返回带索引的本地存储
// 这是读取缓存数据的主要入口
GetIndexer() Indexer
// Run 启动 Informer 的主循环
Run(stopCh
从接口定义我们可以看到,SharedInformer 的核心功能就是:添加事件处理器(AddEventHandler)、获取本地缓存(GetIndexer)、启动运行(Run)、检查同步状态(HasSynced)。SharedIndexInformer 在此基础上增加了自定义索引的能力,这对于需要按标签或字段快速查询的场景非常有用。
现在让我们看看 sharedIndexInformer 的内部实现结构,这是真正干活的类:
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go(行 585-638)
// sharedIndexInformer 是 SharedIndexInformer 接口的具体实现
// 它由三大核心组件构成:Indexer、Controller、Processor
type sharedIndexInformer struct {
// indexer 是本地缓存,存储所有已知的资源对象
// 相当于一个线程安全的 Map,key 是 namespace/name
indexer Indexer
// controller 负责从 DeltaFIFO 取出增量并处理
// 它会调用 processDeltas 函数处理每个变更
controller Controller
// synced 是一个关闭 channel
// 当首次同步完成后,这个 channel 会被关闭
// HasSynced() 方法就是通过检查这个 channel 实现的
synced chan struct{}
// processor 负责将事件分发给所有注册的处理器
// 它管理多个 processorListener,每个处理器一个
processor *sharedProcessor
// cacheMutationDetector 用于检测缓存是否被意外修改
// 这是一个调试工具,帮助发现不正确的代码
cacheMutationDetector MutationDetector
// listerWatcher 是数据源,负责 List 和 Watch API Server
listerWatcher ListerWatcher
// objectType 是这个 Informer 预期的对象类型
// 用于类型检查,过滤掉不匹配的对象
objectType runtime.Object
// resyncCheckPeriod 是检查是否需要重新同步的周期
// 默认值通常是 30 秒
resyncCheckPeriod time.Duration
// defaultEventHandlerResyncPeriod 是处理器默认的重新同步周期
defaultEventHandlerResyncPeriod time.Duration
// clock 用于测试,可以模拟时间流逝
clock clock.Clock
// started 和 stopped 标记 Informer 的运行状态
started, stopped bool
startedLock sync.Mutex
// blockDeltas 是一个互斥锁
// 用于在添加新的处理器时暂停事件分发
// 这样可以安全地为新处理器发送"初始化添加"事件
blockDeltas sync.Mutex
// watchErrorHandler 处理 Watch 错误
watchErrorHandler WatchErrorHandlerWithContext
// transform 用于在存入缓存前转换对象
transform TransformFunc
}
这个结构体的注释已经非常清晰了。让我用更直白的话解释一下三大组件的职责:Indexer 就是本地缓存,你可以把它想象成一个线程安全的 Map,key 是"命名空间/资源名称",value 是资源对象本身。Controller 是控制器,它不断从 DeltaFIFO 取出增量(Delta),然后更新 Indexer 缓存,并调用 Processor 分发事件。Processor 是事件分发器,它管理多个"监听器"(processorListener),当有事件发生时,它负责把事件传递给每一个监听器。
让我们看看如何创建一个 SharedIndexInformer:
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go(行 308-340)
// NewSharedIndexInformerWithOptions 是创建 Informer 的主入口
func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.Object,
options SharedIndexInformerOptions) SharedIndexInformer {
// 1. 创建一个真实时钟,用于定时器
realClock := &clock.RealClock{}
// 2. 创建共享处理器,管理所有事件监听器
processor := &sharedProcessor{clock: realClock}
processor.listenersRCond = sync.NewCond(processor.listenersLock.RLocker())
// 3. 创建并返回 sharedIndexInformer 实例
return &sharedIndexInformer{
// 创建带索引的本地缓存
// DeletionHandlingMetaNamespaceKeyFunc 是默认的 key 生成函数
// 格式为 "namespace/name",对于集群级资源则是 "name"
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers,
WithStoreMetrics(options.Identifier, options.InformerMetricsProvider)),
// 注入共享处理器
processor: processor,
// 创建一个未关闭的 channel,用于表示同步状态
synced: make(chan struct{}),
// 数据源
listerWatcher: lw,
// 示例对象,用于类型检查
objectType: exampleObject,
// 重新同步检查周期
resyncCheckPeriod: options.ResyncPeriod,
defaultEventHandlerResyncPeriod: options.ResyncPeriod,
// 时钟
clock: realClock,
// 缓存变更检测器
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
// key 生成函数
keyFunc: DeletionHandlingMetaNamespaceKeyFunc,
}
}
创建过程非常清晰:首先创建一个共享处理器(sharedProcessor),然后创建本地缓存(Indexer),最后把所有组件组装成一个完整的 sharedIndexInformer 实例。Indexer 的 key 生成函数 `DeletionHandlingMetaNamespaceKeyFunc` 会生成"命名空间/名称"格式的 key,比如 "default/nginx-pod",这和我们用 kubectl 获取资源时看到的信息是一致的。
当我们调用 Run 方法启动 Informer 时,内部发生了什么?
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go(行 719-783)
func (s *sharedIndexInformer) RunWithContext(ctx context.Context) {
defer utilruntime.HandleCrashWithContext(ctx)
logger := klog.FromContext(ctx)
// 防止重复启动
if s.HasStarted() {
logger.Info("Warning: the sharedIndexInformer has started, run more than once is not allowed")
return
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
// 1. 创建 DeltaFIFO 队列和对应的日志记录器
// 这个 FIFO 存储资源的所有变更(增量)
logger, fifo := newQueueFIFO(logger, s.objectType, s.indexer, s.transform,
s.identifier, s.informerMetricsProvider)
// 2. 创建 Controller 配置
// Controller 会持续从 ListerWatcher 拉取数据,放入 FIFO
cfg := &Config{
Queue: fifo, // DeltaFIFO 队列
ListerWatcher: s.listerWatcher, // 数据源
ObjectType: s.objectType, // 对象类型
FullResyncPeriod: s.resyncCheckPeriod, // 完整重新同步周期
ShouldResync: s.processor.shouldResync, // 检查是否需要重新同步
// Process 函数处理每个从 FIFO 取出的增量
Process: func(obj interface{}, isInInitialList bool) error {
return s.handleDeltas(logger, obj, isInInitialList)
},
ProcessBatch: func(deltas []Delta, isInInitialList bool) error {
return s.handleBatchDeltas(logger, deltas, isInInitialList)
},
WatchErrorHandlerWithContext: s.watchErrorHandler,
}
// 3. 创建 Controller
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// 4. 创建独立的上下文用于停止 Processor
// Processor 必须在 Controller 之后停止
processorStopCtx, stopProcessor := context.WithCancelCause(context.WithoutCancel(ctx))
var wg wait.Group
defer wg.Wait() // 等待 Processor 停止
defer stopProcessor(errors.New("informer is stopping")) // 通知 Processor 停止
// 5. 启动三个后台任务
// 任务1:缓存变更检测器(用于调试)
wg.StartWithChannel(processorStopCtx.Done(), s.cacheMutationDetector.Run)
// 任务2:事件分发器
wg.StartWithContext(processorStopCtx, s.processor.run)
// 任务3:监控同步状态
wg.Start(func() {
select {
case <-ctx.Done():
// 我们被停止了,没有完成同步
case <-s.controller.HasSyncedChecker().Done():
// Controller 已同步,我们也同步了
close(s.synced) // 关闭同步 channel
}
})
// 6. 标记为已停止状态(不允许添加新的处理器)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true
}()
// 7. 运行 Controller(这是主循环,不会返回)
s.controller.RunWithContext(ctx)
}
Run 方法的逻辑非常清晰,总结一下就是:创建 DeltaFIFO 队列 → 创建 Controller → 启动三个后台任务(缓存检测、事件分发、同步监控)→ 运行 Controller 主循环。Controller 会启动一个无限循环,持续从 ListerWatcher 拉取数据并放入 DeltaFIFO。
shareProcessor 是 SharedIndexInformer 中的事件分发器,负责将资源变更事件分发给所有注册的处理器。它的名字里的"share"体现了它的核心价值:多个处理器可以共享同一个 Informer 的事件流,而不是每个处理器都独立连接 API Server。
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go(行 1017-1031)
// sharedProcessor 管理多个 processorListener,并负责分发事件
// "Shared"体现在:多个控制器共享同一个 Processor 实例
type sharedProcessor struct {
// listenersStarted 标记是否已启动所有监听器
listenersStarted bool
// listenersLock 保护监听器列表的并发访问
listenersLock sync.RWMutex
// listenersRCond 条件变量,用于等待监听器启动
listenersRCond *sync.Cond
// listeners 是一个 map,键是监听器指针,值表示是否正在同步
// 这个 map 会在 Processor 停止时被清空
listeners map[*processorListener]bool
// clock 用于计算重新同步时间
clock clock.Clock
// wg 用于等待所有监听器 goroutine 退出
wg wait.Group
}
shareProcessor 的设计非常简洁:它维护一个 listener map,每个注册的处理器对应一个 processorListener。当有事件需要分发时,它会遍历所有监听器,把事件发送到对应的 channel 中。
processorListener 是每个事件处理器的运行时表示。它内部使用三个 goroutine 来处理事件分发,这是整个 Informer 机制中最复杂也最巧妙的部分。让我详细解释:
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go(行 1195-1254)
// processorListener 使用三个 goroutine 来中转事件
//
// 第一个 goroutine: pop()
// - 从 addCh channel 接收事件
// - 使用无界环形缓冲区(pendingNotifications)暂存事件
// - 当 nextCh 消费者跟不上时,事件会被暂存到缓冲区
//
// 第二个 goroutine: run()
// - 从 nextCh channel 接收事件
// - 同步调用 handler 的 OnAdd/OnUpdate/OnDelete 方法
// - 如果处理函数 panic,会捕获并跳过该事件
//
// 第三个 goroutine: watchSynced()
// - 监控上游是否已同步
// - 使用 SingleFileTracker 跟踪已处理的事件
type processorListener struct {
logger klog.Logger
// nextCh 是 run() goroutine 的输入 channel
nextCh chan interface{}
// addCh 是 pop() goroutine 的输入 channel
addCh chan interface{}
// done 是关闭信号 channel
done chan struct{}
// handler 是用户注册的事件处理函数
handler ResourceEventHandler
handlerName string
// syncTracker 跟踪事件处理进度
syncTracker *synctrack.SingleFileTracker
upstreamHasSynced DoneChecker
// pendingNotifications 是一个无界环形缓冲区
// 当 run() 处理速度跟不上 addCh 发送速度时,
// 事件会被暂存到这里,防止丢失
pendingNotifications buffer.RingGrowing
pendingNotificationsLength atomic.Int64
// requestedResyncPeriod 是用户请求的重新同步周期
requestedResyncPeriod time.Duration
// resyncPeriod 是实际使用的重新同步周期
// 可能被调整为不小于 minimumResyncPeriod
resyncPeriod time.Duration
// nextResync 下一次需要重新同步的时间
nextResync time.Time
resyncLock sync.Mutex
}
三个 goroutine 的协作关系可以用这幅图来理解:
┌────────────────────────────────────────────────────────────┐
│ sharedIndexInformer │
│ │
│ handleDeltas() ──────────────┐ │
│ │ │
│ ▼ │
│ processor.distribute() │
│ │ │
└──────────────────────────────────┼──────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────────┐
│ processorListener (每个 Handler 一个) │
│ │
│ ┌─────────────────────────────────────────────────────────────────────────┐ │
│ │ Goroutine 1: pop() │ │
│ │ │ │
│ │ addCh ◄───────────────────────── distribute() │ │
│ │ │ │ │
│ │ │ select { │ │
│ │ │ case notification
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go(行 1296-1328)
func (p *processorListener) pop() {
defer utilruntime.HandleCrashWithLogger(p.logger)
defer close(p.nextCh) // 告诉 run() goroutine 停止
defer close(p.done) // 告诉 watchSynced() goroutine 停止
var nextCh chan
pop() 的逻辑非常巧妙,它使用了一个经典的 channel 生产者-消费者模式:如果 nextCh 的消费者(run goroutine)跟得上,就直接把事件发送过去;如果跟不上,就把事件暂存到 pendingNotifications 缓冲区。这样设计的好处是:即使某个处理器处理速度很慢,也不会阻塞其他处理器的事件分发。
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go(行 1094-1118)
// distribute 将事件分发给所有监听器
// sync 参数决定是否只分发给正在同步的监听器
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
// 等待所有监听器启动
for !p.listenersStarted && len(p.listeners) > 0 {
p.listenersRCond.Wait()
}
// 遍历所有监听器
for listener, isSyncing := range p.listeners {
switch {
case !sync:
// 非同步事件:分发给所有监听器
listener.add(obj)
case isSyncing:
// 同步事件(Resync):只分发给正在同步的监听器
listener.add(obj)
default:
// 跳过:不是同步事件,但监听器不在同步状态
}
}
}
这里的分发策略很巧妙:普通事件(Add/Update/Delete)会分发给所有监听器,而同步事件(Resync)只分发给那些设置了同步标记的监听器。这样,不同的处理器可以根据自己的需求设置不同的同步周期,不会相互干扰。
SharedInformerFactory 是创建和管理多个 SharedIndexInformer 实例的工厂。想象一下,如果你需要同时监听 Pod、Service、Endpoints 三种资源,你会创建三个独立的 Informer。但这样做的问题是:每个 Informer 都会独立连接 API Server,浪费资源。SharedInformerFactory 就是来解决这个问题的:它让多个 Informer 共享底层的连接和资源,同时保持独立的缓存和事件处理逻辑。
// staging/src/k8s.io/client-go/informers/factory.go(行 59-78)
// sharedInformerFactory 管理多个 SharedIndexInformer 实例
type sharedInformerFactory struct {
// Kubernetes 客户端,用于访问 API Server
client kubernetes.Interface
// 命名空间过滤,只监听指定命名空间的资源
namespace string
// tweakListOptions 可以自定义 List/Watch 的过滤条件
tweakListOptions internalinterfaces.TweakListOptionsFunc
// lock 保护 informers map 的并发访问
lock sync.Mutex
// defaultResync 默认的重新同步周期
defaultResync time.Duration
// customResync 特定类型的自定义重新同步周期
customResync map[reflect.Type]time.Duration
// transform 应用于所有 Informer 的对象转换函数
transform cache.TransformFunc
// informerName 用于指标追踪
informerName *cache.InformerName
// informers 缓存所有已创建的 Informer 实例
// key 是对象的 reflect.Type,确保每种类型只有一个 Informer
informers map[reflect.Type]cache.SharedIndexInformer
// startedInformers 标记哪些 Informer 已经启动
startedInformers map[reflect.Type]bool
// wg 用于等待所有 Informer goroutine 退出
wg sync.WaitGroup
// shuttingDown 标记工厂是否正在关闭
shuttingDown bool
}
注意到 `informers map[reflect.Type]cache.SharedIndexInformer` 这个字段,它确保每种资源类型只创建一个 Informer 实例。比如你调用 `factory.Core().V1().Pods()` 两次,返回的是同一个 Informer 实例。这避免了重复连接 API Server 的问题。
// staging/src/k8s.io/client-go/informers/factory.go(行 143-160)
// NewSharedInformerFactoryWithOptions 是创建工厂的主入口
// 支持通过选项模式配置各种参数
func NewSharedInformerFactoryWithOptions(
client kubernetes.Interface,
defaultResync time.Duration,
options ...SharedInformerOption) SharedInformerFactory {
factory := &sharedInformerFactory{
client: client,
namespace: v1.NamespaceAll, // 默认监听所有命名空间
defaultResync: defaultResync,
informers: make(map[reflect.Type]cache.SharedIndexInformer),
startedInformers: make(map[reflect.Type]bool),
customResync: make(map[reflect.Type]time.Duration),
}
// 应用所有选项
for _, opt := range options {
factory = opt(factory)
}
return factory
}
// 常用选项函数
// WithNamespace 只监听特定命名空间的资源
func WithNamespace(namespace string) SharedInformerOption {
return func(factory *sharedInformerFactory) *sharedInformerFactory {
factory.namespace = namespace
return factory
}
}
// WithCustomResyncConfig 为特定资源类型设置自定义同步周期
func WithCustomResyncConfig(resyncConfig map[v1.Object]time.Duration) SharedInformerOption {
return func(factory *sharedInformerFactory) *sharedInformerFactory {
for k, v := range resyncConfig {
factory.customResync[reflect.TypeOf(k)] = v
}
return factory
}
}
// WithTransform 为所有资源设置对象转换函数
func WithTransform(transform cache.TransformFunc) SharedInformerOption {
return func(factory *sharedInformerFactory) *sharedInformerFactory {
factory.transform = transform
return factory
}
}
工厂使用选项模式(Functional Options)来配置参数,这是一种非常优雅的 Go 设计模式。你可以组合多个选项,比如:`NewSharedInformerFactoryWithOptions(client, 30*time.Second, WithNamespace("default"), WithTransform(myTransform))`。这样代码既灵活又易读。
// staging/src/k8s.io/client-go/informers/factory.go(行 241-265)
// InformerFor 懒加载创建或返回已存在的 Informer
// 这是工厂模式的核心方法
func (f *sharedInformerFactory) InformerFor(obj runtime.Object,
newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
// 1. 根据对象类型查找是否已存在
informerType := reflect.TypeOf(obj)
informer, exists := f.informers[informerType]
if exists {
return informer // 直接返回已有的 Informer
}
// 2. 获取该类型的自定义同步周期,如果没有就用默认周期
resyncPeriod, exists := f.customResync[informerType]
if !exists {
resyncPeriod = f.defaultResync
}
// 3. 调用传入的创建函数创建新的 Informer
informer = newFunc(f.client, resyncPeriod)
// 4. 如果工厂设置了 transform,应用到 Informer
if f.transform != nil {
informer.SetTransform(f.transform)
}
// 5. 缓存并返回
f.informers[informerType] = informer
return informer
}
InformerFor 采用了懒加载(Lazy Loading)策略:直到第一次访问某个 Informer 时才创建它。比如你调用 `factory.Apps().V1().Deployments()` 时,实际上只是获取了一个 Informer 的"访问器"(DeploymentInformer),真正的 SharedIndexInformer 要等到你调用 `Informer()` 方法时才创建。
// staging/src/k8s.io/client-go/informers/factory.go(行 162-239)
// Start 启动所有已创建的 Informer // 必须在 WaitForCacheSync 之前调用 func (f *sharedInformerFactory) Start(stopCh
这三个方法构成了完整的生命周期管理:Start 启动所有 Informer,它们开始从 API Server 拉取数据。WaitForCacheSync 阻塞等待首次同步完成,只有当所有缓存都同步完成后才继续。这是非常重要的一步,因为如果缓存还没同步完成就开始处理业务逻辑,可能会读取到不完整的数据。Shutdown 优雅关闭工厂,它会等待所有后台 goroutine 安全退出。
让我展示一个完整的控制器使用 SharedInformerFactory 的典型模式:
// 示例代码
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 1. 创建工厂,默认同步周期 30 秒
factory := informers.NewSharedInformerFactory(client, 30*time.Second)
// 2. 获取 Informer(懒加载,此时还没有真正创建)
podInformer := factory.Core().V1().Pods()
deploymentInformer := factory.Apps().V1().Deployments()
// 3. 注册事件处理器
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// 处理 Pod 添加事件
},
UpdateFunc: func(oldObj, newObj interface{}) {
// 处理 Pod 更新事件
},
DeleteFunc: func(obj interface{}) {
// 处理 Pod 删除事件
},
})
// 4. 启动工厂(Start 不阻塞,立即返回)
factory.Start(ctx.Done())
// 5. 等待所有缓存同步完成(关键步骤!)
// 只有缓存同步完成后,才能安全地读取数据和处理事件
synced := factory.WaitForCacheSync(ctx.Done())
if err := synced.AsError(); err != nil {
return fmt.Errorf("failed to sync cache: %w", err)
}
// 6. 现在可以安全地读取缓存
pods, _ := podInformer.Lister().List("") // 列出所有 Pod
// 7. 运行控制器逻辑...
// controller.Run(ctx, workers)
// 8. 关闭工厂
factory.Shutdown()
这个模式是 Kubernetes 控制器开发的标准范式。记住:WaitForCacheSync 之前一定不能读取缓存或处理业务逻辑,否则可能会基于不完整的数据做出错误的决策。
DeltaFIFO 是 Informer 内部的消息队列,存储所有资源变更的增量(Delta)。理解 DeltaFIFO 的工作原理,对于调试 Informer 相关问题和深入理解 Kubernetes 的事件驱动模型非常重要。
// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go(行 178-195)
// DeltaType 定义了资源变更的类型
type DeltaType string
// 变更类型常量
const (
// Added 当资源第一次被创建时
Added DeltaType = "Added"
// Modified 当资源的 spec 或 metadata 发生变化时
Modified DeltaType = "Modified"
// Deleted 当资源被删除时
Deleted DeltaType = "Deleted"
// Sync 当需要重新同步缓存时(定期触发)
Sync DeltaType = "Sync"
// Replaced 当收到完整的资源列表替换时
// (Watch 断线重连后的第一次 LIST 响应)
Replaced DeltaType = "Replaced"
// Bookmark 特殊事件,用于保持 Watch 连接
Bookmark DeltaType = "Bookmark"
// ReplacedAll 全量替换所有资源
ReplacedAll DeltaType = "ReplacedAll"
// SyncAll 重新同步所有资源
SyncAll DeltaType = "SyncAll"
)
// Delta 代表一个单独的变更
type Delta struct {
Type DeltaType // 变更类型
Object interface{} // 变更的对象(可能是 *v1.Pod 等)
}
// Deltas 是一个 Delta 列表,按时间顺序排列
// 旧的在前面,新的在后面
type Deltas []Delta
Delta 的设计非常巧妙:同一个资源的多个变更会累积在同一个 Deltas 列表中。比如一个 Pod 先被创建(Added),然后被修改(Modified),最后被删除(Deleted),这三个 Delta 都会存储在 key 为 "namespace/pod-name" 的 Deltas 中。Pop 的时候会一次性返回整个列表,processDeltas 会依次处理每个 Delta。
// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go(行 108-158)
// DeltaFIFO 是一个先进先出队列,但存储的是增量而不是完整对象
type DeltaFIFO struct {
logger klog.Logger
name string
// lock 和 cond 保护 items 和 queue 的并发访问
lock sync.RWMutex
cond sync.Cond
// items 存储所有的 Delta,key 是 namespace/name
items map[string]Deltas
// queue 维护出队顺序(FIFO)
queue []string
// synced 用于标记初始同步是否完成
synced chan struct{}
syncedClosed bool
// populated 标记是否已经有初始数据
populated bool
// initialPopulationCount 初始数据的数量
initialPopulationCount int
// keyFunc 用于生成对象的 key
keyFunc KeyFunc
// knownObjects 用于 Replace 操作时检测被删除的对象
knownObjects KeyListerGetter
// closed 标记队列是否已关闭
closed bool
// emitDeltaTypeReplaced 是否使用 Replaced 而不是 Sync
emitDeltaTypeReplaced bool
// transformer 在放入队列前转换对象
transformer TransformFunc
}
DeltaFIFO 和普通 FIFO 的区别在于:它的每个元素不是单独的对象,而是一个 Deltas 列表,代表同一个对象的所有变更。knownObjects 字段用于 Replace 操作时检测哪些对象被删除了——当收到新的资源列表时,那些在旧列表中存在但新列表中不存在的对象,会生成 Deleted Delta。
▼ Q1: 为什么我注册的 AddEventHandler 没有被调用?
A: 最常见的原因是在 WaitForCacheSync 完成之前就注册了处理器。如果你的处理器是在工厂 Start 之前注册的,那么它应该能正常收到事件。但如果是在 Start 之后、WaitForCacheSync 之前注册的,需要确保调用了 `handle.HasSyncedChecker()` 并等待其完成。另外,检查一下你的 Informer 是否已经调用了 `Run()` 方法,如果没有运行 Informer,它永远不会产生事件。
▼ Q2: SharedInformer 和 SharedIndexInformer 有什么区别?我该用哪个?
A: SharedIndexInformer 继承了 SharedInformer,并额外提供了两个能力:GetIndexer() 方法可以获取带索引的本地缓存,AddIndexers() 方法可以添加自定义索引。在实际使用中,client-go 提供的都是 SharedIndexInformer,因为它的能力是 SharedInformer 的超集。如果你需要按标签或字段快速查询资源,就必须用 SharedIndexInformer。SharedInformer 只提供基本的 GetStore() 方法,功能更有限。
▼ Q3: Resync 是什么?我需要设置多长的同步周期?
A: Resync 是定期重新触发同步事件的机制。即使 API Server 没有推送任何变更,Informer 也会按照设定的周期(比如 30 秒)向所有处理器发送一个 Sync 事件。处理器的 OnUpdate 会被调用,传入两个相同的对象(oldObj == newObj)。这个机制主要用于让控制器有机会重新检查和修正状态,比如某些边界情况导致上次处理失败。同步周期的选择取决于你的业务需求:周期越短,状态越实时,但 API Server 压力越大;周期越长,状态可能滞后,但资源消耗更少。对于大多数控制器,默认的 30 秒或几分钟是合理的。如果你有强实时性要求,可以缩短到 10-15 秒;如果你的控制器主要响应显式变更,可以设置更长的周期甚至禁用(设置为 0)。
▼ Q4: 为什么需要多个 processorListener?它们是线程安全的吗?
A: 多个 processorListener 是因为每个注册的处理器(AddEventHandler)都有自己独立的 listener 实例。这样设计的好处是:不同处理器可以有独立的 resync 周期、独立的处理速度、独立的同步状态追踪。一个处理器处理慢不会影响其他处理器。至于线程安全,每个 processorListener 内部都通过 channel 和 mutex 保证了并发安全。你不需要在处理器函数中额外加锁,但要注意:OnAdd/OnUpdate/OnDelete 是在单独的 goroutine 中调用的,如果你访问共享数据,仍然需要同步。
▼ Q5: Informer 的本地缓存会无限增长吗?如何清理?
A: Informer 的本地缓存(Indexer)会存储当前集群中所有符合条件(命名空间过滤)的资源对象。当资源被删除时,对应的缓存条目也会被删除。所以缓存大小是动态的,取决于集群中资源的数量,不会无限增长。但如果你监听的是集群级资源(如 Node、PersistentVolume),这些资源数量可能很大,缓存也会相应占用更多内存。需要清理时,只能停止整个 Informer,工厂无法单独清理某个缓存。
▼ Q6: 控制器重启后,Informer 会重新同步所有数据吗?
A: 是的,每次调用 Run() 启动 Informer 时,都会重新从 API Server 拉取全量数据(通过 LIST 请求),然后处理 Replace 事件更新本地缓存。这个过程对用户是透明的:Informer 会自动处理 Watch 断线重连,Controller 也会收到相应的 Add/Update/Delete 事件。所以即使控制器重启,只要调用了 WaitForCacheSync 并等待完成,你读取到的缓存数据就是完整的、一致的。
▼ Q7: AddEventHandler 返回的 ResourceEventHandlerRegistration 有什么用?
A: ResourceEventHandlerRegistration 是处理器的注册句柄,主要用于两个场景:第一,移除处理器:调用 `RemoveEventHandler(handle)` 可以安全地停止并移除一个处理器,相关的 goroutine 也会被正确关闭。第二,检查同步状态:调用 `handle.HasSyncedChecker()` 可以获取一个 DoneChecker,用于检查这个特定处理器的事件是否都已处理完毕(不仅仅是 Informer 缓存同步,而是处理器本身已处理完所有初始事件)。在 defer 中移除处理器是一个好习惯,可以避免 goroutine 泄漏。
▼ Q8: 为什么有 OnUpdate 和 OnDelete,但没有 OnCreate?创建资源时触发哪个方法?
A: 这是 Informer 的设计选择。当资源被创建时,会触发 OnAdd 方法,而不是单独的 OnCreate。这个设计背后的逻辑是:无论是"新建"还是"从缓存中读取",对于处理器来说都是"添加了一个对象"。从代码实现角度看,processDeltas 会检查对象是否已存在于缓存中,如果不存在就调用 Add,已存在则调用 Update。所以如果你需要区分"创建"和"更新",可以在 OnAdd 的 isInInitialList 参数上做判断:如果为 true,说明是首次从 LIST 响应中加载的;如果为 false,说明是新创建的资源。
▼ Q9: Indexer 和 Lister 有什么区别?我该用哪个读取数据?
A: Indexer 和 Lister 本质上访问的是同一个底层缓存,但提供了不同的接口。Indexer是底层的、直接操作缓存的结构,提供了 Get、List、Add、Update、Delete 等方法。Lister是上层封装,专门为某个资源类型设计的只读访问接口,比如 PodLister.Namespace("default").Get("nginx") 会比 Indexer.GetByKey("default/nginx") 更类型安全、更易用。对于控制器开发,推荐使用 Lister,因为它是类型化的,编译器能帮你检查是否传错了类型。Indexer 更底层,通常用于需要自定义索引或批量操作的场景。
▼ Q10: Watch 断开后,Informer 会自动重连吗?重连期间的数据丢失怎么处理?
A: 是的,Informer 有自动重连机制。当 Watch 连接断开时,Reflector 会等待一段时间后重新发起 LIST 请求,然后从 API Server 获取全量数据和最新的 ResourceVersion,再继续 Watch。整个过程对用户是透明的。关键在于"重连期间的数据丢失"问题:假设在 Watch 断开期间,某 Pod 被创建又删除了,等重连后 LIST 响应中没有这个 Pod,所以这个事件就丢失了。但这不是 bug,而是 Kubernetes 官方的设计权衡——为了保证一致性,选择了每次重连都做全量同步(LIST),而不是维护一个巨大的增量日志。对于大多数控制器来说,这没有问题,因为它们主要依赖 Watch 事件的实时性和 Resync 的兜底。但如果你的业务需要 100% 不丢失事件,可能需要额外的外部机制(如审计日志)来补全。
▼ Q11: 如何调试 Informer 的事件分发?有没有什么好的技巧?
A: 调试 Informer 有几个实用技巧:第一,使用 CacheMutationDetector:SharedIndexInformer 内部有一个 CacheMutationDetector,如果你修改了从缓存获取的对象(而不是通过 Indexer 的 Update 方法),它会检测到并打印警告。第二,打印事件日志:可以在事件处理器中打印日志,观察 OnAdd/OnUpdate/OnDelete 的调用频率和时机。第三,检查 HasSynced 状态:确保在 WaitForCacheSync 返回 true 后再进行调试,否则可能读取到不完整的数据。第四,使用 kubectl get 验证:通过 kubectl 获取资源,对比 Informer 缓存中的数据,看是否一致。对于更高级的调试,可以启用 client-go 的 trace 日志,看 Reflector 的 List/Watch 详细日志。
▼ Q12: 事件处理器 panic 了会怎样?会不会影响整个 Informer?
A: 不会影响整个 Informer。processorListener.run() 内部有 panic 捕获机制,当事件处理函数(OnAdd/OnUpdate/OnDelete)panic 时,会被 utilruntime.HandleCrash() 捕获并记录日志,然后跳过这个事件继续处理下一个。这个设计保证了:一个处理器出问题不会阻塞其他处理器,也不会导致整个 Informer 崩溃。但要注意的是:panic 的事件会被跳过,不会重试。如果你的业务逻辑必须在每次事件发生时都执行,应该在事件处理函数内部做错误处理和重试,而不是依赖 Informer 来保证调用。最佳实践是在处理函数中使用 defer + recover 捕获 panic,并记录详细错误信息便于排查。
▼ Q13: 同一个资源类型可以注册多个事件处理器吗?它们收到的事件顺序一样吗?
A: 可以注册多个处理器。每个处理器收到的事件顺序是相同的(按资源变更的时序),但到达时间可能略有不同,因为处理器是独立运行的 goroutine。具体来说:shareProcessor.distribute() 会依次向每个 listener.addCh 发送事件,但这是非阻塞的快速操作。各个 processorListener 的 pop() goroutine 独立从 addCh 接收,然后各自的 run() goroutine 独立处理。所以两个不同的处理器可能一个已经处理完 Add 事件并开始处理 Update,而另一个还在处理 Add。但这不会造成逻辑问题,因为事件本身是按序产生的,各处理器独立处理不会产生竞态条件。
▼ Q14: 什么是 pendingNotifications 缓冲区?它会不会导致内存溢出?
A: pendingNotifications 是一个无界环形缓冲区,用于暂存来不及处理的事件。它的设计目的是:防止一个慢处理器阻塞其他处理器和数据流。当 run() goroutine 处理速度跟不上 pop() 接收速度时,新事件会被写入这个缓冲区。如果一个处理器持续很慢,这个缓冲区会不断增长,最终可能导致 OOM。这是一个已知的设计权衡。代码注释里也提到:"This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but we should try to do something better." 所以在实际使用中,不要在事件处理器中做耗时的同步操作。如果需要做耗时操作,应该把任务放入工作队列(WorkQueue),然后用单独的 worker goroutine 异步处理。缓冲区大小可以通过 newProcessListener 的 bufferSize 参数控制,默认是 initialBufferSize(通常 1024)。
▼ Q15: AddEventHandler 和 AddEventHandlerWithOptions 有什么区别?
A: AddEventHandlerWithOptions 提供了更丰富的配置选项。主要区别在于:AddEventHandler 使用工厂默认的 resync 周期,不能自定义。而 AddEventHandlerWithOptions 可以通过 HandlerOptions 设置:ResyncPeriod:自定义的重新同步周期,会覆盖工厂的默认值;Logger:为这个处理器指定独立的日志记录器;Name:给处理器起个名字,便于在日志和指标中识别。实际上,AddEventHandler 内部就是调用 AddEventHandlerWithOptions,使用默认参数。高级用法示例:`informer.AddEventHandlerWithOptions(handler, HandlerOptions{ResyncPeriod: 10*time.Second, Name: "my-custom-handler"})`。
▼ Q16: 为什么需要 TransformFunc?它有什么实际用途?
A: TransformFunc 允许在资源对象存入本地缓存之前对其进行转换或过滤。这是一个强大的钩子,常用场景包括:精简数据:删除不需要的字段(如 status、annotations 的某些 key),减少内存占用;添加默认值:为缺失的字段设置默认值;敏感信息过滤:删除 Secret 的 data 内容(只保留 key 列表);资源压缩:将多个 annotation 合并,节省存储空间。Transform 在 DeltaFIFO 的队列中就会应用,所以任何通过事件获取的对象和从 Indexer/Lister 获取的对象都会经过转换。使用示例:`factory := NewSharedInformerFactoryWithOptions(client, 30*time.Second, WithTransform(func(obj interface{}) (interface{}, error) { return clearSecretData(obj), nil }))`。
▼ Q17: Controller 和 Informer 使用同一个 SharedInformerFactory,会有资源竞争吗?
A: 不会有资源竞争。SharedInformerFactory 的核心价值就是共享资源:同一个 Pod Informer 实例被多个控制器(DeploymentController、ReplicaSetController、EndpointController 等)共享,它们都通过同一个 Informer 监听 Pod 变化,但各自有独立的事件处理器(processorListener)。共享体现在:共享底层的 List/Watch 连接、共享 DeltaFIFO 队列、共享 Indexer 缓存。各控制器的独立性体现在:独立的事件处理器、独立 resync 周期、独立同步状态追踪。所以,即使 10 个控制器都监听 Pod,也不会创建 10 个到 API Server 的连接,只会有 1 个 Watch 连接和 1 个 LIST 请求。
▼ Q18: 什么是 Bookmarks 事件?为什么要保留它?
A: Bookmark 是一种特殊的 Delta 类型,用于保持 Watch 连接的活跃状态。在 Kubernetes 的 Watch 机制中,如果长时间没有任何事件发生,Watch 连接可能会被网络中间设备(如负载均衡器、NAT 网关)超时断开。为了防止这种情况,API Server 会定期发送 Bookmark 事件(包含当前的 ResourceVersion),提醒客户端连接仍然活跃。从代码角度看,Bookmark 事件会被 processDeltas 特殊处理:它会更新本地缓存的 ResourceVersion 记录,但不会触发任何处理器回调。所以你注册的 OnAdd/OnUpdate/OnDelete 不会被 Bookmark 事件调用。Bookmark 主要用于 Informer 内部的资源版本追踪,普通开发者一般不需要关心它。
▼ Q19: 在 Handler 中修改对象会生效吗?为什么?
A: 直接修改 Handler 参数不会生效,但也不会报错,只是修改被丢弃了。这是因为 Informer 传给你的对象是缓存中的引用,processDeltas 在处理完事件后不会把对象再写回缓存(除非通过 Indexer 的 Update 方法显式操作)。有两个理由不要直接修改:第一,修改不会被保存,下次从缓存获取的还是原始对象;第二,SharedIndexInformer 内部有 CacheMutationDetector,会检测到这种"意外修改"并打印警告(如果启用了 debug 模式)。正确的做法是:如果需要更新资源,调用 Kubernetes client 的 Update/Patch 方法;如果需要更新缓存,在 Indexer 上调用 Update 方法。
▼ Q20: SharedIndexInformer 和 NewSharedInformer 有什么区别?应该用哪个?
A: NewSharedInformer 和 NewSharedIndexInformer 最终返回的都是 SharedIndexInformer,区别只是参数和便捷性:NewSharedInformer 是简化版本,只接受最基本的参数(ListerWatcher、示例对象、resync 周期),返回类型是 SharedInformer(接口);NewSharedIndexInformer 是完整版本,额外接受 Indexers 参数,可以创建带自定义索引的缓存,返回类型是 SharedIndexInformer(接口)。实际使用中,client-go 的代码生成工具(如 informer-gen)生成的代码都使用 NewSharedIndexInformerWithOptions,给你最大的灵活性。所以结论是:用 NewSharedIndexInformer 或 NewSharedIndexInformerWithOptions,除非你确定不需要索引功能。
让我们回顾一下今天学到的核心知识点:
提示:本文档基于 Kubernetes 1.36.1 版本的 client-go 源码编写。不同版本可能在细节上有所差异,但核心机制保持一致。建议读者 clone 源码仓库,使用 IDE 的跳转功能亲自阅读这些文件的完整内容,加深理解。
Kubernetes client-go Informer 源码深度解析 · 来源:Kubernetes 1.36.1 源码分析
此内容由惯性聚合(RSS阅读器)自动聚合整理,仅供阅读参考。 原文来自 — 版权归原作者所有。