



























当我们开发 Kubernetes Operator 或者自定义控制器时,最常遇到的一个问题就是:为什么 Controller 启动后不能立即开始工作?为什么需要 WaitForCacheSync?SharedInformerFactory 到底是什么?它和直接创建 Informer 有什么区别?这一篇文章我们就来彻底搞懂这些问题。
Kubernetes client-go Operator Controller v1.36.1
🔓 学习重点提示 — 建议先通读全文,再重点回顾标注内容
★ 重点掌握(必须)
• SharedInformerFactory 的作用:为什么需要共享 Informer,如何复用缓存
• WaitForCacheSync 的原理:HasSynced 机制是如何保证数据一致性的
• 多 Controller 共享工厂:如何在同一个进程中运行多个控制器而不互相干扰
☆ 次重点(了解即可)
• 自定义 Resync 周期和 Transform 函数
想象一下这个场景:我们写了一个 Operator,需要同时监听 Pod、Deployment、ConfigMap 三种资源。如果每个 Controller 各自创建自己的 Informer,会发生什么?
首先,APIServer 要同时响应 3 个 Watch 请求,每个 Watch 都会维护一条到 APIServer 的长连接。然后,每个 Informer 都有自己独立的 Reflector,都会独立地向 APIServer 发送 List 请求来全量拉取数据。这意味着,即使 Pod、Deployment、ConfigMap 的数据可以一次性从 APIServer 批量获取,我们的程序也会分别发起 3 次 List 请求。
更糟糕的是,如果集群中有 10 个 Operator,每个都监听了同一种资源(比如 Pod),那 APIServer 要维护 10 条 Watch 连接来处理同一个资源的变更。Kubernetes 社区把这个叫做"惊群效应"(Thundering Herd)。
提示:SharedInformerFactory 的核心思想就是"共享"。同一个资源类型,只创建一个 Informer,所有需要监听该资源的 Controller 都共用这个 Informer 的缓存。这就像图书馆只买一本书,但很多人可以借阅一样。
我们先来看 SharedInformerFactory 的核心数据结构,理解它的字段设计。
// staging/src/k8s.io/client-go/informers/factory.go(行 59-78)
type sharedInformerFactory struct {
client kubernetes.Interface // 连接到 APIServer 的 client
namespace string // 监听的名字空间,NamespaceAll 表示所有
tweakListOptions internalinterfaces.TweakListOptionsFunc // 自定义 List 选项的过滤器
lock sync.Mutex // 保护下面 map 的并发锁
defaultResync time.Duration // 默认的同步周期
customResync map[reflect.Type]time.Duration // 特定类型的自定义同步周期
informers map[reflect.Type]cache.SharedIndexInformer // 核心!存储所有 informer
startedInformers map[reflect.Type]bool // 标记哪些 informer 已经启动
wg sync.WaitGroup // 等待所有 goroutine 退出
shuttingDown bool // 是否正在关闭
}
这里最关键的字段是 informers map[reflect.Type]cache.SharedIndexInformer。这是一个用资源的 Type 作为 key 的 map。同一个 Type 的资源,只会创建一个 SharedIndexInformer。当我们调用 factory.Core().V1().Pods().Informer() 时,工厂会先检查这个 map 中是否已经存在 Pod 的 Informer,如果存在就直接返回,如果不存在才会创建新的。
SharedInformerFactory 提供了三种创建方式,从简单到复杂:
// staging/src/k8s.io/client-go/informers/factory.go(行 129-160)
// 方式一:最简单,所有 namespace,默认 resync 周期
func NewSharedInformerFactory(client kubernetes.Interface, defaultResync time.Duration) SharedInformerFactory {
return NewSharedInformerFactoryWithOptions(client, defaultResync)
}
// 方式二:指定 namespace 和 list 选项过滤(Deprecated)
func NewFilteredSharedInformerFactory(...) SharedInformerFactory {
return NewSharedInformerFactoryWithOptions(client, defaultResync,
WithNamespace(namespace), WithTweakListOptions(tweakListOptions))
}
// 方式三:推荐方式,通过选项函数灵活配置
func NewSharedInformerFactoryWithOptions(
client kubernetes.Interface,
defaultResync time.Duration,
options ...SharedInformerOption) SharedInformerFactory {
factory := &sharedInformerFactory{
client: client,
namespace: v1.NamespaceAll, // 默认监听所有 namespace
defaultResync: defaultResync, // 默认 resync 周期
informers: make(map[reflect.Type]cache.SharedIndexInformer),
startedInformers: make(map[reflect.Type]bool),
customResync: make(map[reflect.Type]time.Duration),
}
// 应用所有选项
for _, opt := range options {
factory = opt(factory)
}
return factory
}
方式三是最灵活的,我们可以组合多种选项。比如只监听某个特定 namespace 的 Pod:
// 实际使用示例
// 创建一个只监听 "default" namespace 的工厂
factory := informers.NewSharedInformerFactoryWithOptions(
clientset,
30*time.Second, // 每 30 秒做一次 resync
informers.WithNamespace("default"), // 只监听 default namespace
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = "app=myapp" // 只监听带特定标签的资源
}),
)
SharedInformerFactory 使用了懒加载(Lazy Loading)模式。Informer 只有在第一次被请求时才会创建,而不是在工厂创建时就创建所有可能的 Informer。
// staging/src/k8s.io/client-go/informers/factory.go(行 241-260)
// InformerFor 是工厂的精髓:同类型的资源只创建一次
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerType := reflect.TypeOf(obj) // 用 reflect 获取资源类型
informer, exists := f.informers[informerType]
if exists {
return informer // 已存在,直接返回已有的 informer
}
// 不存在,创建新的,并存入 map
informer = newFunc(f.client, f.resyncPeriod(informerType), f.namespace, f.tweakListOptions, f.transform)
f.informers[informerType] = informer
return informer
}
这段代码的关键在于: informerType 作为 key 确保了同一个资源类型只会创建一次。无论我们调用多少次 `factory.Core().V1().Pods().Informer()`,实际上只有第一次会真正创建 Pod 的 Informer,后续调用都会返回同一个实例。
💡 注意
懒加载意味着:即使我们创建了工厂并传入了 client,Informer 也不会立即连接 APIServer。只有当我们第一次访问某个资源的 Informer 时,才会真正开始 Watch。这是一种"按需初始化"的优化策略。
这是最容易让新手困惑的地方:为什么 Controller 的 Run 方法要先调用 WaitForCacheSync?这是什么原理?
要理解这个问题,我们先回顾一下 Informer 的工作原理。Informer 在启动时,首先会执行一次全量 List,把 APIServer 上的所有资源都拉到本地缓存。这个过程需要时间,尤其是当集群中资源很多的时候。
如果在缓存还没有同步完成的情况下,Controller 就开始处理事件,会发生什么?假设我们收到一个 Add 事件,但此时缓存还没有收到之前已经存在的所有 Pod,那么我们可能会遗漏一些已有的 Pod,导致 Controller 的状态和集群真实状态不一致。
WaitForCacheSync 就是来解决这个问题的。它的核心思想是:等待所有 Informer 的缓存都同步完成之后,才开始处理业务逻辑。
// staging/src/k8s.io/client-go/informers/factory.go(行 194-239)
// WaitForCacheSync 等待所有已启动的 Informer 完成初始同步 func (f *sharedInformerFactory) WaitForCacheSync(stopCh
WaitForCacheSync 做了四件事:
HasSynced 看起来只是一个简单的方法调用,但实际上它背后有一套完整的同步检查机制。SharedIndexInformer 的 HasSyncedChecker() 返回的是一个 DoneChecker 接口:
// DoneChecker 接口定义
// cache.DoneChecker 接口
type DoneChecker interface {
Done(channel struct{}) bool
}
当 Informer 完成初始 List 并把所有数据都放入 DeltaFIFO 后,它会调用一个 channel 的 close 操作。HasSyncedChecker() 返回的 DoneChecker 会监听这个 channel,一旦收到 close 信号,就知道同步完成了。
🌟 实用技巧
在生产环境中,如果 Controller 启动后立即开始处理请求,但 WaitForCacheSync 还没完成,可能会导致"看不到资源"的问题。排查这类问题时,可以先用 kubectl get pods 确认资源存在,然后用 kubectl describe pod 查看 Events,确认是否有 Informer 相关的错误。
把以上知识串起来,我们来看一个完整的 Controller 启动流程:
创建 SharedInformerFactory → 启动 Factory.Start() → 等待 WaitForCacheSync → 启动 Worker 循环
// 完整的 Controller 启动代码示例
func RunController(ctx context.Context) error {
// 第一步:创建工厂
factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
// 第二步:注册需要监听的资源(懒加载,此时还不真正创建 Informer)
deploymentInformer := factory.Apps().V1().Deployments()
podInformer := factory.Core().V1().Pods()
// 第三步:启动工厂(此时 Informer 才真正开始 List + Watch)
factory.Start(ctx.Done())
// 第四步:等待所有 Informer 的缓存同步完成(关键步骤!)
cacheSyncResult := factory.WaitForCacheSync(ctx.Done())
if !cacheSyncResult.Synced {
return fmt.Errorf("failed to sync caches")
}
// 第五步:所有缓存同步完成后,创建 Controller 并启动 worker
controller := NewController(deploymentInformer, podInformer, workqueue.New())
go controller.Run(ctx)
<-ctx.Done() // 等待 context 取消
factory.Shutdown()
return nil
}
下面是一张完整的架构图,展示 SharedInformerFactory 内部的结构和各组件之间的关系:
┌──────────────────────────────────────────────────────────────────┐ │ SharedInformerFactory │ │ ┌────────────────────────────────────────────────────────────┐ │ │ │ sharedInformerFactory struct │ │ │ │ ├── client: kubernetes.Interface ←─── APIServer 连接 │ │ │ │ ├── informers: map[reflect.Type]SharedIndexInformer │ │ │ │ │ ↓ │ │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ │ │ │ Pod Informer│ │ Deploy Inf. │ │ ConfigMap Inf│ │ │ │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ │ │ ↓ ↓ ↓ │ │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ │ │ │ SharedIndexInformer │ │ │ │ │ │ │ ├── Controller (控制循环) │ │ │ │ │ │ │ ├── Reflector (List/Watch) │ │ │ │ │ │ │ ├── DeltaFIFO (增量队列) │ │ │ │ │ │ └── Indexer (本地缓存) │ │ │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ └────────────────────────────────────────────────────┘ │ │ │ │ │ │ Start() ──→ 启动所有已注册的 Informer goroutine │ │ │ WaitForCacheSync() ──→ 等待所有 Informer 完成初始同步 │ │ │ Shutdown() ──→ 优雅关闭所有 Informer │ │ └────────────────────────────────────────────────────────────┘ │ ↓ │ ┌──────────────────────┐ │ │ APIServer │ │ │ /api/v1/pods │ │ │ /apis/apps/v1/deploy│ │ └──────────────────────┘ └──────────────────────────────────────────────────────────────────┘
从图中可以看出:SharedInformerFactory 是所有 SharedIndexInformer 的管理者,它负责创建、分发和管理这些 Informer。而每个 Informer 内部的 Reflector 负责和 APIServer 通信,把数据存入 DeltaFIFO,再由 Controller 处理后放入 Indexer 本地缓存。
在实际开发中,SharedInformerFactory 相关的问题主要集中在以下几个方面:
如果 Controller 启动后 WaitForCacheSync 一直不返回,可能的原因是:
排查方法:
# 查看 Controller 日志中是否有 List/Watch 相关错误 kubectl logs -n <namespace> <controller-pod> | grep -i " reflector\|informer\|list\|watch" # 确认 APIServer 是否正常响应 kubectl --insecure-prefix get --raw /healthz
如果 Controller 运行一段时间后内存持续增长,很可能是 Informer 没有正确关闭。确保:
// 错误示例:没有调用 Shutdown
func main() {
factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
// ... 使用 factory ...
// 程序退出时没有清理
}
// 正确示例:使用 context 控制生命周期
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
factory := informers.NewSharedInformerFactory(clientset, 30*time.Second)
factory.Start(ctx.Done())
// ... 使用 factory ...
// 程序退出前调用 Shutdown
factory.Shutdown()
}
这一节我们深入理解了 SharedInformerFactory 的设计与实现:
下一节我们将进入 Controller 开发模式完整实战,看看如何从零开始写一个生产级的 Kubernetes Controller。敬请期待!
相关阅读:
• SharedInformerFactory 完整源码
• SharedIndexInformer 核心实现
• client-go 官方 Controller 示例
Kubernetes 编程 / Operator 专题【左扬精讲】—— SharedInformerFactory 与等待缓存同步 · 来源:Kubernetes v1.36.1 client-go 源码分析
此内容由惯性聚合(RSS阅读器)自动聚合整理,仅供阅读参考。 原文来自 — 版权归原作者所有。