惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

C
CXSECURITY Database RSS Feed - CXSecurity.com
Stack Overflow Blog
Stack Overflow Blog
月光博客
月光博客
T
Threat Research - Cisco Blogs
小众软件
小众软件
有赞技术团队
有赞技术团队
酷 壳 – CoolShell
酷 壳 – CoolShell
Apple Machine Learning Research
Apple Machine Learning Research
C
Cyber Attacks, Cyber Crime and Cyber Security
cs.CV updates on arXiv.org
cs.CV updates on arXiv.org
T
Tailwind CSS Blog
Cisco Talos Blog
Cisco Talos Blog
V
V2EX
博客园 - 【当耐特】
C
Cybersecurity and Infrastructure Security Agency CISA
Hugging Face - Blog
Hugging Face - Blog
The Cloudflare Blog
The Last Watchdog
The Last Watchdog
Simon Willison's Weblog
Simon Willison's Weblog
T
Threatpost
S
Secure Thoughts
O
OpenAI News
P
Proofpoint News Feed
S
SegmentFault 最新的问题
Forbes - Security
Forbes - Security
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
Application and Cybersecurity Blog
Application and Cybersecurity Blog
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
Last Week in AI
Last Week in AI
宝玉的分享
宝玉的分享
Scott Helme
Scott Helme
T
Tenable Blog
A
Arctic Wolf
L
LINUX DO - 热门话题
爱范儿
爱范儿
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
www.infosecurity-magazine.com
www.infosecurity-magazine.com
V
Visual Studio Blog
Hacker News: Ask HN
Hacker News: Ask HN
Hacker News - Newest:
Hacker News - Newest: "LLM"
腾讯CDC
博客园 - Franky
WordPress大学
WordPress大学
Know Your Adversary
Know Your Adversary
博客园_首页
雷峰网
雷峰网
IT之家
IT之家
PCI Perspectives
PCI Perspectives
L
LINUX DO - 最新话题
H
Heimdal Security Blog

博客园 - JAVA是老婆

今天来给智商充值 关于Java8函数式编程你需要了解的几点 【实战Java高并发程序设计6】挑战无锁算法:无锁的Vector实现 【实战Java高并发程序设计 5】让普通变量也享受原子操作 【实战Java高并发程序设计 4】数组也能无锁:AtomicIntegerArray 【转】成为Java顶尖程序员 ,看这10本书就够了 【实战Java高并发程序设计 3】带有时间戳的对象引用:AtomicStampedReference 【实战Java高并发程序设计 2】无锁的对象引用:AtomicReference 一图搞定【实战Java高并发程序设计】 【实战Java高并发程序设计 1】Java中的指针:Unsafe类 如何提高Java并行程序性能?? 《实战Java虚拟机》,最简单的JVM入门书,京东活动,满200就减100了,该出手了 看JVM就推荐这本书 【Java】实战Java虚拟机之五“开启JIT编译” 实战Java虚拟机之四:提升性能,禁用System.gc() ? 实战Java虚拟机之三“G1的新生代GC” 实战Java虚拟机之二“虚拟机的工作模式” 实战Java虚拟机之一“堆溢出处理” 实战java虚拟机的学习计划图(看懂java虚拟机)
【实战Java高并发程序设计 7】让线程之间互相帮助--SynchronousQueue的实现
JAVA是老婆 · 2016-03-17 · via 博客园 - JAVA是老婆

【实战Java高并发程序设计 1】Java中的指针:Unsafe类

【实战Java高并发程序设计 2】无锁的对象引用:AtomicReference

【实战Java高并发程序设计 3】带有时间戳的对象引用:AtomicStampedReference

【实战Java高并发程序设计 4】数组也能无锁:AtomicIntegerArray

 【实战Java高并发程序设计 5】让普通变量也享受原子操作

【实战Java高并发程序设计6】挑战无锁算法:无锁的Vector实现

 在对线程池的介绍中,提到了一个非常特殊的等待队列SynchronousQueue。SynchronousQueue的容量为0,任何一个对SynchronousQueue的写需要等待一个对SynchronousQueue的读,反之亦然。因此,SynchronousQueue与其说是一个队列,不如说是一个数据交换通道。那SynchronousQueue的其妙功能是如何实现的呢?

     既然我打算在这一节中介绍它,那么SynchronousQueue比如和无锁的操作脱离不了关系。实际上SynchronousQueue内部也正是大量使用了无锁工具。

对SynchronousQueue来说,它将put()和take()两个功能截然不同的操作抽象为一个共通的方法Transferer.transfer()。从字面上看,这就是数据传递的意思。它的完整签名如下:

Object transfer(Object e, boolean timed, long nanos)  

  当参数e为非空时,表示当前操作传递给一个消费者,如果为空,则表示当前操作需要请求一个数据。timed参数决定是否存在timeout时间,nanos决定了timeout的时长。如果返回值非空,则表示数据以及接受或者正常提供,如果为空,则表示失败(超时或者中断)。

  SynchronousQueue内部会维护一个线程等待队列。等待队列中会保存等待线程以及相关数据的信息。比如,生产者将数据放入SynchronousQueue时,如果没有消费者接受,那么数据本身和线程对象都会打包在队列中等待(因为SynchronousQueue容积为0,没有数据可以正常放入)。

Transferer.transfer()函数的实现是SynchronousQueue的核心,它大体上分为三个步骤:

1、如果等待队列为空,或者队列中节点的类型和本次操作是一致的,那么将当前操作压入队列等待。比如,等待队列中是读线程等待,本次操作也是读,因此这2个读都需要等待。进入等待队列的线程可能会被挂起,它们会等待一个“匹配”操作。

2、如果等待队列中的元素和本次操作是互补的(比如等待操作是读,而本次操作是写),那么就插入一个“完成”状态的节点,并且让他“匹配”到一个等待节点上。接着弹出这2个节点,并且使得对应的2个线程继续执行。

3、如果线程发现等待队列的节点就是“完成”节点。那么帮助这个节点完成任务。其流程和步骤2是一致的。

 步骤1的实现如下(代码参考JDK 7u60):

SNode h = head;  
if (h == null || h.mode == mode) {                  // 如果队列为空,或者模式相同  
    if (timed && nanos <= 0) {                   // 不进行等待  
        if (h != null && h.isCancelled())  
            casHead(h, h.next);                 // 处理取消行为  
        else  
            return null;  
    } else if (casHead(h, s = snode(s, e, h, mode))) {  
        SNode m = awaitFulfill(s, timed, nanos);    //等待,直到有匹配操作出现  
        if (m == s) {                               // 等待被取消  
            clean(s);  
            return null;  
        }  
        if ((h = head) != null && h.next == s)  
            casHead(h, s.next);                 // 帮助s的 fulfiller  
        return (mode == REQUEST) ? m.item : s.item;  
    }  
}  

   上述代码中,第1行SNode表示等待队列中的节点。内部封装了当前线程、next节点、匹配节点、数据内容等信息。第2行,判断当前等待队列为空,或者队列中元素的模式与本次操作相同(比如,都是读操作,那么都必须要等待)。第8行,生成一个新的节点并置于队列头部,这个节点就代表当前线程。如果入队成功,则执行第9行awaitFulfill()函数。该函数会进行自旋等待,并最终挂起当前线程。直到一个与之对应的操作产生,将其唤醒。线程被唤醒后(表示已经读取到数据或者自己产生的数据已经被别的线程读取),在14~15行尝试帮助对应的线程完成两个头部节点的出队操作(这仅仅是友情帮助)。并在最后,返回读取或者写入的数据(第16行)。

 步骤2的实现如下: 

} else if (!isFulfilling(h.mode)) {             //是否处于fulfill状态  
    if (h.isCancelled())                // 如果以前取消了  
        casHead(h, h.next);             // 弹出并重试  
    else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {  
        for (;;) {                        // 一直循环直到匹配(match)或者没有等待者了  
            SNode m = s.next;           // m 是 s的匹配者(match)  
            if (m == null) {                // 已经没有等待者了  
                casHead(s, null);           // 弹出fulfill节点  
                s = null;                   // 下一次使用新的节点  
                break;                  // 重新开始主循环  
            }  
            SNode mn = m.next;  
            if (m.tryMatch(s)) {  
                casHead(s, mn);         // 弹出s 和 m  
                return (mode == REQUEST) ? m.item : s.item;  
            } else                      // match 失败  
                s.casNext(m, mn);       // 帮助删除节点  
        }  
    }  
}  

   上述代码中,首先判断头部节点是否处于fulfill模式。如果是,则需要进入步骤3。否则,将试自己为对应的fulfill线程。第4行,生成一个SNode元素,设置为fulfill模式并将其压入队列头部。接着,设置m(原始的队列头部)为s的匹配节点(第13行),这个tryMatch()操作将会激活一个等待线程,并将m传递给那个线程。如果设置成功,则表示数据投递完成,将s和m两个节点弹出即可(第14行)。如果tryMatch()失败,则表示已经有其他线程帮我完成了操作,那么简单得删除m节点即可(第17行),因为这个节点的数据已经被投递,不需要再次处理,然后,再次跳转到第5行的循环体,进行下一个等待线程的匹配和数据投递,直到队列中没有等待线程为止。

 步骤3:如果线程在执行时,发现头部元素恰好是fulfill模式,它就会帮助这个fulfill节点尽快被执行: 

} else {                                            // 帮助一个 fulfiller  
    SNode m =h.next;                           // m 是 h的 match  
    if (m ==null)                                   // 没有等待者  
        casHead(h,null);                          // 弹出fulfill节点  
    else {  
        SNode mn =m.next;  
        if(m.tryMatch(h))                           // 尝试 match  
           casHead(h, mn);                     // 弹出 h 和 m  
        else                                    // match失败  
            h.casNext(m,mn);                    // 帮助删除节点  
    }  
}  

   上述代码的执行原理和步骤2是完全一致的。唯一的不同是步骤3不会返回,因为步骤3所进行的工作是帮助其他线程尽快投递它们的数据。而自己并没有完成对应的操作,因此,线程进入步骤3后,再次进入大循环体(代码中没有给出),从步骤1开始重新判断条件和投递数据。

    从整个数据投递的过程中可以看到,在SynchronousQueue中,参与工作的所有线程不仅仅是竞争资源的关系。更重要的是,它们彼此之间还会相互帮助。在一个线程内部,可能会帮助其他线程完成它们的工作。这种模式可以更大程度上减少饥饿的可能,提高系统整体的并行度。

摘自《实战Java高并发程序设计》