






















上一篇我们搭好了 application-operator 的项目骨架、定义了 Application CRD、用 DeploymentStatus / ServiceStatus 复用了 Kubernetes 原生状态模型,并生成了全套 clientset / informers / listers / applyconfiguration 代码。
但operator 的"大脑"还没写——提交 Application YAML 之后,operator 不会自动创建 Deployment,也不会回填 status。这一篇我们把 controller.go 的每一行都拆开看,从 client-go 最底层的 controller.Run 一路追到业务函数 syncHandler。
读完这篇你将能:独立画出一张从"用户 kubectl apply"到"Deployment 创建成功"的全链路时序图,标出每一个 goroutine 何时启动、何时阻塞、何时返回。更重要的是,你能直接读懂 client-go 源码——以后遇到 controller 不工作、status 不更新、deployment 重复创建这些玄学问题,知道该去哪个文件、哪一行加日志。
Kubernetes 1.36.1 Go 1.26 client-go cache workqueue Reflector SharedInformer
🔓 学习重点提示 — 全文可一次性读,再回头看重点
★ 重点掌握(必须)
• Controller struct 字段:6 个字段每个的用途与生命周期
• NewController 装配流程:informer 事件 → workqueue 排队的完整链路
• Run 启动三件套:Reflector / processLoop / Worker 三层 goroutine
• processNextWorkItem 的五段式:Get → Sync → Done/Forget/AddRateLimited
• syncHandler 的 read-then-write 幂等模式:Get / Create / Update 流程
• workqueue 三种限流器:指数退避 / 令牌桶 / 组合限流
• OwnerReference 与 handleObject 双向桥
☆ 次重点(了解即可)
• sharedProcessor.distribute 把事件 fan-out 到所有 listener
• processorListener.addCh / nextCh / pendingNotifications 三缓冲机制
• ResourceEventHandlerFuncs 适配器与 OnAdd/OnUpdate/OnDelete 钩子
Kubernetes client-go 的 Informer 体系被设计为"三层 goroutine + 两条数据通道"。所有 controller 模式(无论是裸 client-go 写的 sample-controller,还是 controller-runtime 简化版)都遵循这套骨架。看图:
API Server
|
| (1) List + Watch
v
+-------------------+ +---------------------+
| Reflector | (2) Δ | sharedProcessor |
| (ListAndWatch) | ---------> | + listeners[] |
+-------------------+ DeltaFIFO | - pop() |
| | - distribute() |
| HasSynced (after +---------+-----------+
| initial list) |
v | (3) add notification
+-------------------+ v
| processLoop | +---------+-----------+
| (c.config.Pop) | | processorListener |
+--------+----------+ | addCh -> pop() |
| | -> nextCh -> run() |
| (4) key (NS/Name) +---------+-----------+
| |
v | (5) handler.OnAdd(obj)
+--------+----------+ v
| workqueue | +---------+-----------+
| (rate limited) |
逐层职责:
💡 小贴士
为什么不让 handler 直接做 reconcile?两个原因:① handler 跟 reflector 在不同 goroutine,让 handler 慢会卡住 reflector 的 watch 流(最终导致 watch 断开);② 入队 + worker 模式天然支持"同一个 key 多次变化合并成一次处理"——workqueue 的 dirty 集合会 dedup。
看 sample-controller 的 Controller struct 定义(行 68-89)。我们逐字段过:
// staging/src/k8s.io/sample-controller/controller.go(行 68-89)
// Controller is the controller implementation for Foo resources
type Controller struct {
// kubeclientset is a standard kubernetes clientset
kubeclientset kubernetes.Interface
// sampleclientset is a clientset for our own API group
sampleclientset clientset.Interface
deploymentsLister appslisters.DeploymentLister
deploymentsSynced cache.InformerSynced
foosLister listers.FooLister
foosSynced cache.InformerSynced
// workqueue is a rate limited work queue. This is used to queue work to be
// processed instead of performing it as soon as a change happens. This
// means we can ensure we only process a fixed amount of resources at a
// time, and makes it easy to ensure we are never processing the same item
// simultaneously in two different workers.
workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName]
// recorder is an event recorder for recording Event resources to the
// Kubernetes API.
recorder record.EventRecorder
}
逐个解释:
| 字段 | 类型 | 职责 |
|---|---|---|
| kubeclientset | kubernetes.Interface | 原生资源 clientset(Deployment、Service)。用于"写入"——syncHandler 调 Create/Update Deployment 时走它 |
| sampleclientset | clientset.Interface | 我们自己的 CRD clientset。updateFooStatus 调 UpdateStatus 时走它 |
| deploymentsLister | appslisters.DeploymentLister | Deployment 本地缓存的读接口。syncHandler 用它 Get Deployment——不经过 API Server,超快 |
| deploymentsSynced | cache.InformerSynced | 一个函数,返回 Deployment informer 是否完成初始 List。Run 阶段用 WaitForCacheSync 等待 |
| foosLister | listers.FooLister | Foo 本地缓存的读接口。syncHandler 第一行 Get 用户的 Foo 走它 |
| foosSynced | cache.InformerSynced | Foo informer 的 HasSynced 状态指示 |
| workqueue | TypedRateLimitingInterface | 去重 + 限流 + FIFO 队列。handler enqueue 后立即返回;worker 从它 Get 出来处理 |
| recorder | record.EventRecorder | Kubernetes Event 记录器。recorder.Event(foo, Type, Reason, Msg) 会在 Event 流里产生一条记录,kubectl describe foo 能看到 |
新手最容易疑惑的是 xxxLister 和 xxxClientset 的区别:Lister 只读、读本地缓存、不发网络请求;Clientset 可读可写、每次都打网络。Lister 用于"查询"路径(syncHandler 第一步"读期望"),Clientset 用于"修改"路径(创建 Deployment、回填 status)。如果 syncHandler 用 clientset.Get() 来读 Foo,会发现 informer 缓存和 API Server 状态可能错位——这是经典 bug。
⚠️ 警告
Lister 返回的对象是只读缓存——千万别直接 c.foosLister.Foos(ns).Get(name).Spec.Replicas = &newVal 这种写法。那样会改坏本地缓存,下一个 List 拿到错误数据,reconcile 进入死循环。正确做法是 Get → DeepCopy → 修改副本 → 写回。
NewController 是 Controller 的"构造函数"——它做三件事:① 装配 eventBroadcaster + recorder;② 构造限流器 + workqueue;③ 给两个 informer 注册事件处理器。
// staging/src/k8s.io/sample-controller/controller.go(行 91-156)
func NewController(
ctx context.Context,
kubeclientset kubernetes.Interface,
sampleclientset clientset.Interface,
deploymentInformer appsinformers.DeploymentInformer,
fooInformer informers.FooInformer) *Controller {
logger := klog.FromContext(ctx)
// Create event broadcaster
// Add sample-controller types to the default Kubernetes Scheme so Events can be
// logged for sample-controller types.
utilruntime.Must(samplescheme.AddToScheme(scheme.Scheme))
logger.V(4).Info("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
ratelimiter := workqueue.NewTypedMaxOfRateLimiter(
workqueue.NewTypedItemExponentialFailureRateLimiter[cache.ObjectName](5*time.Millisecond, 1000*time.Second),
&workqueue.TypedBucketRateLimiter[cache.ObjectName]{Limiter: rate.NewLimiter(rate.Limit(50), 300)},
)
controller := &Controller{
kubeclientset: kubeclientset,
sampleclientset: sampleclientset,
deploymentsLister: deploymentInformer.Lister(),
deploymentsSynced: deploymentInformer.Informer().HasSynced,
foosLister: fooInformer.Lister(),
foosSynced: fooInformer.Informer().HasSynced,
workqueue: workqueue.NewTypedRateLimitingQueue(ratelimiter),
recorder: recorder,
}
logger.Info("Setting up event handlers")
// Set up an event handler for when Foo resources change
fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueFoo,
UpdateFunc: func(old, new interface{}) {
controller.enqueueFoo(new)
},
})
// Set up an event handler for when Deployment resources change.
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleObject,
UpdateFunc: func(old, new interface{}) {
newDepl := new.(*appsv1.Deployment)
oldDepl := old.(*appsv1.Deployment)
if newDepl.ResourceVersion == oldDepl.ResourceVersion {
return
}
controller.handleObject(new)
},
DeleteFunc: controller.handleObject,
})
return controller
}
把这段代码切成"配置 / 构造 / 注册"三块:
EventBroadcaster 是一个独立组件——它内部起 goroutine 把 recorder 写入的事件批量发到 API Server。三个调用:
MaxOfRateLimiter = 多个 limiter 取最大延迟。这里组合了两个:
两者的较大值生效——既不会全局突发把 API Server 打爆,单个 key 也不会无限快速重试。这是个 production-grade 默认值。
两个 informer 都注册了事件处理器,但用途不同:
| Informer | 事件 | 入队 key | 原因 |
|---|---|---|---|
| Foo | Add / Update | Foo 自身 NS/Name | 用户改了 spec / 新建了 Foo,要触发 reconcile |
| Deployment | Add / Update / Delete | Deployment 的 ownerRef 指向的 Foo NS/Name | Deployment 状态变了(Replicas 增减、Conditions 变化),要触发对应 Foo 的 status 回填 |
后者的"通过 ownerRef 反向找 Foo"机制是 controller 设计的精髓——它让我们不用为每个相关资源都写 handler。一个 Pod 创建只需要 Deployment handler + 资源本身的 ownerRef 桥,operator 就能把 Pod 状态变化回填到 Foo.status。
🌟 实用技巧
Deployment handler 里的 ResourceVersion 判断(行 145-149)是处理"informer 周期性 resync 触发伪 Update"的兜底逻辑——同一个对象真正发生变化时 new.ResourceVersion ≠ old.ResourceVersion;resync 触发的伪 Update 两者相等。这种检查必须放在每个 informer 的 UpdateFunc 头部,否则 controller 会被无意义的 update 淹没。
client-go 的事件机制是"fan-out"——一个 informer 可以有任意多个 handler,事件被广播给所有 handler。每个 handler 由一个 processorListener 后台 goroutine 独立消费。
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go(行 284-340)
// SharedIndexInformer provides add and get Indexers ability based on SharedInformer.
type SharedIndexInformer interface {
SharedInformer
AddIndexers(indexers Indexers) error
GetIndexer() Indexer
}
func NewSharedIndexInformerWithOptions(lw ListerWatcher, exampleObject runtime.Object, options SharedIndexInformerOptions) SharedInformer {
realClock := &clock.RealClock{}
processor := &sharedProcessor{clock: realClock}
processor.listenersRCond = sync.NewCond(processor.listenersLock.RLocker())
return &sharedIndexInformer{
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, options.Indexers, ...),
processor: processor,
synced: make(chan struct{}),
listerWatcher: lw,
...
}
}
sharedIndexInformer 的核心结构是 indexer(本地缓存)+ processor(事件分发器)。processor 内部用 listeners[] 数组保存所有已注册的 processorListener,每个 listener 跑在独立 goroutine。
// staging/src/k8s.io/client-go/tools/cache/controller.go(行 279-300 附近)
// ResourceEventHandler can handle notifications for events that
// happen to a resource.
type ResourceEventHandler interface {
OnAdd(obj interface{}, isInInitialList bool)
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}
// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
// as few of the notification functions as you want while still implementing
// ResourceEventHandler.
type ResourceEventHandlerFuncs struct {
AddFunc func(obj interface{})
UpdateFunc func(oldObj, newObj interface{})
DeleteFunc func(obj interface{})
}
注意两个细节:① 真实接口是 OnAdd / OnUpdate / OnDelete 三个方法,sample-controller 用的是 ResourceEventHandlerFuncs 适配器,只需要写三个 func 字段;② OnAdd 的第二个参数 isInInitialList 表示"这次 Add 是来自初始 List 还是后续 Watch 事件"——这个标志用于区分"新对象"和"已经存在的对象重新通知"。
processorListener 是 client-go 设计中"应对慢 handler"的关键。它内部用 3 个数据结构:
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go(行 1289-1328)
func (p *processorListener) add(notification interface{}) {
if a, ok := notification.(addNotification); ok && a.isInInitialList {
p.syncTracker.Start()
}
p.addCh
工作机制:
三缓冲让"producer(distribute)"和"consumer(handler)"解耦——handler 即便被卡住几秒,informer 主循环也不会被阻塞。
💡 小贴士
如果你的 controller 经常"事件堆积"(kubectl get pod 看到 queue 高水位),多半是 handler 太慢(同步打了 klog 阻塞了 goroutine)或者 workqueue.AddRateLimited 没正确调用。检查 enqueueFoo 是否只做 cache.ObjectToName + workqueue.Add,然后立即 return——任何重操作都应该挪到 syncHandler。
// staging/src/k8s.io/sample-controller/controller.go(行 331-383)
// enqueueFoo takes a Foo resource and converts it into a namespace/name
// string which is then put onto the work queue.
func (c *Controller) enqueueFoo(obj interface{}) {
if objectRef, err := cache.ObjectToName(obj); err != nil {
utilruntime.HandleError(err)
return
} else {
c.workqueue.Add(objectRef)
}
}
// handleObject will take any resource implementing metav1.Object and attempt
// to find the Foo resource that 'owns' it.
func (c *Controller) handleObject(obj interface{}) {
var object metav1.Object
var ok bool
if object, ok = obj.(metav1.Object); !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleErrorWithContext(...)
return
}
object, ok = tombstone.Obj.(metav1.Object)
if !ok {
utilruntime.HandleErrorWithContext(...)
return
}
}
if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
// If this object is not owned by a Foo, we should not do anything
if ownerRef.Kind != "Foo" {
return
}
foo, err := c.foosLister.Foos(object.GetNamespace()).Get(ownerRef.Name)
if err != nil {
return
}
c.enqueueFoo(foo)
return
}
}
两个 handler 的设计哲学截然不同:
注意 handleObject 里那个 DeletedFinalStateUnknown 分支——这是 informer 在"watch 断开重连后丢失删除事件"时使用的兜底类型。Kubernetes 用 tombstone 标记"这个对象曾经被删过、但我拿不到最终状态"。生产中这种情况比较罕见,但代码必须有兜底。
看 controller.Run 启动过程(行 162-188):
// staging/src/k8s.io/sample-controller/controller.go(行 162-188)
func (c *Controller) Run(ctx context.Context, workers int) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
logger := klog.FromContext(ctx)
logger.Info("Starting Foo controller")
// Wait for the caches to be synced before starting workers
logger.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(ctx.Done(), c.deploymentsSynced, c.foosSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
logger.Info("Starting workers", "count", workers)
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
}
logger.Info("Started workers")
<-ctx.Done()
logger.Info("Shutting down workers")
return nil
}
逐行拆解:
c.foosSynced 和 c.deploymentsSynced 都是函数引用,对应 informer.HasSynced)wait.UntilWithContext(ctx, c.runWorker, time.Second) 启动 N 个 worker;time.Second 是异常恢复间隔(runWorker panic 后 1s 重启)WaitForCacheSync 是关键屏障。源码:
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go(行 413-432)
// WaitForCacheSync waits for caches to populate. It returns true if it was successful, false // if the controller should shutdown func WaitForCacheSync(stopCh
实现简单但重要——每 100ms 轮询一次所有 cacheSyncs 函数,全部返回 true 才通过;任何一个 informer 没完成初始 List,worker 就不能启动。为什么必须等?因为 worker 第一行往往是 c.foosLister.Foos(ns).Get(name)——如果 List 没完成,lister 拿不到数据,要么 NotFound 错,要么拿到空。
⚠️ 警告
如果忘记 WaitForCacheSync,controller 启动会立即进入 worker,lister 还没数据→Get 返回 NotFound→processNextWorkItem 把 key 重新入队(requeue 限流退避)→1s 后重试。表面看就是"controller 启动 1 秒后才正常工作",但 CRD 资源量大时延迟会更明显。生产级 controller 一定要加 WaitForCacheSync。
在 sharedInformer.Run 中,下层 controller.RunWithContext 启动 Reflector 和 processLoop(行 715-790):
// staging/src/k8s.io/client-go/tools/cache/controller.go(行 169-209)
func (c *controller) RunWithContext(ctx context.Context) {
defer utilruntime.HandleCrashWithContext(ctx)
go func() {
<-ctx.Done()
c.config.Queue.Close()
}()
logger := klog.FromContext(ctx)
r := NewReflectorWithOptions(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
ReflectorOptions{
Logger: &logger,
ResyncPeriod: c.config.FullResyncPeriod,
MinWatchTimeout: c.config.MinWatchTimeout,
...
},
)
...
var wg wait.Group
wg.StartWithContext(ctx, r.RunWithContext) // Reflector goroutine
wait.UntilWithContext(ctx, c.processLoop, time.Second) // processLoop goroutine
wg.Wait()
}
两个 goroutine 协作:
worker goroutine 跑的是 runWorker(行 193-196),runWorker 反复调 processNextWorkItem。processNextWorkItem 是整个 controller 的"心脏"——它把"从 workqueue 取 key、跑业务逻辑、根据结果决定 Forget/AddRateLimited"封装成五段式:
// staging/src/k8s.io/sample-controller/controller.go(行 200-236)
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
objRef, shutdown := c.workqueue.Get()
logger := klog.FromContext(ctx)
if shutdown {
return false
}
// We call Done at the end of this func so the workqueue knows we have
// finished processing this item. We also must remember to call Forget
// if we do not want this work item being re-queued.
defer c.workqueue.Done(objRef)
// Run the syncHandler, passing it the structured reference to the object to be synced.
err := c.syncHandler(ctx, objRef)
if err == nil {
// If no error occurs then we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(objRef)
logger.Info("Successfully synced", "objectName", objRef)
return true
}
// there was a failure so be sure to report it.
utilruntime.HandleErrorWithContext(ctx, err, "Error syncing; requeuing for later retry", "objectReference", objRef)
// since we failed, we should requeue the item to work on later.
c.workqueue.AddRateLimited(objRef)
return true
}
五段式:
① Get → ② defer Done → ③ syncHandler → ④ err==nil ? Forget : AddRateLimited → ⑤ return true
workqueue.Get 行为是"阻塞拿一个 key,没有就等"。其内部用 sync.Cond 实现:
// staging/src/k8s.io/client-go/util/workqueue/queue.go(行 265-302)
func (q *Typed[T]) Get() (item T, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for q.queue.Len() == 0 && !q.shuttingDown {
q.cond.Wait()
}
if q.queue.Len() == 0 {
return *new(T), true
}
item = q.queue.Pop()
q.metrics.get(item)
q.processing.Insert(item)
q.dirty.Delete(item)
return item, false
}
func (q *Typed[T]) Done(item T) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.metrics.done(item)
q.processing.Delete(item)
if q.dirty.Has(item) {
q.queue.Push(item)
q.cond.Signal()
} else if q.processing.Len() == 0 {
q.cond.Signal()
}
}
Get 有几个关键状态:
这就是工作队列"去重"的精髓——同一个 key 在处理期间被多次 Add,只在 Done 时入队一次。
syncHandler 成功 → Forget 清除这个 key 的失败计数;失败 → AddRateLimited 按限流策略重新入队。这两个动作对应 rateLimitingType:
// staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue.go(行 129-149)
type rateLimitingType[T comparable] struct {
TypedDelayingInterface[T]
rateLimiter TypedRateLimiter[T]
}
// AddRateLimited AddAfter's the item based on the time when the rate limiter says it's ok
func (q *rateLimitingType[T]) AddRateLimited(item T) {
q.TypedDelayingInterface.AddAfter(item, q.rateLimiter.When(item))
}
func (q *rateLimitingType[T]) Forget(item T) {
q.rateLimiter.Forget(item)
}
func (q *rateLimitingType[T]) NumRequeues(item T) int {
return q.rateLimiter.NumRequeues(item)
}
AddRateLimited 内部就是 AddAfter(item, rateLimiter.When(item))——计算"还要等多久才能处理这个 key",然后加到延时队列。For Each failure,ItemExponentialFailureRateLimiter 的 When 返回值是 baseDelay * 2^failures:
// staging/src/k8s.io/client-go/util/workqueue/default_rate_limiters.go(行 99-149)
func NewTypedItemExponentialFailureRateLimiter[T comparable](baseDelay time.Duration, maxDelay time.Duration) TypedRateLimiter[T] {
return &TypedItemExponentialFailureRateLimiter[T]{
failures: map[T]int{},
baseDelay: baseDelay,
maxDelay: maxDelay,
}
}
func (r *TypedItemExponentialFailureRateLimiter[T]) When(item T) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
exp := r.failures[item]
r.failures[item] = r.failures[item] + 1
// The backoff is capped such that 'calculated' value overflows int64 (when a long-running item stays in the queue
// for hours/days and is constantly being re-queued).
calculated := r.baseDelay * time.Duration(2^exp)
if calculated > r.maxDelay {
return r.maxDelay
}
return calculated
}
func (r *TypedItemExponentialFailureRateLimiter[T]) Forget(item T) {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
delete(r.failures, item)
}
退避策略:5ms → 10ms → 20ms → 40ms …… 1000s 封顶。一旦 Forget 清除 failures 计数,下次 AddRateLimited 又从 5ms 重新开始。
很多新手会写 if err != nil { return err } 就 return 整个 processNextWorkItem,结果 panic / controller 退出。正确的是 processNextWorkItem 永远只 return true(继续跑)或 false(停机),错误处理只能通过 AddRateLimited 把 key 重新入队——业务错误和 API Server 临时故障都用同一种方式重试,靠限流器自然退避。
syncHandler 是真正的"业务大脑"。sample-controller 的实现(行 241-312)走"读期望→读实际→收敛差异→更新 status"四步:
// staging/src/k8s.io/sample-controller/controller.go(行 241-312)节选
func (c *Controller) syncHandler(ctx context.Context, objectRef cache.ObjectName) error {
logger := klog.LoggerWithValues(klog.FromContext(ctx), "objectRef", objectRef)
// 1) 读期望:用 lister 读 Foo
foo, err := c.foosLister.Foos(objectRef.Namespace).Get(objectRef.Name)
if err != nil {
if errors.IsNotFound(err) {
utilruntime.HandleErrorWithContext(ctx, err, "Foo referenced by item in work queue no longer exists", "objectReference", objectRef)
return nil
}
return err
}
deploymentName := foo.Spec.DeploymentName
if deploymentName == "" {
utilruntime.HandleErrorWithContext(ctx, nil, "Deployment name missing from object reference", "objectReference", objectRef)
return nil
}
// 2) 读实际:用 lister 读 Deployment
deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
if errors.IsNotFound(err) {
// 3) 收敛差异:不存在就创建
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(ctx, newDeployment(foo), metav1.CreateOptions{FieldManager: FieldManager})
}
if err != nil {
return err
}
// 4) 防止误接管:确认 Deployment 的 ownerRef 指向当前 Foo
if !metav1.IsControlledBy(deployment, foo) {
msg := fmt.Sprintf(MessageResourceExists, deployment.Name)
c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg)
return fmt.Errorf("%s", msg)
}
// 5) 收敛差异:replicas 不一致就更新
if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
logger.V(4).Info("Update deployment resource", "currentReplicas", *deployment.Spec.Replicas, "desiredReplicas", *foo.Spec.Replicas)
deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(ctx, newDeployment(foo), metav1.UpdateOptions{FieldManager: FieldManager})
}
if err != nil {
return err
}
// 6) 更新 status
err = c.updateFooStatus(ctx, foo, deployment)
if err != nil {
return err
}
c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
return nil
}
把 syncHandler 拆成 6 个阶段:
| # | 阶段 | 操作 | 目的 |
|---|---|---|---|
| 1 | 读期望 | foosLister.Foos(ns).Get(name) | 读本地缓存里的 Foo。NotFound 直接 return nil(key 已被删) |
| 2 | 读实际 | deploymentsLister.Deployments(ns).Get(name) | 读本地缓存里的 Deployment |
| 3 | 创建 | kubeclientset.Create(newDeployment(foo)) | 如果 Deployment 不存在,按 Foo 模板创建 |
| 4 | owner 校验 | metav1.IsControlledBy(deployment, foo) | 防止"用户手建了同名 Deployment 误接管" |
| 5 | 更新 | kubeclientset.Update(newDeployment(foo)) | 如果 replicas 不一致,覆盖更新 |
| 6 | 回填 status | sampleclientset.Foos().UpdateStatus(fooCopy) | 把 Deployment.Status 复制到 Foo.Status.AvailableReplicas |
这就是经典的"read-then-write 幂等模式"——核心特征:① Get 不存在则 Create;② Get 存在则 Update;③ Update 永远是"基于 Foo 模板重新 newDeployment(foo)"——而不是基于旧 deployment 做增量修改。这种"全量重写"虽然有 Update 操作的 ResourceVersion 冲突风险(API Server 拒更新),但语义最简单、出 bug 概率最低。
💡 小贴士
newDeployment 怎么设 ownerRef?在 controller.go 行 397-400:
OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(foo, SchemeGroupVersion.WithKind("Foo")) }
NewControllerRef 会把 BlockOwnerDeletion 设为 true——删 Foo 之前 Deployment 必须先删。这是 Kubernetes "父子级联"的标准做法,避免"孤儿 Deployment"。
// staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/controller_ref.go(行 59-68)
func NewControllerRef(owner Object, gvk schema.GroupVersionKind) *OwnerReference {
return &OwnerReference{
APIVersion: gvk.GroupVersion().String(),
Kind: gvk.Kind,
Name: owner.GetName(),
UID: owner.GetUID(),
BlockOwnerDeletion: ptr.To(true),
Controller: ptr.To(true),
}
}
client-go 提供了五种 RateLimiter,前三种最常用:
// staging/src/k8s.io/client-go/util/workqueue/default_rate_limiters.go(行 99-149)
func NewTypedItemExponentialFailureRateLimiter[T comparable](baseDelay time.Duration, maxDelay time.Duration) TypedRateLimiter[T] {
return &TypedItemExponentialFailureRateLimiter[T]{
failures: map[T]int{},
baseDelay: baseDelay,
maxDelay: maxDelay,
}
}
func (r *TypedItemExponentialFailureRateLimiter[T]) When(item T) time.Duration {
r.failuresLock.Lock()
defer r.failuresLock.Unlock()
exp := r.failures[item]
r.failures[item] = r.failures[item] + 1
calculated := r.baseDelay * time.Duration(2^exp)
if calculated > r.maxDelay {
return r.maxDelay
}
return calculated
}
退避表(baseDelay=5ms):失败 1 次后等 5ms → 2 次后 10ms → 3 次后 20ms → 4 次后 40ms → 5 次后 80ms → …… → 17 次后约 655s → 18 次后超过 1000s 封顶。
适用场景:单个对象反复失败(如用户配错 CR 字段,每次 syncHandler 都被 schema 校验拒)。每 key 独立计数——一个 key 失败 100 次不会拖累其他 key 的处理速度。
// staging/src/k8s.io/client-go/util/workqueue/default_rate_limiters.go(行 62-77)
// TypedBucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
type TypedBucketRateLimiter[T comparable] struct {
*rate.Limiter
}
func (r *TypedBucketRateLimiter[T]) When(item T) time.Duration {
return r.Limiter.Reserve().Delay()
}
底层就是 golang.org/x/time/rate 标准库令牌桶。Reserve().Delay() 返回"还要等多久才能拿到一个 token"。
适用场景:控制 controller 对 API Server 的总 QPS。当 1000 个 Foo 同时被 reconcile 的时候,没有令牌桶会把 API Server 打爆。
// staging/src/k8s.io/client-go/util/workqueue/default_rate_limiters.go(行 239-265)
func NewTypedMaxOfRateLimiter[T comparable](limiters ...TypedRateLimiter[T]) TypedRateLimiter[T] {
return &TypedMaxOfRateLimiter[T]{limiters: limiters}
}
func (r *TypedMaxOfRateLimiter[T]) When(item T) time.Duration {
ret := time.Duration(0)
for _, limiter := range r.limiters {
curr := limiter.When(item)
if curr > ret {
ret = curr
}
}
return ret
}
组合多个 limiter,When 返回所有 limiter 的最大值。sample-controller 用 MaxOf(指数退避 + 令牌桶)取两者中较大的延迟——既保护单 key、也保护全局。
别忘了:Forget 才会清除 failures 计数。workqueue.Add 不会清零、workqueue.AddRateLimited 不会清零、workqueue.Done 不会清零。所以 syncHandler 成功时调用 Forget,失败时不调用——这是清零限流的唯一机会。
另一个重置路径:workqueue.ShutDown 整个 controller 进程重启。所有 in-memory 状态丢失。新进程从 informer 重新 List 一次——所有 Foo 重新走一次 syncHandler。
当 operator 收到 SIGTERM(kubectl delete pod / k8s 滚动更新)时,期望的行为是"处理完当前在跑的 key 再退出,不要丢工作"。sample-controller 的实现分三步:
SIGTERM → signals.SetupSignalHandler cancel ctx → controller.Run 主 goroutine unblock → defer workqueue.ShutDown 触发 → workqueue.Get 返回 shutdown=true → worker 退出
// staging/src/k8s.io/client-go/util/workqueue/queue.go(行 304-319)
func (q *Typed[T]) ShutDown() {
defer q.wg.Wait()
q.stopOnce.Do(func() {
defer close(q.stopCh)
})
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.drain = false
q.shuttingDown = true
q.cond.Broadcast()
}
ShutDown 做了几件事:① 关闭 stopCh(让 AddAfter 不再 wait);② 置 shuttingDown=true(让 Get 退出阻塞);③ Broadcast 唤醒所有 worker。worker 在 Get 拿到 (item, true) 时立即 return,runWorker 死循环退出。
注意:ShutDown 不清空队列。如果 worker 正在跑 syncHandler,它会跑完、然后 Done、然后 Get 拿到 shutdown=true 退出。所以正在处理的工作不会丢;新加进 workqueue 的 key 会被静默忽略(不报错、不入队)。这对绝大多数场景是可接受的——下次 controller 启动会重新 List + 重新 reconcile。
🌟 实用技巧
生产 operator 还要考虑 SIGTERM 之后的"宽限期"——Kubernetes 默认 30 秒后强杀。配置 Pod spec 里的 terminationGracePeriodSeconds: 60 配合 PreStop hook(如 sleep 5)可以避免在 controller 还在 reconcile 时被 kill。
把 sample-controller 模板套到 application-operator 上,我们写出自己的 application_controller.go。这一节是上一篇的承诺兑现——把 Reconcile 循环真正写出来。
// application-operator/pkg/controller/application/application_controller.go
package application
import (
"context"
"fmt"
"time"
"golang.org/x/time/rate"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
appsinformers "k8s.io/client-go/informers/apps/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
appslisters "k8s.io/client-go/listers/apps/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
applicationv1alpha1 "mycompany.io/application-operator/pkg/apis/application/v1alpha1"
clientset "mycompany.io/application-operator/pkg/generated/clientset/versioned"
samplescheme "mycompany.io/application-operator/pkg/generated/clientset/versioned/scheme"
informers "mycompany.io/application-operator/pkg/generated/informers/externalversions/application/v1alpha1"
listers "mycompany.io/application-operator/pkg/generated/listers/application/v1alpha1"
)
const controllerAgentName = "application-operator"
type Controller struct {
kubeclientset kubernetes.Interface
sampleclientset clientset.Interface
deploymentsLister appslisters.DeploymentLister
deploymentsSynced cache.InformerSynced
servicesLister corelisters.ServiceLister
servicesSynced cache.InformerSynced
appsLister listers.ApplicationLister
appsSynced cache.InformerSynced
workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName]
recorder record.EventRecorder
}
func NewController(ctx context.Context, kubeclientset kubernetes.Interface, sampleclientset clientset.Interface,
deploymentInformer appsinformers.DeploymentInformer, serviceInformer coreinformers.ServiceInformer,
applicationInformer informers.ApplicationInformer) *Controller {
utilruntime.Must(samplescheme.AddToScheme(scheme.Scheme))
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
eventBroadcaster.StartStructuredLogging(0)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
ratelimiter := workqueue.NewTypedMaxOfRateLimiter(
workqueue.NewTypedItemExponentialFailureRateLimiter[cache.ObjectName](5*time.Millisecond, 1000*time.Second),
&workqueue.TypedBucketRateLimiter[cache.ObjectName]{Limiter: rate.NewLimiter(rate.Limit(50), 300)},
)
controller := &Controller{
kubeclientset: kubeclientset,
sampleclientset: sampleclientset,
deploymentsLister: deploymentInformer.Lister(),
deploymentsSynced: deploymentInformer.Informer().HasSynced,
servicesLister: serviceInformer.Lister(),
servicesSynced: serviceInformer.Informer().HasSynced,
appsLister: applicationInformer.Lister(),
appsSynced: applicationInformer.Informer().HasSynced,
workqueue: workqueue.NewTypedRateLimitingQueue(ratelimiter),
recorder: recorder,
}
applicationInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueApplication,
UpdateFunc: func(old, new interface{}) { controller.enqueueApplication(new) },
})
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleObject,
UpdateFunc: func(old, new interface{}) {
newDepl := new.(*appsv1.Deployment)
oldDepl := old.(*appsv1.Deployment)
if newDepl.ResourceVersion == oldDepl.ResourceVersion { return }
controller.handleObject(new)
},
DeleteFunc: controller.handleObject,
})
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleObject,
UpdateFunc: func(old, new interface{}) {
newSvc := new.(*corev1.Service)
oldSvc := old.(*corev1.Service)
if newSvc.ResourceVersion == oldSvc.ResourceVersion { return }
controller.handleObject(new)
},
DeleteFunc: controller.handleObject,
})
return controller
}
func (c *Controller) Run(ctx context.Context, workers int) error {
defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown()
if ok := cache.WaitForCacheSync(ctx.Done(), c.deploymentsSynced, c.servicesSynced, c.appsSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
}
<-ctx.Done()
return nil
}
func (c *Controller) runWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}
func (c *Controller) processNextWorkItem(ctx context.Context) bool {
objRef, shutdown := c.workqueue.Get()
if shutdown { return false }
defer c.workqueue.Done(objRef)
err := c.syncHandler(ctx, objRef)
if err == nil {
c.workqueue.Forget(objRef)
return true
}
utilruntime.HandleErrorWithContext(ctx, err, "Error syncing; requeuing", "objectReference", objRef)
c.workqueue.AddRateLimited(objRef)
return true
}
func (c *Controller) enqueueApplication(obj interface{}) {
if objectRef, err := cache.ObjectToName(obj); err == nil {
c.workqueue.Add(objectRef)
}
}
func (c *Controller) handleObject(obj interface{}) {
var object metav1.Object
var ok bool
if object, ok = obj.(metav1.Object); !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok { return }
object, _ = tombstone.Obj.(metav1.Object)
}
if ownerRef := metav1.GetControllerOf(object); ownerRef != nil {
if ownerRef.Kind != "Application" { return }
app, err := c.appsLister.Applications(object.GetNamespace()).Get(ownerRef.Name)
if err != nil { return }
c.enqueueApplication(app)
}
}
func (c *Controller) syncHandler(ctx context.Context, objectRef cache.ObjectName) error {
// 1) 读期望
app, err := c.appsLister.Applications(objectRef.Namespace).Get(objectRef.Name)
if errors.IsNotFound(err) { return nil }
if err != nil { return err }
if app.Spec.Image == "" { return nil }
// 2) 读实际 Deployment
deploymentName := app.Name
deployment, err := c.deploymentsLister.Deployments(app.Namespace).Get(deploymentName)
if errors.IsNotFound(err) {
deployment, err = c.kubeclientset.AppsV1().Deployments(app.Namespace).Create(ctx, newDeployment(app), metav1.CreateOptions{FieldManager: controllerAgentName})
}
if err != nil { return err }
if !metav1.IsControlledBy(deployment, app) {
msg := fmt.Sprintf("Deployment %q already exists and is not managed by Application", deployment.Name)
c.recorder.Event(app, corev1.EventTypeWarning, "ErrResourceExists", msg)
return fmt.Errorf("%s", msg)
}
if app.Spec.Replicas != nil && *app.Spec.Replicas != *deployment.Spec.Replicas {
deployment, err = c.kubeclientset.AppsV1().Deployments(app.Namespace).Update(ctx, newDeployment(app), metav1.UpdateOptions{FieldManager: controllerAgentName})
}
if err != nil { return err }
// 3) 读实际 Service
var service *corev1.Service
if app.Spec.Expose != "" {
service, err = c.servicesLister.Services(app.Namespace).Get(deploymentName)
if errors.IsNotFound(err) {
service, err = c.kubeclientset.CoreV1().Services(app.Namespace).Create(ctx, newService(app), metav1.CreateOptions{FieldManager: controllerAgentName})
}
if err != nil { return err }
if !metav1.IsControlledBy(service, app) {
return fmt.Errorf("Service %q already exists and is not managed by Application", service.Name)
}
}
// 4) 回填 status
return c.updateApplicationStatus(ctx, app, deployment, service)
}
func (c *Controller) updateApplicationStatus(ctx context.Context, app *applicationv1alpha1.Application, deployment *appsv1.Deployment, service *corev1.Service) error {
appCopy := app.DeepCopy() // 永远深拷贝
appCopy.Status.DeploymentStatus = deployment.Status // 原样复制
if service != nil {
appCopy.Status.ServiceStatus = service.Status // 原样复制
}
// 推导 Phase
switch {
case isDeploymentFailed(deployment):
appCopy.Status.Phase = applicationv1alpha1.ApplicationPhaseFailed
appCopy.Status.Message = "Deployment 滚动更新超时"
case isDeploymentAvailable(deployment):
appCopy.Status.Phase = applicationv1alpha1.ApplicationPhaseRunning
appCopy.Status.Message = fmt.Sprintf("%d/%d 副本可用",
deployment.Status.AvailableReplicas, *deployment.Spec.Replicas)
default:
appCopy.Status.Phase = applicationv1alpha1.ApplicationPhasePending
appCopy.Status.Message = "正在启动"
}
_, err := c.sampleclientset.ApplicationV1alpha1().Applications(app.Namespace).
UpdateStatus(ctx, appCopy, metav1.UpdateOptions{FieldManager: controllerAgentName})
return err
}
func isDeploymentAvailable(d *appsv1.Deployment) bool {
for _, cond := range d.Status.Conditions {
if cond.Type == appsv1.DeploymentAvailable && cond.Status == corev1.ConditionTrue {
return true
}
}
return false
}
func isDeploymentFailed(d *appsv1.Deployment) bool {
for _, cond := range d.Status.Conditions {
if cond.Type == appsv1.DeploymentProgressing && cond.Status == corev1.ConditionFalse &&
cond.Reason == "ProgressDeadlineExceeded" {
return true
}
}
return false
}
func newDeployment(app *applicationv1alpha1.Application) *appsv1.Deployment {
replicas := app.Spec.Replicas
if replicas == nil {
replicas = ptr.To[int32](1)
}
return &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: app.Name, Namespace: app.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(app, applicationv1alpha1.SchemeGroupVersion.WithKind("Application")),
},
},
Spec: appsv1.DeploymentSpec{
Replicas: replicas,
Selector: &metav1.LabelSelector{MatchLabels: labels(app)},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: labels(app)},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "app",
Image: app.Spec.Image,
Ports: []corev1.ContainerPort{{ContainerPort: app.Spec.Port}},
}},
},
},
},
}
}
func newService(app *applicationv1alpha1.Application) *corev1.Service {
return &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: app.Name, Namespace: app.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(app, applicationv1alpha1.SchemeGroupVersion.WithKind("Application")),
},
},
Spec: corev1.ServiceSpec{
Type: app.Spec.Expose,
Selector: labels(app),
Ports: []corev1.ServicePort{{
Port: app.Spec.Port, TargetPort: intstr.FromInt32(app.Spec.Port),
}},
},
}
}
func labels(app *applicationv1alpha1.Application) map[string]string {
return map[string]string{
"app": app.Name,
"owner": "application-operator",
}
}
对比 sample-controller 的源码,application-operator 主要多了两点:
至此,application-operator 已经具备"提交 Application YAML 后自动创建 Deployment + Service、回填 status"的能力。下一篇我们会讲生产化必备的几件套:metrics、health probe、leader election、TLS serving cert 管理。
▼ Q1: 我看到很多 controller 用 controller-runtime 库(kubebuilder 风格),为什么 sample-controller 不用?哪个更好?
A: 两者本质相同——controller-runtime 是对裸 client-go 模式的"标准库化"封装。sample-controller 是"教育级"代码,让你看到每一层如何搭建;controller-runtime 帮你封装好 reconciliation 框架、admission webhook、metrics、leader election 等样板。生产上推荐 controller-runtime,但看懂 sample-controller 之后,controller-runtime 里的 manager / controller / Reconciler 三个核心对象你才能真正理解。它不是黑盒。本文的 Reconcile 流程分析对两者都适用,因为底层都是 client-go。
▼ Q2: handleObject 里的 DeletedFinalStateUnknown 是什么鬼?为什么必须处理?
A: 它是 client-go 在"丢失 watch 删除事件"时用的兜底类型。考虑时序:① informer 收到 watch 事件 "Deployment 删除";② 处理这个事件时网络断开;③ informer 重新 List 发现 Deployment 真的不在了;④ informer 不知道对象"曾经长什么样",就用 DeletedFinalStateUnknown 包装最后一次缓存值作为 tombstone 投递给 handler。不处理 tombstone 会导致 type assert panic。sample-controller 的 obj.(metav1.Object) 断言失败时尝试 obj.(cache.DeletedFinalStateUnknown) 就是这个兜底。
▼ Q3: workqueue 的 dirty + processing 双集合设计有什么讲究?
A: 这是 client-go 解决"处理期间被多次 enqueue"的关键。dirty 集合:标记"该 key 还需要被处理"(Add 时入、Get 时出);processing 集合:标记"该 key 正在被处理"(Get 时入、Done 时出)。Done 时如果 dirty 还有该 key(即处理期间又 Add 过了),就重新 Push 到 queue。这样:① 多个 Add 合并成一次入队;② Get 时 key 一定只被一个 worker 持有(保证顺序处理);③ Done 触发"是否重新入队"判断。
▼ Q4: ResourceVersion 周期性 resync 是什么?为什么要做?
A: informer 每 resyncPeriod(默认 10 分钟)会把所有缓存对象当成"更新"投递一次给 handler。目的是兜底:万一 watch 漏事件、controller 漏 reconcile,靠 resync 能重新触发一遍。代价是 handler 被"伪更新"轰炸,所以每个 UpdateFunc 都该用 ResourceVersion 判断过滤——sample-controller 的 deploymentInformer UpdateFunc 就是这么做的。这是写 controller 的"标准防御"代码,不写也不会立即出 bug,但写了你才能在异常时拿到正确的事件。
▼ Q5: 我看到别人代码用 watch 而不是 informer,性能差别大吗?
A: 直接用 watch 你要自己实现:List 拿全量、watch 接增量、自己维护本地缓存、自己处理 watch 断开重新 list 的逻辑。informer 全部封装好。性能差别:informer 的本地缓存让 lister.Get 不打网络,每秒成千上万次读也无所谓;裸 watch 每次都打 API Server,几百个对象就可能触发限流。所以 99% 场景应该用 informer 而不是裸 watch。
▼ Q6: WaitForCacheSync 等多久算"太久"?
A: 正常 1-3 秒完成(一次 List 调用 + 解析)。超过 30 秒通常意味着:① 集群 API Server 卡了;② 你 watch 的资源量很大(10w+ Pods);③ informer 配错了(错误的 namespace selector、错误的 resyncPeriod)。Sample-controller 的 WaitForCacheSync 写法是 100ms 轮询一次(syncedPollPeriod)。生产中要加 timeout 监控——WaitForCacheSync 超过 60s 还没返回就该报警。
▼ Q7: worker 数设多少合适?
A: sample-controller 写死 2。生产经验公式:worker = min(CPU 核数, 5)。worker 不是越多越好——多了虽然并行处理变多,但 API Server QPS 也成比例上升,最终撞上限流(令牌桶 + API Server 自身的 client-go rate limiter)。还要看 syncHandler 的"重操作"占比:纯本地计算可以堆 worker 到 10+,调外部 API 超过 2-3 worker 就开始拖慢。最佳实践:跑小规模负载后用 pprof 看 worker goroutine 的 CPU 占用,再调。
▼ Q8: 我 controller 跑起来 events 没有,怎么办?
A: 三步排查:① eventBroadcaster.StartRecordingToSink 调了没?这是把事件发到 API Server 的关键调用;② operator ServiceAccount 有 create events 权限没?RBAC 加 events: []string{"create"};③ kubectl describe foo 看 Events 区段。如果都正常还看不到事件,多半是 record.NewBroadcaster 启动顺序问题——必须在 informer.Run 之前调用 broadcaster 的 Start* 方法。
▼ Q9: 同一对象的多个事件被合并了吗?是怎么合并的?
A: 在三个层面做合并:① DeltaFIFO 层——同一个对象连续的 Update 事件合并成最新的 Delta;② sharedProcessor 层——同一 listener 的通知会被 addCh 缓冲,下一个通知到达时如果上一个还没消费,覆盖式写入;③ workqueue 层——dirty 集合保证同一 key 在 processing 期间只入队一次。效果:用户连续改 100 次 spec,controller 最终只跑 1-3 次 syncHandler。但注意:"合并"不是"丢事件"——最终一次 reconcile 拿到的是最新 spec,没有漏处理。
▼ Q10: 为什么 UpdateStatus 失败要 requeue?我明明只是写 status 而已。
A: 几种典型原因:① resourceVersion 冲突(用户或别的 operator 改了 spec);② 5xx 临时故障(API Server 过载);③ 权限不足(RBAC 漏配 update status)。requeue 让 controller 自动重试——第一次 status 没回填成功,5ms 后再来,再失败 10ms 后再来,指数退避天然保护。不可恢复的错误(如 schema 校验失败)会无限重试,需要在 syncHandler 里加分类处理:业务错误 → return nil 放弃;临时错误 → return err 重试。
▼ Q11: 看 controller-runtime 的源码时遇到 Predicate 过滤,sample-controller 没这个概念。是什么?
A: Predicate 是 controller-runtime 在 AddEventHandler 时多包的一层过滤器。比如"只关心 status 变化的 Update"或"只关心有特定 label 的对象"。它和 sample-controller 的 ResourceVersion 判断本质一样——都是减少无意义的 handler 调用,但 controller-runtime 的 Predicate 更声明式。生产中如果资源量大、事件量多,加 Predicate 能让 controller 减少 30-50% 的 CPU。
▼ Q12: 怎么测试 controller?fake client 怎么用?
A: 两类测试:① 单元测试——用 client-go/fake + informers 构造"假的"controller runtime:fake.NewSimpleClientset() 装初始对象,informers.NewSharedInformerFactory(fakeClient, 0) 拿 informer,ctrl.NewController(...).Run() 跑起来,断言 Deployment 真的被创建了;② envtest(controller-runtime 提供)——启动一个真实的 etcd + kube-apiserver(用 kubebuilder envtest 工具),跑真集成测试。envtest 比 fake 更接近生产环境,推荐 controller 业务逻辑用 envtest、工具函数用 fake。sample-controller 的 controller_test.go 用的是 fake 模式。
下一篇我们把 application-operator 升级为"生产级"——加 metrics、health probe、leader election、TLS 证书管理与 e2e 测试。具体会展开:
本文源码参考:
• sample-controller controller.go(行 68-156, 162-188, 200-236, 241-326, 388-421)
• cache/controller.go(行 124-209, 279-300, 394-400)
• shared_informer.go(行 284-340, 413-432, 715-798, 852-873, 1289-1328)
• workqueue/queue.go(行 42-60, 140-182, 220-319)
• workqueue/rate_limiting_queue.go(行 27-149)
• workqueue/default_rate_limiters.go(行 30-291)
• reflector.go(行 296-420)
• NewControllerRef 源码(行 59-68)
Kubernetes 编程 / Operator 专题【左扬精讲】—— application-operator Reconcile 循环源码精讲 · 来源:Kubernetes 1.36.1 源码 + sample-controller + client-go cache/workqueue
此内容由惯性聚合(RSS阅读器)自动聚合整理,仅供阅读参考。 原文来自 — 版权归原作者所有。