惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
人人都是产品经理
人人都是产品经理
Cisco Talos Blog
Cisco Talos Blog
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
V
V2EX
博客园 - 三生石上(FineUI控件)
Martin Fowler
Martin Fowler
WordPress大学
WordPress大学
D
Docker
S
SegmentFault 最新的问题
博客园 - 聂微东
美团技术团队
Apple Machine Learning Research
Apple Machine Learning Research
月光博客
月光博客
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
Last Week in AI
Last Week in AI
M
MIT News - Artificial intelligence
F
Fortinet All Blogs
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
The GitHub Blog
The GitHub Blog
GbyAI
GbyAI
L
LangChain Blog
Vercel News
Vercel News
博客园 - 叶小钗
MongoDB | Blog
MongoDB | Blog
Stack Overflow Blog
Stack Overflow Blog
H
Help Net Security
OSCHINA 社区最新新闻
OSCHINA 社区最新新闻
The Cloudflare Blog
Engineering at Meta
Engineering at Meta
T
Threat Research - Cisco Blogs
T
Threatpost
Scott Helme
Scott Helme
T
Tailwind CSS Blog
Latest news
Latest news
Stack Overflow Blog
Stack Overflow Blog
Blog — PlanetScale
Blog — PlanetScale
The Register - Security
The Register - Security
罗磊的独立博客
P
Proofpoint News Feed
腾讯CDC
S
Schneier on Security
雷峰网
雷峰网
A
About on SuperTechFans
T
Tenable Blog
F
Full Disclosure
Cyberwarzone
Cyberwarzone
博客园_首页
有赞技术团队
有赞技术团队
K
Kaspersky official blog

文章列表

设计模式-备忘录模式 - OXOXTECH 牛牛技术客栈 设计模式-中介者模式 - OXOXTECH 牛牛技术客栈 Linux【Ubuntu】修改ssh默认端口 - OXOXTECH 牛牛技术客栈 设计模式-迭代器模式 - OXOXTECH 牛牛技术客栈 scheduled定时任务的三种基本实现方式 - OXOXTECH 牛牛技术客栈 Apriori - 基于关联规则的推荐算法(三) - OXOXTECH 牛牛技术客栈 Apriori - 基于关联规则的推荐算法(二) - OXOXTECH 牛牛技术客栈 Apriori - 基于关联规则的推荐算法(一) - OXOXTECH 牛牛技术客栈 基于JavaFX的桌面端网络调试工具 - OXOXTECH 牛牛技术客栈 Golang Channel的原理介绍 - OXOXTECH 牛牛技术客栈 Go语言Map的原理分析 - OXOXTECH 牛牛技术客栈 Go语言错误处理(panic)的最佳实践 - OXOXTECH 牛牛技术客栈 设计模式-解释器模式 - OXOXTECH 牛牛技术客栈 Redis报错Redis is configured to save RDB snapshots, but it's currently unable to persist to disk. go-webpbin库在Linux报错failed to encode image to WebP: exit status 1.......的问题 exe4j 打包加密的jar - OXOXTECH 牛牛技术客栈 Go生成图形验证码示例 - OXOXTECH 牛牛技术客栈 澳门一天游:一日尽享东方与西方的交融之美 - OXOXTECH 牛牛技术客栈 设计模式-命令模式 - OXOXTECH 牛牛技术客栈 别再自己瞎写工具类了,SpringBoot内置工具类应有尽有 - OXOXTECH 牛牛技术客栈 中山一日游 - OXOXTECH 牛牛技术客栈 设计模式-责任链模式 - OXOXTECH 牛牛技术客栈 起舞吧,齐舞吧 - OXOXTECH 牛牛技术客栈 设计模式-组合模式 - OXOXTECH 牛牛技术客栈 Go语言Web开发|GoFrame框架入门笔记 - OXOXTECH 牛牛技术客栈 Java打包exe教程 - OXOXTECH 牛牛技术客栈 设计模式-代理模式 - OXOXTECH 牛牛技术客栈 MySQL存储过程的优缺点有哪些? - OXOXTECH 牛牛技术客栈 前端渲染优化有哪些? - OXOXTECH 牛牛技术客栈 HTTP状态码及其含义 - OXOXTECH 牛牛技术客栈 从浏览器地址栏输入url到显示页面的步骤 - OXOXTECH 牛牛技术客栈 TypeScript事件派发管理器 - OXOXTECH 牛牛技术客栈 MQTT保留消息的使用方法 - OXOXTECH 牛牛技术客栈 世界工程-港珠澳大桥游 - OXOXTECH 牛牛技术客栈 Golang逃逸分析 - OXOXTECH 牛牛技术客栈 设计模式-享元模式 - OXOXTECH 牛牛技术客栈 牛牛成长记录 - OXOXTECH 牛牛技术客栈 ffmpeg常用命令 - OXOXTECH 牛牛技术客栈 设计模式-外观模式 - OXOXTECH 牛牛技术客栈 设计模式-装饰器模式 - OXOXTECH 牛牛技术客栈 设计模式-桥接模式 - OXOXTECH 牛牛技术客栈 5周年恋爱纪念日 - OXOXTECH 牛牛技术客栈 2024新年快乐,龙腾四海 - OXOXTECH 牛牛技术客栈 迎接新年:除夕的美好时刻 - OXOXTECH 牛牛技术客栈 设计模式-适配器模式 - OXOXTECH 牛牛技术客栈 设计模式-原型模式 - OXOXTECH 牛牛技术客栈 设计模式-建造者模式 - OXOXTECH 牛牛技术客栈 设计模式-工厂模式 - OXOXTECH 牛牛技术客栈 设计模式-单例模式 - OXOXTECH 牛牛技术客栈 SpringBoot在Linux环境下发送163邮件失败(No appropriate protocol (protocol is disabled or cipher suites are inappropriate)) 海与日落 - OXOXTECH 牛牛技术客栈 Swagger比较常用的注解 - OXOXTECH 牛牛技术客栈 猫🐱牛 - OXOXTECH 牛牛技术客栈 2023年最后一个晚霞 - OXOXTECH 牛牛技术客栈 Linux(Centos)部署Nginx教程 - OXOXTECH 牛牛技术客栈 Linux MySQL下载安装详细教程(CentOS版) - OXOXTECH 牛牛技术客栈 JavaFx打包成exe - OXOXTECH 牛牛技术客栈 Flux脚本语言入门教程 - OXOXTECH 牛牛技术客栈 演唱会出图 - OXOXTECH 牛牛技术客栈 Netty TCP解决粘包拆包 - OXOXTECH 牛牛技术客栈 SpringBoot实现订单超时取消的几种方案 - OXOXTECH 牛牛技术客栈 详解Java并发中的各种锁 - OXOXTECH 牛牛技术客栈 SpringBoot集成支付宝支付 - OXOXTECH 牛牛技术客栈 雪花算法:分布式系统唯一ID生成算法 - OXOXTECH 牛牛技术客栈 Java解决空指针的神器Optional - OXOXTECH 牛牛技术客栈 与兴一起 - OXOXTECH 牛牛技术客栈 Java17新特性详解与安装 - OXOXTECH 牛牛技术客栈 Jdk17安装+环境配置详细教程 - OXOXTECH 牛牛技术客栈 孤注一掷 - OXOXTECH 牛牛技术客栈 解决WinSCP经常断线重连 - OXOXTECH 牛牛技术客栈 内存不足导致Tomcat崩溃问题排查与解决办法 - OXOXTECH 牛牛技术客栈 influxDB初识,一个高效的时序数据库 - OXOXTECH 牛牛技术客栈 SpringBoot 服务接口限流方案 - OXOXTECH 牛牛技术客栈 Docker 安装 Portainer - OXOXTECH 牛牛技术客栈 Linux 安装Docker - OXOXTECH 牛牛技术客栈 物料宣传 - OXOXTECH 牛牛技术客栈 Java使用EMQX实现MQTT通信 - OXOXTECH 牛牛技术客栈 Java实现常见的排序算法 - OXOXTECH 牛牛技术客栈 FreeSwitch Windows安装教程 - OXOXTECH 牛牛技术客栈 MQTT单向SSL数据加密 - OXOXTECH 牛牛技术客栈 随性 - OXOXTECH 牛牛技术客栈 mysql报错Expression #1 of SELECT list is not in GROUP BY clause and contains nonaggre的解决方案 Git Push项目报 push to origin/master was rejected 错误解决方案 游行记——珠海金沙滩与金湖公园之行 - OXOXTECH 牛牛技术客栈 Tomcat:解决Tomcat启动警告:"无法将资源添加到Web应用程序缓存中....请考虑增加缓存空间" 的问题 - OXOXTECH 牛牛技术客栈 励骏庞都广场,迷一般的皇宫 - OXOXTECH 牛牛技术客栈 Docker 常用命令集合 - OXOXTECH 牛牛技术客栈 ElasticSearch Windows版-安装教程 - OXOXTECH 牛牛技术客栈 Java去除对象中为null的字段 - OXOXTECH 牛牛技术客栈 我和我的青春 - OXOXTECH 牛牛技术客栈 Java实现螺旋矩阵算法: - OXOXTECH 牛牛技术客栈 Java直接内存分配和释放的理解 - OXOXTECH 牛牛技术客栈 FreeSwitch将默认数据库迁移至MySQL - OXOXTECH 牛牛技术客栈 别错过路上的风景,别错过刹那间的深情! Viewer.js:一款强大的图片预览组件 - OXOXTECH 牛牛技术客栈 Java JDK Proxy和CGLib动态代理示例 redis常用命令 - OXOXTECH 牛牛技术客栈 SpringBoot查询IP归属地 - OXOXTECH 牛牛技术客栈 Spring 事务失效的六种情况 #张艺兴每时每刻#
Java多线程阻塞队列BlockingQueue的用法 - OXOXTECH 牛牛技术客栈
Mr.Potato · 2023-03-13 · via

简介

说明

本文用示例介绍Java中阻塞队列(BlockingQueue)的用法。

队列类型

BlockingQueue有这几种类型:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、PriorityBlockingQueue、DelayedWorkQueue

队列类型说明
ArrayBlockingQueue 基于数组的FIFO队列;有界;创建时必须指定大小; 入队和出队共用一个可重入锁。默认使用非公平锁。
LinkedBlockingQueue 基于链表的FIFO队列;有/无界;默认大小是 Integer.MAX_VALUE(无界),可自定义(有界);
两个重入锁分别控制元素的入队和出队,用Condition进行线程间的唤醒和等待。
吞吐量通常要高于ArrayBlockingQueue。
默认大小的LinkedBlockingQueue将导致所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了)。当每个任务相互独时,适合使用无界队列;例如, 在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
SynchronousQueue 无缓存的等待队列;无界;可认为大小为0。
不保存提交任务,直接提交出去。若超出corePoolSize个任务,直接创建新线程来执行任务,直到(corePoolSize+新建线程)> maximumPoolSize。
此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线 程具有增长的可能性。
吞吐量通常要高于LinkedBlockingQueue。
//也有地方说:是一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态
详见下边的:CachedThreadPool的execute流程
PriorityBlockingQueue 基于链表的优先级队列;有/无界;默认大小是 Integer.MAX_VALUE,可自定义;

类似于LinkedBlockingQueue,但是其所含对象的排序不是FIFO,而是依据对象的自然顺序或者构造函数的Comparator决定。

DelayedWorkQueue

常用方法

放入数据

方法说明
offer(E e) 向队列尾部插入一个元素。该方法是非阻塞的。如果队列中有空闲:插入成功后返回 true。如果队列己满:丢弃当前元素然后返回false。如果e元素为null:抛出NullPointerException异常。
offer(E o, long timeout, TimeUnit unit) 可以设定等待的时间,若在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
add(E e) 内部调用offer方法。

与直接调用offer的区别:

add:失败时,抛出异常

offer:失败时,返回false

put(E e) 向队列尾部插入一个元素。
如果队列中有空闲:插入后直接返回。
如果队列己满:阻塞当前线程,直到队列有空闲插入成功后返回。
如果在阻塞时被其他线程设置了中断标志:被阻塞线程会抛出InterruptedException异常而返回。
如果e元素为null:抛出NullPointerException异常

获取数据

方法说明
poll()获取当前队列头部元素并从队列里面移除它。如果队列为空则返回null。
poll(long timeout, TimeUnit unit) 从BlockingQueue取出(会删除对象)一个队首的对象。

一旦在指定时间内有数据可取,则立即返回队列中的数据。

若直到时间超时还没有数据可取,返回失败。

take() 获取当前队列头部元素并从队列里面移除它。

如果队列为空则阻塞当前线程直到队列不为空然后返回元素;

如果在阻塞时被其他线程设置了中断标志,则被阻塞线程会抛出InterruptedException异常而返回。

drainTo() 一次性从BlockingQueue获取(会删除对象)所有可用的数据对象(可指定获取数据的个数)。

本方法可提升获取数据效率,不需要多次分批加锁或释放锁。

其他方法

方法说明
remainingCapacity() 获取队列中剩余的空间
contains(Object o) 判断队列中是否拥有该值。
remove(Object o) 从队列中移除指定的值。
size() 获得队列中有多少值(返回AtomicLong的值)

ArrayBlockingQueue

简介

ArrayBlockingQueue通过数组实现的FIFO有界阻塞队列,它的大小在实例被初始化的时候就被固定了,不能更改。

该类支持一个可选的公平策略,用于被阻塞等待的线程获取独占锁的排序,因为ArrayBlockingQueue内部的操作都需要获取一个ReentrantLock锁,该锁是支持公平策略的,所以ArrayBlockingQueue的公平策略就直接作用于ReentrantLock锁,决定线程是否有公平获取锁的权利。默认情况下是非公平的,公平模式下队列按照FIFO顺序授予线程访问权。公平性通常会降低吞吐量,但会降低可变性并避免饥饿。

ArrayBlockingQueue的缺陷

通过源码可以看见,ArrayBlockingQueue内部的几乎每一个操作方法都需要先获取同一个ReentrantLock独占锁才能进行,这极大的降低了吞吐量,几乎每个操作都会阻塞其它操作,最主要是插入操作和取出操作相互之间互斥。所以ArrayBlockingQueue不适用于需要高吞吐量的高效率数据生成与消费场景。LinkedBlockingQueue就能弥补其低吞吐量的缺陷。

实例

创建一个corePoolSize为2,maximumPoolSize为3的线程池。其中ArrayBlockingQueue设置缓存2个任务。执行6个任务。ArrayBlockingQueue为有界队列:

  1. 任务1和2在核心线程中执行;
  2. 任务3和4进来时,放到ArrayBlockingQueue缓存队列中,并且只能放2个(ArrayBlockingQueue设置的大小为2);
  3. 任务5和6进来的时候,任务5新建线程来执行任务,已经达到最大线程数3,所以任务6拒绝;
  4. 当有线程执行完的时候,再将任务3和4从队列中取出执行

创建线程池代码如下:

  /**
     * ArrayBlockingQueue
     */
    private static void arrayQueue() {
        System.out.println("\n\n =======ArrayBlockingQueue====== \n\n");
        Executor executors = new ThreadPoolExecutor(
                2, 3, 30, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(2),
                new RejectHandler());
        execute(executors);
    }

执行结果如下:

1 is running... 
2 is running... 
6 is rejected ^^ //6被拒
5 is running...  //5新建线程执行
1 is end !!! 
2 is end !!! 
3 is running... //1和2执行完之后3和4才执行
4 is running... 
5 is end !!! 

LinkedBlockingQueue

简介

LinkedBlockingQueueArrayBlockingQueue的相同点:

  1. 是FIFO队列,不允许插入null值。
  2. 容量在实例被构造完成之后不允许被更改

不同点:

LinkedBlockingQueueArrayBlockingQueue
大小指定实例化时可指定队列大小。若不指定大小,会采用默认的Integer.MAX_VALUE。实例化时必须指定大小。
吞吐量大。

采用了“双锁队列” 算法,元素的入队和出队分别由putLock、takeLock两个独立的可重入锁来实现。

小。

几乎每一个方法都需要先获取同一个ReentrantLock独占锁才能进行。

实例

创建一个corePoolSize为2,maximumPoolSize为3的线程池。无界队列。同样执行6个任务

  1. 核心线程执行任务1和2,其它的任务3~6放到队列中
  2. 执行完1和2,将3和4从队列中取出执行
  3. 执行完3和4,将5和6从队列中取出

创建线程池代码如下:

/**
 * LinkedBlockingQueue
 */
private static void linkedQueue() {
    System.out.println("\n\n =======LinkedBlockingQueue====== \n\n");
    Executor executors = new ThreadPoolExecutor(
            2, 3, 30, TimeUnit.SECONDS,
            new LinkedBlockingQueue<Runnable>(),
            new RejectHandler());
    execute(executors);
}

运行结果如下:

1 is running... 
2 is running... //中间线程休眠 
2 is end !!!   //10s之后才运行完
1 is end !!! 
3 is running...  //任务3和4才执行
4 is running... 
4 is end !!! 
3 is end !!! 
6 is running... 
5 is running... 
5 is end !!! 
6 is end !!! 

SynchronousQueue

说明

SynchrousQueue是个一个无缓存的队列。因为:SynchrousQueue源码可以看到:isEmpty()始终为truesize()始终返回0

示例

创建一个corePoolSize为2,maximumPoolSize为3的线程池。执行6个任务。

根据参数设置应该只可以执行3个任务:

  1. 2个核心线程执行2个任务;
  2. 第3个任务的时候,创建线程来执行任务3;
  3. 当第4个任务来的时候,此时已经超过了maximumPoolSize,所以拒绝任务。

代码:

/**
* SynchronousQueue
*/
private static void syncQueue() {
    System.out.println("\n\n =======SynchronousQueue====== \n\n");
    Executor executors = new ThreadPoolExecutor(
            2, 3, 30, TimeUnit.SECONDS,
            new SynchronousQueue<Runnable>(),
            new RejectHandler());
    execute(executors);
}

执行结果:

1 is running... 
4 is rejected ^^ //4被拒
2 is running... 
3 is running... 
5 is rejected ^^  //5被拒
6 is rejected ^^  //6被拒
3 is end !!! 
1 is end !!! 
2 is end !!! 

PriorityBlockingQueue

简介

PriorityBlockingQueue是一个无限容量的阻塞队列。

容量是无限的,所以put等入队操作其实不存在阻塞,只要内存足够都能够立即入队成功,当然多个入队操作的线程之间还是存在竞争唯一锁的互斥访问。虽然PriorityBlockingQueue逻辑上是无界的,但是尝试添加元素时还是可能因为资源耗尽而抛出OutOfMemoryError

该队列也不允许放入null值,它使用与类java.util.PriorityQueue 相同的排序规则,也不允许放入不可比较的对象,这样做会导致ClassCastException

值得注意的是,虽然PriorityBlockingQueue叫优先级队列,但是并不是说元素一入队就会按照排序规则被排好序,而是只有通过调用take、poll方法出队或者drainTo转移出的队列顺序才是被优先级队列排过序的。所以通过调用 iterator() 以及可拆分迭代器 spliterator() 方法返回的迭代器迭代的元素顺序都没有被排序。如果需要有序遍历可以通过 Arrays.sort(pq.toArray()) 方法来排序。注意peek方法永远只获取且不删除第一个元素,所以多次调用peek都是返回同样的值。

PriorityBlockingQueue其实是通过Comparator来排序的,要么入队的元素实现了Comparator接口(即所谓的自然排序),要么构造PriorityBlockingQueue实例的时候传入一个统一的Comparator实例,如果两者兼备那么以后者为准

PriorityBlockingQueue不保证具有相同优先级的元素顺序,但是你可以定义自定义类或比较器,通过辅助属性来决定优先级相同的元素的顺序。

DelayedWorkQueue

简介

为什么不直接使用DelayQueue而要重新实现一个DelayedWorkQueue呢,可能是了方便在实现过程中加入一些扩展。

使用场景

  • 实现重试机制比如当调用接口失败后,把当前调用信息放入delay=10s的元素,然后把元素放入队列,那么这个队列就是一个重试队列。一个线程通过take方法获取需要重试的接口,take返回则接口进行重试,失败则再次放入队列,同时也可以在元素加上重试次数)。
  • TimerQueue的内部实现

参考文献: https://knife.blog.csdn.net/article/details/122795966