惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

腾讯CDC
Hacker News: Ask HN
Hacker News: Ask HN
S
Securelist
Security Latest
Security Latest
S
Schneier on Security
T
Threat Research - Cisco Blogs
Latest news
Latest news
Cyberwarzone
Cyberwarzone
A
Arctic Wolf
Threat Intelligence Blog | Flashpoint
Threat Intelligence Blog | Flashpoint
NISL@THU
NISL@THU
cs.AI updates on arXiv.org
cs.AI updates on arXiv.org
I
Intezer
T
The Exploit Database - CXSecurity.com
N
News and Events Feed by Topic
Simon Willison's Weblog
Simon Willison's Weblog
T
Tor Project blog
Blog — PlanetScale
Blog — PlanetScale
C
Cyber Attacks, Cyber Crime and Cyber Security
C
CERT Recently Published Vulnerability Notes
The Hacker News
The Hacker News
月光博客
月光博客
WordPress大学
WordPress大学
博客园 - 叶小钗
Hugging Face - Blog
Hugging Face - Blog
美团技术团队
量子位
cs.CL updates on arXiv.org
cs.CL updates on arXiv.org
C
Cisco Blogs
博客园 - 三生石上(FineUI控件)
Google DeepMind News
Google DeepMind News
Project Zero
Project Zero
Webroot Blog
Webroot Blog
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
Application and Cybersecurity Blog
Application and Cybersecurity Blog
云风的 BLOG
云风的 BLOG
L
LINUX DO - 最新话题
Schneier on Security
Schneier on Security
Engineering at Meta
Engineering at Meta
www.infosecurity-magazine.com
www.infosecurity-magazine.com
aimingoo的专栏
aimingoo的专栏
D
Docker
有赞技术团队
有赞技术团队
Google DeepMind News
Google DeepMind News
宝玉的分享
宝玉的分享
T
Troy Hunt's Blog
L
Lohrmann on Cybersecurity
T
The Blog of Author Tim Ferriss
cs.CV updates on arXiv.org
cs.CV updates on arXiv.org
L
LangChain Blog

博客园 - 左扬

Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:生产级 Controller 实践:并发安全、资源清理与高可用设计 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析: Controller 调试与诊断工具:从日志分析到问题定位 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:DynamicClient 操作 CRD:无需代码生成的动态操作 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:控制器与 APIServer 完整交互流程:从 Watch 到缓存同步 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:错误处理与重试机制:WorkQueue 限速器详解 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:Leader 选举机制:高可用控制器的必备技能 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:Controller 开发模式完整实战 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:SharedInformerFactory 与等待缓存同步 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:从认证配置到 Deployment 操作 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:版本对应、架构组件与组件关系 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:Reflector 源码深度解析 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:ListWatcher 源码深度解析 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:Indexer 与 ThreadSafeStore 核心原理与源码深度剖析 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:DeltaFIFO 核心原理与源码深度剖析 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:workqueue 核心原理与实战 Kubernetes 编程 / Operator 专题【左扬精讲】—— runtime.Codec 资源编解码:serializer 与 codec 差异、编解码数据结构、codec 核心调用链路 Kubernetes 编程 / Operator 专题【左扬精讲】—— Scheme 资源注册机制全解 Kubernetes 编程 / Operator 专题【左扬精讲】—— Kubernetes 自定义资源的内部版本与外部版本:从源码看版本定义机制 Kubernetes 编程 / Operator 专题【左扬精讲】—— Kubernetes 1.36.1 核心 API 数据结构全解 Kubernetes 编程 / Operator 专题【左扬精讲】—— Kubernetes 构建过程 【AIOPS】一文读懂LLM【左扬精讲】:从诞生到普及,解锁大语言模型的核心密码 【AIOPS】AI Agent 专题【左扬精讲】核心功能篇:MCP-VictoriaMetrics Hooks 源码精讲:Hooks 可观测性的无侵入式实现 【AIOPS】AI Agent 专题【左扬精讲】核心功能篇:MCP-VictoriaMetrics Golang 配置解析源码精讲 ——SRE 自定义 Agent 核心技巧 【AIOPS】AI Agent 专题【左扬精讲】核心功能篇:MCP-VictoriaMetrics Golang 并发模型解析 ——SRE 应对高并发采集的调优思路 【AIOPS】AI Agent 专题【左扬精讲】基础架构篇:MCP-VictoriaMetrics Golang 源码整体架构拆解 ——SRE 必懂的核心模块与数据流 OpenTelemetry 开发实战【左扬精讲】—— 云原生可观测体系构建与分布式追踪二次开发 Kubernetes 编程 / Operator 专题【左扬精讲】—— Operator 开发实战项目 7 —— 基于流量预测模型的智能弹性扩缩容 Operator 实战(AIOps 模型训练与智能扩容(下篇)—— 预测式弹性扩缩容 Operator 落地实现) Kubernetes 编程 / Operator 专题【左扬精讲】—— Operator 开发实战项目 7 —— 基于流量预测模型的智能弹性扩缩容 Operator 实战(AIOps 模型训练与智能扩容(上篇)—— 时序预测模型构建与离线训练) Kubernetes 编程 / Operator 专题【左扬精讲】—— Operator 开发实战项目 6 —— 基于运维专家知识库的智能故障诊断与排查 Operator 实战 Kubernetes 编程 / Operator 专题【左扬精讲】—— Operator 开发实战项目 5 —— 基于大语言模型(LLM)的实时日志流智能监测 Operator 实现 Kubernetes 编程 / Operator 专题【左扬精讲】—— Operator 开发实战项目 4 —— 基于 Operator 实现大模型私有化部署与管理 Kubernetes 编程 / Operator 专题【左扬精讲】—— Operator 开发实战项目 3 —— 面向 AI / 算力调度场景:GPU 竞价实例资源池统一调度管理 Operator 开发 Kubernetes编程 / Operator专题【左扬精讲】—— Operator 开发实战项目 2 —— 面向零售 / 电商潮汐流量难题:多云多集群数据中心级全链路弹性伸缩 DataCenter Scaler Operator 从 0 到 1 全链路开发 Kubernetes编程 / Operator专题【左扬精讲】—— 深入理解Kubebuilder注解:为什么Operator开发离不开这些特殊注释 Kubernetes编程 / Operator专题【左扬精讲】—— Operator 开发实战项目1 —— Applicaion Operator(通用应用生命周期管理 Operator 实战) Pod 镜像拉取失败?kubectl edit pods修改镜像地址的底层原理与实操 (该方法仅为临时应急方案,并非长期解决方案) Kubernetes编程/Operator专题精讲—— 理解控制器模式 —— 控制器模式的核心原理与实现逻辑(从原理到实践) 【AIOPS】AI Agent 专题【左扬精讲】模型微调实战:一站式平台 LLaMA-Factory 【AIOPS】AI Agent 专题【左扬精讲】基于 k8s+vLLM+Ray 分布式部署全指南:架构设计、资源调度与性能优化 【AIOPS】AI Agent专题【左扬精讲】非量化版DeepSeek分布式部署全指南:精度保障、显存规划与Ollama/vLLM选型 【AIOPS】AI Agent 专题【左扬精讲】零开发框架实现 ReAct Agent(Go SRE友好)
Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:Informer 源码深度解析:从底层原理到实战应用
左扬 · 2026-06-13 · via 博客园 - 左扬

Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:Informer 源码深度解析:从底层原理到实战应用

当我们第一次写 Kubernetes Operator 或者开发自己的控制器时,总会遇到一个让人头疼的问题:为什么我们需要 Informer?它和 Controller 到底有什么区别?SharedIndexInformer、shareProcessor、SharedInformerFactory 这些名字眼花缭乱,它们各自承担什么职责?今天这篇文章,我们从源码出发,用最直白的语言,把 Informer 机制的每一个细节都讲清楚。读完这篇,你将能够:理解 Informer 和 Controller 的本质区别、掌握 SharedIndexInformer 的内部工作原理、看懂 shareProcessor 如何分发事件、以及熟练使用 SharedInformerFactory 管理多个 Informer 的生命周期。

Kubernetes client-go Informer Controller 源码分析

🔓 学习重点提示  — 建议先通读全文,再重点回顾标注内容

★ 重点掌握(必须)
   • Informer vs Controller 的本质区别:Informer 是数据管道,Controller 是业务逻辑
   • SharedIndexInformer 的三大组件:Indexer、Controller、Processor
   • shareProcessor 的事件分发机制:三个 goroutine 的协作
   • SharedInformerFactory 的生命周期管理:Start、WaitForCacheSync、Shutdown

☆ 次重点(了解即可)
   • DeltaFIFO 的 Delta 类型和队列机制
   • Resync 机制和最小同步周期
   • 缓存变更检测器 (CacheMutationDetector)


一、Informer vs Controller:到底有什么区别?

很多初学者会把 Informer 和 Controller 混为一谈,觉得它们是同一个东西。实际上,它们承担着完全不同的职责,理解这个区别是掌握整个 Kubernetes 控制器开发的关键。让我用一个生活化的比喻来解释:把 Kubernetes 集群想象成一个大型图书馆,API Server 就是图书管理员,而你(开发者)需要知道馆里有哪些书、哪些书被借走了、哪些书过期了。

1.1 一句话总结区别

Informer 是数据管道,负责从 API Server 拉取数据、维护本地缓存、向你通知资源变化。Controller 是业务逻辑,负责监听资源变化、计算期望状态、调用 API Server 调整实际状态。简单来说:Informer 告诉你"发生了什么",Controller 决定"该怎么处理"。

1.2 代码层面的区别

我们来看 EndpointController(端点控制器)的结构定义,它同时使用了 Informer 和 Controller 两种组件:

// pkg/controller/endpoint/endpoints_controller.go(行 132-180)

// Controller manages selector-based service endpoints.
// 这个 Controller 是业务逻辑层,它使用 Informer 提供的缓存
type Controller struct {
    client           clientset.Interface
    eventBroadcaster record.EventBroadcaster
    eventRecorder    record.EventRecorder

    // serviceLister 和 podLister 是 Informer 提供的只读缓存
    // Controller 通过它们读取数据,从不直接访问 API Server
    serviceLister corelisters.ServiceLister
    servicesSynced cache.InformerSynced  // Informer 的同步状态

    podLister corelisters.PodLister
    podsSynced cache.InformerSynced

    endpointsLister corelisters.EndpointsLister
    endpointsSynced cache.InformerSynced

    // queue 是 Controller 的工作队列,用于批量处理和去重
    // 这是 Controller 自己的组件,Informer 不关心
    queue workqueue.TypedRateLimitingInterface[string]
    podQueue workqueue.TypedRateLimitingInterface[*endpointsliceutil.PodProjectionKey]

    workerLoopPeriod time.Duration
    triggerTimeTracker *endpointsliceutil.TriggerTimeTracker
    endpointUpdatesBatchPeriod time.Duration
}

从这段代码我们可以看到,EndpointController 本身并不直接连接 API Server,它依赖 Informer 提供的 serviceLister、podLister、endpointsLister 来读取数据。这些 Lister 就是 Informer 缓存的访问入口。Controller 的核心职责是根据服务的变化,计算出应该有哪些端点,然后更新到 API Server。

1.3 工作流程对比

让我用一张图来展示 Informer 和 Controller 的协作关系:

┌─────────────────────────────────────────────────────────────────────────┐
│                          API Server(数据源)                              │
└──────────────────────────────┬──────────────────────────────────────────┘
                               │ List/Watch
┌──────────────────────────────▼──────────────────────────────────────────┐
│                         Informer(数据管道)                               │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │  SharedIndexInformer                                              │   │
│  │  ├── Reflector ──────→ DeltaFIFO ──→ Controller ──→ Indexer      │   │
│  │  │                  (增量队列)      (处理循环)    (本地缓存)        │   │
│  │  │                                                              │   │
│  │  └── shareProcessor ──→ processorListener(事件通知)             │   │
│  │                        (多播分发)    (每个 Handler 一个)           │   │
│  └─────────────────────────────────────────────────────────────────┘   │
└──────────────────────────────┬──────────────────────────────────────────┘
                               │ 事件回调 + 缓存读取
┌──────────────────────────────▼──────────────────────────────────────────┐
│                       Controller(业务逻辑)                               │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │  EndpointController / DeploymentController / JobController...    │   │
│  │  ├── 接收 Informer 的事件回调                                      │   │
│  │  ├── 读取 Indexer 缓存获取完整数据                                  │   │
│  │  ├── 放入 WorkQueue 等待处理                                      │   │
│  │  ├── Worker 协程从队列消费,计算期望状态                            │   │
│  │  └── 调用 API Server 调整实际状态                                  │   │
│  └─────────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────────┘

从这幅图我们可以看出:Informer 负责数据的获取和同步,它通过 Reflector 持续从 API Server 拉取数据,放入 DeltaFIFO 队列,然后通过 Controller 处理后存入 Indexer 缓存。Controller 负责业务逻辑,它监听 Informer 的事件回调,读取缓存数据,计算差异,然后采取行动。

🌟 实用技巧
当你开发自己的控制器时,记住这个黄金法则:Controller 永远只读缓存,不直接访问 API Server。如果需要创建、更新、删除资源,才通过 client 调用 API Server。这不仅减少了 API Server 的压力,还保证了控制器的高可用性(即使 API Server 暂时不可用,控制器依然可以正常工作)。


二、SharedIndexInformer 核心结构解析

SharedIndexInformer 是 client-go 中最核心的 Informer 实现。它的名字里有"Shared",意味着多个控制器可以共享同一个 Informer 实例,从而减少对 API Server 的连接数和请求压力。让我带你深入它的源码结构。

2.1 SharedIndexInformer 接口定义

首先我们看 SharedIndexInformer 实现了哪些接口,这些接口定义了它的核心能力:

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go(行 130-288)

// SharedInformer 是基础接口,提供事件监听和缓存访问能力
type SharedInformer interface {
    // AddEventHandler 添加一个事件处理器,返回一个注册句柄
    // 这个句柄可以用于后续移除处理器
    AddEventHandler(handler ResourceEventHandler) (ResourceEventHandlerRegistration, error)
    
    // AddEventHandlerWithOptions 添加带选项的事件处理器
    // 可以指定 resync 周期等参数
    AddEventHandlerWithOptions(handler ResourceEventHandler, options HandlerOptions) (ResourceEventHandlerRegistration, error)
    
    // RemoveEventHandler 移除之前添加的事件处理器
    RemoveEventHandler(handle ResourceEventHandlerRegistration) error
    
    // GetStore 返回本地存储(已废弃,使用 GetIndexer)
    GetStore() Store
    
    // GetIndexer 返回带索引的本地存储
    // 这是读取缓存数据的主要入口
    GetIndexer() Indexer
    
    // Run 启动 Informer 的主循环
    Run(stopCh

从接口定义我们可以看到,SharedInformer 的核心功能就是:添加事件处理器(AddEventHandler)、获取本地缓存(GetIndexer)、启动运行(Run)、检查同步状态(HasSynced)。SharedIndexInformer 在此基础上增加了自定义索引的能力,这对于需要按标签或字段快速查询的场景非常有用。

2.2 sharedIndexInformer 内部结构

现在让我们看看 sharedIndexInformer 的内部实现结构,这是真正干活的类:

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go(行 585-638)

// sharedIndexInformer 是 SharedIndexInformer 接口的具体实现
// 它由三大核心组件构成:Indexer、Controller、Processor
type sharedIndexInformer struct {
    // indexer 是本地缓存,存储所有已知的资源对象
    // 相当于一个线程安全的 Map,key 是 namespace/name
    indexer    Indexer
    
    // controller 负责从 DeltaFIFO 取出增量并处理
    // 它会调用 processDeltas 函数处理每个变更
    controller Controller

    // synced 是一个关闭 channel
    // 当首次同步完成后,这个 channel 会被关闭
    // HasSynced() 方法就是通过检查这个 channel 实现的
    synced chan struct{}

    // processor 负责将事件分发给所有注册的处理器
    // 它管理多个 processorListener,每个处理器一个
    processor             *sharedProcessor
    
    // cacheMutationDetector 用于检测缓存是否被意外修改
    // 这是一个调试工具,帮助发现不正确的代码
    cacheMutationDetector MutationDetector

    // listerWatcher 是数据源,负责 List 和 Watch API Server
    listerWatcher ListerWatcher

    // objectType 是这个 Informer 预期的对象类型
    // 用于类型检查,过滤掉不匹配的对象
    objectType runtime.Object

    // resyncCheckPeriod 是检查是否需要重新同步的周期
    // 默认值通常是 30 秒
    resyncCheckPeriod time.Duration
    
    // defaultEventHandlerResyncPeriod 是处理器默认的重新同步周期
    defaultEventHandlerResyncPeriod time.Duration
    
    // clock 用于测试,可以模拟时间流逝
    clock clock.Clock

    // started 和 stopped 标记 Informer 的运行状态
    started, stopped bool
    startedLock      sync.Mutex

    // blockDeltas 是一个互斥锁
    // 用于在添加新的处理器时暂停事件分发
    // 这样可以安全地为新处理器发送"初始化添加"事件
    blockDeltas sync.Mutex

    // watchErrorHandler 处理 Watch 错误
    watchErrorHandler WatchErrorHandlerWithContext

    // transform 用于在存入缓存前转换对象
    transform TransformFunc
}

这个结构体的注释已经非常清晰了。让我用更直白的话解释一下三大组件的职责:Indexer 就是本地缓存,你可以把它想象成一个线程安全的 Map,key 是"命名空间/资源名称",value 是资源对象本身。Controller 是控制器,它不断从 DeltaFIFO 取出增量(Delta),然后更新 Indexer 缓存,并调用 Processor 分发事件。Processor 是事件分发器,它管理多个"监听器"(processorListener),当有事件发生时,它负责把事件传递给每一个监听器。

2.3 创建 SharedIndexInformer

让我们看看如何创建一个 SharedIndexInformer:

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go(行 308-340)

// NewSharedIndexInformerWithOptions 是创建 Informer 的主入口
func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.Object, 
    options SharedIndexInformerOptions) SharedIndexInformer {
    
    // 1. 创建一个真实时钟,用于定时器
    realClock := &clock.RealClock{}

    // 2. 创建共享处理器,管理所有事件监听器
    processor := &sharedProcessor{clock: realClock}
    processor.listenersRCond = sync.NewCond(processor.listenersLock.RLocker())

    // 3. 创建并返回 sharedIndexInformer 实例
    return &sharedIndexInformer{
        // 创建带索引的本地缓存
        // DeletionHandlingMetaNamespaceKeyFunc 是默认的 key 生成函数
        // 格式为 "namespace/name",对于集群级资源则是 "name"
        indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers, 
            WithStoreMetrics(options.Identifier, options.InformerMetricsProvider)),
        
        // 注入共享处理器
        processor: processor,
        
        // 创建一个未关闭的 channel,用于表示同步状态
        synced: make(chan struct{}),
        
        // 数据源
        listerWatcher: lw,
        
        // 示例对象,用于类型检查
        objectType: exampleObject,
        
        // 重新同步检查周期
        resyncCheckPeriod: options.ResyncPeriod,
        defaultEventHandlerResyncPeriod: options.ResyncPeriod,
        
        // 时钟
        clock: realClock,
        
        // 缓存变更检测器
        cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
        
        // key 生成函数
        keyFunc: DeletionHandlingMetaNamespaceKeyFunc,
    }
}

创建过程非常清晰:首先创建一个共享处理器(sharedProcessor),然后创建本地缓存(Indexer),最后把所有组件组装成一个完整的 sharedIndexInformer 实例。Indexer 的 key 生成函数 `DeletionHandlingMetaNamespaceKeyFunc` 会生成"命名空间/名称"格式的 key,比如 "default/nginx-pod",这和我们用 kubectl 获取资源时看到的信息是一致的。

2.4 SharedIndexInformer 的运行流程

当我们调用 Run 方法启动 Informer 时,内部发生了什么?

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go(行 719-783)

func (s *sharedIndexInformer) RunWithContext(ctx context.Context) {
    defer utilruntime.HandleCrashWithContext(ctx)
    logger := klog.FromContext(ctx)

    // 防止重复启动
    if s.HasStarted() {
        logger.Info("Warning: the sharedIndexInformer has started, run more than once is not allowed")
        return
    }

    func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()

        // 1. 创建 DeltaFIFO 队列和对应的日志记录器
        // 这个 FIFO 存储资源的所有变更(增量)
        logger, fifo := newQueueFIFO(logger, s.objectType, s.indexer, s.transform, 
            s.identifier, s.informerMetricsProvider)

        // 2. 创建 Controller 配置
        // Controller 会持续从 ListerWatcher 拉取数据,放入 FIFO
        cfg := &Config{
            Queue:             fifo,                      // DeltaFIFO 队列
            ListerWatcher:     s.listerWatcher,           // 数据源
            ObjectType:        s.objectType,              // 对象类型
            FullResyncPeriod:  s.resyncCheckPeriod,       // 完整重新同步周期
            ShouldResync:      s.processor.shouldResync,  // 检查是否需要重新同步

            // Process 函数处理每个从 FIFO 取出的增量
            Process: func(obj interface{}, isInInitialList bool) error {
                return s.handleDeltas(logger, obj, isInInitialList)
            },
            ProcessBatch: func(deltas []Delta, isInInitialList bool) error {
                return s.handleBatchDeltas(logger, deltas, isInInitialList)
            },
            WatchErrorHandlerWithContext: s.watchErrorHandler,
        }

        // 3. 创建 Controller
        s.controller = New(cfg)
        s.controller.(*controller).clock = s.clock
        s.started = true
    }()

    // 4. 创建独立的上下文用于停止 Processor
    // Processor 必须在 Controller 之后停止
    processorStopCtx, stopProcessor := context.WithCancelCause(context.WithoutCancel(ctx))
    var wg wait.Group
    
    defer wg.Wait()  // 等待 Processor 停止
    defer stopProcessor(errors.New("informer is stopping"))  // 通知 Processor 停止

    // 5. 启动三个后台任务
    // 任务1:缓存变更检测器(用于调试)
    wg.StartWithChannel(processorStopCtx.Done(), s.cacheMutationDetector.Run)
    
    // 任务2:事件分发器
    wg.StartWithContext(processorStopCtx, s.processor.run)
    
    // 任务3:监控同步状态
    wg.Start(func() {
        select {
        case <-ctx.Done():
            // 我们被停止了,没有完成同步
        case <-s.controller.HasSyncedChecker().Done():
            // Controller 已同步,我们也同步了
            close(s.synced)  // 关闭同步 channel
        }
    })

    // 6. 标记为已停止状态(不允许添加新的处理器)
    defer func() {
        s.startedLock.Lock()
        defer s.startedLock.Unlock()
        s.stopped = true
    }()

    // 7. 运行 Controller(这是主循环,不会返回)
    s.controller.RunWithContext(ctx)
}

Run 方法的逻辑非常清晰,总结一下就是:创建 DeltaFIFO 队列 → 创建 Controller → 启动三个后台任务(缓存检测、事件分发、同步监控)→ 运行 Controller 主循环。Controller 会启动一个无限循环,持续从 ListerWatcher 拉取数据并放入 DeltaFIFO。


三、shareProcessor 事件分发机制详解

shareProcessor 是 SharedIndexInformer 中的事件分发器,负责将资源变更事件分发给所有注册的处理器。它的名字里的"share"体现了它的核心价值:多个处理器可以共享同一个 Informer 的事件流,而不是每个处理器都独立连接 API Server。

3.1 shareProcessor 结构

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go(行 1017-1031)

// sharedProcessor 管理多个 processorListener,并负责分发事件
// "Shared"体现在:多个控制器共享同一个 Processor 实例
type sharedProcessor struct {
    // listenersStarted 标记是否已启动所有监听器
    listenersStarted bool
    
    // listenersLock 保护监听器列表的并发访问
    listenersLock    sync.RWMutex
    
    // listenersRCond 条件变量,用于等待监听器启动
    listenersRCond   *sync.Cond
    
    // listeners 是一个 map,键是监听器指针,值表示是否正在同步
    // 这个 map 会在 Processor 停止时被清空
    listeners map[*processorListener]bool
    
    // clock 用于计算重新同步时间
    clock     clock.Clock
    
    // wg 用于等待所有监听器 goroutine 退出
    wg        wait.Group
}

shareProcessor 的设计非常简洁:它维护一个 listener map,每个注册的处理器对应一个 processorListener。当有事件需要分发时,它会遍历所有监听器,把事件发送到对应的 channel 中。

3.2 processorListener 三 goroutine 架构

processorListener 是每个事件处理器的运行时表示。它内部使用三个 goroutine 来处理事件分发,这是整个 Informer 机制中最复杂也最巧妙的部分。让我详细解释:

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go(行 1195-1254)

// processorListener 使用三个 goroutine 来中转事件
// 
// 第一个 goroutine: pop()
//   - 从 addCh channel 接收事件
//   - 使用无界环形缓冲区(pendingNotifications)暂存事件
//   - 当 nextCh 消费者跟不上时,事件会被暂存到缓冲区
//
// 第二个 goroutine: run()
//   - 从 nextCh channel 接收事件
//   - 同步调用 handler 的 OnAdd/OnUpdate/OnDelete 方法
//   - 如果处理函数 panic,会捕获并跳过该事件
//
// 第三个 goroutine: watchSynced()
//   - 监控上游是否已同步
//   - 使用 SingleFileTracker 跟踪已处理的事件
type processorListener struct {
    logger klog.Logger
    
    // nextCh 是 run() goroutine 的输入 channel
    nextCh chan interface{}
    
    // addCh 是 pop() goroutine 的输入 channel
    addCh  chan interface{}
    
    // done 是关闭信号 channel
    done   chan struct{}

    // handler 是用户注册的事件处理函数
    handler     ResourceEventHandler
    handlerName string

    // syncTracker 跟踪事件处理进度
    syncTracker       *synctrack.SingleFileTracker
    upstreamHasSynced DoneChecker

    // pendingNotifications 是一个无界环形缓冲区
    // 当 run() 处理速度跟不上 addCh 发送速度时,
    // 事件会被暂存到这里,防止丢失
    pendingNotifications buffer.RingGrowing
    pendingNotificationsLength atomic.Int64

    // requestedResyncPeriod 是用户请求的重新同步周期
    requestedResyncPeriod time.Duration
    
    // resyncPeriod 是实际使用的重新同步周期
    // 可能被调整为不小于 minimumResyncPeriod
    resyncPeriod time.Duration
    
    // nextResync 下一次需要重新同步的时间
    nextResync time.Time
    resyncLock sync.Mutex
}

三个 goroutine 的协作关系可以用这幅图来理解:

┌────────────────────────────────────────────────────────────┐
                    │              sharedIndexInformer                           │
                    │                                                            │
                    │   handleDeltas() ──────────────┐                          │
                    │                                  │                          │
                    │                                  ▼                          │
                    │                          processor.distribute()            │
                    │                                  │                          │
                    └──────────────────────────────────┼──────────────────────────┘
                                                       │
                                                       ▼
┌─────────────────────────────────────────────────────────────────────────────────┐
│                         processorListener (每个 Handler 一个)                    │
│                                                                                 │
│   ┌─────────────────────────────────────────────────────────────────────────┐   │
│   │  Goroutine 1: pop()                                                    │   │
│   │                                                                         │   │
│   │   addCh ◄───────────────────────── distribute()                         │   │
│   │    │                                                                      │   │
│   │    │  select {                                                          │   │
│   │    │    case notification

3.3 pop() goroutine 的实现

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go(行 1296-1328)

func (p *processorListener) pop() {
    defer utilruntime.HandleCrashWithLogger(p.logger)
    defer close(p.nextCh) // 告诉 run() goroutine 停止
    defer close(p.done)   // 告诉 watchSynced() goroutine 停止

    var nextCh chan

pop() 的逻辑非常巧妙,它使用了一个经典的 channel 生产者-消费者模式:如果 nextCh 的消费者(run goroutine)跟得上,就直接把事件发送过去;如果跟不上,就把事件暂存到 pendingNotifications 缓冲区。这样设计的好处是:即使某个处理器处理速度很慢,也不会阻塞其他处理器的事件分发。

3.4 distribute() 分发策略

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go(行 1094-1118)

// distribute 将事件分发给所有监听器
// sync 参数决定是否只分发给正在同步的监听器
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()

    // 等待所有监听器启动
    for !p.listenersStarted && len(p.listeners) > 0 {
        p.listenersRCond.Wait()
    }

    // 遍历所有监听器
    for listener, isSyncing := range p.listeners {
        switch {
        case !sync:
            // 非同步事件:分发给所有监听器
            listener.add(obj)
        case isSyncing:
            // 同步事件(Resync):只分发给正在同步的监听器
            listener.add(obj)
        default:
            // 跳过:不是同步事件,但监听器不在同步状态
        }
    }
}

这里的分发策略很巧妙:普通事件(Add/Update/Delete)会分发给所有监听器,而同步事件(Resync)只分发给那些设置了同步标记的监听器。这样,不同的处理器可以根据自己的需求设置不同的同步周期,不会相互干扰。


四、SharedInformerFactory 工厂模式详解

SharedInformerFactory 是创建和管理多个 SharedIndexInformer 实例的工厂。想象一下,如果你需要同时监听 Pod、Service、Endpoints 三种资源,你会创建三个独立的 Informer。但这样做的问题是:每个 Informer 都会独立连接 API Server,浪费资源。SharedInformerFactory 就是来解决这个问题的:它让多个 Informer 共享底层的连接和资源,同时保持独立的缓存和事件处理逻辑

4.1 工厂结构

// staging/src/k8s.io/client-go/informers/factory.go(行 59-78)

// sharedInformerFactory 管理多个 SharedIndexInformer 实例
type sharedInformerFactory struct {
    // Kubernetes 客户端,用于访问 API Server
    client kubernetes.Interface
    
    // 命名空间过滤,只监听指定命名空间的资源
    namespace string
    
    // tweakListOptions 可以自定义 List/Watch 的过滤条件
    tweakListOptions internalinterfaces.TweakListOptionsFunc
    
    // lock 保护 informers map 的并发访问
    lock sync.Mutex
    
    // defaultResync 默认的重新同步周期
    defaultResync time.Duration
    
    // customResync 特定类型的自定义重新同步周期
    customResync map[reflect.Type]time.Duration
    
    // transform 应用于所有 Informer 的对象转换函数
    transform cache.TransformFunc
    
    // informerName 用于指标追踪
    informerName *cache.InformerName

    // informers 缓存所有已创建的 Informer 实例
    // key 是对象的 reflect.Type,确保每种类型只有一个 Informer
    informers map[reflect.Type]cache.SharedIndexInformer
    
    // startedInformers 标记哪些 Informer 已经启动
    startedInformers map[reflect.Type]bool
    
    // wg 用于等待所有 Informer goroutine 退出
    wg sync.WaitGroup
    
    // shuttingDown 标记工厂是否正在关闭
    shuttingDown bool
}

注意到 `informers map[reflect.Type]cache.SharedIndexInformer` 这个字段,它确保每种资源类型只创建一个 Informer 实例。比如你调用 `factory.Core().V1().Pods()` 两次,返回的是同一个 Informer 实例。这避免了重复连接 API Server 的问题。

4.2 创建工厂实例

// staging/src/k8s.io/client-go/informers/factory.go(行 143-160)

// NewSharedInformerFactoryWithOptions 是创建工厂的主入口
// 支持通过选项模式配置各种参数
func NewSharedInformerFactoryWithOptions(
    client kubernetes.Interface, 
    defaultResync time.Duration, 
    options ...SharedInformerOption) SharedInformerFactory {
    
    factory := &sharedInformerFactory{
        client:           client,
        namespace:        v1.NamespaceAll,  // 默认监听所有命名空间
        defaultResync:    defaultResync,
        informers:        make(map[reflect.Type]cache.SharedIndexInformer),
        startedInformers: make(map[reflect.Type]bool),
        customResync:     make(map[reflect.Type]time.Duration),
    }

    // 应用所有选项
    for _, opt := range options {
        factory = opt(factory)
    }

    return factory
}

// 常用选项函数

// WithNamespace 只监听特定命名空间的资源
func WithNamespace(namespace string) SharedInformerOption {
    return func(factory *sharedInformerFactory) *sharedInformerFactory {
        factory.namespace = namespace
        return factory
    }
}

// WithCustomResyncConfig 为特定资源类型设置自定义同步周期
func WithCustomResyncConfig(resyncConfig map[v1.Object]time.Duration) SharedInformerOption {
    return func(factory *sharedInformerFactory) *sharedInformerFactory {
        for k, v := range resyncConfig {
            factory.customResync[reflect.TypeOf(k)] = v
        }
        return factory
    }
}

// WithTransform 为所有资源设置对象转换函数
func WithTransform(transform cache.TransformFunc) SharedInformerOption {
    return func(factory *sharedInformerFactory) *sharedInformerFactory {
        factory.transform = transform
        return factory
    }
}

工厂使用选项模式(Functional Options)来配置参数,这是一种非常优雅的 Go 设计模式。你可以组合多个选项,比如:`NewSharedInformerFactoryWithOptions(client, 30*time.Second, WithNamespace("default"), WithTransform(myTransform))`。这样代码既灵活又易读。

4.3 InformerFor 懒加载模式

// staging/src/k8s.io/client-go/informers/factory.go(行 241-265)

// InformerFor 懒加载创建或返回已存在的 Informer
// 这是工厂模式的核心方法
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, 
    newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
    
    f.lock.Lock()
    defer f.lock.Unlock()

    // 1. 根据对象类型查找是否已存在
    informerType := reflect.TypeOf(obj)
    informer, exists := f.informers[informerType]
    if exists {
        return informer  // 直接返回已有的 Informer
    }

    // 2. 获取该类型的自定义同步周期,如果没有就用默认周期
    resyncPeriod, exists := f.customResync[informerType]
    if !exists {
        resyncPeriod = f.defaultResync
    }

    // 3. 调用传入的创建函数创建新的 Informer
    informer = newFunc(f.client, resyncPeriod)
    
    // 4. 如果工厂设置了 transform,应用到 Informer
    if f.transform != nil {
        informer.SetTransform(f.transform)
    }
    
    // 5. 缓存并返回
    f.informers[informerType] = informer
    return informer
}

InformerFor 采用了懒加载(Lazy Loading)策略:直到第一次访问某个 Informer 时才创建它。比如你调用 `factory.Apps().V1().Deployments()` 时,实际上只是获取了一个 Informer 的"访问器"(DeploymentInformer),真正的 SharedIndexInformer 要等到你调用 `Informer()` 方法时才创建。

4.4 生命周期管理三件套:Start、WaitForCacheSync、Shutdown

// staging/src/k8s.io/client-go/informers/factory.go(行 162-239)

// Start 启动所有已创建的 Informer
// 必须在 WaitForCacheSync 之前调用
func (f *sharedInformerFactory) Start(stopCh

这三个方法构成了完整的生命周期管理:Start 启动所有 Informer,它们开始从 API Server 拉取数据。WaitForCacheSync 阻塞等待首次同步完成,只有当所有缓存都同步完成后才继续。这是非常重要的一步,因为如果缓存还没同步完成就开始处理业务逻辑,可能会读取到不完整的数据。Shutdown 优雅关闭工厂,它会等待所有后台 goroutine 安全退出。

4.5 典型使用模式

让我展示一个完整的控制器使用 SharedInformerFactory 的典型模式:

// 示例代码
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// 1. 创建工厂,默认同步周期 30 秒
factory := informers.NewSharedInformerFactory(client, 30*time.Second)

// 2. 获取 Informer(懒加载,此时还没有真正创建)
podInformer := factory.Core().V1().Pods()
deploymentInformer := factory.Apps().V1().Deployments()

// 3. 注册事件处理器
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: func(obj interface{}) {
        // 处理 Pod 添加事件
    },
    UpdateFunc: func(oldObj, newObj interface{}) {
        // 处理 Pod 更新事件
    },
    DeleteFunc: func(obj interface{}) {
        // 处理 Pod 删除事件
    },
})

// 4. 启动工厂(Start 不阻塞,立即返回)
factory.Start(ctx.Done())

// 5. 等待所有缓存同步完成(关键步骤!)
// 只有缓存同步完成后,才能安全地读取数据和处理事件
synced := factory.WaitForCacheSync(ctx.Done())
if err := synced.AsError(); err != nil {
    return fmt.Errorf("failed to sync cache: %w", err)
}

// 6. 现在可以安全地读取缓存
pods, _ := podInformer.Lister().List("")  // 列出所有 Pod

// 7. 运行控制器逻辑...
// controller.Run(ctx, workers)

// 8. 关闭工厂
factory.Shutdown()

这个模式是 Kubernetes 控制器开发的标准范式。记住:WaitForCacheSync 之前一定不能读取缓存或处理业务逻辑,否则可能会基于不完整的数据做出错误的决策。


五、DeltaFIFO 与增量机制

DeltaFIFO 是 Informer 内部的消息队列,存储所有资源变更的增量(Delta)。理解 DeltaFIFO 的工作原理,对于调试 Informer 相关问题和深入理解 Kubernetes 的事件驱动模型非常重要。

5.1 Delta 类型

// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go(行 178-195)

// DeltaType 定义了资源变更的类型
type DeltaType string

// 变更类型常量
const (
    // Added 当资源第一次被创建时
    Added DeltaType = "Added"
    
    // Modified 当资源的 spec 或 metadata 发生变化时
    Modified DeltaType = "Modified"
    
    // Deleted 当资源被删除时
    Deleted DeltaType = "Deleted"
    
    // Sync 当需要重新同步缓存时(定期触发)
    Sync DeltaType = "Sync"
    
    // Replaced 当收到完整的资源列表替换时
    // (Watch 断线重连后的第一次 LIST 响应)
    Replaced DeltaType = "Replaced"
    
    // Bookmark 特殊事件,用于保持 Watch 连接
    Bookmark DeltaType = "Bookmark"
    
    // ReplacedAll 全量替换所有资源
    ReplacedAll DeltaType = "ReplacedAll"
    
    // SyncAll 重新同步所有资源
    SyncAll DeltaType = "SyncAll"
)

// Delta 代表一个单独的变更
type Delta struct {
    Type   DeltaType  // 变更类型
    Object interface{}  // 变更的对象(可能是 *v1.Pod 等)
}

// Deltas 是一个 Delta 列表,按时间顺序排列
// 旧的在前面,新的在后面
type Deltas []Delta

Delta 的设计非常巧妙:同一个资源的多个变更会累积在同一个 Deltas 列表中。比如一个 Pod 先被创建(Added),然后被修改(Modified),最后被删除(Deleted),这三个 Delta 都会存储在 key 为 "namespace/pod-name" 的 Deltas 中。Pop 的时候会一次性返回整个列表,processDeltas 会依次处理每个 Delta。

5.2 DeltaFIFO 结构

// staging/src/k8s.io/client-go/tools/cache/delta_fifo.go(行 108-158)

// DeltaFIFO 是一个先进先出队列,但存储的是增量而不是完整对象
type DeltaFIFO struct {
    logger klog.Logger
    name string

    // lock 和 cond 保护 items 和 queue 的并发访问
    lock sync.RWMutex
    cond sync.Cond

    // items 存储所有的 Delta,key 是 namespace/name
    items map[string]Deltas

    // queue 维护出队顺序(FIFO)
    queue []string

    // synced 用于标记初始同步是否完成
    synced       chan struct{}
    syncedClosed bool

    // populated 标记是否已经有初始数据
    populated bool
    
    // initialPopulationCount 初始数据的数量
    initialPopulationCount int

    // keyFunc 用于生成对象的 key
    keyFunc KeyFunc

    // knownObjects 用于 Replace 操作时检测被删除的对象
    knownObjects KeyListerGetter

    // closed 标记队列是否已关闭
    closed bool

    // emitDeltaTypeReplaced 是否使用 Replaced 而不是 Sync
    emitDeltaTypeReplaced bool

    // transformer 在放入队列前转换对象
    transformer TransformFunc
}

DeltaFIFO 和普通 FIFO 的区别在于:它的每个元素不是单独的对象,而是一个 Deltas 列表,代表同一个对象的所有变更。knownObjects 字段用于 Replace 操作时检测哪些对象被删除了——当收到新的资源列表时,那些在旧列表中存在但新列表中不存在的对象,会生成 Deleted Delta。


六、常见问题解答(FAQ)

▼ Q1: 为什么我注册的 AddEventHandler 没有被调用?

A: 最常见的原因是在 WaitForCacheSync 完成之前就注册了处理器。如果你的处理器是在工厂 Start 之前注册的,那么它应该能正常收到事件。但如果是在 Start 之后、WaitForCacheSync 之前注册的,需要确保调用了 `handle.HasSyncedChecker()` 并等待其完成。另外,检查一下你的 Informer 是否已经调用了 `Run()` 方法,如果没有运行 Informer,它永远不会产生事件。


▼ Q2: SharedInformer 和 SharedIndexInformer 有什么区别?我该用哪个?

A: SharedIndexInformer 继承了 SharedInformer,并额外提供了两个能力:GetIndexer() 方法可以获取带索引的本地缓存,AddIndexers() 方法可以添加自定义索引。在实际使用中,client-go 提供的都是 SharedIndexInformer,因为它的能力是 SharedInformer 的超集。如果你需要按标签或字段快速查询资源,就必须用 SharedIndexInformer。SharedInformer 只提供基本的 GetStore() 方法,功能更有限。


▼ Q3: Resync 是什么?我需要设置多长的同步周期?

A: Resync 是定期重新触发同步事件的机制。即使 API Server 没有推送任何变更,Informer 也会按照设定的周期(比如 30 秒)向所有处理器发送一个 Sync 事件。处理器的 OnUpdate 会被调用,传入两个相同的对象(oldObj == newObj)。这个机制主要用于让控制器有机会重新检查和修正状态,比如某些边界情况导致上次处理失败。同步周期的选择取决于你的业务需求:周期越短,状态越实时,但 API Server 压力越大;周期越长,状态可能滞后,但资源消耗更少。对于大多数控制器,默认的 30 秒或几分钟是合理的。如果你有强实时性要求,可以缩短到 10-15 秒;如果你的控制器主要响应显式变更,可以设置更长的周期甚至禁用(设置为 0)。


▼ Q4: 为什么需要多个 processorListener?它们是线程安全的吗?

A: 多个 processorListener 是因为每个注册的处理器(AddEventHandler)都有自己独立的 listener 实例。这样设计的好处是:不同处理器可以有独立的 resync 周期、独立的处理速度、独立的同步状态追踪。一个处理器处理慢不会影响其他处理器。至于线程安全,每个 processorListener 内部都通过 channel 和 mutex 保证了并发安全。你不需要在处理器函数中额外加锁,但要注意:OnAdd/OnUpdate/OnDelete 是在单独的 goroutine 中调用的,如果你访问共享数据,仍然需要同步。


▼ Q5: Informer 的本地缓存会无限增长吗?如何清理?

A: Informer 的本地缓存(Indexer)会存储当前集群中所有符合条件(命名空间过滤)的资源对象。当资源被删除时,对应的缓存条目也会被删除。所以缓存大小是动态的,取决于集群中资源的数量,不会无限增长。但如果你监听的是集群级资源(如 Node、PersistentVolume),这些资源数量可能很大,缓存也会相应占用更多内存。需要清理时,只能停止整个 Informer,工厂无法单独清理某个缓存。


▼ Q6: 控制器重启后,Informer 会重新同步所有数据吗?

A: 是的,每次调用 Run() 启动 Informer 时,都会重新从 API Server 拉取全量数据(通过 LIST 请求),然后处理 Replace 事件更新本地缓存。这个过程对用户是透明的:Informer 会自动处理 Watch 断线重连,Controller 也会收到相应的 Add/Update/Delete 事件。所以即使控制器重启,只要调用了 WaitForCacheSync 并等待完成,你读取到的缓存数据就是完整的、一致的。


▼ Q7: AddEventHandler 返回的 ResourceEventHandlerRegistration 有什么用?

A: ResourceEventHandlerRegistration 是处理器的注册句柄,主要用于两个场景:第一,移除处理器:调用 `RemoveEventHandler(handle)` 可以安全地停止并移除一个处理器,相关的 goroutine 也会被正确关闭。第二,检查同步状态:调用 `handle.HasSyncedChecker()` 可以获取一个 DoneChecker,用于检查这个特定处理器的事件是否都已处理完毕(不仅仅是 Informer 缓存同步,而是处理器本身已处理完所有初始事件)。在 defer 中移除处理器是一个好习惯,可以避免 goroutine 泄漏。


▼ Q8: 为什么有 OnUpdate 和 OnDelete,但没有 OnCreate?创建资源时触发哪个方法?

A: 这是 Informer 的设计选择。当资源被创建时,会触发 OnAdd 方法,而不是单独的 OnCreate。这个设计背后的逻辑是:无论是"新建"还是"从缓存中读取",对于处理器来说都是"添加了一个对象"。从代码实现角度看,processDeltas 会检查对象是否已存在于缓存中,如果不存在就调用 Add,已存在则调用 Update。所以如果你需要区分"创建"和"更新",可以在 OnAdd 的 isInInitialList 参数上做判断:如果为 true,说明是首次从 LIST 响应中加载的;如果为 false,说明是新创建的资源。


▼ Q9: Indexer 和 Lister 有什么区别?我该用哪个读取数据?

A: Indexer 和 Lister 本质上访问的是同一个底层缓存,但提供了不同的接口。Indexer是底层的、直接操作缓存的结构,提供了 Get、List、Add、Update、Delete 等方法。Lister是上层封装,专门为某个资源类型设计的只读访问接口,比如 PodLister.Namespace("default").Get("nginx") 会比 Indexer.GetByKey("default/nginx") 更类型安全、更易用。对于控制器开发,推荐使用 Lister,因为它是类型化的,编译器能帮你检查是否传错了类型。Indexer 更底层,通常用于需要自定义索引或批量操作的场景。


▼ Q10: Watch 断开后,Informer 会自动重连吗?重连期间的数据丢失怎么处理?

A: 是的,Informer 有自动重连机制。当 Watch 连接断开时,Reflector 会等待一段时间后重新发起 LIST 请求,然后从 API Server 获取全量数据和最新的 ResourceVersion,再继续 Watch。整个过程对用户是透明的。关键在于"重连期间的数据丢失"问题:假设在 Watch 断开期间,某 Pod 被创建又删除了,等重连后 LIST 响应中没有这个 Pod,所以这个事件就丢失了。但这不是 bug,而是 Kubernetes 官方的设计权衡——为了保证一致性,选择了每次重连都做全量同步(LIST),而不是维护一个巨大的增量日志。对于大多数控制器来说,这没有问题,因为它们主要依赖 Watch 事件的实时性和 Resync 的兜底。但如果你的业务需要 100% 不丢失事件,可能需要额外的外部机制(如审计日志)来补全。


▼ Q11: 如何调试 Informer 的事件分发?有没有什么好的技巧?

A: 调试 Informer 有几个实用技巧:第一,使用 CacheMutationDetector:SharedIndexInformer 内部有一个 CacheMutationDetector,如果你修改了从缓存获取的对象(而不是通过 Indexer 的 Update 方法),它会检测到并打印警告。第二,打印事件日志:可以在事件处理器中打印日志,观察 OnAdd/OnUpdate/OnDelete 的调用频率和时机。第三,检查 HasSynced 状态:确保在 WaitForCacheSync 返回 true 后再进行调试,否则可能读取到不完整的数据。第四,使用 kubectl get 验证:通过 kubectl 获取资源,对比 Informer 缓存中的数据,看是否一致。对于更高级的调试,可以启用 client-go 的 trace 日志,看 Reflector 的 List/Watch 详细日志。


▼ Q12: 事件处理器 panic 了会怎样?会不会影响整个 Informer?

A: 不会影响整个 Informer。processorListener.run() 内部有 panic 捕获机制,当事件处理函数(OnAdd/OnUpdate/OnDelete)panic 时,会被 utilruntime.HandleCrash() 捕获并记录日志,然后跳过这个事件继续处理下一个。这个设计保证了:一个处理器出问题不会阻塞其他处理器,也不会导致整个 Informer 崩溃。但要注意的是:panic 的事件会被跳过,不会重试。如果你的业务逻辑必须在每次事件发生时都执行,应该在事件处理函数内部做错误处理和重试,而不是依赖 Informer 来保证调用。最佳实践是在处理函数中使用 defer + recover 捕获 panic,并记录详细错误信息便于排查。


▼ Q13: 同一个资源类型可以注册多个事件处理器吗?它们收到的事件顺序一样吗?

A: 可以注册多个处理器。每个处理器收到的事件顺序是相同的(按资源变更的时序),但到达时间可能略有不同,因为处理器是独立运行的 goroutine。具体来说:shareProcessor.distribute() 会依次向每个 listener.addCh 发送事件,但这是非阻塞的快速操作。各个 processorListener 的 pop() goroutine 独立从 addCh 接收,然后各自的 run() goroutine 独立处理。所以两个不同的处理器可能一个已经处理完 Add 事件并开始处理 Update,而另一个还在处理 Add。但这不会造成逻辑问题,因为事件本身是按序产生的,各处理器独立处理不会产生竞态条件。


▼ Q14: 什么是 pendingNotifications 缓冲区?它会不会导致内存溢出?

A: pendingNotifications 是一个无界环形缓冲区,用于暂存来不及处理的事件。它的设计目的是:防止一个慢处理器阻塞其他处理器和数据流。当 run() goroutine 处理速度跟不上 pop() 接收速度时,新事件会被写入这个缓冲区。如果一个处理器持续很慢,这个缓冲区会不断增长,最终可能导致 OOM。这是一个已知的设计权衡。代码注释里也提到:"This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but we should try to do something better." 所以在实际使用中,不要在事件处理器中做耗时的同步操作。如果需要做耗时操作,应该把任务放入工作队列(WorkQueue),然后用单独的 worker goroutine 异步处理。缓冲区大小可以通过 newProcessListener 的 bufferSize 参数控制,默认是 initialBufferSize(通常 1024)。


▼ Q15: AddEventHandler 和 AddEventHandlerWithOptions 有什么区别?

A: AddEventHandlerWithOptions 提供了更丰富的配置选项。主要区别在于:AddEventHandler 使用工厂默认的 resync 周期,不能自定义。而 AddEventHandlerWithOptions 可以通过 HandlerOptions 设置:ResyncPeriod:自定义的重新同步周期,会覆盖工厂的默认值;Logger:为这个处理器指定独立的日志记录器;Name:给处理器起个名字,便于在日志和指标中识别。实际上,AddEventHandler 内部就是调用 AddEventHandlerWithOptions,使用默认参数。高级用法示例:`informer.AddEventHandlerWithOptions(handler, HandlerOptions{ResyncPeriod: 10*time.Second, Name: "my-custom-handler"})`。


▼ Q16: 为什么需要 TransformFunc?它有什么实际用途?

A: TransformFunc 允许在资源对象存入本地缓存之前对其进行转换或过滤。这是一个强大的钩子,常用场景包括:精简数据:删除不需要的字段(如 status、annotations 的某些 key),减少内存占用;添加默认值:为缺失的字段设置默认值;敏感信息过滤:删除 Secret 的 data 内容(只保留 key 列表);资源压缩:将多个 annotation 合并,节省存储空间。Transform 在 DeltaFIFO 的队列中就会应用,所以任何通过事件获取的对象和从 Indexer/Lister 获取的对象都会经过转换。使用示例:`factory := NewSharedInformerFactoryWithOptions(client, 30*time.Second, WithTransform(func(obj interface{}) (interface{}, error) { return clearSecretData(obj), nil }))`。


▼ Q17: Controller 和 Informer 使用同一个 SharedInformerFactory,会有资源竞争吗?

A: 不会有资源竞争。SharedInformerFactory 的核心价值就是共享资源:同一个 Pod Informer 实例被多个控制器(DeploymentController、ReplicaSetController、EndpointController 等)共享,它们都通过同一个 Informer 监听 Pod 变化,但各自有独立的事件处理器(processorListener)。共享体现在:共享底层的 List/Watch 连接、共享 DeltaFIFO 队列、共享 Indexer 缓存。各控制器的独立性体现在:独立的事件处理器、独立 resync 周期、独立同步状态追踪。所以,即使 10 个控制器都监听 Pod,也不会创建 10 个到 API Server 的连接,只会有 1 个 Watch 连接和 1 个 LIST 请求。


▼ Q18: 什么是 Bookmarks 事件?为什么要保留它?

A: Bookmark 是一种特殊的 Delta 类型,用于保持 Watch 连接的活跃状态。在 Kubernetes 的 Watch 机制中,如果长时间没有任何事件发生,Watch 连接可能会被网络中间设备(如负载均衡器、NAT 网关)超时断开。为了防止这种情况,API Server 会定期发送 Bookmark 事件(包含当前的 ResourceVersion),提醒客户端连接仍然活跃。从代码角度看,Bookmark 事件会被 processDeltas 特殊处理:它会更新本地缓存的 ResourceVersion 记录,但不会触发任何处理器回调。所以你注册的 OnAdd/OnUpdate/OnDelete 不会被 Bookmark 事件调用。Bookmark 主要用于 Informer 内部的资源版本追踪,普通开发者一般不需要关心它。


▼ Q19: 在 Handler 中修改对象会生效吗?为什么?

A: 直接修改 Handler 参数不会生效,但也不会报错,只是修改被丢弃了。这是因为 Informer 传给你的对象是缓存中的引用,processDeltas 在处理完事件后不会把对象再写回缓存(除非通过 Indexer 的 Update 方法显式操作)。有两个理由不要直接修改:第一,修改不会被保存,下次从缓存获取的还是原始对象;第二,SharedIndexInformer 内部有 CacheMutationDetector,会检测到这种"意外修改"并打印警告(如果启用了 debug 模式)。正确的做法是:如果需要更新资源,调用 Kubernetes client 的 Update/Patch 方法;如果需要更新缓存,在 Indexer 上调用 Update 方法。


▼ Q20: SharedIndexInformer 和 NewSharedInformer 有什么区别?应该用哪个?

A: NewSharedInformer 和 NewSharedIndexInformer 最终返回的都是 SharedIndexInformer,区别只是参数和便捷性:NewSharedInformer 是简化版本,只接受最基本的参数(ListerWatcher、示例对象、resync 周期),返回类型是 SharedInformer(接口);NewSharedIndexInformer 是完整版本,额外接受 Indexers 参数,可以创建带自定义索引的缓存,返回类型是 SharedIndexInformer(接口)。实际使用中,client-go 的代码生成工具(如 informer-gen)生成的代码都使用 NewSharedIndexInformerWithOptions,给你最大的灵活性。所以结论是:用 NewSharedIndexInformer 或 NewSharedIndexInformerWithOptions,除非你确定不需要索引功能。


七、总结与实践建议

让我们回顾一下今天学到的核心知识点:

  • Informer vs Controller:Informer 是数据管道,负责从 API Server 拉取数据并维护本地缓存;Controller 是业务逻辑,负责监听事件、计算期望状态并采取行动。两者是上下游关系,不是替代关系。
  • SharedIndexInformer 三大组件:Indexer(本地缓存)、Controller(增量处理)、Processor(事件分发)。理解这三者的协作关系,就理解了整个 Informer 的核心。
  • shareProcessor 三个 goroutine:pop() 接收并缓冲事件、run() 同步调用处理器、watchSynced() 追踪同步状态。三者协作保证事件可靠分发且不会阻塞。
  • SharedInformerFactory 生命周期:Start 启动所有 Informer、WaitForCacheSync 等待首次同步完成、Shutdown 优雅关闭。WaitForCacheSync 是关键步骤,必须在处理业务逻辑之前完成。
  • DeltaFIFO 增量机制:存储每个对象的变更历史(Added/Modified/Deleted 等),保证事件不丢失且按序处理。

提示:本文档基于 Kubernetes 1.36.1 版本的 client-go 源码编写。不同版本可能在细节上有所差异,但核心机制保持一致。建议读者 clone 源码仓库,使用 IDE 的跳转功能亲自阅读这些文件的完整内容,加深理解。


相关阅读

Kubernetes client-go Informer 源码深度解析 · 来源:Kubernetes 1.36.1 源码分析