惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

V
Vulnerabilities – Threatpost
Simon Willison's Weblog
Simon Willison's Weblog
Security Latest
Security Latest
C
Cyber Attacks, Cyber Crime and Cyber Security
云风的 BLOG
云风的 BLOG
Threat Intelligence Blog | Flashpoint
Threat Intelligence Blog | Flashpoint
Engineering at Meta
Engineering at Meta
Jina AI
Jina AI
罗磊的独立博客
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
D
Docker
Cisco Talos Blog
Cisco Talos Blog
T
The Blog of Author Tim Ferriss
L
LangChain Blog
NISL@THU
NISL@THU
V
Visual Studio Blog
C
Cybersecurity and Infrastructure Security Agency CISA
Cyber Security Advisories - MS-ISAC
Cyber Security Advisories - MS-ISAC
MongoDB | Blog
MongoDB | Blog
cs.CL updates on arXiv.org
cs.CL updates on arXiv.org
Apple Machine Learning Research
Apple Machine Learning Research
N
Netflix TechBlog - Medium
I
Intezer
Scott Helme
Scott Helme
H
Help Net Security
C
Check Point Blog
T
Threat Research - Cisco Blogs
P
Proofpoint News Feed
T
Tailwind CSS Blog
酷 壳 – CoolShell
酷 壳 – CoolShell
宝玉的分享
宝玉的分享
Know Your Adversary
Know Your Adversary
P
Palo Alto Networks Blog
AWS News Blog
AWS News Blog
U
Unit 42
P
Privacy International News Feed
S
Schneier on Security
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
T
Tor Project blog
Stack Overflow Blog
Stack Overflow Blog
G
GRAHAM CLULEY
Recorded Future
Recorded Future
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
A
About on SuperTechFans
The Cloudflare Blog
A
Arctic Wolf
P
Proofpoint News Feed
Last Week in AI
Last Week in AI
Project Zero
Project Zero
T
Threatpost

博客园 - 左扬

Kubernetes 编程 / Operator 专题【左扬精讲】—— 从零搭建一个 application-operator 新项目:脚手架、API 设计与基于原生 DeploymentStatus/ServiceStatus 的状态建模 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:生产级 Controller 实践:并发安全、资源清理与高可用设计 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析: Controller 调试与诊断工具:从日志分析到问题定位 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:DynamicClient 操作 CRD:无需代码生成的动态操作 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:控制器与 APIServer 完整交互流程:从 Watch 到缓存同步 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:错误处理与重试机制:WorkQueue 限速器详解 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:Leader 选举机制:高可用控制器的必备技能 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:Controller 开发模式完整实战 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:SharedInformerFactory 与等待缓存同步 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:从认证配置到 Deployment 操作 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:版本对应、架构组件与组件关系 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:Informer 源码深度解析:从底层原理到实战应用 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:Reflector 源码深度解析 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:ListWatcher 源码深度解析 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:Indexer 与 ThreadSafeStore 核心原理与源码深度剖析 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:DeltaFIFO 核心原理与源码深度剖析 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:workqueue 核心原理与实战 Kubernetes 编程 / Operator 专题【左扬精讲】—— runtime.Codec 资源编解码:serializer 与 codec 差异、编解码数据结构、codec 核心调用链路 Kubernetes 编程 / Operator 专题【左扬精讲】—— Scheme 资源注册机制全解 Kubernetes 编程 / Operator 专题【左扬精讲】—— Kubernetes 自定义资源的内部版本与外部版本:从源码看版本定义机制 Kubernetes 编程 / Operator 专题【左扬精讲】—— Kubernetes 1.36.1 核心 API 数据结构全解 Kubernetes 编程 / Operator 专题【左扬精讲】—— Kubernetes 构建过程 【AIOPS】一文读懂LLM【左扬精讲】:从诞生到普及,解锁大语言模型的核心密码 【AIOPS】AI Agent 专题【左扬精讲】核心功能篇:MCP-VictoriaMetrics Hooks 源码精讲:Hooks 可观测性的无侵入式实现 【AIOPS】AI Agent 专题【左扬精讲】核心功能篇:MCP-VictoriaMetrics Golang 配置解析源码精讲 ——SRE 自定义 Agent 核心技巧 【AIOPS】AI Agent 专题【左扬精讲】核心功能篇:MCP-VictoriaMetrics Golang 并发模型解析 ——SRE 应对高并发采集的调优思路 【AIOPS】AI Agent 专题【左扬精讲】基础架构篇:MCP-VictoriaMetrics Golang 源码整体架构拆解 ——SRE 必懂的核心模块与数据流 OpenTelemetry 开发实战【左扬精讲】—— 云原生可观测体系构建与分布式追踪二次开发 Kubernetes 编程 / Operator 专题【左扬精讲】—— Operator 开发实战项目 7 —— 基于流量预测模型的智能弹性扩缩容 Operator 实战(AIOps 模型训练与智能扩容(下篇)—— 预测式弹性扩缩容 Operator 落地实现) Kubernetes 编程 / Operator 专题【左扬精讲】—— Operator 开发实战项目 7 —— 基于流量预测模型的智能弹性扩缩容 Operator 实战(AIOps 模型训练与智能扩容(上篇)—— 时序预测模型构建与离线训练) Kubernetes 编程 / Operator 专题【左扬精讲】—— Operator 开发实战项目 6 —— 基于运维专家知识库的智能故障诊断与排查 Operator 实战 Kubernetes 编程 / Operator 专题【左扬精讲】—— Operator 开发实战项目 5 —— 基于大语言模型(LLM)的实时日志流智能监测 Operator 实现 Kubernetes 编程 / Operator 专题【左扬精讲】—— Operator 开发实战项目 4 —— 基于 Operator 实现大模型私有化部署与管理 Kubernetes 编程 / Operator 专题【左扬精讲】—— Operator 开发实战项目 3 —— 面向 AI / 算力调度场景:GPU 竞价实例资源池统一调度管理 Operator 开发 Kubernetes编程 / Operator专题【左扬精讲】—— Operator 开发实战项目 2 —— 面向零售 / 电商潮汐流量难题:多云多集群数据中心级全链路弹性伸缩 DataCenter Scaler Operator 从 0 到 1 全链路开发 Kubernetes编程 / Operator专题【左扬精讲】—— 深入理解Kubebuilder注解:为什么Operator开发离不开这些特殊注释 Kubernetes编程 / Operator专题【左扬精讲】—— Operator 开发实战项目1 —— Applicaion Operator(通用应用生命周期管理 Operator 实战) Pod 镜像拉取失败?kubectl edit pods修改镜像地址的底层原理与实操 (该方法仅为临时应急方案,并非长期解决方案) Kubernetes编程/Operator专题精讲—— 理解控制器模式 —— 控制器模式的核心原理与实现逻辑(从原理到实践) 【AIOPS】AI Agent 专题【左扬精讲】模型微调实战:一站式平台 LLaMA-Factory 【AIOPS】AI Agent 专题【左扬精讲】基于 k8s+vLLM+Ray 分布式部署全指南:架构设计、资源调度与性能优化 【AIOPS】AI Agent专题【左扬精讲】非量化版DeepSeek分布式部署全指南:精度保障、显存规划与Ollama/vLLM选型 【AIOPS】AI Agent 专题【左扬精讲】零开发框架实现 ReAct Agent(Go SRE友好)
Kubernetes 编程 / Operator 专题【左扬精讲】—— application-operator Reconcile 循环源码精讲:从 client-go Informer 到 workqueue 的全链路解剖
左扬 · 2026-06-14 · via 博客园 - 左扬

Kubernetes 编程 / Operator 专题【左扬精讲】—— application-operator Reconcile 循环源码精讲:从 client-go Informer 到 workqueue 的全链路解剖

上一篇我们搭好了 application-operator 的项目骨架、定义了 Application CRD、用 DeploymentStatus / ServiceStatus 复用了 Kubernetes 原生状态模型,并生成了全套 clientset / informers / listers / applyconfiguration 代码。

operator 的"大脑"还没写——提交 Application YAML 之后,operator 不会自动创建 Deployment,也不会回填 status。这一篇我们把 controller.go 的每一行都拆开看,从 client-go 最底层的 controller.Run 一路追到业务函数 syncHandler。

读完这篇你将能:独立画出一张从"用户 kubectl apply"到"Deployment 创建成功"的全链路时序图,标出每一个 goroutine 何时启动、何时阻塞、何时返回。更重要的是,你能直接读懂 client-go 源码——以后遇到 controller 不工作、status 不更新、deployment 重复创建这些玄学问题,知道该去哪个文件、哪一行加日志。

Kubernetes 1.36.1 Go 1.26 client-go cache workqueue Reflector SharedInformer

🔓 学习重点提示  — 全文可一次性读,再回头看重点

★ 重点掌握(必须)
   • Controller struct 字段:6 个字段每个的用途与生命周期
   • NewController 装配流程:informer 事件 → workqueue 排队的完整链路
   • Run 启动三件套:Reflector / processLoop / Worker 三层 goroutine
   • processNextWorkItem 的五段式:Get → Sync → Done/Forget/AddRateLimited
   • syncHandler 的 read-then-write 幂等模式:Get / Create / Update 流程
   • workqueue 三种限流器:指数退避 / 令牌桶 / 组合限流
   • OwnerReference 与 handleObject 双向桥

☆ 次重点(了解即可)
   • sharedProcessor.distribute 把事件 fan-out 到所有 listener
   • processorListener.addCh / nextCh / pendingNotifications 三缓冲机制
   • ResourceEventHandlerFuncs 适配器与 OnAdd/OnUpdate/OnDelete 钩子


目录

  1. 一、整体心智模型:Informer 体系三层 goroutine 协作
  2. 二、Controller struct:6 个字段的职责与生命周期
  3. 三、NewController:装配 informer 事件处理器
  4. 四、Informer 事件机制:ResourceEventHandlerFuncs 钩子与 sharedProcessor 扇出
  5. 五、controller.Run:三件套启动 + WaitForCacheSync 屏障
  6. 六、processNextWorkItem:Reconcile 循环的"心脏"
  7. 七、syncHandler:read-then-write 幂等模式与 OwnerReference 桥接
  8. 八、workqueue 限流策略:指数退避 / 令牌桶 / 组合限流
  9. 九、优雅停机:ctx.Done → ShutDown → wg.Wait
  10. 十、application-operator 完整 Reconcile 实现
  11. 十一、常见 12 问 FAQ

一、整体心智模型:Informer 体系三层 goroutine 协作

Kubernetes client-go 的 Informer 体系被设计为"三层 goroutine + 两条数据通道"。所有 controller 模式(无论是裸 client-go 写的 sample-controller,还是 controller-runtime 简化版)都遵循这套骨架。看图:

API Server                                                             
         |                                                                   
         |  (1) List + Watch                                                 
         v                                                                   
+-------------------+             +---------------------+                   
|   Reflector       |   (2) Δ     |  sharedProcessor    |                   
|   (ListAndWatch)  | --------->  |  + listeners[]      |                   
+-------------------+   DeltaFIFO |  - pop()            |                   
         |                       |  - distribute()     |                   
         |  HasSynced (after      +---------+-----------+                   
         |   initial list)                  |                               
         v                                  | (3) add notification          
+-------------------+                       v                               
|   processLoop     |             +---------+-----------+                   
| (c.config.Pop)    |             |  processorListener  |                   
+--------+----------+             |  addCh -> pop()     |                   
         |                        |  -> nextCh -> run() |                   
         |  (4) key (NS/Name)     +---------+-----------+                   
         |                                  |                               
         v                                  | (5) handler.OnAdd(obj)        
+--------+----------+                       v                               
|   workqueue       |             +---------+-----------+                   
|   (rate limited)  |

逐层职责:

  • Reflector 层:与 API Server 维持 List+Watch 长连接。List 拿全量数据、Watch 拿增量事件(Add / Update / Delete)。所有事件都打包成 Delta 推入 DeltaFIFO。Reflector 是单 goroutine,但内部用 ListAndWatch 状态机处理"watch 断了重新 list"等异常
  • processLoop 层:从 DeltaFIFO Pop 出 Delta,调用 Config.ProcessFunc(默认是 sharedIndexInformer 的 handleDeltas)
  • sharedProcessor 层:把 Delta 转换成"对每个已注册 handler 的 add/update/delete 通知",分发给所有 processorListener
  • processorListener 层:每个 handler 一个 listener,内部有三缓冲(addCh / nextCh / pendingNotifications)防止 handler 慢导致 informer 阻塞
  • handler 层:你的业务代码——sample-controller 的 enqueueFoo + handleObject
  • workqueue 层:去重 + 限流 + 顺序保证(FIFO),handler 拿到 key 入队后立即返回
  • Worker 层:从 workqueue Get,调用业务函数 syncHandler;syncHandler 是真正的"收敛差异"逻辑

💡 小贴士
为什么不让 handler 直接做 reconcile?两个原因:① handler 跟 reflector 在不同 goroutine,让 handler 慢会卡住 reflector 的 watch 流(最终导致 watch 断开);② 入队 + worker 模式天然支持"同一个 key 多次变化合并成一次处理"——workqueue 的 dirty 集合会 dedup。


二、Controller struct:6 个字段的职责与生命周期

看 sample-controller 的 Controller struct 定义(行 68-89)。我们逐字段过:

// staging/src/k8s.io/sample-controller/controller.go(行 68-89)

// Controller is the controller implementation for Foo resources
type Controller struct {
    // kubeclientset is a standard kubernetes clientset
    kubeclientset kubernetes.Interface
    // sampleclientset is a clientset for our own API group
    sampleclientset clientset.Interface

    deploymentsLister appslisters.DeploymentLister
    deploymentsSynced cache.InformerSynced
    foosLister        listers.FooLister
    foosSynced        cache.InformerSynced

    // workqueue is a rate limited work queue. This is used to queue work to be
    // processed instead of performing it as soon as a change happens. This
    // means we can ensure we only process a fixed amount of resources at a
    // time, and makes it easy to ensure we are never processing the same item
    // simultaneously in two different workers.
    workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName]
    // recorder is an event recorder for recording Event resources to the
    // Kubernetes API.
    recorder record.EventRecorder
}

逐个解释:

字段类型职责
kubeclientset kubernetes.Interface 原生资源 clientset(Deployment、Service)。用于"写入"——syncHandler 调 Create/Update Deployment 时走它
sampleclientset clientset.Interface 我们自己的 CRD clientset。updateFooStatus 调 UpdateStatus 时走它
deploymentsLister appslisters.DeploymentLister Deployment 本地缓存的读接口。syncHandler 用它 Get Deployment——不经过 API Server,超快
deploymentsSynced cache.InformerSynced 一个函数,返回 Deployment informer 是否完成初始 List。Run 阶段用 WaitForCacheSync 等待
foosLister listers.FooLister Foo 本地缓存的读接口。syncHandler 第一行 Get 用户的 Foo 走它
foosSynced cache.InformerSynced Foo informer 的 HasSynced 状态指示
workqueue TypedRateLimitingInterface 去重 + 限流 + FIFO 队列。handler enqueue 后立即返回;worker 从它 Get 出来处理
recorder record.EventRecorder Kubernetes Event 记录器。recorder.Event(foo, Type, Reason, Msg) 会在 Event 流里产生一条记录,kubectl describe foo 能看到

新手最容易疑惑的是 xxxLister 和 xxxClientset 的区别:Lister 只读、读本地缓存、不发网络请求;Clientset 可读可写、每次都打网络。Lister 用于"查询"路径(syncHandler 第一步"读期望"),Clientset 用于"修改"路径(创建 Deployment、回填 status)。如果 syncHandler 用 clientset.Get() 来读 Foo,会发现 informer 缓存和 API Server 状态可能错位——这是经典 bug。

⚠️ 警告
Lister 返回的对象是只读缓存——千万别直接 c.foosLister.Foos(ns).Get(name).Spec.Replicas = &newVal 这种写法。那样会改坏本地缓存,下一个 List 拿到错误数据,reconcile 进入死循环。正确做法是 Get → DeepCopy → 修改副本 → 写回。


三、NewController:装配 informer 事件处理器

NewController 是 Controller 的"构造函数"——它做三件事:① 装配 eventBroadcaster + recorder;② 构造限流器 + workqueue;③ 给两个 informer 注册事件处理器。

// staging/src/k8s.io/sample-controller/controller.go(行 91-156)

func NewController(
    ctx context.Context,
    kubeclientset kubernetes.Interface,
    sampleclientset clientset.Interface,
    deploymentInformer appsinformers.DeploymentInformer,
    fooInformer informers.FooInformer) *Controller {
    logger := klog.FromContext(ctx)

    // Create event broadcaster
    // Add sample-controller types to the default Kubernetes Scheme so Events can be
    // logged for sample-controller types.
    utilruntime.Must(samplescheme.AddToScheme(scheme.Scheme))
    logger.V(4).Info("Creating event broadcaster")

    eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
    eventBroadcaster.StartStructuredLogging(0)
    eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
    recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
    ratelimiter := workqueue.NewTypedMaxOfRateLimiter(
        workqueue.NewTypedItemExponentialFailureRateLimiter[cache.ObjectName](5*time.Millisecond, 1000*time.Second),
        &workqueue.TypedBucketRateLimiter[cache.ObjectName]{Limiter: rate.NewLimiter(rate.Limit(50), 300)},
    )

    controller := &Controller{
        kubeclientset:     kubeclientset,
        sampleclientset:   sampleclientset,
        deploymentsLister: deploymentInformer.Lister(),
        deploymentsSynced: deploymentInformer.Informer().HasSynced,
        foosLister:        fooInformer.Lister(),
        foosSynced:        fooInformer.Informer().HasSynced,
        workqueue:         workqueue.NewTypedRateLimitingQueue(ratelimiter),
        recorder:          recorder,
    }

    logger.Info("Setting up event handlers")
    // Set up an event handler for when Foo resources change
    fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.enqueueFoo,
        UpdateFunc: func(old, new interface{}) {
            controller.enqueueFoo(new)
        },
    })
    // Set up an event handler for when Deployment resources change.
    deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.handleObject,
        UpdateFunc: func(old, new interface{}) {
            newDepl := new.(*appsv1.Deployment)
            oldDepl := old.(*appsv1.Deployment)
            if newDepl.ResourceVersion == oldDepl.ResourceVersion {
                return
            }
            controller.handleObject(new)
        },
        DeleteFunc: controller.handleObject,
    })

    return controller
}

把这段代码切成"配置 / 构造 / 注册"三块:

3.1 Event Broadcaster(行 100-109)

EventBroadcaster 是一个独立组件——它内部起 goroutine 把 recorder 写入的事件批量发到 API Server。三个调用:

  • samplescheme.AddToScheme(scheme.Scheme):把 Foo 类型注册到全局 Scheme,让 Event 知道怎么序列化
  • StartStructuredLogging(0):除了发 API Server,本地结构化日志也打一份
  • StartRecordingToSink(...):发到 API Server 的 /events 接口

3.2 RateLimiter 与 WorkQueue(行 110-113)

MaxOfRateLimiter = 多个 limiter 取最大延迟。这里组合了两个:

  • ItemExponentialFailureRateLimiter(5ms, 1000s):每个 key 失败一次延迟翻一倍,从 5ms 起、上限 1000s。第 1 次 5ms、第 2 次 10ms、第 3 次 20ms ……
  • BucketRateLimiter(50 QPS, 300 burst):全局令牌桶,整个 controller 每秒最多处理 50 个 key、突发可达 300

两者的较大值生效——既不会全局突发把 API Server 打爆,单个 key 也不会无限快速重试。这是个 production-grade 默认值。

3.3 EventHandler 注册(行 126-153)

两个 informer 都注册了事件处理器,但用途不同:

Informer事件入队 key原因
Foo Add / Update Foo 自身 NS/Name 用户改了 spec / 新建了 Foo,要触发 reconcile
Deployment Add / Update / Delete Deployment 的 ownerRef 指向的 Foo NS/Name Deployment 状态变了(Replicas 增减、Conditions 变化),要触发对应 Foo 的 status 回填

后者的"通过 ownerRef 反向找 Foo"机制是 controller 设计的精髓——它让我们不用为每个相关资源都写 handler。一个 Pod 创建只需要 Deployment handler + 资源本身的 ownerRef 桥,operator 就能把 Pod 状态变化回填到 Foo.status。

🌟 实用技巧
Deployment handler 里的 ResourceVersion 判断(行 145-149)是处理"informer 周期性 resync 触发伪 Update"的兜底逻辑——同一个对象真正发生变化时 new.ResourceVersion ≠ old.ResourceVersion;resync 触发的伪 Update 两者相等。这种检查必须放在每个 informer 的 UpdateFunc 头部,否则 controller 会被无意义的 update 淹没。


四、Informer 事件机制:ResourceEventHandlerFuncs 钩子与 sharedProcessor 扇出

client-go 的事件机制是"fan-out"——一个 informer 可以有任意多个 handler,事件被广播给所有 handler。每个 handler 由一个 processorListener 后台 goroutine 独立消费。

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go(行 284-340)

// SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
type SharedIndexInformer interface {
    SharedInformer
    AddIndexers(indexers Indexers) error
    GetIndexer() Indexer
}

func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.Object, options SharedIndexInformerOptions) SharedInformer {
    realClock := &clock.RealClock{}
    processor := &sharedProcessor{clock: realClock}
    processor.listenersRCond = sync.NewCond(processor.listenersLock.RLocker())

    return &sharedIndexInformer{
        indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers, ...),
        processor:                       processor,
        synced:                          make(chan struct{}),
        listerWatcher:                   lw,
        ...
    }
}

sharedIndexInformer 的核心结构是 indexer(本地缓存)+ processor(事件分发器)。processor 内部用 listeners[] 数组保存所有已注册的 processorListener,每个 listener 跑在独立 goroutine。

4.1 ResourceEventHandlerFuncs 适配器

// staging/src/k8s.io/client-go/tools/cache/controller.go(行 279-300 附近)

// ResourceEventHandler can handle notifications for events that
// happen to a resource.
type ResourceEventHandler interface {
    OnAdd(obj interface{}, isInInitialList bool)
    OnUpdate(oldObj, newObj interface{})
    OnDelete(obj interface{})
}

// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
// as few of the notification functions as you want while still implementing
// ResourceEventHandler.
type ResourceEventHandlerFuncs struct {
    AddFunc    func(obj interface{})
    UpdateFunc func(oldObj, newObj interface{})
    DeleteFunc func(obj interface{})
}

注意两个细节:① 真实接口是 OnAdd / OnUpdate / OnDelete 三个方法,sample-controller 用的是 ResourceEventHandlerFuncs 适配器,只需要写三个 func 字段;② OnAdd 的第二个参数 isInInitialList 表示"这次 Add 是来自初始 List 还是后续 Watch 事件"——这个标志用于区分"新对象"和"已经存在的对象重新通知"。

4.2 processorListener 三缓冲机制

processorListener 是 client-go 设计中"应对慢 handler"的关键。它内部用 3 个数据结构:

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go(行 1289-1328)

func (p *processorListener) add(notification interface{}) {
    if a, ok := notification.(addNotification); ok && a.isInInitialList {
        p.syncTracker.Start()
    }
    p.addCh

工作机制:

  • addCh:add(notification) 调用时,notification 先尝试 push 到 addCh;如果满了,pop 协程会把它转入 pendingNotifications 队列
  • nextCh:pop 协程把 notification 写到 nextCh;run 协程从 nextCh 读取后调 handler.OnAdd
  • pendingNotifications:addCh 满了之后的溢出缓冲

三缓冲让"producer(distribute)"和"consumer(handler)"解耦——handler 即便被卡住几秒,informer 主循环也不会被阻塞。

💡 小贴士
如果你的 controller 经常"事件堆积"(kubectl get pod 看到 queue 高水位),多半是 handler 太慢(同步打了 klog 阻塞了 goroutine)或者 workqueue.AddRateLimited 没正确调用。检查 enqueueFoo 是否只做 cache.ObjectToName + workqueue.Add,然后立即 return——任何重操作都应该挪到 syncHandler。

4.3 enqueueFoo 与 handleObject:handler 的两种入队策略

// staging/src/k8s.io/sample-controller/controller.go(行 331-383)

// enqueueFoo takes a Foo resource and converts it into a namespace/name
// string which is then put onto the work queue.
func (c *Controller) enqueueFoo(obj interface{}) {
    if objectRef, err := cache.ObjectToName(obj); err != nil {
        utilruntime.HandleError(err)
        return
    } else {
        c.workqueue.Add(objectRef)
    }
}

// handleObject will take any resource implementing metav1.Object and attempt
// to find the Foo resource that 'owns' it.
func (c *Controller) handleObject(obj interface{}) {
    var object metav1.Object
    var ok bool
    if object, ok = obj.(metav1.Object); !ok {
        tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
        if !ok {
            utilruntime.HandleErrorWithContext(...)
            return
        }
        object, ok = tombstone.Obj.(metav1.Object)
        if !ok {
            utilruntime.HandleErrorWithContext(...)
            return
        }
    }
    if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
        // If this object is not owned by a Foo, we should not do anything
        if ownerRef.Kind != "Foo" {
            return
        }
        foo, err := c.foosLister.Foos(object.GetNamespace()).Get(ownerRef.Name)
        if err != nil {
            return
        }
        c.enqueueFoo(foo)
        return
    }
}

两个 handler 的设计哲学截然不同:

  • enqueueFoo:直白型——Foo 来了就把 Foo 的 key 入队。Foo 自己更新、Reconcile 失败、最终用户编辑 spec,全部触发它
  • handleObject:转发型——Deployment / Pod / ReplicaSet 这些"附属资源"变了,要先查 ownerRef 找到对应的 Foo,再把 Foo 入队。Kubernetes 的"链式 reconcile"就是靠这个机制

注意 handleObject 里那个 DeletedFinalStateUnknown 分支——这是 informer 在"watch 断开重连后丢失删除事件"时使用的兜底类型。Kubernetes 用 tombstone 标记"这个对象曾经被删过、但我拿不到最终状态"。生产中这种情况比较罕见,但代码必须有兜底。


五、controller.Run:三件套启动 + WaitForCacheSync 屏障

看 controller.Run 启动过程(行 162-188):

// staging/src/k8s.io/sample-controller/controller.go(行 162-188)

func (c *Controller) Run(ctx context.Context, workers int) error {
    defer utilruntime.HandleCrash()
    defer c.workqueue.ShutDown()
    logger := klog.FromContext(ctx)

    logger.Info("Starting Foo controller")

    // Wait for the caches to be synced before starting workers
    logger.Info("Waiting for informer caches to sync")

    if ok := cache.WaitForCacheSync(ctx.Done(), c.deploymentsSynced, c.foosSynced); !ok {
        return fmt.Errorf("failed to wait for caches to sync")
    }

    logger.Info("Starting workers", "count", workers)
    for i := 0; i < workers; i++ {
        go wait.UntilWithContext(ctx, c.runWorker, time.Second)
    }

    logger.Info("Started workers")
    <-ctx.Done()
    logger.Info("Shutting down workers")

    return nil
}

逐行拆解:

  • defer utilruntime.HandleCrash():捕获后续 goroutine 的 panic,避免整个进程崩溃
  • defer c.workqueue.ShutDown():Run 退出时关 workqueue——新的 Add 静默丢弃,已有 item 处理完
  • WaitForCacheSync:阻塞等所有 informer 完成初始 List(c.foosSyncedc.deploymentsSynced 都是函数引用,对应 informer.HasSynced)
  • workers goroutine:用 wait.UntilWithContext(ctx, c.runWorker, time.Second) 启动 N 个 worker;time.Second 是异常恢复间隔(runWorker panic 后 1s 重启)
  • <-ctx.Done():阻塞主 goroutine,等 ctx 取消(信号触发 signals.SetupSignalHandler)

WaitForCacheSync 是关键屏障。源码:

// staging/src/k8s.io/client-go/tools/cache/shared_informer.go(行 413-432)

// WaitForCacheSync waits for caches to populate.  It returns true if it was successful, false
// if the controller should shutdown
func WaitForCacheSync(stopCh

实现简单但重要——每 100ms 轮询一次所有 cacheSyncs 函数,全部返回 true 才通过;任何一个 informer 没完成初始 List,worker 就不能启动。为什么必须等?因为 worker 第一行往往是 c.foosLister.Foos(ns).Get(name)——如果 List 没完成,lister 拿不到数据,要么 NotFound 错,要么拿到空。

⚠️ 警告
如果忘记 WaitForCacheSync,controller 启动会立即进入 worker,lister 还没数据→Get 返回 NotFound→processNextWorkItem 把 key 重新入队(requeue 限流退避)→1s 后重试。表面看就是"controller 启动 1 秒后才正常工作",但 CRD 资源量大时延迟会更明显。生产级 controller 一定要加 WaitForCacheSync。

5.1 Reflector + processLoop 实际在 cache.controller.Run 启动

在 sharedInformer.Run 中,下层 controller.RunWithContext 启动 Reflector 和 processLoop(行 715-790):

// staging/src/k8s.io/client-go/tools/cache/controller.go(行 169-209)

func (c *controller) RunWithContext(ctx context.Context) {
    defer utilruntime.HandleCrashWithContext(ctx)
    go func() {
        <-ctx.Done()
        c.config.Queue.Close()
    }()
    logger := klog.FromContext(ctx)
    r := NewReflectorWithOptions(
        c.config.ListerWatcher,
        c.config.ObjectType,
        c.config.Queue,
        ReflectorOptions{
            Logger:          &logger,
            ResyncPeriod:    c.config.FullResyncPeriod,
            MinWatchTimeout: c.config.MinWatchTimeout,
            ...
        },
    )
    ...
    var wg wait.Group
    wg.StartWithContext(ctx, r.RunWithContext)        // Reflector goroutine
    wait.UntilWithContext(ctx, c.processLoop, time.Second)  // processLoop goroutine
    wg.Wait()
}

两个 goroutine 协作:

  • Reflector goroutine:不停调 ListAndWatch,把 API Server 事件写入 DeltaFIFO。List 完成时关闭 synced channel(→ informer.HasSynced() 返回 true)
  • processLoop goroutine:不停从 DeltaFIFO Pop(c.config.Pop),对每个 Delta 调用 c.config.Process——默认是 sharedIndexInformer 的 handleDeltas,把 Delta 转成 notification 投给所有 listener

六、processNextWorkItem:Reconcile 循环的"心脏"

worker goroutine 跑的是 runWorker(行 193-196),runWorker 反复调 processNextWorkItem。processNextWorkItem 是整个 controller 的"心脏"——它把"从 workqueue 取 key、跑业务逻辑、根据结果决定 Forget/AddRateLimited"封装成五段式:

// staging/src/k8s.io/sample-controller/controller.go(行 200-236)

func (c *Controller) processNextWorkItem(ctx context.Context) bool {
    objRef, shutdown := c.workqueue.Get()
    logger := klog.FromContext(ctx)

    if shutdown {
        return false
    }

    // We call Done at the end of this func so the workqueue knows we have
    // finished processing this item. We also must remember to call Forget
    // if we do not want this work item being re-queued.
    defer c.workqueue.Done(objRef)

    // Run the syncHandler, passing it the structured reference to the object to be synced.
    err := c.syncHandler(ctx, objRef)
    if err == nil {
        // If no error occurs then we Forget this item so it does not
        // get queued again until another change happens.
        c.workqueue.Forget(objRef)
        logger.Info("Successfully synced", "objectName", objRef)
        return true
    }
    // there was a failure so be sure to report it.
    utilruntime.HandleErrorWithContext(ctx, err, "Error syncing; requeuing for later retry", "objectReference", objRef)
    // since we failed, we should requeue the item to work on later.
    c.workqueue.AddRateLimited(objRef)
    return true
}

五段式:

① Get  →  ② defer Done  →  ③ syncHandler  →  ④ err==nil ? Forget : AddRateLimited  →  ⑤ return true

6.1 第 ① 步 Get 与 第 ② 步 Done

workqueue.Get 行为是"阻塞拿一个 key,没有就等"。其内部用 sync.Cond 实现:

// staging/src/k8s.io/client-go/util/workqueue/queue.go(行 265-302)

func (q *Typed[T]) Get() (item T, shutdown bool) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    for q.queue.Len() == 0 && !q.shuttingDown {
        q.cond.Wait()
    }
    if q.queue.Len() == 0 {
        return *new(T), true
    }
    item = q.queue.Pop()
    q.metrics.get(item)
    q.processing.Insert(item)
    q.dirty.Delete(item)
    return item, false
}

func (q *Typed[T]) Done(item T) {
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    q.metrics.done(item)
    q.processing.Delete(item)
    if q.dirty.Has(item) {
        q.queue.Push(item)
        q.cond.Signal()
    } else if q.processing.Len() == 0 {
        q.cond.Signal()
    }
}

Get 有几个关键状态:

  • processing 集合:记录"正在被处理"的 key(Get 时加、Done 时删)
  • dirty 集合:记录"处理期间又变化"的 key(Add 时加、Get 时减)
  • Done 时的去重逻辑:如果 Done 时 dirty 集合还有这个 key(即处理期间又 Add 过),自动重新入队

这就是工作队列"去重"的精髓——同一个 key 在处理期间被多次 Add,只在 Done 时入队一次。

6.2 第 ④ 步 Forget vs AddRateLimited

syncHandler 成功 → Forget 清除这个 key 的失败计数;失败 → AddRateLimited 按限流策略重新入队。这两个动作对应 rateLimitingType:

// staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue.go(行 129-149)

type rateLimitingType[T comparable] struct {
    TypedDelayingInterface[T]
    rateLimiter TypedRateLimiter[T]
}

// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok
func (q *rateLimitingType[T]) AddRateLimited(item T) {
    q.TypedDelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}

func (q *rateLimitingType[T]) Forget(item T) {
    q.rateLimiter.Forget(item)
}

func (q *rateLimitingType[T]) NumRequeues(item T) int {
    return q.rateLimiter.NumRequeues(item)
}

AddRateLimited 内部就是 AddAfter(item, rateLimiter.When(item))——计算"还要等多久才能处理这个 key",然后加到延时队列。For Each failure,ItemExponentialFailureRateLimiter 的 When 返回值是 baseDelay * 2^failures:

// staging/src/k8s.io/client-go/util/workqueue/default_rate_limiters.go(行 99-149)

func NewTypedItemExponentialFailureRateLimiter[T comparable](baseDelay time.Duration, maxDelay time.Duration) TypedRateLimiter[T] {
    return &TypedItemExponentialFailureRateLimiter[T]{
        failures:  map[T]int{},
        baseDelay: baseDelay,
        maxDelay:  maxDelay,
    }
}

func (r *TypedItemExponentialFailureRateLimiter[T]) When(item T) time.Duration {
    r.failuresLock.Lock()
    defer r.failuresLock.Unlock()

    exp := r.failures[item]
    r.failures[item] = r.failures[item] + 1

    // The backoff is capped such that 'calculated' value overflows int64 (when a long-running item stays in the queue
    // for hours/days and is constantly being re-queued).
    calculated := r.baseDelay * time.Duration(2^exp)
    if calculated > r.maxDelay {
        return r.maxDelay
    }
    return calculated
}

func (r *TypedItemExponentialFailureRateLimiter[T]) Forget(item T) {
    r.failuresLock.Lock()
    defer r.failuresLock.Unlock()
    delete(r.failures, item)
}

退避策略:5ms → 10ms → 20ms → 40ms …… 1000s 封顶。一旦 Forget 清除 failures 计数,下次 AddRateLimited 又从 5ms 重新开始。

6.3 一个反模式警示

很多新手会写 if err != nil { return err } 就 return 整个 processNextWorkItem,结果 panic / controller 退出。正确的是 processNextWorkItem 永远只 return true(继续跑)或 false(停机),错误处理只能通过 AddRateLimited 把 key 重新入队——业务错误和 API Server 临时故障都用同一种方式重试,靠限流器自然退避。


七、syncHandler:read-then-write 幂等模式与 OwnerReference 桥接

syncHandler 是真正的"业务大脑"。sample-controller 的实现(行 241-312)走"读期望→读实际→收敛差异→更新 status"四步:

// staging/src/k8s.io/sample-controller/controller.go(行 241-312)节选

func (c *Controller) syncHandler(ctx context.Context, objectRef cache.ObjectName) error {
    logger := klog.LoggerWithValues(klog.FromContext(ctx), "objectRef", objectRef)

    // 1) 读期望:用 lister 读 Foo
    foo, err := c.foosLister.Foos(objectRef.Namespace).Get(objectRef.Name)
    if err != nil {
        if errors.IsNotFound(err) {
            utilruntime.HandleErrorWithContext(ctx, err, "Foo referenced by item in work queue no longer exists", "objectReference", objectRef)
            return nil
        }
        return err
    }

    deploymentName := foo.Spec.DeploymentName
    if deploymentName == "" {
        utilruntime.HandleErrorWithContext(ctx, nil, "Deployment name missing from object reference", "objectReference", objectRef)
        return nil
    }

    // 2) 读实际:用 lister 读 Deployment
    deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
    if errors.IsNotFound(err) {
        // 3) 收敛差异:不存在就创建
        deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(ctx, newDeployment(foo), metav1.CreateOptions{FieldManager: FieldManager})
    }
    if err != nil {
        return err
    }

    // 4) 防止误接管:确认 Deployment 的 ownerRef 指向当前 Foo
    if !metav1.IsControlledBy(deployment, foo) {
        msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
        c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg)
        return fmt.Errorf("%s", msg)
    }

    // 5) 收敛差异:replicas 不一致就更新
    if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
        logger.V(4).Info("Update deployment resource", "currentReplicas", *deployment.Spec.Replicas, "desiredReplicas", *foo.Spec.Replicas)
        deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(ctx, newDeployment(foo), metav1.UpdateOptions{FieldManager: FieldManager})
    }
    if err != nil {
        return err
    }

    // 6) 更新 status
    err = c.updateFooStatus(ctx, foo, deployment)
    if err != nil {
        return err
    }

    c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
    return nil
}

把 syncHandler 拆成 6 个阶段:

#阶段操作目的
1 读期望 foosLister.Foos(ns).Get(name) 读本地缓存里的 Foo。NotFound 直接 return nil(key 已被删)
2 读实际 deploymentsLister.Deployments(ns).Get(name) 读本地缓存里的 Deployment
3 创建 kubeclientset.Create(newDeployment(foo)) 如果 Deployment 不存在,按 Foo 模板创建
4 owner 校验 metav1.IsControlledBy(deployment, foo) 防止"用户手建了同名 Deployment 误接管"
5 更新 kubeclientset.Update(newDeployment(foo)) 如果 replicas 不一致,覆盖更新
6 回填 status sampleclientset.Foos().UpdateStatus(fooCopy) 把 Deployment.Status 复制到 Foo.Status.AvailableReplicas

这就是经典的"read-then-write 幂等模式"——核心特征:① Get 不存在则 Create;② Get 存在则 Update;③ Update 永远是"基于 Foo 模板重新 newDeployment(foo)"——而不是基于旧 deployment 做增量修改。这种"全量重写"虽然有 Update 操作的 ResourceVersion 冲突风险(API Server 拒更新),但语义最简单、出 bug 概率最低。

💡 小贴士
newDeployment 怎么设 ownerRef?在 controller.go 行 397-400:
OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(foo, SchemeGroupVersion.WithKind("Foo")) }
NewControllerRef 会把 BlockOwnerDeletion 设为 true——删 Foo 之前 Deployment 必须先删。这是 Kubernetes "父子级联"的标准做法,避免"孤儿 Deployment"。

// staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/controller_ref.go(行 59-68)

func NewControllerRef(owner Object, gvk schema.GroupVersionKind) *OwnerReference {
    return &OwnerReference{
        APIVersion:         gvk.GroupVersion().String(),
        Kind:               gvk.Kind,
        Name:               owner.GetName(),
        UID:                owner.GetUID(),
        BlockOwnerDeletion: ptr.To(true),
        Controller:         ptr.To(true),
    }
}

八、workqueue 限流策略:指数退避 / 令牌桶 / 组合限流

client-go 提供了五种 RateLimiter,前三种最常用:

8.1 ItemExponentialFailureRateLimiter(每 key 指数退避)

// staging/src/k8s.io/client-go/util/workqueue/default_rate_limiters.go(行 99-149)

func NewTypedItemExponentialFailureRateLimiter[T comparable](baseDelay time.Duration, maxDelay time.Duration) TypedRateLimiter[T] {
    return &TypedItemExponentialFailureRateLimiter[T]{
        failures:  map[T]int{},
        baseDelay: baseDelay,
        maxDelay:  maxDelay,
    }
}

func (r *TypedItemExponentialFailureRateLimiter[T]) When(item T) time.Duration {
    r.failuresLock.Lock()
    defer r.failuresLock.Unlock()

    exp := r.failures[item]
    r.failures[item] = r.failures[item] + 1
    calculated := r.baseDelay * time.Duration(2^exp)
    if calculated > r.maxDelay {
        return r.maxDelay
    }
    return calculated
}

退避表(baseDelay=5ms):失败 1 次后等 5ms → 2 次后 10ms → 3 次后 20ms → 4 次后 40ms → 5 次后 80ms → …… → 17 次后约 655s → 18 次后超过 1000s 封顶。

适用场景:单个对象反复失败(如用户配错 CR 字段,每次 syncHandler 都被 schema 校验拒)。每 key 独立计数——一个 key 失败 100 次不会拖累其他 key 的处理速度。

8.2 BucketRateLimiter(全局令牌桶)

// staging/src/k8s.io/client-go/util/workqueue/default_rate_limiters.go(行 62-77)

// TypedBucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
type TypedBucketRateLimiter[T comparable] struct {
    *rate.Limiter
}

func (r *TypedBucketRateLimiter[T]) When(item T) time.Duration {
    return r.Limiter.Reserve().Delay()
}

底层就是 golang.org/x/time/rate 标准库令牌桶。Reserve().Delay() 返回"还要等多久才能拿到一个 token"。

适用场景:控制 controller 对 API Server 的总 QPS。当 1000 个 Foo 同时被 reconcile 的时候,没有令牌桶会把 API Server 打爆。

8.3 MaxOfRateLimiter(组合限流)

// staging/src/k8s.io/client-go/util/workqueue/default_rate_limiters.go(行 239-265)

func NewTypedMaxOfRateLimiter[T comparable](limiters ...TypedRateLimiter[T]) TypedRateLimiter[T] {
    return &TypedMaxOfRateLimiter[T]{limiters: limiters}
}

func (r *TypedMaxOfRateLimiter[T]) When(item T) time.Duration {
    ret := time.Duration(0)
    for _, limiter := range r.limiters {
        curr := limiter.When(item)
        if curr > ret {
            ret = curr
        }
    }
    return ret
}

组合多个 limiter,When 返回所有 limiter 的最大值。sample-controller 用 MaxOf(指数退避 + 令牌桶)取两者中较大的延迟——既保护单 key、也保护全局。

8.4 三种"重置限流"路径

别忘了:Forget 才会清除 failures 计数。workqueue.Add 不会清零、workqueue.AddRateLimited 不会清零、workqueue.Done 不会清零。所以 syncHandler 成功时调用 Forget,失败时不调用——这是清零限流的唯一机会。

另一个重置路径:workqueue.ShutDown 整个 controller 进程重启。所有 in-memory 状态丢失。新进程从 informer 重新 List 一次——所有 Foo 重新走一次 syncHandler。


九、优雅停机:ctx.Done → ShutDown → wg.Wait

当 operator 收到 SIGTERM(kubectl delete pod / k8s 滚动更新)时,期望的行为是"处理完当前在跑的 key 再退出,不要丢工作"。sample-controller 的实现分三步:

SIGTERM → signals.SetupSignalHandler cancel ctx  →  controller.Run 主 goroutine unblock  →  defer workqueue.ShutDown 触发  →  workqueue.Get 返回 shutdown=true  →  worker 退出

// staging/src/k8s.io/client-go/util/workqueue/queue.go(行 304-319)

func (q *Typed[T]) ShutDown() {
    defer q.wg.Wait()
    q.stopOnce.Do(func() {
        defer close(q.stopCh)
    })
    q.cond.L.Lock()
    defer q.cond.L.Unlock()
    q.drain = false
    q.shuttingDown = true
    q.cond.Broadcast()
}

ShutDown 做了几件事:① 关闭 stopCh(让 AddAfter 不再 wait);② 置 shuttingDown=true(让 Get 退出阻塞);③ Broadcast 唤醒所有 worker。worker 在 Get 拿到 (item, true) 时立即 return,runWorker 死循环退出。

注意:ShutDown 不清空队列。如果 worker 正在跑 syncHandler,它会跑完、然后 Done、然后 Get 拿到 shutdown=true 退出。所以正在处理的工作不会丢;新加进 workqueue 的 key 会被静默忽略(不报错、不入队)。这对绝大多数场景是可接受的——下次 controller 启动会重新 List + 重新 reconcile。

🌟 实用技巧
生产 operator 还要考虑 SIGTERM 之后的"宽限期"——Kubernetes 默认 30 秒后强杀。配置 Pod spec 里的 terminationGracePeriodSeconds: 60 配合 PreStop hook(如 sleep 5)可以避免在 controller 还在 reconcile 时被 kill。


十、application-operator 完整 Reconcile 实现

把 sample-controller 模板套到 application-operator 上,我们写出自己的 application_controller.go。这一节是上一篇的承诺兑现——把 Reconcile 循环真正写出来。

// application-operator/pkg/controller/application/application_controller.go

package application

import (
    "context"
    "fmt"
    "time"

    "golang.org/x/time/rate"

    appsv1 "k8s.io/api/apps/v1"
    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/api/errors"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    "k8s.io/apimachinery/pkg/util/wait"
    appsinformers "k8s.io/client-go/informers/apps/v1"
    coreinformers "k8s.io/client-go/informers/core/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/kubernetes/scheme"
    typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
    appslisters "k8s.io/client-go/listers/apps/v1"
    corelisters "k8s.io/client-go/listers/core/v1"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/record"
    "k8s.io/client-go/util/workqueue"
    "k8s.io/klog/v2"

    applicationv1alpha1 "mycompany.io/application-operator/pkg/apis/application/v1alpha1"
    clientset "mycompany.io/application-operator/pkg/generated/clientset/versioned"
    samplescheme "mycompany.io/application-operator/pkg/generated/clientset/versioned/scheme"
    informers "mycompany.io/application-operator/pkg/generated/informers/externalversions/application/v1alpha1"
    listers "mycompany.io/application-operator/pkg/generated/listers/application/v1alpha1"
)

const controllerAgentName = "application-operator"

type Controller struct {
    kubeclientset    kubernetes.Interface
    sampleclientset  clientset.Interface

    deploymentsLister appslisters.DeploymentLister
    deploymentsSynced cache.InformerSynced
    servicesLister    corelisters.ServiceLister
    servicesSynced    cache.InformerSynced
    appsLister        listers.ApplicationLister
    appsSynced        cache.InformerSynced

    workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName]
    recorder  record.EventRecorder
}

func NewController(ctx context.Context, kubeclientset kubernetes.Interface, sampleclientset clientset.Interface,
    deploymentInformer appsinformers.DeploymentInformer, serviceInformer coreinformers.ServiceInformer,
    applicationInformer informers.ApplicationInformer) *Controller {

    utilruntime.Must(samplescheme.AddToScheme(scheme.Scheme))

    eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
    eventBroadcaster.StartStructuredLogging(0)
    eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
    recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

    ratelimiter := workqueue.NewTypedMaxOfRateLimiter(
        workqueue.NewTypedItemExponentialFailureRateLimiter[cache.ObjectName](5*time.Millisecond, 1000*time.Second),
        &workqueue.TypedBucketRateLimiter[cache.ObjectName]{Limiter: rate.NewLimiter(rate.Limit(50), 300)},
    )

    controller := &Controller{
        kubeclientset:    kubeclientset,
        sampleclientset:  sampleclientset,
        deploymentsLister: deploymentInformer.Lister(),
        deploymentsSynced: deploymentInformer.Informer().HasSynced,
        servicesLister:   serviceInformer.Lister(),
        servicesSynced:   serviceInformer.Informer().HasSynced,
        appsLister:       applicationInformer.Lister(),
        appsSynced:       applicationInformer.Informer().HasSynced,
        workqueue:        workqueue.NewTypedRateLimitingQueue(ratelimiter),
        recorder:         recorder,
    }

    applicationInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.enqueueApplication,
        UpdateFunc: func(old, new interface{}) { controller.enqueueApplication(new) },
    })
    deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.handleObject,
        UpdateFunc: func(old, new interface{}) {
            newDepl := new.(*appsv1.Deployment)
            oldDepl := old.(*appsv1.Deployment)
            if newDepl.ResourceVersion == oldDepl.ResourceVersion { return }
            controller.handleObject(new)
        },
        DeleteFunc: controller.handleObject,
    })
    serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.handleObject,
        UpdateFunc: func(old, new interface{}) {
            newSvc := new.(*corev1.Service)
            oldSvc := old.(*corev1.Service)
            if newSvc.ResourceVersion == oldSvc.ResourceVersion { return }
            controller.handleObject(new)
        },
        DeleteFunc: controller.handleObject,
    })

    return controller
}

func (c *Controller) Run(ctx context.Context, workers int) error {
    defer utilruntime.HandleCrash()
    defer c.workqueue.ShutDown()

    if ok := cache.WaitForCacheSync(ctx.Done(), c.deploymentsSynced, c.servicesSynced, c.appsSynced); !ok {
        return fmt.Errorf("failed to wait for caches to sync")
    }
    for i := 0; i < workers; i++ {
        go wait.UntilWithContext(ctx, c.runWorker, time.Second)
    }
    <-ctx.Done()
    return nil
}

func (c *Controller) runWorker(ctx context.Context) {
    for c.processNextWorkItem(ctx) {
    }
}

func (c *Controller) processNextWorkItem(ctx context.Context) bool {
    objRef, shutdown := c.workqueue.Get()
    if shutdown { return false }
    defer c.workqueue.Done(objRef)

    err := c.syncHandler(ctx, objRef)
    if err == nil {
        c.workqueue.Forget(objRef)
        return true
    }
    utilruntime.HandleErrorWithContext(ctx, err, "Error syncing; requeuing", "objectReference", objRef)
    c.workqueue.AddRateLimited(objRef)
    return true
}

func (c *Controller) enqueueApplication(obj interface{}) {
    if objectRef, err := cache.ObjectToName(obj); err == nil {
        c.workqueue.Add(objectRef)
    }
}

func (c *Controller) handleObject(obj interface{}) {
    var object metav1.Object
    var ok bool
    if object, ok = obj.(metav1.Object); !ok {
        tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
        if !ok { return }
        object, _ = tombstone.Obj.(metav1.Object)
    }
    if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
        if ownerRef.Kind != "Application" { return }
        app, err := c.appsLister.Applications(object.GetNamespace()).Get(ownerRef.Name)
        if err != nil { return }
        c.enqueueApplication(app)
    }
}

func (c *Controller) syncHandler(ctx context.Context, objectRef cache.ObjectName) error {
    // 1) 读期望
    app, err := c.appsLister.Applications(objectRef.Namespace).Get(objectRef.Name)
    if errors.IsNotFound(err) { return nil }
    if err != nil { return err }
    if app.Spec.Image == "" { return nil }

    // 2) 读实际 Deployment
    deploymentName := app.Name
    deployment, err := c.deploymentsLister.Deployments(app.Namespace).Get(deploymentName)
    if errors.IsNotFound(err) {
        deployment, err = c.kubeclientset.AppsV1().Deployments(app.Namespace).Create(ctx, newDeployment(app), metav1.CreateOptions{FieldManager: controllerAgentName})
    }
    if err != nil { return err }
    if !metav1.IsControlledBy(deployment, app) {
        msg := fmt.Sprintf("Deployment %q already exists and is not managed by Application", deployment.Name)
        c.recorder.Event(app, corev1.EventTypeWarning, "ErrResourceExists", msg)
        return fmt.Errorf("%s", msg)
    }
    if app.Spec.Replicas != nil && *app.Spec.Replicas != *deployment.Spec.Replicas {
        deployment, err = c.kubeclientset.AppsV1().Deployments(app.Namespace).Update(ctx, newDeployment(app), metav1.UpdateOptions{FieldManager: controllerAgentName})
    }
    if err != nil { return err }

    // 3) 读实际 Service
    var service *corev1.Service
    if app.Spec.Expose != "" {
        service, err = c.servicesLister.Services(app.Namespace).Get(deploymentName)
        if errors.IsNotFound(err) {
            service, err = c.kubeclientset.CoreV1().Services(app.Namespace).Create(ctx, newService(app), metav1.CreateOptions{FieldManager: controllerAgentName})
        }
        if err != nil { return err }
        if !metav1.IsControlledBy(service, app) {
            return fmt.Errorf("Service %q already exists and is not managed by Application", service.Name)
        }
    }

    // 4) 回填 status
    return c.updateApplicationStatus(ctx, app, deployment, service)
}

func (c *Controller) updateApplicationStatus(ctx context.Context, app *applicationv1alpha1.Application, deployment *appsv1.Deployment, service *corev1.Service) error {
    appCopy := app.DeepCopy()                                     // 永远深拷贝
    appCopy.Status.DeploymentStatus = deployment.Status           // 原样复制
    if service != nil {
        appCopy.Status.ServiceStatus = service.Status             // 原样复制
    }
    // 推导 Phase
    switch {
    case isDeploymentFailed(deployment):
        appCopy.Status.Phase = applicationv1alpha1.ApplicationPhaseFailed
        appCopy.Status.Message = "Deployment 滚动更新超时"
    case isDeploymentAvailable(deployment):
        appCopy.Status.Phase = applicationv1alpha1.ApplicationPhaseRunning
        appCopy.Status.Message = fmt.Sprintf("%d/%d 副本可用",
            deployment.Status.AvailableReplicas, *deployment.Spec.Replicas)
    default:
        appCopy.Status.Phase = applicationv1alpha1.ApplicationPhasePending
        appCopy.Status.Message = "正在启动"
    }
    _, err := c.sampleclientset.ApplicationV1alpha1().Applications(app.Namespace).
        UpdateStatus(ctx, appCopy, metav1.UpdateOptions{FieldManager: controllerAgentName})
    return err
}

func isDeploymentAvailable(d *appsv1.Deployment) bool {
    for _, cond := range d.Status.Conditions {
        if cond.Type == appsv1.DeploymentAvailable && cond.Status == corev1.ConditionTrue {
            return true
        }
    }
    return false
}

func isDeploymentFailed(d *appsv1.Deployment) bool {
    for _, cond := range d.Status.Conditions {
        if cond.Type == appsv1.DeploymentProgressing && cond.Status == corev1.ConditionFalse &&
            cond.Reason == "ProgressDeadlineExceeded" {
            return true
        }
    }
    return false
}

func newDeployment(app *applicationv1alpha1.Application) *appsv1.Deployment {
    replicas := app.Spec.Replicas
    if replicas == nil {
        replicas = ptr.To[int32](1)
    }
    return &appsv1.Deployment{
        ObjectMeta: metav1.ObjectMeta{
            Name: app.Name, Namespace: app.Namespace,
            OwnerReferences: []metav1.OwnerReference{
                *metav1.NewControllerRef(app, applicationv1alpha1.SchemeGroupVersion.WithKind("Application")),
            },
        },
        Spec: appsv1.DeploymentSpec{
            Replicas: replicas,
            Selector: &metav1.LabelSelector{MatchLabels: labels(app)},
            Template: corev1.PodTemplateSpec{
                ObjectMeta: metav1.ObjectMeta{Labels: labels(app)},
                Spec: corev1.PodSpec{
                    Containers: []corev1.Container{{
                        Name:  "app",
                        Image: app.Spec.Image,
                        Ports: []corev1.ContainerPort{{ContainerPort: app.Spec.Port}},
                    }},
                },
            },
        },
    }
}

func newService(app *applicationv1alpha1.Application) *corev1.Service {
    return &corev1.Service{
        ObjectMeta: metav1.ObjectMeta{
            Name: app.Name, Namespace: app.Namespace,
            OwnerReferences: []metav1.OwnerReference{
                *metav1.NewControllerRef(app, applicationv1alpha1.SchemeGroupVersion.WithKind("Application")),
            },
        },
        Spec: corev1.ServiceSpec{
            Type: app.Spec.Expose,
            Selector: labels(app),
            Ports: []corev1.ServicePort{{
                Port: app.Spec.Port, TargetPort: intstr.FromInt32(app.Spec.Port),
            }},
        },
    }
}

func labels(app *applicationv1alpha1.Application) map[string]string {
    return map[string]string{
        "app":   app.Name,
        "owner": "application-operator",
    }
}

对比 sample-controller 的源码,application-operator 主要多了两点:

  • Service informer 同步处理:用户 Spec.Expose != "" 时 operator 创建 Service;Service 变化时也通过 handleObject 反向 enqueue 对应 Application
  • 顶层 Phase 推导:复用 DeploymentStatus 的 Conditions 字段判 Available/Progressing/ProgressDeadlineExceeded,再映射到 Application 自己的 Phase:Pending / Running / Failed

至此,application-operator 已经具备"提交 Application YAML 后自动创建 Deployment + Service、回填 status"的能力。下一篇我们会讲生产化必备的几件套:metrics、health probe、leader election、TLS serving cert 管理。


十一、常见 12 问 FAQ


▼ Q1: 我看到很多 controller 用 controller-runtime 库(kubebuilder 风格),为什么 sample-controller 不用?哪个更好?

A: 两者本质相同——controller-runtime 是对裸 client-go 模式的"标准库化"封装。sample-controller 是"教育级"代码,让你看到每一层如何搭建;controller-runtime 帮你封装好 reconciliation 框架、admission webhook、metrics、leader election 等样板。生产上推荐 controller-runtime,但看懂 sample-controller 之后,controller-runtime 里的 manager / controller / Reconciler 三个核心对象你才能真正理解。它不是黑盒。本文的 Reconcile 流程分析对两者都适用,因为底层都是 client-go。


▼ Q2: handleObject 里的 DeletedFinalStateUnknown 是什么鬼?为什么必须处理?

A: 它是 client-go 在"丢失 watch 删除事件"时用的兜底类型。考虑时序:① informer 收到 watch 事件 "Deployment 删除";② 处理这个事件时网络断开;③ informer 重新 List 发现 Deployment 真的不在了;④ informer 不知道对象"曾经长什么样",就用 DeletedFinalStateUnknown 包装最后一次缓存值作为 tombstone 投递给 handler。不处理 tombstone 会导致 type assert panic。sample-controller 的 obj.(metav1.Object) 断言失败时尝试 obj.(cache.DeletedFinalStateUnknown) 就是这个兜底。


▼ Q3: workqueue 的 dirty + processing 双集合设计有什么讲究?

A: 这是 client-go 解决"处理期间被多次 enqueue"的关键。dirty 集合:标记"该 key 还需要被处理"(Add 时入、Get 时出);processing 集合:标记"该 key 正在被处理"(Get 时入、Done 时出)。Done 时如果 dirty 还有该 key(即处理期间又 Add 过了),就重新 Push 到 queue。这样:① 多个 Add 合并成一次入队;② Get 时 key 一定只被一个 worker 持有(保证顺序处理);③ Done 触发"是否重新入队"判断。


▼ Q4: ResourceVersion 周期性 resync 是什么?为什么要做?

A: informer 每 resyncPeriod(默认 10 分钟)会把所有缓存对象当成"更新"投递一次给 handler。目的是兜底:万一 watch 漏事件、controller 漏 reconcile,靠 resync 能重新触发一遍。代价是 handler 被"伪更新"轰炸,所以每个 UpdateFunc 都该用 ResourceVersion 判断过滤——sample-controller 的 deploymentInformer UpdateFunc 就是这么做的。这是写 controller 的"标准防御"代码,不写也不会立即出 bug,但写了你才能在异常时拿到正确的事件。


▼ Q5: 我看到别人代码用 watch 而不是 informer,性能差别大吗?

A: 直接用 watch 你要自己实现:List 拿全量、watch 接增量、自己维护本地缓存、自己处理 watch 断开重新 list 的逻辑。informer 全部封装好。性能差别:informer 的本地缓存让 lister.Get 不打网络,每秒成千上万次读也无所谓;裸 watch 每次都打 API Server,几百个对象就可能触发限流。所以 99% 场景应该用 informer 而不是裸 watch。


▼ Q6: WaitForCacheSync 等多久算"太久"?

A: 正常 1-3 秒完成(一次 List 调用 + 解析)。超过 30 秒通常意味着:① 集群 API Server 卡了;② 你 watch 的资源量很大(10w+ Pods);③ informer 配错了(错误的 namespace selector、错误的 resyncPeriod)。Sample-controller 的 WaitForCacheSync 写法是 100ms 轮询一次(syncedPollPeriod)。生产中要加 timeout 监控——WaitForCacheSync 超过 60s 还没返回就该报警。


▼ Q7: worker 数设多少合适?

A: sample-controller 写死 2。生产经验公式:worker = min(CPU 核数, 5)。worker 不是越多越好——多了虽然并行处理变多,但 API Server QPS 也成比例上升,最终撞上限流(令牌桶 + API Server 自身的 client-go rate limiter)。还要看 syncHandler 的"重操作"占比:纯本地计算可以堆 worker 到 10+,调外部 API 超过 2-3 worker 就开始拖慢。最佳实践:跑小规模负载后用 pprof 看 worker goroutine 的 CPU 占用,再调。


▼ Q8: 我 controller 跑起来 events 没有,怎么办?

A: 三步排查:① eventBroadcaster.StartRecordingToSink 调了没?这是把事件发到 API Server 的关键调用;② operator ServiceAccount 有 create events 权限没?RBAC 加 events: []string{"create"};③ kubectl describe foo 看 Events 区段。如果都正常还看不到事件,多半是 record.NewBroadcaster 启动顺序问题——必须在 informer.Run 之前调用 broadcaster 的 Start* 方法。


▼ Q9: 同一对象的多个事件被合并了吗?是怎么合并的?

A: 在三个层面做合并:① DeltaFIFO 层——同一个对象连续的 Update 事件合并成最新的 Delta;② sharedProcessor 层——同一 listener 的通知会被 addCh 缓冲,下一个通知到达时如果上一个还没消费,覆盖式写入;③ workqueue 层——dirty 集合保证同一 key 在 processing 期间只入队一次。效果:用户连续改 100 次 spec,controller 最终只跑 1-3 次 syncHandler。但注意:"合并"不是"丢事件"——最终一次 reconcile 拿到的是最新 spec,没有漏处理。


▼ Q10: 为什么 UpdateStatus 失败要 requeue?我明明只是写 status 而已。

A: 几种典型原因:① resourceVersion 冲突(用户或别的 operator 改了 spec);② 5xx 临时故障(API Server 过载);③ 权限不足(RBAC 漏配 update status)。requeue 让 controller 自动重试——第一次 status 没回填成功,5ms 后再来,再失败 10ms 后再来,指数退避天然保护。不可恢复的错误(如 schema 校验失败)会无限重试,需要在 syncHandler 里加分类处理:业务错误 → return nil 放弃;临时错误 → return err 重试。


▼ Q11: 看 controller-runtime 的源码时遇到 Predicate 过滤,sample-controller 没这个概念。是什么?

A: Predicate 是 controller-runtime 在 AddEventHandler 时多包的一层过滤器。比如"只关心 status 变化的 Update"或"只关心有特定 label 的对象"。它和 sample-controller 的 ResourceVersion 判断本质一样——都是减少无意义的 handler 调用,但 controller-runtime 的 Predicate 更声明式。生产中如果资源量大、事件量多,加 Predicate 能让 controller 减少 30-50% 的 CPU。


▼ Q12: 怎么测试 controller?fake client 怎么用?

A: 两类测试:① 单元测试——用 client-go/fake + informers 构造"假的"controller runtime:fake.NewSimpleClientset() 装初始对象,informers.NewSharedInformerFactory(fakeClient, 0) 拿 informer,ctrl.NewController(...).Run() 跑起来,断言 Deployment 真的被创建了;② envtest(controller-runtime 提供)——启动一个真实的 etcd + kube-apiserver(用 kubebuilder envtest 工具),跑真集成测试。envtest 比 fake 更接近生产环境,推荐 controller 业务逻辑用 envtest、工具函数用 fake。sample-controller 的 controller_test.go 用的是 fake 模式。


十二、下一篇预告

下一篇我们把 application-operator 升级为"生产级"——加 metrics、health probe、leader election、TLS 证书管理与 e2e 测试。具体会展开:

  • workqueue 指标(depth、adds、latency)怎么注册到 controller-runtime MetricsServer
  • leaderelection.Lease 选举 + 为什么 lease 必须是 ConfigMap 之外的 Lease 资源
  • cert-watcher 给 admission webhook 自动加载 / 热更新 TLS 证书
  • controller-runtime 的 Manager.startHook 启动 healthz / readyz 探针
  • envtest + Ginkgo BDD 写 e2e 测试,验证 reconcile 整条链路
  • 用 kustomize 把 operator 自己(Deployment、ServiceAccount、ClusterRole)打成一份可发布制品

本文源码参考:
   • sample-controller controller.go(行 68-156, 162-188, 200-236, 241-326, 388-421)
   • cache/controller.go(行 124-209, 279-300, 394-400)
   • shared_informer.go(行 284-340, 413-432, 715-798, 852-873, 1289-1328)
   • workqueue/queue.go(行 42-60, 140-182, 220-319)
   • workqueue/rate_limiting_queue.go(行 27-149)
   • workqueue/default_rate_limiters.go(行 30-291)
   • reflector.go(行 296-420)
   • NewControllerRef 源码(行 59-68)

Kubernetes 编程 / Operator 专题【左扬精讲】—— application-operator Reconcile 循环源码精讲 · 来源:Kubernetes 1.36.1 源码 + sample-controller + client-go cache/workqueue