惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

腾讯CDC
Hacker News: Ask HN
Hacker News: Ask HN
S
Securelist
Security Latest
Security Latest
S
Schneier on Security
T
Threat Research - Cisco Blogs
Latest news
Latest news
Cyberwarzone
Cyberwarzone
A
Arctic Wolf
Threat Intelligence Blog | Flashpoint
Threat Intelligence Blog | Flashpoint
NISL@THU
NISL@THU
cs.AI updates on arXiv.org
cs.AI updates on arXiv.org
I
Intezer
T
The Exploit Database - CXSecurity.com
N
News and Events Feed by Topic
Simon Willison's Weblog
Simon Willison's Weblog
T
Tor Project blog
Blog — PlanetScale
Blog — PlanetScale
C
Cyber Attacks, Cyber Crime and Cyber Security
C
CERT Recently Published Vulnerability Notes
The Hacker News
The Hacker News
月光博客
月光博客
WordPress大学
WordPress大学
博客园 - 叶小钗
Hugging Face - Blog
Hugging Face - Blog
美团技术团队
量子位
cs.CL updates on arXiv.org
cs.CL updates on arXiv.org
C
Cisco Blogs
博客园 - 三生石上(FineUI控件)
Google DeepMind News
Google DeepMind News
Project Zero
Project Zero
Webroot Blog
Webroot Blog
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
Application and Cybersecurity Blog
Application and Cybersecurity Blog
云风的 BLOG
云风的 BLOG
L
LINUX DO - 最新话题
Schneier on Security
Schneier on Security
Engineering at Meta
Engineering at Meta
www.infosecurity-magazine.com
www.infosecurity-magazine.com
aimingoo的专栏
aimingoo的专栏
D
Docker
有赞技术团队
有赞技术团队
Google DeepMind News
Google DeepMind News
宝玉的分享
宝玉的分享
T
Troy Hunt's Blog
L
Lohrmann on Cybersecurity
T
The Blog of Author Tim Ferriss
cs.CV updates on arXiv.org
cs.CV updates on arXiv.org
L
LangChain Blog

博客园 - 左扬

Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:生产级 Controller 实践:并发安全、资源清理与高可用设计 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析: Controller 调试与诊断工具:从日志分析到问题定位 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:DynamicClient 操作 CRD:无需代码生成的动态操作 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:控制器与 APIServer 完整交互流程:从 Watch 到缓存同步 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:错误处理与重试机制:WorkQueue 限速器详解 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:Leader 选举机制:高可用控制器的必备技能 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:Controller 开发模式完整实战 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:SharedInformerFactory 与等待缓存同步 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:从认证配置到 Deployment 操作 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:版本对应、架构组件与组件关系 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:Informer 源码深度解析:从底层原理到实战应用 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:ListWatcher 源码深度解析 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:Indexer 与 ThreadSafeStore 核心原理与源码深度剖析 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:DeltaFIFO 核心原理与源码深度剖析 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:workqueue 核心原理与实战 Kubernetes 编程 / Operator 专题【左扬精讲】—— runtime.Codec 资源编解码:serializer 与 codec 差异、编解码数据结构、codec 核心调用链路 Kubernetes 编程 / Operator 专题【左扬精讲】—— Scheme 资源注册机制全解 Kubernetes 编程 / Operator 专题【左扬精讲】—— Kubernetes 自定义资源的内部版本与外部版本:从源码看版本定义机制 Kubernetes 编程 / Operator 专题【左扬精讲】—— Kubernetes 1.36.1 核心 API 数据结构全解 Kubernetes 编程 / Operator 专题【左扬精讲】—— Kubernetes 构建过程 【AIOPS】一文读懂LLM【左扬精讲】:从诞生到普及,解锁大语言模型的核心密码 【AIOPS】AI Agent 专题【左扬精讲】核心功能篇:MCP-VictoriaMetrics Hooks 源码精讲:Hooks 可观测性的无侵入式实现 【AIOPS】AI Agent 专题【左扬精讲】核心功能篇:MCP-VictoriaMetrics Golang 配置解析源码精讲 ——SRE 自定义 Agent 核心技巧 【AIOPS】AI Agent 专题【左扬精讲】核心功能篇:MCP-VictoriaMetrics Golang 并发模型解析 ——SRE 应对高并发采集的调优思路 【AIOPS】AI Agent 专题【左扬精讲】基础架构篇:MCP-VictoriaMetrics Golang 源码整体架构拆解 ——SRE 必懂的核心模块与数据流 OpenTelemetry 开发实战【左扬精讲】—— 云原生可观测体系构建与分布式追踪二次开发 Kubernetes 编程 / Operator 专题【左扬精讲】—— Operator 开发实战项目 7 —— 基于流量预测模型的智能弹性扩缩容 Operator 实战(AIOps 模型训练与智能扩容(下篇)—— 预测式弹性扩缩容 Operator 落地实现) Kubernetes 编程 / Operator 专题【左扬精讲】—— Operator 开发实战项目 7 —— 基于流量预测模型的智能弹性扩缩容 Operator 实战(AIOps 模型训练与智能扩容(上篇)—— 时序预测模型构建与离线训练) Kubernetes 编程 / Operator 专题【左扬精讲】—— Operator 开发实战项目 6 —— 基于运维专家知识库的智能故障诊断与排查 Operator 实战 Kubernetes 编程 / Operator 专题【左扬精讲】—— Operator 开发实战项目 5 —— 基于大语言模型(LLM)的实时日志流智能监测 Operator 实现 Kubernetes 编程 / Operator 专题【左扬精讲】—— Operator 开发实战项目 4 —— 基于 Operator 实现大模型私有化部署与管理 Kubernetes 编程 / Operator 专题【左扬精讲】—— Operator 开发实战项目 3 —— 面向 AI / 算力调度场景:GPU 竞价实例资源池统一调度管理 Operator 开发 Kubernetes编程 / Operator专题【左扬精讲】—— Operator 开发实战项目 2 —— 面向零售 / 电商潮汐流量难题:多云多集群数据中心级全链路弹性伸缩 DataCenter Scaler Operator 从 0 到 1 全链路开发 Kubernetes编程 / Operator专题【左扬精讲】—— 深入理解Kubebuilder注解:为什么Operator开发离不开这些特殊注释 Kubernetes编程 / Operator专题【左扬精讲】—— Operator 开发实战项目1 —— Applicaion Operator(通用应用生命周期管理 Operator 实战) Pod 镜像拉取失败?kubectl edit pods修改镜像地址的底层原理与实操 (该方法仅为临时应急方案,并非长期解决方案) Kubernetes编程/Operator专题精讲—— 理解控制器模式 —— 控制器模式的核心原理与实现逻辑(从原理到实践) 【AIOPS】AI Agent 专题【左扬精讲】模型微调实战:一站式平台 LLaMA-Factory 【AIOPS】AI Agent 专题【左扬精讲】基于 k8s+vLLM+Ray 分布式部署全指南:架构设计、资源调度与性能优化 【AIOPS】AI Agent专题【左扬精讲】非量化版DeepSeek分布式部署全指南:精度保障、显存规划与Ollama/vLLM选型 【AIOPS】AI Agent 专题【左扬精讲】零开发框架实现 ReAct Agent(Go SRE友好)
Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:Reflector 源码深度解析
左扬 · 2026-06-13 · via 博客园 - 左扬

Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:Reflector 源码深度解析

当我们写 Kubernetes 控制器或 Operator 时,最核心的需求就是监听集群中资源的变化。当 Pod 被创建了、Deployment 被修改了、Service 被删除了……这些事件必须实时捕获,才能驱动控制器的 Reconcile 逻辑。整个 Informer 系统的核心就是 Reflector(反射器)。Reflector 的职责是从 API Server 拉取资源列表(List),建立长连接监听资源变化(Watch),然后将变化写入本地缓存(Store)。这篇文章我们就从源码出发,把 Reflector 的初始化、启动流程、以及核心的 ListAndWatch 机制彻底讲透。

Kubernetes client-go v1.36.1 Reflector Informer

🔓 学习重点提示  — 建议先通读全文,再重点回顾标注内容

★ 重点掌握(必须)
   • Reflector 结构体字段:理解每个字段的含义,特别是 lastSyncResourceVersion、listerWatcher、store 的作用
   • NewReflectorWithOptions:构造函数的完整初始化流程,特别是 WatchList 特性的开关逻辑
   • RunWithContext:delayHandler.Until 的调用模式,理解为什么 ListAndWatch 会在失败后自动重试
   • ListAndWatchWithContext:两阶段模式——先 List 初始化缓存,再 Watch 增量同步
   • watch() 方法:watch 循环的完整流程、错误分类与退避重试机制
   • handleAnyWatch:事件处理循环,对 Added/Modified/Deleted/Bookmark 的处理逻辑

☆ 次重点(了解即可)
   • watchList(WatchList 特性)与传统 List-Watch 的区别
   • resync 周期同步机制
   • ResourceVersion 过期和回退逻辑


一、Reflector 在整个系统中的定位

在 Kubernetes 客户端工具链中,Reflector 处于一个承上启下的关键位置。它位于 ListerWatcher(ListWatch)和 Store(本地缓存)之间,负责将 API Server 的远程数据同步到本地。它的上游是 ListWatch(通过 ListerWatcherWithContext 接口),下游是 Store(通过 ReflectorStore 接口)。Reflector 本身不存储数据,它只是一个"搬运工"——从 API Server 拉取数据,写入 Store。

Reflector 的核心哲学是"最终一致性":它保证最终会看到所有事件,但中间可能会丢失一些事件(特别是连接断开时)。这是分布式系统中的经典 trade-off——为了可用性和性能,牺牲了强一致性。


二、Reflector 结构体:核心字段详解

Reflector 的结构体定义在 staging/src/k8s.io/client-go/tools/cache/reflector.go 中。理解每个字段的含义,是理解整个 Reflector 工作原理的基础。

// staging/src/k8s.io/client-go/tools/cache/reflector.go(行 52-171)

// 常量定义:Watch 超时和退避参数
const (
    // Watch 请求的超时时间在 [5min, 10min] 之间随机选择
    // 这样可以避免大量 Informer 同时在同一个时间点过期
    defaultMinWatchTimeout = 5 * time.Minute
    defaultMaxWatchTimeout = 2 * defaultMinWatchTimeout  // 10 分钟

    // 退避策略:从 800ms 开始,最大 30s,乘以 2.0 因子
    defaultBackoffInit    = 800 * time.Millisecond
    defaultBackoffMax     = 30 * time.Second
    defaultBackoffReset  = 2 * time.Minute   // 2 分钟无错误则重置退避
    defaultBackoffFactor = 2.0
)

// ReflectorStore 接口:Reflector 使用的 Store 子集
type ReflectorStore interface {
    Add(obj interface{}) error
    Update(obj interface{}) error
    Delete(obj interface{}) error
    // Replace 用新列表完全替换缓存内容
    Replace([]interface{}, string) error
    // Resync 是为了让某些 Store(如 DeltaFIFO)实现周期同步
    Resync() error
}

// Reflector 结构体
type Reflector struct {
    // 日志记录器(支持上下文日志)
    logger klog.Logger

    // name:标识这个 Reflector,默认值是调用位置的 file:line
    name string

    // typeDescription:用于日志显示的类型描述,如 "v1.Pod"
    typeDescription string

    // expectedType:期望放入缓存的对象类型(reflect.Type)
    // 用于类型安全检查
    expectedType reflect.Type

    // expectedGVK:如果是 unstructured 对象,记录期望的 GVK
    expectedGVK *schema.GroupVersionKind

    // store:本地缓存,Reflector 将数据写入这里
    // 可以是 Indexer、DeltaFIFO、Store 等类型
    store ReflectorStore

    // listerWatcher:用于执行 List 和 Watch 的组件
    // 通常是 ListWatch 类型
    listerWatcher ListerWatcherWithContext

    // resyncPeriod:周期同步的间隔,0 表示不进行周期同步
    // 周期同步会调用 store.Resync(),让 Store 重新处理所有对象
    resyncPeriod time.Duration

    // delayHandler:退避延迟函数,用于重试时的延迟
    // 默认实现是指数退避:800ms -> 1.6s -> 3.2s -> ... -> 30s
    delayHandler wait.DelayFunc

    // minWatchTimeout / maxWatchTimeout:Watch 请求的超时范围
    minWatchTimeout time.Duration
    maxWatchTimeout time.Duration

    // clock:用于测试中控制时间
    clock clock.Clock

    // paginatedResult:标记第一次 List 是否返回了分页结果
    // 这个信息会影响后续 List 是否使用分页
    paginatedResult bool

    // lastSyncResourceVersion:上次同步时的资源版本
    // 这是保证 Watch 连续性的关键:下次 Watch 从这个版本开始
    lastSyncResourceVersion string

    // isLastSyncResourceVersionUnavailable:
    // 标记上次请求的 RV 是否过期(410 Gone 或 RV 太大错误)
    // 如果为 true,下次 List 会使用 RV="" 来获取最新数据
    isLastSyncResourceVersionUnavailable bool

    // lastSyncResourceVersionMutex:保护上述字段的读写锁
    lastSyncResourceVersionMutex sync.RWMutex

    // watchErrorHandler:Watch 出错时的回调函数
    watchErrorHandler WatchErrorHandlerWithContext

    // WatchListPageSize:List 请求的分页大小
    WatchListPageSize int64

    // ShouldResync:判断是否需要进行周期同步的函数
    // nil 表示总是返回 true
    ShouldResync func() bool

    // MaxInternalErrorRetryDuration:内部错误最大重试时长
    MaxInternalErrorRetryDuration time.Duration

    // useWatchList:是否使用 WatchList 特性(流式初始化)
    // WatchList 特性可以减少 API Server 的资源消耗
    useWatchList bool
}

Reflector 的字段可以分为几组:标识字段(name、typeDescription)、依赖注入(store、listerWatcher)、时间控制(resyncPeriod、clock)、错误处理(watchErrorHandler)和状态跟踪(lastSyncResourceVersion)。理解这些字段后,我们来看它们是如何初始化的。


三、NewReflectorWithOptions:构造函数的完整初始化流程

3.1 工厂函数家族

Reflector 提供了多个工厂函数,从最简到最复杂:

// staging/src/k8s.io/client-go/tools/cache/reflector.go(行 239-249)

// NewReflector:最简版本,只需要 ListerWatcher、类型、Store 和同步周期
func NewReflector(lw ListerWatcher, expectedType interface{}, store ReflectorStore, resyncPeriod time.Duration) *Reflector {
    return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{ResyncPeriod: resyncPeriod})
}

// NewNamedReflector:带命名的版本,用于调试时区分不同的 Reflector
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store ReflectorStore, resyncPeriod time.Duration) *Reflector {
    return NewReflectorWithOptions(lw, expectedType, store, ReflectorOptions{Name: name, ResyncPeriod: resyncPeriod})
}

// ReflectorOptions:配置选项结构体
type ReflectorOptions struct {
    Logger          *klog.Logger      // 自定义日志记录器
    Name            string           // Reflector 名称
    TypeDescription string           // 类型描述
    ResyncPeriod    time.Duration    // 同步周期
    MinWatchTimeout time.Duration    // Watch 最小超时
    Clock           clock.Clock      // 时间源(用于测试)
    Backoff         *wait.Backoff   // 自定义退避策略
}

3.2 NewReflectorWithOptions 完整实现

// staging/src/k8s.io/client-go/tools/cache/reflector.go(行 286-371)

func NewReflectorWithOptions(
    lw ListerWatcher,               // ListWatcher,用于执行 List 和 Watch
    expectedType interface{},       // 期望的类型(如 &v1.Pod{})
    store ReflectorStore,          // 本地缓存(如 DeltaFIFO、Indexer)
    options ReflectorOptions,      // 配置选项
) *Reflector {
    // 1. 初始化时间源
    reflectorClock := options.Clock
    if reflectorClock == nil {
        reflectorClock = clock.RealClock{}
    }

    // 2. 设置 Watch 超时范围
    minWatchTimeout := defaultMinWatchTimeout  // 5 分钟
    maxWatchTimeout := defaultMaxWatchTimeout  // 10 分钟
    if options.MinWatchTimeout > defaultMinWatchTimeout {
        minWatchTimeout = options.MinWatchTimeout
        maxWatchTimeout = 2 * minWatchTimeout
    }
    if maxWatchTimeout < minWatchTimeout {
        maxWatchTimeout = minWatchTimeout
    }

    // 3. 初始化退避策略
    // 默认退避:800ms * 2^n,到达 30s 后保持,最多 11 步
    backoff := options.Backoff
    if backoff == nil {
        backoff = &wait.Backoff{
            Duration:  defaultBackoffInit,
            Cap:       defaultBackoffMax,
            Steps:     int(math.Ceil(float64(defaultBackoffMax) / float64(defaultBackoffInit))),
            Factor:    defaultBackoffFactor,
            Jitter:    defaultBackoffJitter,
        }
    }

    // 4. 创建 Reflector 实例
    r := &Reflector{
        name:              options.Name,
        resyncPeriod:      options.ResyncPeriod,
        minWatchTimeout:   minWatchTimeout,
        maxWatchTimeout:   maxWatchTimeout,
        typeDescription:   options.TypeDescription,
        listerWatcher:     ToListerWatcherWithContext(lw),  // 转换为带上下文的版本
        store:             store,
        delayHandler:      backoff.DelayWithReset(reflectorClock, defaultBackoffReset),
        clock:             reflectorClock,
        watchErrorHandler: WatchErrorHandlerWithContext(DefaultWatchErrorHandler),
        expectedType:      reflect.TypeOf(expectedType),
    }

    // 5. 如果 name 为空,自动从调用栈获取
    // 跳过 internalPackages(如 client-go/tools/cache/)以获取有意义的文件名
    if r.name == "" {
        r.name = naming.GetNameFromCallsite(internalPackages...)
    }

    // 6. 初始化日志记录器
    logger := klog.Background()
    if options.Logger != nil {
        logger = *options.Logger
    }
    logger = klog.LoggerWithName(logger, r.name)
    r.logger = logger

    // 7. 初始化类型描述
    if r.typeDescription == "" {
        r.typeDescription = getTypeDescriptionFromObject(expectedType)
    }

    // 8. 获取期望的 GVK(用于 unstructured 对象)
    if r.expectedGVK == nil {
        r.expectedGVK = getExpectedGVKFromObject(expectedType)
    }

    // 9. 决定是否启用 WatchList 特性
    // WatchList 特性使用流式接口,减少 API Server 资源消耗
    r.useWatchList = clientfeatures.FeatureGates().Enabled(clientfeatures.WatchListClient)
    if r.useWatchList && watchlist.DoesClientNotSupportWatchListSemantics(lw) {
        r.logger.V(2).Info(
            "Client doesn't support WatchList semantics, disabling feature",
            "feature", clientfeatures.WatchListClient,
        )
        r.useWatchList = false
    }

    return r
}

初始化过程中有几个关键点值得注意。第一,delayHandler 使用了 backoff.DelayWithReset,这意味着退避会在 2 分钟无错误后自动重置为初始值(800ms),这样 API Server 恢复健康后,客户端可以快速恢复而无需等待漫长的退避延迟。第二,WatchList 特性的启用需要同时满足两个条件:FeatureGate 开启,以及客户端支持 WatchList 语义。第三,name 的自动推断使用了 GetNameFromCallsite,会跳过 client-go/tools/cache/ 包本身,找到实际调用者的代码位置。


四、RunWithContext:Reflector 的启动入口

当 Informer 启动时,最终会调用 Reflector.RunWithContext()。这个方法是 Reflector 的启动入口,它使用 delayHandler.Until 来实现无限循环 + 退避重试的模式。

// staging/src/k8s.io/client-go/tools/cache/reflector.go(行 411-435)

// Run 反复执行 ListAndWatch,直到 stopCh 被关闭
// 注意:这是一个阻塞方法,应该在单独的 goroutine 中调用
func (r *Reflector) Run(stopCh

delayHandler.Until 的工作方式类似于一个带有退避延迟的无限循环。它接收一个函数作为参数,如果函数返回 true,就停止循环;如果返回 false,就继续循环。每次循环之间,它会等待 delayHandler 决定的延迟时间。当 context 被取消时,Until 会立即返回,循环结束。

这里有一个精妙的退避机制:delayHandler 是通过 backoff.DelayWithReset 创建的。DelayWithReset 的特点是:如果连续成功执行超过 resetDuration(默认 2 分钟),延迟就会重置回初始值(800ms)。这意味着在正常情况下,每次 ListAndWatch 成功后,下次立即重试。只有在连续失败超过 2 分钟的情况下,退避才会增长到最大值(30s)。


五、ListAndWatchWithContext:两阶段同步的核心逻辑

ListAndWatchWithContext 是 Reflector 的核心方法。它的设计哲学是"两阶段同步":第一阶段用 List 获取全量数据并初始化本地缓存,第二阶段用 Watch 持续同步增量变化。

// staging/src/k8s.io/client-go/tools/cache/reflector.go(行 458-509)

// ListAndWatchWithContext 先 List 全量数据,再用 Watch 增量同步
func (r *Reflector) ListAndWatchWithContext(ctx context.Context) error {
    logger := klog.FromContext(ctx)
    logger.V(3).Info("Listing and watching",
        "type", r.typeDescription,
        "reflector", r.name,
    )

    var err error
    var w watch.Interface
    fallbackToList := !r.useWatchList  // 是否回退到传统 List 模式

    // defer 确保函数退出时关闭 Watch
    defer func() {
        if w != nil {
            w.Stop()
        }
    }()

    // ===== 阶段一:获取初始数据 =====

    if r.useWatchList {
        // 尝试使用 WatchList 特性(流式初始化)
        w, err = r.watchList(ctx)
        if w == nil && err == nil {
            // stopCh 被关闭,正常退出
            return nil
        }
        if err != nil {
            logger.V(4).Info(
                "WatchList failed, falling back to regular list. "+
                    "This is expected if watchlist is not supported or disabled.",
                "err", err,
            )
            fallbackToList = true
            w = nil
        }
    }

    if fallbackToList {
        // 传统模式:使用 List 获取全量数据
        err = r.list(ctx)
        if err != nil {
            return err
        }
    }

    logger.V(2).Info("Caches populated",
        "type", r.typeDescription,
        "reflector", r.name,
    )

    // ===== 阶段二:启动增量同步 =====

    return r.watchWithResync(ctx, w)
}

5.1 两种初始化模式的对比

ListAndWatchWithContext 支持两种初始化模式:

特性传统 List-WatchWatchList(流式)
初始化方式 GET /api/v1/pods(分页) GET /api/v1/pods?watch=true&sendInitialEvents=true
API Server 负载 较高(多次 LIST 请求) 较低(单次流式请求)
初始事件 无 synthetic Added 事件 有 synthetic Added + Bookmark 事件
兼容性 所有版本 需要 K8s 1.27+ 和 WatchList 特性门

5.2 list() 方法:执行 List 请求

list() 方法负责从 API Server 获取全量数据,并将数据写入 Store。这个过程涉及到分页处理、资源版本管理、以及错误恢复。

// staging/src/k8s.io/client-go/tools/cache/reflector.go(行 672-783)

// list 执行 List 请求,将结果写入 Store
func (r *Reflector) list(ctx context.Context) error {
    var resourceVersion string
    // relistResourceVersion() 决定使用哪个 RV:
    // - 首次 List:RV="0"(允许从 watch cache 返回)
    // - 后续 List:使用上次的 lastSyncResourceVersion
    // - RV 不可用:RV=""(强制从 etcd 读取最新数据)
    options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}

    initTrace := trace.New("Reflector ListAndWatch", trace.Field{Key: "name", Value: r.name})
    defer initTrace.LogIfLong(10 * time.Second)

    // 使用 pager 进行分页 List
    pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
        return r.listerWatcher.ListWithContext(ctx, opts)
    }))

    // 根据情况设置分页大小
    switch {
    case r.WatchListPageSize != 0:
        pager.PageSize = r.WatchListPageSize
    case r.paginatedResult:
        // 上次 List 返回了分页结果,保持分页
    case options.ResourceVersion != "" && options.ResourceVersion != "0":
        // 非首次 List,强制关闭分页以使用 watch cache
        pager.PageSize = 0
    }

    list, paginatedResult, err := pager.ListWithAlloc(context.Background(), options)

    // 如果 RV 过期,立即重试一次(使用 RV="")
    if isExpiredError(err) || isTooLargeResourceVersionError(err) {
        r.setIsLastSyncResourceVersionUnavailable(true)
        list, paginatedResult, err = pager.ListWithAlloc(context.Background(),
            metav1.ListOptions{ResourceVersion: r.relistResourceVersion()})
    }

    if err != nil {
        return fmt.Errorf("failed to list %v: %w", r.typeDescription, err)
    }

    // 记录是否为分页结果
    if options.ResourceVersion == "0" && paginatedResult {
        r.paginatedResult = true
    }

    r.setIsLastSyncResourceVersionUnavailable(false)

    // 提取列表中的对象
    listMeta, err := meta.ListAccessor(list)
    resourceVersion = listMeta.GetResourceVersion()

    items, err := meta.ExtractListWithAlloc(list)
    if err != nil {
        return fmt.Errorf("unable to understand list result: %v", err)
    }

    // syncWith 用新列表完全替换 Store 内容
    if err := r.syncWith(items, resourceVersion); err != nil {
        return fmt.Errorf("unable to sync list result: %v", err)
    }

    r.setLastSyncResourceVersion(resourceVersion)
    return nil
}

// syncWith 用新列表替换 Store
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
    found := make([]interface{}, 0, len(items))
    for _, item := range items {
        found = append(found, item)
    }
    return r.store.Replace(found, resourceVersion)
}

list() 方法中有一个精妙的 RV 选择策略:relistResourceVersion() 根据 isLastSyncResourceVersionUnavailable 标志来决定使用哪个资源版本。如果上次 Watch 遇到 410 Gone 或 RV 太大错误,就使用 RV="" 强制从 etcd 读取最新数据;否则使用上次记录的 RV 以保持一致性。syncWith 方法使用 Store.Replace 来清空旧数据并写入新数据,这对于 DeltaFIFO 等队列类型的 Store 尤为重要。


六、watchWithResync:启动 Watch 和 Resync 循环

watchWithResync 启动了两个并行的 goroutine:一个负责 Watch 增量同步,一个负责周期性的 Resync。

// staging/src/k8s.io/client-go/tools/cache/reflector.go(行 511-554)

// startResync 定期调用 store.Resync()
func (r *Reflector) startResync(ctx context.Context, resyncerrc chan error) {
    logger := klog.FromContext(ctx)
    resyncCh, cleanup := r.resyncChan()
    defer func() {
        cleanup()
    }()

    for {
        select {
        case

startResync 的工作方式是:创建定时器,等待定时器触发,然后调用 store.Resync()。Resync 的语义因 Store 类型而异。对于 DeltaFIFO,Resync 会重新处理队列中的所有对象,确保没有遗漏。对于普通 Store,Resync 可能是空操作。这里使用 wait.Group 来协调两个 goroutine,当 watch() 返回时,cancel() 会停止 cancelCtx,从而让 startResync 也退出。


七、watch() 方法:Watch 循环的完整实现

watch() 方法是整个 Reflector 中最复杂的部分。它负责:建立 Watch 连接、消费事件、处理错误、以及在需要时重试。

// staging/src/k8s.io/client-go/tools/cache/reflector.go(行 556-670)

// watch 启动 Watch 请求,消费事件,并在出错时重试
func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc chan error) error {
    stopCh := ctx.Done()
    logger := klog.FromContext(ctx)
    var err error

    // NewRetryWithDeadline:用于处理内部错误(500 等)
    retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute,
        apierrors.IsInternalError, r.clock)

    defer func() {
        if w != nil {
            w.Stop()
        }
    }()

    for {
        // 检查是否需要退出
        select {
        case

7.1 错误分类与处理策略

watch() 方法根据错误类型采用不同的处理策略:

错误类型错误码处理策略
连接拒绝 connection refused 退避后重试
限速 429 Too Many Requests 退避后重试
RV 过期 410 Gone / Expired 标记 RV 不可用,下次 List 用 RV=""
RV 太大 RVTooLarge 标记 RV 不可用,下次 List 用 RV=""
内部错误 500 Internal Error 短暂重试(有上限)
其他错误 其他 4xx/5xx 记录并退出循环

八、handleAnyWatch:事件处理循环

handleAnyWatch 是真正处理 Watch 事件的地方。它从 watch.Interface 的 ResultChan() 中读取事件,并根据事件类型调用 Store 的不同方法。

// staging/src/k8s.io/client-go/tools/cache/reflector.go(行 963-1095)

// handleAnyWatch 处理 Watch 事件的核心循环
func handleAnyWatch(
    ctx context.Context,
    start time.Time,
    w watch.Interface,
    store ReflectorStore,
    expectedType reflect.Type,
    expectedGVK *schema.GroupVersionKind,
    name string,
    expectedTypeName string,
    setLastSyncResourceVersion func(string, bool),
    exitOnWatchListBookmarkReceived bool,
    clock clock.Clock,
    errCh chan error,
) (bool, error) {
    watchListBookmarkReceived := false
    eventReceivedBesidesAdded := false
    eventCount := 0
    logger := klog.FromContext(ctx)

    // 初始化 Bookmark 超时警告器
    initialEventsEndBookmarkWarningTicker := newInitialEventsEndBookmarkTicker(
        logger, name, clock, start, exitOnWatchListBookmarkReceived)
    defer initialEventsEndBookmarkWarningTicker.Stop()

    stopWatcher := true
    defer func() {
        if stopWatcher {
            w.Stop()
        }
    }()

loop:
    for {
        select {
        case <-ctx.Done():
            return watchListBookmarkReceived, errorStopRequested

        case err :=

8.1 四种 Watch 事件类型的处理

Watch 事件有四种类型,每种都有不同的语义:

  • Added:对象被创建。Reflector 调用 store.Add() 将其加入缓存。
  • Modified:对象被更新。Reflector 调用 store.Update() 更新缓存中的对象。
  • Deleted:对象被删除。Reflector 调用 store.Delete() 从缓存中移除对象。
  • Bookmark:特殊的同步点事件,用于续租 Watch 连接。在 WatchList 模式中,收到带有 initial-events-end 注解的 Bookmark 表示初始数据同步完成。

eventReceivedBesidesAdded 标志的设计非常精妙。它用于判断是否已经处理过非 Added 事件。如果是从 RV=0 开始 Watch,可能会收到大量 synthetic Added 事件(模拟 List 结果),此时不应立即更新 lastSyncResourceVersion。只有在收到 Modified、Deleted 或 Bookmark 之后,才认为真正进入了增量同步阶段,此时才更新 RV。


九、完整调用流程图

让我们用流程图来总结整个 Reflector 的工作流程:

Reflector 完整调用流程

用户启动 Informer
        │
        ▼
SharedInformer.Run(stopCh)
        │
        ▼
Controller.RunWithContext(ctx)
        │
        ├─► 创建 Controller(包含 DeltaFIFO + Reflector)
        │
        ▼
    go Reflector.RunWithContext(ctx)
        │    │
        │    │  delayHandler.Until() 循环
        │    ▼
        │    ListAndWatchWithContext(ctx)
        │    │
        │    ├─► [阶段一] list(ctx)
        │    │         │
        │    │         ├─► relistResourceVersion() 选择 RV
        │    │         │
        │    │         ├─► pager.ListWithAlloc() 分页获取
        │    │         │
        │    │         ├─► syncWith() 用全量数据替换 Store
        │    │         │
        │    │         └─► setLastSyncResourceVersion()
        │    │
        │    └─► [阶段二] watchWithResync(ctx, w)
        │              │
        │              ├─► go startResync()  [周期同步 goroutine]
        │              │         │
        │              │         └─► timer.C() → store.Resync()
        │              │
        │              ▼
        │           watch(ctx, w, resyncerrc)
        │              │
        │              ├─► NewWatch() 建立 Watch 连接
        │              │
        │              ▼
        │           handleAnyWatch()  [事件处理循环]
        │              │
        │              ├─► for event := range w.ResultChan()
        │              │         │
        │              │         ├─► event.Type == Added: store.Add()
        │              │         ├─► event.Type == Modified: store.Update()
        │              │         ├─► event.Type == Deleted: store.Delete()
        │              │         └─► event.Type == Bookmark: 更新 RV
        │              │
        │              └─► channel 关闭 → 退出循环
        │
        │    [Watch 断开]
        │         │
        │         ▼
        │    delayHandler() 退避延迟
        │         │
        │         ▼
        │    [继续下一次循环]
        │
        ▼
    go processLoop()  [事件消费 goroutine]
        │
        └─► for { queue.Pop() → handler() }

十、常见问题解答(FAQ)

▼ Q: Reflector 和 Informer 是什么关系?

A: Informer(SharedInformer)是一个更高层次的抽象,它封装了 Reflector、Store(DeltaFIFO + Indexer)、Controller 等多个组件。Reflector 负责从 API Server 同步数据到本地 Store,Informer 在 Reflector 的基础上添加了事件处理器注册、共享缓存等功能。在实际开发中,我们通常使用 SharedInformerFactory 创建 SharedInformer,而不是直接操作 Reflector。


▼ Q: lastSyncResourceVersion 为什么这么重要?

A: ResourceVersion 是 Kubernetes 实现乐观并发的核心机制。每次 List 和 Watch 都需要指定一个 RV,确保不会遗漏事件。Reflector 用 lastSyncResourceVersion 来记住上次同步的位置,下次 Watch 从这个位置继续。如果这个版本过期(410 Gone),Reflector 会回退到 List 获取最新数据。这个机制保证了"at least once"的语义——不会遗漏事件,但可能在某些边界情况下重复处理。


▼ Q: 为什么 Watch 超时时间是随机的 [5min, 10min]?

A: 这是为了避免"惊群效应"(Thundering Herd)。如果所有 Informer 的 Watch 都在同一时间点过期,它们会同时向 API Server 发起新的 Watch 请求,瞬间增加巨大的负载。随机化的超时让过期时间分散开来,起到负载均衡的作用。API Server 端也会利用这个随机化来优化 Watch 连接的管理。


▼ Q: WatchList 特性和传统 List-Watch 有什么区别?

A: 传统 List-Watch 需要两次请求:一次 List 获取全量数据,一次 Watch 监听增量变化。WatchList 使用流式接口,单次请求同时返回初始数据(synthetic Added 事件)和后续的增量变化。这减少了 API Server 的 etcd 查询次数,降低了内存消耗。WatchList 收到带有 initial-events-end 注解的 Bookmark 事件后,认为初始化完成,继续使用同一个 Watch 连接接收增量变化。


▼ Q: Reflector 在什么情况下会重新 List?

A: 三种情况会触发重新 List:1)Watch 连接断开且无法重试(如非暂时性错误);2)ResourceVersion 过期(410 Gone),此时会标记 isLastSyncResourceVersionUnavailable=true,下次 List 使用 RV="" 强制从 etcd 读取最新数据;3)Resync 周期到达,调用 store.Resync() 重新处理所有缓存对象。


十一、总结

这篇文章我们从源码层面详细剖析了 client-go Reflector 的完整实现。核心要点如下:

  • 结构体设计:Reflector 只持有依赖注入(store、listerWatcher)和状态(lastSyncResourceVersion),不存储数据。
  • 启动入口:RunWithContext 使用 delayHandler.Until 实现无限循环 + 自动退避重试,正常情况下立即重试,失败后逐步增加延迟。
  • 两阶段同步:ListAndWatch 先用 List 获取全量数据初始化缓存,再用 Watch 增量同步。支持传统 List-Watch 和新的 WatchList 流式模式。
  • 事件处理:handleAnyWatch 从 Channel 读取事件,根据 Added/Modified/Deleted/Bookmark 类型调用 Store 的不同方法。
  • 错误恢复:RV 过期时回退到 List(RV=""),连接拒绝/限速时退避重试,内部错误时短暂重试。
  • 并发安全:lastSyncResourceVersion 有专门的读写锁保护,避免并发访问问题。

理解 Reflector 的工作原理,不仅能帮助我们写出更高效的控制器代码,更能在遇到连接异常、数据不一致、版本冲突等问题时,有针对性地定位和解决。下一篇文章我们将进入 SharedInformer 层面,看看它如何封装 Reflector 并提供更易用的 API。


Kubernetes client-go Reflector 源码深度解析 · Kubernetes v1.36.1 · 源码基于 staging/src/k8s.io/client-go/tools/cache/reflector.go

相关阅读:
   • staging/src/k8s.io/client-go/tools/cache/reflector.go — Reflector 完整源码
   • staging/src/k8s.io/client-go/tools/cache/controller.go — Controller 与 SharedInformer
   • KEP-3157: WatchList 特性设计文档 — WatchList 流式初始化机制
   • client-go examples/workqueue — Informer 使用完整示例