



























Kubernetes 编程 / Operator 专题【左扬精讲】—— Indexer 与 ThreadSafeStore 核心原理与源码深度剖析
在 Kubernetes Informer 系统中,Indexer 是一个非常重要的组件。它不仅仅是简单的键值存储,而是支持多索引查询的数据结构。在开发控制器时,我们经常需要"找出所有 namespace 为 default 的 Pod"或"找出所有 spec.nodeName 为 node-1 的 Pod",这时候 Indexer 的强大索引能力就派上用场了。Indexer 底层依赖 ThreadSafeStore 实现线程安全的数据存储。今天我们就来深入剖析这两个核心组件的设计和实现。
Kubernetes 1.36.1 · Go 1.26 · client-go tools/cache
Kubernetes Go Indexer ThreadSafeStore Informer
🔓 学习重点提示 — 建议先通读全文,再重点回顾标注内容
★ 重点掌握(必须)
• Store 接口:基础存储接口的完整定义
• Indexer 接口:扩展 Store 的多索引能力
• cache 结构体:Indexer 的底层实现(KeyFunc + ThreadSafeStore)
• threadSafeMap:ThreadSafeStore 的实现,线程安全的 map + 索引
• Index/IndexKeys/ByIndex 区别:三种索引查询方法的不同场景
☆ 次重点(了解即可)
• Transaction 批量操作
• AddIndexers 动态添加索引器
在 Kubernetes client-go 中,数据存储的接口层级非常清晰。Store 是最基础的接口,Indexer 在 Store 基础上扩展了多索引能力,ThreadSafeStore 则提供了线程安全的底层实现。
┌─────────────────────────────────────────────────────────────┐
│ Indexer(扩展存储 + 多索引查询) │
│ - 继承自 Store │
│ - 新增:Index / IndexKeys / ByIndex / GetIndexers │
│ - 底层:cache (KeyFunc + ThreadSafeStore) │
└─────────────────────────────┬───────────────────────────────┘
│ 继承
┌─────────────────────────────▼───────────────────────────────┐
│ Store(基础存储接口) │
│ - Add / Update / Delete / Get / List / ListKeys │
│ - Replace / Resync / LastStoreSyncResourceVersion │
└─────────────────────────────┬───────────────────────────────┘
│ 实现
┌─────────────────────────────▼───────────────────────────────┐
│ ThreadSafeStore(线程安全底层存储) │
│ - threadSafeMap: sync.RWMutex + map + storeIndex │
│ - Add / Update / Delete / Get / List / Index │
└─────────────────────────────────────────────────────────────┘
理解这个层级关系非常重要:Indexer 是接口,定义了多索引查询的契约;cache 是 Indexer 的实现,内部持有 KeyFunc 和 ThreadSafeStore;ThreadSafeStore 也是接口,threadSafeMap 是它的实现。
Store 接口是整个存储体系的基础。它定义了对象存储的基本操作。
// staging/src/k8s.io/client-go/tools/cache/store.go(行 38-82)
// Store is a generic object storage interface. Reflector knows how to
// watch a server and update a Store. This package provides a variety of
// implementations of Store.
type Store interface {
// Add adds the given object to the accumulator associated with
// the given object's key
Add(obj interface{}) error
// Update updates the given object in the accumulator associated
// with the given object's key
Update(obj interface{}) error
// Delete deletes the given object from the accumulator associated
// with the given object's key
Delete(obj interface{}) error
// List returns a list of all the currently non-empty accumulators
List() []interface{}
// ListKeys returns a list of all the keys currently associated
// with non-empty accumulators
ListKeys() []string
// Get returns the accumulator associated with the given object's key
Get(obj interface{}) (item interface{}, exists bool, err error)
// GetByKey returns the accumulator associated with the given key
GetByKey(key string) (item interface{}, exists bool, err error)
// Replace will delete the contents of the store, using instead the
// given list. Store takes ownership of the list.
Replace([]interface{}, string) error
// Resync is meaningless in the terms appearing here but has
// meaning in some implementations that have non-trivial
// additional behavior (e.g., DeltaFIFO).
Resync() error
}
Store 接口的核心方法是 CRUD 操作:Add/Update/Delete 是增删改,Get/GetByKey/List/ListKeys 是读取,Replace 是全量替换。Resync 对于普通 Store 实现来说是空操作(no-op),但在 DeltaFIFO 中有实际意义。
Indexer 在 Store 基础上增加了五个与索引相关的方法,让你可以按索引字段快速查询对象。
// staging/src/k8s.io/client-go/tools/cache/index.go(行 26-55)
// Indexer extends Store with multiple indices and restricts each
// accumulator to simply hold the current object (and be empty after Delete).
//
// There are three kinds of strings here:
// 1. a storage key, as defined in the Store interface,
// 2. a name of an index, and
// 3. an "indexed value", which is produced by an IndexFunc and
// can be a field value or any other string computed from the object.
type Indexer interface {
Store
// Index returns the stored objects whose set of indexed values
// intersects the set of indexed values of the given object, for
// the named index
Index(indexName string, obj interface{}) ([]interface{}, error)
// IndexKeys returns the storage keys of the stored objects whose
// set of indexed values for the named index includes the given
// indexed value
IndexKeys(indexName, indexedValue string) ([]string, error)
// ListIndexFuncValues returns all the indexed values of the given index
ListIndexFuncValues(indexName string) []string
// ByIndex returns the stored objects whose set of indexed values
// for the named index includes the given indexed value
ByIndex(indexName, indexedValue string) ([]interface{}, error)
// GetIndexers return the indexers
GetIndexers() Indexers
// AddIndexers adds more indexers to this store.
AddIndexers(newIndexers Indexers) error
}
Indexer 接口新增了五个方法,理解它们的区别是掌握 Indexer 的关键:
| 方法 | 输入 | 输出 | 典型场景 |
|---|---|---|---|
| Index | indexName + obj(对象) | []interface{}(对象列表) | 按对象属性查询 |
| IndexKeys | indexName + indexedValue | []string(key 列表) | 按索引值查 key |
| ByIndex | indexName + indexedValue | []interface{}(对象列表) | 按索引值查对象(常用) |
| ListIndexFuncValues | indexName | []string(所有索引值) | 列出所有 namespace |
KeyFunc 是将对象转换为存储 key 的函数。在 Kubernetes 中,这个 key 通常是 namespace/name 格式。
// staging/src/k8s.io/client-go/tools/cache/store.go(行 120-162)
// KeyFunc knows how to make a key from an object. Implementations should be deterministic.
type KeyFunc func(obj interface{}) (string, error)
// MetaNamespaceKeyFunc is a default KeyFunc that knows how to make
// keys for API objects which implement meta.Interface.
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
if key, ok := obj.(ExplicitKey); ok {
return string(key), nil
}
objName, err := ObjectToName(obj)
if err != nil {
return "", err
}
return objName.String(), nil
}
// ObjectToName returns the structured name for the given object
func ObjectToName(obj interface{}) (ObjectName, error) {
meta, err := meta.Accessor(obj)
if err != nil {
return ObjectName{}, fmt.Errorf("object has no meta: %v", err)
}
return MetaObjectToName(meta), nil
}
// MetaObjectToName returns the structured name for the given object
func MetaObjectToName(obj metav1.Object) ObjectName {
if len(obj.GetNamespace()) > 0 {
return ObjectName{Namespace: obj.GetNamespace(), Name: obj.GetName()}
}
return ObjectName{Namespace: "", Name: obj.GetName()}
}
MetaNamespaceKeyFunc 是默认的 KeyFunc,它的工作流程是:先检查对象是否实现了 ExplicitKey 接口(用于特殊 key,如集群级别的资源);然后提取对象的 meta 信息,生成 namespace/name 格式的 key。实现必须是确定性的——同一个对象多次调用应该返回相同的 key。
IndexFunc 是将对象转换为索引值的函数。一个对象可以对应多个索引值(返回 []string)。
// staging/src/k8s.io/client-go/tools/cache/index.go(行 57-97)
// IndexFunc knows how to compute the set of indexed values for an object.
type IndexFunc func(obj interface{}) ([]string, error)
// NamespaceIndex is the lookup name for the most common index function,
// which is to index by the namespace field.
const NamespaceIndex string = "namespace"
// MetaNamespaceIndexFunc is a default index function that indexes based
// on an object's namespace
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
meta, err := meta.Accessor(obj)
if err != nil {
return []string{""}, fmt.Errorf("object has no meta: %v", err)
}
return []string{meta.GetNamespace()}, nil
}
// Indexers maps a name to an IndexFunc
type Indexers map[string]IndexFunc
// Indices maps a name to an Index
type Indices map[string]index
// index maps the indexed value to a set of keys in the store that match on that value
type index map[string]sets.Set[string]
MetaNamespaceIndexFunc 是最简单的 IndexFunc,它只用一行代码就提取了对象的 namespace 作为索引值。IndexFunc 返回 []string,这意味着一个对象可以有多个索引值——比如一个 Pod 可以同时属于 namespace "default" 和有标签 "app=nginx",如果定义了基于标签的索引函数,它可能同时返回 ["default", "app=nginx"]。
Indexer 接口的默认实现是 cache 结构体。它组合了 KeyFunc(生成 key)和 ThreadSafeStore(存储 + 索引)。
// staging/src/k8s.io/client-go/tools/cache/store.go(行 202-439)
// `*cache` implements Indexer in terms of a ThreadSafeStore and
// an associated KeyFunc.
type cache struct {
// cacheStorage bears the burden of thread safety for the cache
cacheStorage ThreadSafeStore
// keyFunc is used to make the key for objects stored in and
// retrieved from items, and should be deterministic.
keyFunc KeyFunc
// Called with every object put in the cache.
transformer TransformFunc
// identifier is used to identify the store for metrics.
identifier InformerNameAndResource
// metrics is the metrics provider for the store.
metrics InformerMetricsProvider
}
// NewStore returns a Store implemented simply with a map and a lock.
func NewStore(keyFunc KeyFunc, opts ...StoreOption) Store {
c := &cache{
keyFunc: keyFunc,
}
for _, opt := range opts {
opt(c)
}
threadSafeOpts := []ThreadSafeStoreOption{}
if c.metrics != nil {
threadSafeOpts = append(threadSafeOpts, WithThreadSafeStoreMetrics(c.identifier, c.metrics))
}
c.cacheStorage = NewThreadSafeStore(Indexers{}, Indices{}, threadSafeOpts...)
return c
}
// NewIndexer returns an Indexer implemented simply with a map and a lock.
func NewIndexer(keyFunc KeyFunc, indexers Indexers, opts ...StoreOption) Indexer {
c := &cache{
keyFunc: keyFunc,
}
for _, opt := range opts {
opt(c)
}
threadSafeOpts := []ThreadSafeStoreOption{}
if c.metrics != nil {
threadSafeOpts = append(threadSafeOpts, WithThreadSafeStoreMetrics(c.identifier, c.metrics))
}
c.cacheStorage = NewThreadSafeStore(indexers, Indices{}, threadSafeOpts...)
return c
}
cache 结构体非常简洁:它持有 cacheStorage(ThreadSafeStore 实例)和 keyFunc(生成 key 的函数)。所有 CRUD 操作实际上都是委托给 ThreadSafeStore 完成的。KeyFunc 只在 Add/Update/Delete 时用于生成 key。
// staging/src/k8s.io/client-go/tools/cache/store.go(行 254-340)
// Add inserts an item into the cache.
func (c *cache) Add(obj interface{}) error {
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
if c.transformer != nil {
obj, err = c.transformer(obj)
if err != nil {
return fmt.Errorf("transforming: %w", err)
}
}
c.cacheStorage.Add(key, obj)
return nil
}
// Update sets an item in the cache to its updated state.
func (c *cache) Update(obj interface{}) error {
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
if c.transformer != nil {
obj, err = c.transformer(obj)
if err != nil {
return fmt.Errorf("transforming: %w", err)
}
}
c.cacheStorage.Update(key, obj)
return nil
}
// Delete removes an item from the cache.
func (c *cache) Delete(obj interface{}) error {
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
c.cacheStorage.Delete(key)
return nil
}
// Get returns the item from the cache with the given key.
func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) {
key, err := c.keyFunc(obj)
if err != nil {
return nil, false, KeyError{obj, err}
}
return c.GetByKey(key)
}
// GetByKey returns the item from the cache with the given key.
func (c *cache) GetByKey(key string) (item interface{}, exists bool, err error) {
item, exists = c.cacheStorage.Get(key)
return item, exists, nil
}
// List returns a list of all the items in the cache.
func (c *cache) List() []interface{} {
return c.cacheStorage.List()
}
// ListKeys returns a list of all the keys in the cache.
func (c *cache) ListKeys() []string {
return c.cacheStorage.ListKeys()
}
cache 的 CRUD 实现非常直接:每次 Add/Update/Delete,先用 KeyFunc 生成 key,然后调用 cacheStorage(ThreadSafeStore)的对应方法。Get/GetByKey 也是类似,先生成或直接使用 key,再从 cacheStorage 读取。这里有个重要细节:transformer 函数可以在对象存入缓存前对其进行转换(常用于过滤或脱敏)。
ThreadSafeStore 是线程安全的底层存储接口,它组合了基本存储和多索引能力。
// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go(行 46-70)
// ThreadSafeStore is an interface that allows concurrent access to a
// thread-safe mutable map of items. The interface is thread-safe
// by using sync.RWMutex.
//
// Note that modifying objects stored by the indexers (if any) will
// *not* automatically lead to a re-index. So it's not a good idea
// to directly modify the objects returned by Get/List, in general.
type ThreadSafeStore interface {
Add(key string, obj interface{})
Update(key string, obj interface{})
Delete(key string)
DeleteWithObject(key string, obj interface{})
Get(key string) (item interface{}, exists bool)
List() []interface{}
ListKeys() []string
Replace(map[string]interface{}, string)
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexedValue string) ([]string, error)
ByIndex(indexName, indexedValue string) ([]interface{}, error)
GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error
Resync() error
}
// storeIndex implements the indexing functionality for Store interface
type storeIndex struct {
// indexers maps a name to an IndexFunc
indexers Indexers
// indices maps a name to an Index
indices Indices
}
注意这里的注释非常重要:通过索引器修改对象存储不会自动导致重新索引。也就是说,如果你通过 Get/List 获取了对象后直接修改它,索引不会更新。所以返回的对象应该被视为只读的。
threadSafeMap 是 ThreadSafeStore 接口的默认实现,它用 Go 的 sync.RWMutex 实现线程安全。
// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go(行 255-326)
// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
// index implements the indexing functionality
index *storeIndex
rv string
// metrics is used to expose metrics about the store
metrics *storeMetrics
}
func (c *threadSafeMap) Update(key string, obj interface{}) {
rv, rvErr := rvFromObject(obj)
rvInt, parseErr := parseRVForMetricsWithTruncation(rv)
c.lock.Lock()
defer c.lock.Unlock()
c.updateLocked(key, obj)
if rvErr == nil {
c.rv = rv
if parseErr == nil {
c.metrics.storeResourceVersion.Set(float64(rvInt))
}
}
}
func (c *threadSafeMap) updateLocked(key string, obj interface{}) {
oldObject := c.items[key]
c.items[key] = obj
c.index.updateIndices(oldObject, obj, key)
}
func (c *threadSafeMap) Delete(key string) {
c.DeleteWithObject(key, nil)
}
func (c *threadSafeMap) deleteLocked(key string) {
if obj, exists := c.items[key]; exists {
c.index.updateIndices(obj, nil, key)
delete(c.items, key)
}
}
func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
c.lock.RLock()
defer c.lock.RUnlock()
item, exists = c.items[key]
return item, exists
}
func (c *threadSafeMap) List() []interface{} {
c.lock.RLock()
defer c.lock.RUnlock()
list := make([]interface{}, 0, len(c.items))
for _, item := range c.items {
list = append(list, item)
}
return list
}
threadSafeMap 的设计非常精妙:items 是普通的 map,存储 key-value 对;index 是索引结构(storeIndex),管理所有的索引;rv 存储最新的 resourceVersion;lock 是读写锁,读操作用 RLock(允许多读),写操作用 Lock(独占)。 注意 updateLocked 的逻辑:更新时,先获取旧对象,然后更新 items,最后调用 index.updateIndices 更新所有索引。这是保持索引一致性的关键——索引必须和 items 同步更新。
storeIndex 管理所有的索引。它维护两个核心数据结构:indexers(名字到函数的映射)和 indices(名字到索引的映射)。
// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go(行 93-254)
// updateIndices modifies the objects location in the managed indexes:
// - for create you must provide only the newObj
// - for update you must provide both the oldObj and the newObj
// - for delete you must provide only the oldObj
func (i *storeIndex) updateIndices(oldObj interface{}, newObj interface{}, key string) {
for name := range i.indexers {
i.updateSingleIndex(name, oldObj, newObj, key)
}
}
// updateSingleIndex modifies the objects location in the named index:
// - for create you must provide only the newObj
// - for update you must provide both the oldObj and the newObj
// - for delete you must provide only the oldObj
func (i *storeIndex) updateSingleIndex(name string, oldObj interface{}, newObj interface{}, key string) {
var oldIndexValues, indexValues []string
indexFunc, ok := i.indexers[name]
if !ok {
panic(fmt.Errorf("indexer %q does not exist", name))
}
if oldObj != nil {
oldIndexValues, _ = indexFunc(oldObj)
}
if newObj != nil {
indexValues, _ = indexFunc(newObj)
}
idx := i.indices[name]
if idx == nil {
idx = index{}
i.indices[name] = idx
}
// 优化:如果新旧索引值相同,跳过更新
if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] {
return
}
// 删除旧索引
for _, value := range oldIndexValues {
i.deleteKeyFromIndex(key, value, idx)
}
// 添加新索引
for _, value := range indexValues {
i.addKeyToIndex(key, value, idx)
}
}
func (i *storeIndex) addKeyToIndex(key, indexValue string, idx index) {
set := idx[indexValue]
if set == nil {
set = sets.Set[string]{}
idx[indexValue] = set
}
set.Insert(key)
}
func (i *storeIndex) deleteKeyFromIndex(key, indexValue string, idx index) {
set := idx[indexValue]
if set == nil {
return
}
set.Delete(key)
// 如果集合为空,删除整个索引值(避免内存泄漏)
if len(set) == 0 {
delete(idx, indexValue)
}
}
索引更新的逻辑非常清晰: 第一步:调用 IndexFunc 分别获取旧对象和新对象的索引值。 第二步:优化检查——如果索引值没变(常见场景),直接返回,避免不必要的删除和添加操作。 第三步:删除旧索引(从 oldIndexValues 到 idx[indexValue] 的映射中移除 key)。 第四步:添加新索引(把 key 加入 idx[indexValue] 对应的集合)。 内存优化:当集合为空时(所有对象都被删除),会删除整个索引值,避免内存泄漏(参见 kubernetes/kubernetes#84959)。
这三个方法是 Indexer 最常用的查询方法,它们的实现有微妙的区别:
// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go(行 417-489)
// Index returns a list of items that match the given object on the index function.
func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
c.lock.RLock()
defer c.lock.RUnlock()
storeKeySet, err := c.index.getKeysFromIndex(indexName, obj)
if err != nil {
return nil, err
}
list := make([]interface{}, 0, storeKeySet.Len())
for storeKey := range storeKeySet {
list = append(list, c.items[storeKey])
}
return list, nil
}
// ByIndex returns a list of the items whose indexed values in the given
// index include the given indexed value
func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
c.lock.RLock()
defer c.lock.RUnlock()
set, err := c.index.getKeysByIndex(indexName, indexedValue)
if err != nil {
return nil, err
}
list := make([]interface{}, 0, set.Len())
for key := range set {
list = append(list, c.items[key])
}
return list, nil
}
// IndexKeys returns a list of the Store keys of the objects whose indexed
// values in the given index include the given indexed value.
func (c *threadSafeMap) IndexKeys(indexName, indexedValue string) ([]string, error) {
c.lock.RLock()
defer c.lock.RUnlock()
set, err := c.index.getKeysByIndex(indexName, indexedValue)
if err != nil {
return nil, err
}
return sets.List(set), nil
}
// getKeysFromIndex returns the set of keys in the index that match the object.
func (i *storeIndex) getKeysFromIndex(indexName string, obj interface{}) (sets.Set[string], error) {
indexFunc := i.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}
indexedValues, err := indexFunc(obj) // 从对象提取索引值
if err != nil {
return nil, err
}
index := i.indices[indexName]
var storeKeySet sets.Set[string]
if len(indexedValues) == 1 {
// 优化:只有一个索引值,直接查
storeKeySet = index[indexedValues[0]]
} else {
// 多个索引值,取并集
storeKeySet = sets.Set[string]{}
for _, indexedValue := range indexedValues {
for key := range index[indexedValue] {
storeKeySet.Insert(key)
}
}
}
return storeKeySet, nil
}
// getKeysByIndex returns the set of keys that have the given index value.
func (i *storeIndex) getKeysByIndex(indexName, indexedValue string) (sets.Set[string], error) {
indexFunc := i.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
}
index := i.indices[indexName]
return index[indexedValue], nil
}
三种查询方法的区别: Index(obj):输入是对象。先用 IndexFunc 从对象提取索引值(可能有多个),然后找到所有拥有这些索引值的 keys,最后返回对应的对象列表。典型场景:传入一个 Pod,找出所有和他 namespace 相同的 Pod。 ByIndex(indexValue):输入是索引值(字符串)。直接查找该索引值对应的所有 keys,返回对应的对象列表。典型场景:找出所有 namespace="default" 的 Pod。 IndexKeys(indexValue):输入是索引值,返回的是 key 列表而非对象列表。典型场景:只关心 key 不关心对象时更高效。
让我们通过一个实际案例来理解 Indexer 的使用场景。假设我们需要创建一个 Indexer,按 Pod 的 nodeName 索引。
// 示例:创建和使用自定义索引器
import (
"k8s.io/client-go/tools/cache"
corev1 "k8s.io/api/core/v1"
)
// 1. 定义 nodeName 索引函数
func nodeNameIndexFunc(obj interface{}) ([]string, error) {
pod, ok := obj.(*corev1.Pod)
if !ok {
return nil, fmt.Errorf("expected *corev1.Pod, got %T", obj)
}
// 返回 nodeName 作为索引值
return []string{pod.Spec.NodeName}, nil
}
// 2. 创建 Indexer
indexers := cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, // namespace 索引(内置)
"nodeName": nodeNameIndexFunc, // 自定义 nodeName 索引
}
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, indexers)
// 3. 添加 Pod
pod := &corev1.Pod{
Spec: corev1.PodSpec{
NodeName: "node-1",
},
}
indexer.Add(pod)
// 4. 按 namespace 查询(使用内置索引)
defaultPods, _ := indexer.ByIndex(cache.NamespaceIndex, "default")
// 5. 按 nodeName 查询(使用自定义索引)
node1Pods, _ := indexer.ByIndex("nodeName", "node-1")
// 6. 列出所有 namespace(使用 ListIndexFuncValues)
allNamespaces := indexer.ListIndexFuncValues(cache.NamespaceIndex)
// 7. 动态添加新索引
err := indexer.AddIndexers(cache.Indexers{
"labels.app": func(obj interface{}) ([]string, error) {
pod := obj.(*corev1.Pod)
return []string{}, nil // 简化示例
},
})
这个示例展示了 Indexer 的核心使用模式:定义 IndexFunc → 创建 Indexer → 添加对象 → 按索引查询。Kubernetes 内部大量使用了这种模式,比如调度器按 nodeName 索引 Pod,实现高效的节点调度。
▼ Q: Index 和 ByIndex 有什么区别?什么时候用哪个?
A: Index 输入是对象,会先用 IndexFunc 提取索引值;ByIndex 输入是索引值(字符串),直接查索引表。如果你有一个 Pod 对象想知道和它同 namespace 的其他 Pod,用 Index(pod);如果你想查所有 namespace="default" 的 Pod,用 ByIndex("default", "default")。简单说:Index 是"对象查同类",ByIndex 是"值查对象"。
▼ Q: 为什么修改 Get/List 返回的对象后,索引不会更新?
A: 这是设计上的权衡。Indexer 内部存储的是对象的引用,而不是拷贝。如果你修改了对象的内容,Indexer 里存的对象也被改了(引用同一份数据)。但索引是按对象内容计算的,不是按引用——所以修改对象后索引不会自动更新。正确做法是:先从 Indexer 获取对象,复制一份,修改副本,再用 Update 存回去,Indexer 会重新计算索引。
▼ Q: IndexFunc 返回多个值时,查询结果是怎样的?
A: IndexFunc 返回 []string,意味着一个对象可以有多个索引值。查询时,Index 会取这些索引值的并集(union)。例如,Pod 的标签索引返回 ["app=nginx", "env=prod"],查询 ByIndex("labels", "app=nginx") 会返回所有有 app=nginx 标签的 Pod。如果一个对象有多个标签,它会在每个标签的索引里都出现一次。
▼ Q: NewStore 和 NewIndexer 有什么区别?什么时候用哪个?
A: NewStore 返回 Store 接口(无索引),NewIndexer 返回 Indexer 接口(有索引)。SharedIndexInformer 内部用的是 NewIndexer,因为 Informer 需要支持按 namespace 查询等常用操作。如果你只是需要一个线程安全的存储,用 NewStore 即可;如果需要按字段查询,用 NewIndexer。
▼ Q: AddIndexers 可以在添加了对象之后调用吗?会发生什么?
A: 可以。AddIndexers 有一个特殊逻辑:当你添加新索引器时,如果缓存里已经有对象,会自动对这些对象建立索引(参见 AddIndexers 中的 for 循环)。这是动态扩展索引能力的方式。注意:不能添加已存在的索引名(会冲突),也不能删除已有索引。
这篇文章我们系统地剖析了 Indexer 和 ThreadSafeStore 的设计。核心要点总结如下:
理解 Indexer 的设计对于开发 Kubernetes 控制器和 Operator 非常重要。调度器用它按 nodeName 快速查找 Pod,垃圾回收器用它按 ownerReference 查找依赖资源,各种控制器用它按 namespace 高效过滤对象。Indexer 的索引机制本质上是"空间换时间"的经典实践——预先建立反向索引,让查询从 O(n) 降到 O(1)。
Kubernetes 编程 / Operator 专题【左扬精讲】—— Indexer 与 ThreadSafeStore 核心原理与源码深度剖析 · 来源:Kubernetes v1.36.1 client-go cache 源码分析
相关阅读:
• Kubernetes client-go Store 和 cache 源码
• Kubernetes client-go Indexer 和 IndexFunc 源码
• Kubernetes client-go ThreadSafeStore 源码
此内容由惯性聚合(RSS阅读器)自动聚合整理,仅供阅读参考。 原文来自 — 版权归原作者所有。