惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

腾讯CDC
Hacker News: Ask HN
Hacker News: Ask HN
S
Securelist
Security Latest
Security Latest
S
Schneier on Security
T
Threat Research - Cisco Blogs
Latest news
Latest news
Cyberwarzone
Cyberwarzone
A
Arctic Wolf
Threat Intelligence Blog | Flashpoint
Threat Intelligence Blog | Flashpoint
NISL@THU
NISL@THU
cs.AI updates on arXiv.org
cs.AI updates on arXiv.org
I
Intezer
T
The Exploit Database - CXSecurity.com
N
News and Events Feed by Topic
Simon Willison's Weblog
Simon Willison's Weblog
T
Tor Project blog
Blog — PlanetScale
Blog — PlanetScale
C
Cyber Attacks, Cyber Crime and Cyber Security
C
CERT Recently Published Vulnerability Notes
The Hacker News
The Hacker News
月光博客
月光博客
WordPress大学
WordPress大学
博客园 - 叶小钗
Hugging Face - Blog
Hugging Face - Blog
美团技术团队
量子位
cs.CL updates on arXiv.org
cs.CL updates on arXiv.org
C
Cisco Blogs
博客园 - 三生石上(FineUI控件)
Google DeepMind News
Google DeepMind News
Project Zero
Project Zero
Webroot Blog
Webroot Blog
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
Application and Cybersecurity Blog
Application and Cybersecurity Blog
云风的 BLOG
云风的 BLOG
L
LINUX DO - 最新话题
Schneier on Security
Schneier on Security
Engineering at Meta
Engineering at Meta
www.infosecurity-magazine.com
www.infosecurity-magazine.com
aimingoo的专栏
aimingoo的专栏
D
Docker
有赞技术团队
有赞技术团队
Google DeepMind News
Google DeepMind News
宝玉的分享
宝玉的分享
T
Troy Hunt's Blog
L
Lohrmann on Cybersecurity
T
The Blog of Author Tim Ferriss
cs.CV updates on arXiv.org
cs.CV updates on arXiv.org
L
LangChain Blog

博客园 - 左扬

Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:生产级 Controller 实践:并发安全、资源清理与高可用设计 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析: Controller 调试与诊断工具:从日志分析到问题定位 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:DynamicClient 操作 CRD:无需代码生成的动态操作 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:控制器与 APIServer 完整交互流程:从 Watch 到缓存同步 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:错误处理与重试机制:WorkQueue 限速器详解 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:Leader 选举机制:高可用控制器的必备技能 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:Controller 开发模式完整实战 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:SharedInformerFactory 与等待缓存同步 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:从认证配置到 Deployment 操作 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:版本对应、架构组件与组件关系 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:Informer 源码深度解析:从底层原理到实战应用 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:Reflector 源码深度解析 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:ListWatcher 源码深度解析 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:DeltaFIFO 核心原理与源码深度剖析 Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:workqueue 核心原理与实战 Kubernetes 编程 / Operator 专题【左扬精讲】—— runtime.Codec 资源编解码:serializer 与 codec 差异、编解码数据结构、codec 核心调用链路 Kubernetes 编程 / Operator 专题【左扬精讲】—— Scheme 资源注册机制全解 Kubernetes 编程 / Operator 专题【左扬精讲】—— Kubernetes 自定义资源的内部版本与外部版本:从源码看版本定义机制 Kubernetes 编程 / Operator 专题【左扬精讲】—— Kubernetes 1.36.1 核心 API 数据结构全解 Kubernetes 编程 / Operator 专题【左扬精讲】—— Kubernetes 构建过程 【AIOPS】一文读懂LLM【左扬精讲】:从诞生到普及,解锁大语言模型的核心密码 【AIOPS】AI Agent 专题【左扬精讲】核心功能篇:MCP-VictoriaMetrics Hooks 源码精讲:Hooks 可观测性的无侵入式实现 【AIOPS】AI Agent 专题【左扬精讲】核心功能篇:MCP-VictoriaMetrics Golang 配置解析源码精讲 ——SRE 自定义 Agent 核心技巧 【AIOPS】AI Agent 专题【左扬精讲】核心功能篇:MCP-VictoriaMetrics Golang 并发模型解析 ——SRE 应对高并发采集的调优思路 【AIOPS】AI Agent 专题【左扬精讲】基础架构篇:MCP-VictoriaMetrics Golang 源码整体架构拆解 ——SRE 必懂的核心模块与数据流 OpenTelemetry 开发实战【左扬精讲】—— 云原生可观测体系构建与分布式追踪二次开发 Kubernetes 编程 / Operator 专题【左扬精讲】—— Operator 开发实战项目 7 —— 基于流量预测模型的智能弹性扩缩容 Operator 实战(AIOps 模型训练与智能扩容(下篇)—— 预测式弹性扩缩容 Operator 落地实现) Kubernetes 编程 / Operator 专题【左扬精讲】—— Operator 开发实战项目 7 —— 基于流量预测模型的智能弹性扩缩容 Operator 实战(AIOps 模型训练与智能扩容(上篇)—— 时序预测模型构建与离线训练) Kubernetes 编程 / Operator 专题【左扬精讲】—— Operator 开发实战项目 6 —— 基于运维专家知识库的智能故障诊断与排查 Operator 实战 Kubernetes 编程 / Operator 专题【左扬精讲】—— Operator 开发实战项目 5 —— 基于大语言模型(LLM)的实时日志流智能监测 Operator 实现 Kubernetes 编程 / Operator 专题【左扬精讲】—— Operator 开发实战项目 4 —— 基于 Operator 实现大模型私有化部署与管理 Kubernetes 编程 / Operator 专题【左扬精讲】—— Operator 开发实战项目 3 —— 面向 AI / 算力调度场景:GPU 竞价实例资源池统一调度管理 Operator 开发 Kubernetes编程 / Operator专题【左扬精讲】—— Operator 开发实战项目 2 —— 面向零售 / 电商潮汐流量难题:多云多集群数据中心级全链路弹性伸缩 DataCenter Scaler Operator 从 0 到 1 全链路开发 Kubernetes编程 / Operator专题【左扬精讲】—— 深入理解Kubebuilder注解:为什么Operator开发离不开这些特殊注释 Kubernetes编程 / Operator专题【左扬精讲】—— Operator 开发实战项目1 —— Applicaion Operator(通用应用生命周期管理 Operator 实战) Pod 镜像拉取失败?kubectl edit pods修改镜像地址的底层原理与实操 (该方法仅为临时应急方案,并非长期解决方案) Kubernetes编程/Operator专题精讲—— 理解控制器模式 —— 控制器模式的核心原理与实现逻辑(从原理到实践) 【AIOPS】AI Agent 专题【左扬精讲】模型微调实战:一站式平台 LLaMA-Factory 【AIOPS】AI Agent 专题【左扬精讲】基于 k8s+vLLM+Ray 分布式部署全指南:架构设计、资源调度与性能优化 【AIOPS】AI Agent专题【左扬精讲】非量化版DeepSeek分布式部署全指南:精度保障、显存规划与Ollama/vLLM选型 【AIOPS】AI Agent 专题【左扬精讲】零开发框架实现 ReAct Agent(Go SRE友好)
Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:Indexer 与 ThreadSafeStore 核心原理与源码深度剖析
左扬 · 2026-06-13 · via 博客园 - 左扬

Kubernetes 编程 / Operator 专题【左扬精讲】—— Indexer 与 ThreadSafeStore 核心原理与源码深度剖析

Kubernetes 编程 / Operator 专题【左扬精讲】—— Client-go 源代码分析:Indexer 与 ThreadSafeStore 核心原理与源码深度剖析

在 Kubernetes Informer 系统中,Indexer 是一个非常重要的组件。它不仅仅是简单的键值存储,而是支持多索引查询的数据结构。在开发控制器时,我们经常需要"找出所有 namespace 为 default 的 Pod"或"找出所有 spec.nodeName 为 node-1 的 Pod",这时候 Indexer 的强大索引能力就派上用场了。Indexer 底层依赖 ThreadSafeStore 实现线程安全的数据存储。今天我们就来深入剖析这两个核心组件的设计和实现。

Kubernetes 1.36.1 · Go 1.26 · client-go tools/cache

Kubernetes Go Indexer ThreadSafeStore Informer

🔓 学习重点提示  — 建议先通读全文,再重点回顾标注内容

★ 重点掌握(必须)
   • Store 接口:基础存储接口的完整定义
   • Indexer 接口:扩展 Store 的多索引能力
   • cache 结构体:Indexer 的底层实现(KeyFunc + ThreadSafeStore)
   • threadSafeMap:ThreadSafeStore 的实现,线程安全的 map + 索引
   • Index/IndexKeys/ByIndex 区别:三种索引查询方法的不同场景

☆ 次重点(了解即可)
   • Transaction 批量操作
   • AddIndexers 动态添加索引器


一、从 Store 到 Indexer —— 接口层级关系

在 Kubernetes client-go 中,数据存储的接口层级非常清晰。Store 是最基础的接口,Indexer 在 Store 基础上扩展了多索引能力,ThreadSafeStore 则提供了线程安全的底层实现。

┌─────────────────────────────────────────────────────────────┐
│  Indexer(扩展存储 + 多索引查询)                            │
│  - 继承自 Store                                            │
│  - 新增:Index / IndexKeys / ByIndex / GetIndexers          │
│  - 底层:cache (KeyFunc + ThreadSafeStore)                 │
└─────────────────────────────┬───────────────────────────────┘
                              │ 继承
┌─────────────────────────────▼───────────────────────────────┐
│  Store(基础存储接口)                                    │
│  - Add / Update / Delete / Get / List / ListKeys           │
│  - Replace / Resync / LastStoreSyncResourceVersion         │
└─────────────────────────────┬───────────────────────────────┘
                              │ 实现
┌─────────────────────────────▼───────────────────────────────┐
│  ThreadSafeStore(线程安全底层存储)                      │
│  - threadSafeMap: sync.RWMutex + map + storeIndex        │
│  - Add / Update / Delete / Get / List / Index           │
└─────────────────────────────────────────────────────────────┘

理解这个层级关系非常重要:Indexer 是接口,定义了多索引查询的契约;cache 是 Indexer 的实现,内部持有 KeyFunc 和 ThreadSafeStore;ThreadSafeStore 也是接口threadSafeMap 是它的实现


二、Store 接口 —— 基础存储契约

Store 接口是整个存储体系的基础。它定义了对象存储的基本操作。

// staging/src/k8s.io/client-go/tools/cache/store.go(行 38-82)

// Store is a generic object storage interface. Reflector knows how to
// watch a server and update a Store. This package provides a variety of
// implementations of Store.
type Store interface {
    // Add adds the given object to the accumulator associated with
    // the given object's key
    Add(obj interface{}) error

    // Update updates the given object in the accumulator associated
    // with the given object's key
    Update(obj interface{}) error

    // Delete deletes the given object from the accumulator associated
    // with the given object's key
    Delete(obj interface{}) error

    // List returns a list of all the currently non-empty accumulators
    List() []interface{}

    // ListKeys returns a list of all the keys currently associated
    // with non-empty accumulators
    ListKeys() []string

    // Get returns the accumulator associated with the given object's key
    Get(obj interface{}) (item interface{}, exists bool, err error)

    // GetByKey returns the accumulator associated with the given key
    GetByKey(key string) (item interface{}, exists bool, err error)

    // Replace will delete the contents of the store, using instead the
    // given list. Store takes ownership of the list.
    Replace([]interface{}, string) error

    // Resync is meaningless in the terms appearing here but has
    // meaning in some implementations that have non-trivial
    // additional behavior (e.g., DeltaFIFO).
    Resync() error
}

Store 接口的核心方法是 CRUD 操作:Add/Update/Delete 是增删改,Get/GetByKey/List/ListKeys 是读取,Replace 是全量替换。Resync 对于普通 Store 实现来说是空操作(no-op),但在 DeltaFIFO 中有实际意义。


三、Indexer 接口 —— 多索引扩展

Indexer 在 Store 基础上增加了五个与索引相关的方法,让你可以按索引字段快速查询对象。

// staging/src/k8s.io/client-go/tools/cache/index.go(行 26-55)

// Indexer extends Store with multiple indices and restricts each
// accumulator to simply hold the current object (and be empty after Delete).
//
// There are three kinds of strings here:
//  1. a storage key, as defined in the Store interface,
//  2. a name of an index, and
//  3. an "indexed value", which is produced by an IndexFunc and
//     can be a field value or any other string computed from the object.
type Indexer interface {
    Store

    // Index returns the stored objects whose set of indexed values
    // intersects the set of indexed values of the given object, for
    // the named index
    Index(indexName string, obj interface{}) ([]interface{}, error)

    // IndexKeys returns the storage keys of the stored objects whose
    // set of indexed values for the named index includes the given
    // indexed value
    IndexKeys(indexName, indexedValue string) ([]string, error)

    // ListIndexFuncValues returns all the indexed values of the given index
    ListIndexFuncValues(indexName string) []string

    // ByIndex returns the stored objects whose set of indexed values
    // for the named index includes the given indexed value
    ByIndex(indexName, indexedValue string) ([]interface{}, error)

    // GetIndexers return the indexers
    GetIndexers() Indexers

    // AddIndexers adds more indexers to this store.
    AddIndexers(newIndexers Indexers) error
}

Indexer 接口新增了五个方法,理解它们的区别是掌握 Indexer 的关键:

方法输入输出典型场景
Index indexName + obj(对象) []interface{}(对象列表) 按对象属性查询
IndexKeys indexName + indexedValue []string(key 列表) 按索引值查 key
ByIndex indexName + indexedValue []interface{}(对象列表) 按索引值查对象(常用)
ListIndexFuncValues indexName []string(所有索引值) 列出所有 namespace

四、KeyFunc 和 IndexFunc —— 索引的核心概念

4.1 KeyFunc —— 从对象到 key 的映射

KeyFunc 是将对象转换为存储 key 的函数。在 Kubernetes 中,这个 key 通常是 namespace/name 格式。

// staging/src/k8s.io/client-go/tools/cache/store.go(行 120-162)

// KeyFunc knows how to make a key from an object. Implementations should be deterministic.
type KeyFunc func(obj interface{}) (string, error)

// MetaNamespaceKeyFunc is a default KeyFunc that knows how to make
// keys for API objects which implement meta.Interface.
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
    if key, ok := obj.(ExplicitKey); ok {
        return string(key), nil
    }
    objName, err := ObjectToName(obj)
    if err != nil {
        return "", err
    }
    return objName.String(), nil
}

// ObjectToName returns the structured name for the given object
func ObjectToName(obj interface{}) (ObjectName, error) {
    meta, err := meta.Accessor(obj)
    if err != nil {
        return ObjectName{}, fmt.Errorf("object has no meta: %v", err)
    }
    return MetaObjectToName(meta), nil
}

// MetaObjectToName returns the structured name for the given object
func MetaObjectToName(obj metav1.Object) ObjectName {
    if len(obj.GetNamespace()) > 0 {
        return ObjectName{Namespace: obj.GetNamespace(), Name: obj.GetName()}
    }
    return ObjectName{Namespace: "", Name: obj.GetName()}
}

MetaNamespaceKeyFunc 是默认的 KeyFunc,它的工作流程是:先检查对象是否实现了 ExplicitKey 接口(用于特殊 key,如集群级别的资源);然后提取对象的 meta 信息,生成 namespace/name 格式的 key。实现必须是确定性的——同一个对象多次调用应该返回相同的 key。

4.2 IndexFunc —— 从对象到索引值的映射

IndexFunc 是将对象转换为索引值的函数。一个对象可以对应多个索引值(返回 []string)。

// staging/src/k8s.io/client-go/tools/cache/index.go(行 57-97)

// IndexFunc knows how to compute the set of indexed values for an object.
type IndexFunc func(obj interface{}) ([]string, error)

// NamespaceIndex is the lookup name for the most common index function,
// which is to index by the namespace field.
const NamespaceIndex string = "namespace"

// MetaNamespaceIndexFunc is a default index function that indexes based
// on an object's namespace
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
    meta, err := meta.Accessor(obj)
    if err != nil {
        return []string{""}, fmt.Errorf("object has no meta: %v", err)
    }
    return []string{meta.GetNamespace()}, nil
}

// Indexers maps a name to an IndexFunc
type Indexers map[string]IndexFunc

// Indices maps a name to an Index
type Indices map[string]index

// index maps the indexed value to a set of keys in the store that match on that value
type index map[string]sets.Set[string]

MetaNamespaceIndexFunc 是最简单的 IndexFunc,它只用一行代码就提取了对象的 namespace 作为索引值。IndexFunc 返回 []string,这意味着一个对象可以有多个索引值——比如一个 Pod 可以同时属于 namespace "default" 和有标签 "app=nginx",如果定义了基于标签的索引函数,它可能同时返回 ["default", "app=nginx"]。


五、cache 结构体 —— Indexer 的底层实现

Indexer 接口的默认实现是 cache 结构体。它组合了 KeyFunc(生成 key)和 ThreadSafeStore(存储 + 索引)。

// staging/src/k8s.io/client-go/tools/cache/store.go(行 202-439)

// `*cache` implements Indexer in terms of a ThreadSafeStore and
// an associated KeyFunc.
type cache struct {
    // cacheStorage bears the burden of thread safety for the cache
    cacheStorage ThreadSafeStore

    // keyFunc is used to make the key for objects stored in and
    // retrieved from items, and should be deterministic.
    keyFunc KeyFunc

    // Called with every object put in the cache.
    transformer TransformFunc

    // identifier is used to identify the store for metrics.
    identifier InformerNameAndResource

    // metrics is the metrics provider for the store.
    metrics InformerMetricsProvider
}

// NewStore returns a Store implemented simply with a map and a lock.
func NewStore(keyFunc KeyFunc, opts ...StoreOption) Store {
    c := &cache{
        keyFunc: keyFunc,
    }
    for _, opt := range opts {
        opt(c)
    }
    threadSafeOpts := []ThreadSafeStoreOption{}
    if c.metrics != nil {
        threadSafeOpts = append(threadSafeOpts, WithThreadSafeStoreMetrics(c.identifier, c.metrics))
    }
    c.cacheStorage = NewThreadSafeStore(Indexers{}, Indices{}, threadSafeOpts...)
    return c
}

// NewIndexer returns an Indexer implemented simply with a map and a lock.
func NewIndexer(keyFunc KeyFunc, indexers Indexers, opts ...StoreOption) Indexer {
    c := &cache{
        keyFunc: keyFunc,
    }
    for _, opt := range opts {
        opt(c)
    }
    threadSafeOpts := []ThreadSafeStoreOption{}
    if c.metrics != nil {
        threadSafeOpts = append(threadSafeOpts, WithThreadSafeStoreMetrics(c.identifier, c.metrics))
    }
    c.cacheStorage = NewThreadSafeStore(indexers, Indices{}, threadSafeOpts...)
    return c
}

cache 结构体非常简洁:它持有 cacheStorage(ThreadSafeStore 实例)和 keyFunc(生成 key 的函数)。所有 CRUD 操作实际上都是委托给 ThreadSafeStore 完成的。KeyFunc 只在 Add/Update/Delete 时用于生成 key。

5.1 cache 的 CRUD 实现

// staging/src/k8s.io/client-go/tools/cache/store.go(行 254-340)

// Add inserts an item into the cache.
func (c *cache) Add(obj interface{}) error {
    key, err := c.keyFunc(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    if c.transformer != nil {
        obj, err = c.transformer(obj)
        if err != nil {
            return fmt.Errorf("transforming: %w", err)
        }
    }
    c.cacheStorage.Add(key, obj)
    return nil
}

// Update sets an item in the cache to its updated state.
func (c *cache) Update(obj interface{}) error {
    key, err := c.keyFunc(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    if c.transformer != nil {
        obj, err = c.transformer(obj)
        if err != nil {
            return fmt.Errorf("transforming: %w", err)
        }
    }
    c.cacheStorage.Update(key, obj)
    return nil
}

// Delete removes an item from the cache.
func (c *cache) Delete(obj interface{}) error {
    key, err := c.keyFunc(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    c.cacheStorage.Delete(key)
    return nil
}

// Get returns the item from the cache with the given key.
func (c *cache) Get(obj interface{}) (item interface{}, exists bool, err error) {
    key, err := c.keyFunc(obj)
    if err != nil {
        return nil, false, KeyError{obj, err}
    }
    return c.GetByKey(key)
}

// GetByKey returns the item from the cache with the given key.
func (c *cache) GetByKey(key string) (item interface{}, exists bool, err error) {
    item, exists = c.cacheStorage.Get(key)
    return item, exists, nil
}

// List returns a list of all the items in the cache.
func (c *cache) List() []interface{} {
    return c.cacheStorage.List()
}

// ListKeys returns a list of all the keys in the cache.
func (c *cache) ListKeys() []string {
    return c.cacheStorage.ListKeys()
}

cache 的 CRUD 实现非常直接:每次 Add/Update/Delete,先用 KeyFunc 生成 key,然后调用 cacheStorage(ThreadSafeStore)的对应方法。Get/GetByKey 也是类似,先生成或直接使用 key,再从 cacheStorage 读取。这里有个重要细节:transformer 函数可以在对象存入缓存前对其进行转换(常用于过滤或脱敏)。


六、ThreadSafeStore 和 threadSafeMap —— 线程安全的底层存储

6.1 ThreadSafeStore 接口

ThreadSafeStore 是线程安全的底层存储接口,它组合了基本存储和多索引能力。

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go(行 46-70)

// ThreadSafeStore is an interface that allows concurrent access to a
// thread-safe mutable map of items. The interface is thread-safe
// by using sync.RWMutex.
//
// Note that modifying objects stored by the indexers (if any) will
// *not* automatically lead to a re-index. So it's not a good idea
// to directly modify the objects returned by Get/List, in general.
type ThreadSafeStore interface {
    Add(key string, obj interface{})
    Update(key string, obj interface{})
    Delete(key string)
    DeleteWithObject(key string, obj interface{})
    Get(key string) (item interface{}, exists bool)
    List() []interface{}
    ListKeys() []string
    Replace(map[string]interface{}, string)
    Index(indexName string, obj interface{}) ([]interface{}, error)
    IndexKeys(indexName, indexedValue string) ([]string, error)
    ByIndex(indexName, indexedValue string) ([]interface{}, error)
    GetIndexers() Indexers
    AddIndexers(newIndexers Indexers) error
    Resync() error
}

// storeIndex implements the indexing functionality for Store interface
type storeIndex struct {
    // indexers maps a name to an IndexFunc
    indexers Indexers
    // indices maps a name to an Index
    indices Indices
}

注意这里的注释非常重要:通过索引器修改对象存储不会自动导致重新索引。也就是说,如果你通过 Get/List 获取了对象后直接修改它,索引不会更新。所以返回的对象应该被视为只读的。

6.2 threadSafeMap 结构体

threadSafeMap 是 ThreadSafeStore 接口的默认实现,它用 Go 的 sync.RWMutex 实现线程安全。

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go(行 255-326)

// threadSafeMap implements ThreadSafeStore
type threadSafeMap struct {
    lock  sync.RWMutex
    items map[string]interface{}

    // index implements the indexing functionality
    index *storeIndex
    rv    string

    // metrics is used to expose metrics about the store
    metrics *storeMetrics
}

func (c *threadSafeMap) Update(key string, obj interface{}) {
    rv, rvErr := rvFromObject(obj)
    rvInt, parseErr := parseRVForMetricsWithTruncation(rv)
    c.lock.Lock()
    defer c.lock.Unlock()
    c.updateLocked(key, obj)
    if rvErr == nil {
        c.rv = rv
        if parseErr == nil {
            c.metrics.storeResourceVersion.Set(float64(rvInt))
        }
    }
}

func (c *threadSafeMap) updateLocked(key string, obj interface{}) {
    oldObject := c.items[key]
    c.items[key] = obj
    c.index.updateIndices(oldObject, obj, key)
}

func (c *threadSafeMap) Delete(key string) {
    c.DeleteWithObject(key, nil)
}

func (c *threadSafeMap) deleteLocked(key string) {
    if obj, exists := c.items[key]; exists {
        c.index.updateIndices(obj, nil, key)
        delete(c.items, key)
    }
}

func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
    c.lock.RLock()
    defer c.lock.RUnlock()
    item, exists = c.items[key]
    return item, exists
}

func (c *threadSafeMap) List() []interface{} {
    c.lock.RLock()
    defer c.lock.RUnlock()
    list := make([]interface{}, 0, len(c.items))
    for _, item := range c.items {
        list = append(list, item)
    }
    return list
}

threadSafeMap 的设计非常精妙:items 是普通的 map,存储 key-value 对;index 是索引结构(storeIndex),管理所有的索引;rv 存储最新的 resourceVersion;lock 是读写锁,读操作用 RLock(允许多读),写操作用 Lock(独占)。 注意 updateLocked 的逻辑:更新时,先获取旧对象,然后更新 items,最后调用 index.updateIndices 更新所有索引。这是保持索引一致性的关键——索引必须和 items 同步更新。


七、索引方法的实现详解

7.1 storeIndex 的索引管理

storeIndex 管理所有的索引。它维护两个核心数据结构:indexers(名字到函数的映射)和 indices(名字到索引的映射)。

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go(行 93-254)

// updateIndices modifies the objects location in the managed indexes:
// - for create you must provide only the newObj
// - for update you must provide both the oldObj and the newObj
// - for delete you must provide only the oldObj
func (i *storeIndex) updateIndices(oldObj interface{}, newObj interface{}, key string) {
    for name := range i.indexers {
        i.updateSingleIndex(name, oldObj, newObj, key)
    }
}

// updateSingleIndex modifies the objects location in the named index:
// - for create you must provide only the newObj
// - for update you must provide both the oldObj and the newObj
// - for delete you must provide only the oldObj
func (i *storeIndex) updateSingleIndex(name string, oldObj interface{}, newObj interface{}, key string) {
    var oldIndexValues, indexValues []string
    indexFunc, ok := i.indexers[name]
    if !ok {
        panic(fmt.Errorf("indexer %q does not exist", name))
    }
    if oldObj != nil {
        oldIndexValues, _ = indexFunc(oldObj)
    }
    if newObj != nil {
        indexValues, _ = indexFunc(newObj)
    }

    idx := i.indices[name]
    if idx == nil {
        idx = index{}
        i.indices[name] = idx
    }

    // 优化:如果新旧索引值相同,跳过更新
    if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] {
        return
    }

    // 删除旧索引
    for _, value := range oldIndexValues {
        i.deleteKeyFromIndex(key, value, idx)
    }
    // 添加新索引
    for _, value := range indexValues {
        i.addKeyToIndex(key, value, idx)
    }
}

func (i *storeIndex) addKeyToIndex(key, indexValue string, idx index) {
    set := idx[indexValue]
    if set == nil {
        set = sets.Set[string]{}
        idx[indexValue] = set
    }
    set.Insert(key)
}

func (i *storeIndex) deleteKeyFromIndex(key, indexValue string, idx index) {
    set := idx[indexValue]
    if set == nil {
        return
    }
    set.Delete(key)
    // 如果集合为空,删除整个索引值(避免内存泄漏)
    if len(set) == 0 {
        delete(idx, indexValue)
    }
}

索引更新的逻辑非常清晰: 第一步:调用 IndexFunc 分别获取旧对象和新对象的索引值。 第二步:优化检查——如果索引值没变(常见场景),直接返回,避免不必要的删除和添加操作。 第三步:删除旧索引(从 oldIndexValues 到 idx[indexValue] 的映射中移除 key)。 第四步:添加新索引(把 key 加入 idx[indexValue] 对应的集合)。 内存优化:当集合为空时(所有对象都被删除),会删除整个索引值,避免内存泄漏(参见 kubernetes/kubernetes#84959)。

7.2 Index / IndexKeys / ByIndex 的实现

这三个方法是 Indexer 最常用的查询方法,它们的实现有微妙的区别:

// staging/src/k8s.io/client-go/tools/cache/thread_safe_store.go(行 417-489)

// Index returns a list of items that match the given object on the index function.
func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
    c.lock.RLock()
    defer c.lock.RUnlock()

    storeKeySet, err := c.index.getKeysFromIndex(indexName, obj)
    if err != nil {
        return nil, err
    }

    list := make([]interface{}, 0, storeKeySet.Len())
    for storeKey := range storeKeySet {
        list = append(list, c.items[storeKey])
    }
    return list, nil
}

// ByIndex returns a list of the items whose indexed values in the given
// index include the given indexed value
func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
    c.lock.RLock()
    defer c.lock.RUnlock()

    set, err := c.index.getKeysByIndex(indexName, indexedValue)
    if err != nil {
        return nil, err
    }
    list := make([]interface{}, 0, set.Len())
    for key := range set {
        list = append(list, c.items[key])
    }
    return list, nil
}

// IndexKeys returns a list of the Store keys of the objects whose indexed
// values in the given index include the given indexed value.
func (c *threadSafeMap) IndexKeys(indexName, indexedValue string) ([]string, error) {
    c.lock.RLock()
    defer c.lock.RUnlock()

    set, err := c.index.getKeysByIndex(indexName, indexedValue)
    if err != nil {
        return nil, err
    }
    return sets.List(set), nil
}

// getKeysFromIndex returns the set of keys in the index that match the object.
func (i *storeIndex) getKeysFromIndex(indexName string, obj interface{}) (sets.Set[string], error) {
    indexFunc := i.indexers[indexName]
    if indexFunc == nil {
        return nil, fmt.Errorf("Index with name %s does not exist", indexName)
    }
    indexedValues, err := indexFunc(obj)  // 从对象提取索引值
    if err != nil {
        return nil, err
    }
    index := i.indices[indexName]

    var storeKeySet sets.Set[string]
    if len(indexedValues) == 1 {
        // 优化:只有一个索引值,直接查
        storeKeySet = index[indexedValues[0]]
    } else {
        // 多个索引值,取并集
        storeKeySet = sets.Set[string]{}
        for _, indexedValue := range indexedValues {
            for key := range index[indexedValue] {
                storeKeySet.Insert(key)
            }
        }
    }
    return storeKeySet, nil
}

// getKeysByIndex returns the set of keys that have the given index value.
func (i *storeIndex) getKeysByIndex(indexName, indexedValue string) (sets.Set[string], error) {
    indexFunc := i.indexers[indexName]
    if indexFunc == nil {
        return nil, fmt.Errorf("Index with name %s does not exist", indexName)
    }
    index := i.indices[indexName]
    return index[indexedValue], nil
}

三种查询方法的区别: Index(obj):输入是对象。先用 IndexFunc 从对象提取索引值(可能有多个),然后找到所有拥有这些索引值的 keys,最后返回对应的对象列表。典型场景:传入一个 Pod,找出所有和他 namespace 相同的 Pod。 ByIndex(indexValue):输入是索引值(字符串)。直接查找该索引值对应的所有 keys,返回对应的对象列表。典型场景:找出所有 namespace="default" 的 Pod。 IndexKeys(indexValue):输入是索引值,返回的是 key 列表而非对象列表。典型场景:只关心 key 不关心对象时更高效。


八、实战:自定义索引器的创建和使用

让我们通过一个实际案例来理解 Indexer 的使用场景。假设我们需要创建一个 Indexer,按 Pod 的 nodeName 索引。

// 示例:创建和使用自定义索引器

import (
    "k8s.io/client-go/tools/cache"
    corev1 "k8s.io/api/core/v1"
)

// 1. 定义 nodeName 索引函数
func nodeNameIndexFunc(obj interface{}) ([]string, error) {
    pod, ok := obj.(*corev1.Pod)
    if !ok {
        return nil, fmt.Errorf("expected *corev1.Pod, got %T", obj)
    }
    // 返回 nodeName 作为索引值
    return []string{pod.Spec.NodeName}, nil
}

// 2. 创建 Indexer
indexers := cache.Indexers{
    cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,  // namespace 索引(内置)
    "nodeName": nodeNameIndexFunc,                      // 自定义 nodeName 索引
}
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, indexers)

// 3. 添加 Pod
pod := &corev1.Pod{
    Spec: corev1.PodSpec{
        NodeName: "node-1",
    },
}
indexer.Add(pod)

// 4. 按 namespace 查询(使用内置索引)
defaultPods, _ := indexer.ByIndex(cache.NamespaceIndex, "default")

// 5. 按 nodeName 查询(使用自定义索引)
node1Pods, _ := indexer.ByIndex("nodeName", "node-1")

// 6. 列出所有 namespace(使用 ListIndexFuncValues)
allNamespaces := indexer.ListIndexFuncValues(cache.NamespaceIndex)

// 7. 动态添加新索引
err := indexer.AddIndexers(cache.Indexers{
    "labels.app": func(obj interface{}) ([]string, error) {
        pod := obj.(*corev1.Pod)
        return []string{}, nil  // 简化示例
    },
})

这个示例展示了 Indexer 的核心使用模式:定义 IndexFunc → 创建 Indexer → 添加对象 → 按索引查询。Kubernetes 内部大量使用了这种模式,比如调度器按 nodeName 索引 Pod,实现高效的节点调度。


九、常见问题(FAQ)

▼ Q: Index 和 ByIndex 有什么区别?什么时候用哪个?

A: Index 输入是对象,会先用 IndexFunc 提取索引值;ByIndex 输入是索引值(字符串),直接查索引表。如果你有一个 Pod 对象想知道和它同 namespace 的其他 Pod,用 Index(pod);如果你想查所有 namespace="default" 的 Pod,用 ByIndex("default", "default")。简单说:Index 是"对象查同类",ByIndex 是"值查对象"。


▼ Q: 为什么修改 Get/List 返回的对象后,索引不会更新?

A: 这是设计上的权衡。Indexer 内部存储的是对象的引用,而不是拷贝。如果你修改了对象的内容,Indexer 里存的对象也被改了(引用同一份数据)。但索引是按对象内容计算的,不是按引用——所以修改对象后索引不会自动更新。正确做法是:先从 Indexer 获取对象,复制一份,修改副本,再用 Update 存回去,Indexer 会重新计算索引。


▼ Q: IndexFunc 返回多个值时,查询结果是怎样的?

A: IndexFunc 返回 []string,意味着一个对象可以有多个索引值。查询时,Index 会取这些索引值的并集(union)。例如,Pod 的标签索引返回 ["app=nginx", "env=prod"],查询 ByIndex("labels", "app=nginx") 会返回所有有 app=nginx 标签的 Pod。如果一个对象有多个标签,它会在每个标签的索引里都出现一次。


▼ Q: NewStore 和 NewIndexer 有什么区别?什么时候用哪个?

A: NewStore 返回 Store 接口(无索引),NewIndexer 返回 Indexer 接口(有索引)。SharedIndexInformer 内部用的是 NewIndexer,因为 Informer 需要支持按 namespace 查询等常用操作。如果你只是需要一个线程安全的存储,用 NewStore 即可;如果需要按字段查询,用 NewIndexer。


▼ Q: AddIndexers 可以在添加了对象之后调用吗?会发生什么?

A: 可以。AddIndexers 有一个特殊逻辑:当你添加新索引器时,如果缓存里已经有对象,会自动对这些对象建立索引(参见 AddIndexers 中的 for 循环)。这是动态扩展索引能力的方式。注意:不能添加已存在的索引名(会冲突),也不能删除已有索引。


十、总结

这篇文章我们系统地剖析了 Indexer 和 ThreadSafeStore 的设计。核心要点总结如下:

  • 接口层级:Store(基础)→ Indexer(扩展多索引)→ cache(实现);ThreadSafeStore(接口)→ threadSafeMap(实现)。
  • KeyFunc:将对象转换为存储 key(通常是 namespace/name),必须是确定性的。
  • IndexFunc:将对象转换为索引值(可以是多个),用于按字段快速查询。
  • Index 三剑客:Index(obj) 是对象查同类,ByIndex(value) 是值查对象,IndexKeys(value) 只查 key 不查对象。
  • 索引更新:updateLocked 时同步更新 items 和 index,保证一致性;优化逻辑避免无变化的重复更新。
  • 线程安全:threadSafeMap 用 sync.RWMutex,读多写少场景性能好。
  • 返回对象只读:修改返回的对象不会更新索引,必须用 Update 方法。

理解 Indexer 的设计对于开发 Kubernetes 控制器和 Operator 非常重要。调度器用它按 nodeName 快速查找 Pod,垃圾回收器用它按 ownerReference 查找依赖资源,各种控制器用它按 namespace 高效过滤对象。Indexer 的索引机制本质上是"空间换时间"的经典实践——预先建立反向索引,让查询从 O(n) 降到 O(1)。


Kubernetes 编程 / Operator 专题【左扬精讲】—— Indexer 与 ThreadSafeStore 核心原理与源码深度剖析 · 来源:Kubernetes v1.36.1 client-go cache 源码分析

相关阅读:
   • Kubernetes client-go Store 和 cache 源码
   • Kubernetes client-go Indexer 和 IndexFunc 源码
   • Kubernetes client-go ThreadSafeStore 源码