惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

酷 壳 – CoolShell
酷 壳 – CoolShell
H
Hacker News: Front Page
P
Palo Alto Networks Blog
T
ThreatConnect
Apple Machine Learning Research
Apple Machine Learning Research
博客园_首页
T
True Tiger Recordings
P
Privacy & Cybersecurity Law Blog
B
Blog
IT之家
IT之家
Last Week in AI
Last Week in AI
F
Full Disclosure
Hacker News: Ask HN
Hacker News: Ask HN
C
Comments on: Blog
Microsoft Azure Blog
Microsoft Azure Blog
C
Cybersecurity and Infrastructure Security Agency CISA
Microsoft Security Blog
Microsoft Security Blog
博客园 - 【当耐特】
N
News and Events Feed by Topic
NISL@THU
NISL@THU
腾讯CDC
雷峰网
雷峰网
Security Latest
Security Latest
李成银的技术随笔
M
Microsoft Research Blog - Microsoft Research
L
LangChain Blog
L
Lohrmann on Cybersecurity
cs.CL updates on arXiv.org
cs.CL updates on arXiv.org
C
Check Point Blog
Y
Y Combinator Blog
Recent Announcements
Recent Announcements
博客园 - Franky
N
News | PayPal Newsroom
V
V2EX
A
About on SuperTechFans
The Register - Security
The Register - Security
月光博客
月光博客
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
Google Online Security Blog
Google Online Security Blog
MyScale Blog
MyScale Blog
Cisco Talos Blog
Cisco Talos Blog
Vercel News
Vercel News
WordPress大学
WordPress大学
C
Cyber Attacks, Cyber Crime and Cyber Security
The Hacker News
The Hacker News
IntelliJ IDEA : IntelliJ IDEA – the Leading IDE for Professional Development in Java and Kotlin | The JetBrains Blog
IntelliJ IDEA : IntelliJ IDEA – the Leading IDE for Professional Development in Java and Kotlin | The JetBrains Blog
爱范儿
爱范儿
A
Arctic Wolf
L
LINUX DO - 最新话题
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More

博客园 - 时空穿越者

流水线技术解析:处理器重排序的硬件基础 java并发:synchronized 揭秘 java并发:管道流(Piped Streams)的应用场景 java并发:再次认识一下Java中的锁 —— 类级别的锁是否存在? LangGraph:add_conditional_edges详解 Spring异步机制:@Async Spring BeanDefinition Spring Resource Spring之ApplicationContext Spring之BeanFactory:解析getBean()方法 Spring之IoC容器 Spring的整体架构 Spring Data JPA:解析CriteriaQuery Spring Data JPA:解析CriteriaBuilder Spring Data JPA:解析JpaSpecificationExecutor & Specification Spring Data JPA:解析SimpleJpaRepository java并发:线程池之Executors(ScheduledExecutorService篇) - 时空穿越者 java并发:线程池之饱和策略 java并发:线程池之ThreadPoolExecutor
java并发:深入解析 ThreadPoolExecutor.addWorker()
时空穿越者 · 2026-02-23 · via 博客园 - 时空穿越者

探讨范围

ThreadPoolExecutor.addWorker() 如何通过 CAS(Compare-And-Swap) 和 锁机制 协同避免线程重复创建?

源码

    /**
     * Checks if a new worker can be added with respect to current
     * pool state and the given bound (either core or maximum). If so,
     * the worker count is adjusted accordingly, and, if possible, a
     * new worker is created and started, running firstTask as its
     * first task. This method returns false if the pool is stopped or
     * eligible to shut down. It also returns false if the thread
     * factory fails to create a thread when asked.  If the thread
     * creation fails, either due to the thread factory returning
     * null, or due to an exception (typically OutOfMemoryError in
     * Thread.start()), we roll back cleanly.
     *
     * @param firstTask the task the new thread should run first (or
     * null if none). Workers are created with an initial first task
     * (in method execute()) to bypass queuing when there are fewer
     * than corePoolSize threads (in which case we always start one),
     * or when the queue is full (in which case we must bypass queue).
     * Initially idle threads are usually created via
     * prestartCoreThread or to replace other dying workers.
     *
     * @param core if true use corePoolSize as bound, else
     * maximumPoolSize. (A boolean indicator is used here rather than a
     * value to ensure reads of fresh values after checking other pool
     * state).
     * @return true if successful
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

解读

(1)双循环结构的分工

image

(2)retry:

retry:标记在 ThreadPoolExecutor.addWorker() 方法中是一个精妙的循环控制机制,用于处理高并发场景下的状态冲突。

它的作用远不止简单的重试,而是实现了一套完整的状态驱动型并发控制逻辑。

break retry:
当 CAS 成功增加线程计数后,直接跳出整个外层循环(而不仅是内层循环),进入线程创建阶段。
continue retry:
若检测到线程池状态变更(runStateOf(c) != rs),则重新开始外层循环,重置整个状态检查流程,避免基于过期状态决策。

小结

(1)仅当 状态未变 时才重试 CAS,避免在已关闭的线程池上浪费 CPU 周期。

(2)状态变更时直接跳回外层循环起点,重新评估是否允许创建线程。

补充

状态校验

外层循环 - 线程池已关闭或任务无效时直接拒绝

if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
        return false;

内层循环 - 校验线程数是否超限(core参数决定上限为corePoolSize或maxPoolSize)

if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
            return false;

CAS 增加线程计数

if (compareAndIncrementWorkerCount(c)) 
            break retry; // CAS成功则跳出外层循环

CAS 作用:通过 compareAndIncrementWorkerCount() 原子增加 workerCount(ctl 的低29位),避免并发创建时计数不一致。

典型场景:
当线程 A 执行 compareAndIncrementWorkerCount() 时,线程 B 调用了 shutdown(),线程 A 的 CAS 可能失败(因状态变更),通过 continue retry 重新获取最新状态,防止在 SHUTDOWN 状态下创建线程。

锁保护临界区

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
    w = new Worker(firstTask);
    final Thread t = w.thread;
    if (t != null) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // Recheck while holding lock.
            // Back out on ThreadFactory failure or if
            // shut down before lock acquired.
            int rs = runStateOf(ctl.get());

            if (rs < SHUTDOWN ||
                (rs == SHUTDOWN && firstTask == null)) {
                if (t.isAlive()) // precheck that t is startable
                    throw new IllegalThreadStateException();
                workers.add(w);
                int s = workers.size();
                if (s > largestPoolSize)
                    largestPoolSize = s;
                workerAdded = true;
            }
        } finally {
            mainLock.unlock();
        }
        if (workerAdded) {
            t.start();
            workerStarted = true;
        }
    }
} finally {
    if (! workerStarted)
        addWorkerFailed(w);
}

mainLock 保护 workers 集合(HashSet<Worker>)的线程安全,避免并发修改。

image

小结 —— 避免重复创建的核心逻辑

workerCount 作为决策依据:
CAS 增加计数是创建线程的唯一入口,确保不会超过 corePoolSize/maxPoolSize。
Worker 唯一性:
每个 Worker 对应一个线程,通过 workers 集合去重(锁保护)。

设计哲学

状态与计数的解耦

高频操作(CAS)与低频操作(状态校验)分离,避免重复计算

image

分层并发控制

第一层:无锁 CAS
快速更新 workerCount,避免锁竞争(高并发场景关键优化)。
第二层:全局锁
保护低并发但需强一致性的操作(如修改 workers 集合)。

状态变更的防御性处理

循环重试:外层循环应对状态变更(如 shutdown() 调用);通过 continue retry 在状态变更时 丢弃局部变量,强制重新获取最新状态。
回滚机制:addWorkerFailed() 在异常时调用,确保计数和集合的一致性。

    /**
     * Rolls back the worker thread creation.
     * - removes worker from workers, if present
     * - decrements worker count
     * - rechecks for termination, in case the existence of this
     *   worker was holding up termination
     */
    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                workers.remove(w);
            decrementWorkerCount();
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }

案例解析

假设线程池配置:corePoolSize=2maxPoolSize=4,当前状态:

  • workerCount=1(RUNNING 状态)
  • 两个线程并发调用 addWorker()

image

image

关键点:线程B通过 continue retry(隐含逻辑)重新校验状态和计数,避免在 WC=2 时直接失败。

假设去掉 retry 改用单层循环:

while (true) {
    int c = ctl.get();
    if (stateInvalid(c)) return false;
    if (compareAndIncrementWorkerCount(c)) break;
}

风险:

线程A CAS 失败后重新获取 c,此时线程B关闭线程池(状态→STOP),线程A仍尝试 CAS → 在非法状态下增加线程计数,导致 僵尸线程 或 资源泄漏。

关联思考

数据库事务的 MVCC:
类似 retry 机制,当读取的行版本号变化时,整个事务回滚重试。