惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

爱范儿
爱范儿
Security Latest
Security Latest
NISL@THU
NISL@THU
OSCHINA 社区最新新闻
OSCHINA 社区最新新闻
C
Cybersecurity and Infrastructure Security Agency CISA
Cloudbric
Cloudbric
T
Threat Research - Cisco Blogs
大猫的无限游戏
大猫的无限游戏
C
CXSECURITY Database RSS Feed - CXSecurity.com
阮一峰的网络日志
阮一峰的网络日志
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
雷峰网
雷峰网
C
Cisco Blogs
V
Vulnerabilities – Threatpost
S
Security Archives - TechRepublic
V
Visual Studio Blog
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
cs.AI updates on arXiv.org
cs.AI updates on arXiv.org
J
Java Code Geeks
D
Darknet – Hacking Tools, Hacker News & Cyber Security
Know Your Adversary
Know Your Adversary
博客园 - 叶小钗
腾讯CDC
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
P
Privacy International News Feed
P
Palo Alto Networks Blog
博客园_首页
V
V2EX
WordPress大学
WordPress大学
Schneier on Security
Schneier on Security
月光博客
月光博客
博客园 - 司徒正美
Google DeepMind News
Google DeepMind News
TaoSecurity Blog
TaoSecurity Blog
博客园 - 聂微东
酷 壳 – CoolShell
酷 壳 – CoolShell
人人都是产品经理
人人都是产品经理
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
博客园 - 【当耐特】
The Cloudflare Blog
罗磊的独立博客
美团技术团队
N
News | PayPal Newsroom
K
KPMG report finds enterprise disconnect between AI and its ROI | CIO
Last Week in AI
Last Week in AI
K
Kaspersky official blog
Google Online Security Blog
Google Online Security Blog
S
SegmentFault 最新的问题
Application and Cybersecurity Blog
Application and Cybersecurity Blog
T
Tailwind CSS Blog

又见苍岚

COLMAP PatchMatch Stereo 算法详解 事件驱动的状态机框架:从理论到工程实践 Git 在国内网络环境下无法 Push 的排查与修复 —— 配置 Clash 代理 分段五次多项式插值原理详解 路径插值方法深度对比研究 Claude Code 使用指南 OpenClaw 记忆管理与技能创建指南 CBS(Conflict-Based Search)算法详解 A* 算法及其变种详解 OpenClaw 配置多 Agents Windows Powershell 无法加载文件,因为在此系统上禁止运行脚本问题的解决方案 MaxClaw 安装流程 大模型 AI 名词介绍 AList 网盘聚合工具简介 Protobuf 简介与测试 Claude Code 简介以及 GLM 4.7 模型接入 Github 歌词下载工具 163MusicLyrics Python __getattr__ 懒加载 Python TypedDict 机器人仿真平台 Gazebo 安装记录 机器人仿真平台 Gazebo 简介 多机器人路径规划问题(Multi-Agent Path Finding, MAPF)简介 Python exifread 读取修改过的 jpeg 信息错误问题修复 3D 坐标系变换的理解 3D 旋转矩阵基本概念 MongoDB Compass 介绍 Python 环境管理工具 uv Flutter 开发指南 Snipaste 安装下载与黑屏问题解决方案 全局路径规划算法记录 2025 Python 版本性能测试 Flutter Hello World Flutter 安装环境配置 Ubuntu VMware 硬盘扩容后 SMBus Host controller not enabled 报错问题解决 Python NetworkX 教程 Docker GPU 报错 - Failed to initialize NVML Unknown Error 解决方案 Python matplotlib 图表绘制 cuda-toolkit 安装替代 Cuda 与 Cudnn Jinja2 Python 利用 docxtpl 和 Jinja2 生成基于模板的 Word 文档 Docker 实现 CPU 核心隔离 LoFTR 基于 Transformer 的特征提取匹配算法 OmniGlue 特征匹配 SuperGlue 使用图神经网络学习特征匹配 Ubuntu 下将 xlsx 文件按照 sheet 转换为 图片 Python 使用 SQLAlchemy Python FastAPI 教程 openwrt 软路由配置安装 Nav2 地图文件(PGM/YAML)规范标准 3D OBJ 模型转换为 glb 瓦片格式 Python 源码 Redis 数据库介绍 Ubuntu 22.04 内核自动升级导致 MongoDB 7.0.12 错误记录 ubuntu 20.04 安装 ROS Noetic ubuntu 18.04 安装 ROS Melodic VMware Workstation Pro 个人免费版下载、安装、使用指南 Hybrid A-star 路径规划 Reeds-Shepp 曲线 Dubins 曲线 Linux kvm 虚拟机网络不通的问题解决方法 Ubuntu 自动内存清理 BiliBili 缓存视频转 mp4 Python 求解线性规划 3D Gaussian Splatting 官方源码实践记录 ImageMagick 教程 Ubuntu 22.04 安装 Colmap 对数几率 odds Ubuntu nmcli 网络管理工具使用指南 SuperPoint 自监督深度学习特征点提取 SyncTV Music Tag Web 在线音乐信息整理工具 ncm 格式转 mp3 MusicBrainz 音乐元数据百科数据库 Ubuntu 网络流量监控工具 私人云音乐平台 Navidrome 入门 手眼标定 四元数(Quaternions) OHTTPS 实现免费自动 https 证书申请、更新、部署 ubuntu 22.04 安装 CloudCompare 单机 KVM 虚拟机冷迁移 Ubuntu 22.04 使用 mdadm 实现软 raid 小鱼 一键安装 ROS-humble Fluid -46- 基于 Simpletex API 构建公式识别页面 公式识别 API 简介 -- Simpletex 使用 Python web 部署库 waitress 3D Gaussian Splatting for Real-Time Radiance Field Rendering Ubuntu Swap 简介与空间扩展 Ubuntu 24.04 安装 forticlient Clash Verge 使用 MongoDB 7.0.17 集群 Docker 构建源码 Error code - 2013. Lost connection to MySQL server during query 问题解决 Python 日志记录库 loguru 使用指北 Python 实现 Web 日志查看服务 MySQL LOAD DATA LOCAL INFILE 极速数据加载 Image size exceeds limit of 89478485 pixels 解决方案 Docker 使用 NVIDIA GPU 驱动错误解决 阿里云 docker 镜像仓库 Ubuntu中没有wired connected的解决方案 MinIO 简介 subconverter 代理订阅格式转换 修复 node –openssl-legacy-provider is not allowed in NODE_OPTIONS 错误
C++ 实现多线程生产者消费者模式
Yiwei Zhang · 2023-03-20 · via 又见苍岚

之前介绍过 生产者、消费者模式,是一种常用的多线程并发设计模式,本文记录 C++ 实现的过程。

生产者消费者模式

生产者消费者问题(英语:Producer-consumer problem),也称有限缓冲问题(英语:Bounded-buffer problem),是一个多线程同步问题的经典案例。该问题描述了两个共享固定大小缓冲区的线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。

生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。

该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。

根据生产者和消费者数量的多少,程序复杂程度也不同,可以分为 :单生产者-单消费者模型单生产者-多消费者模型多生产者-单消费者模型多生产者-多消费者模型

单生产者-单消费者模型

单生产者-单消费者模型中只有一个生产者和一个消费者,生产者不停地往产品库中放入产品,消费者则从产品库中取走产品,产品库容积有限制,只能容纳一定数目的产品,如果生产者生产产品的速度过快,则需要等待消费者取走产品之后,产品库不为空才能继续往产品库中放置新的产品,相反,如果消费者取走产品的速度过快,则可能面临产品库中没有产品可使用的情况,此时需要等待生产者放入一个产品后,消费者才能继续工作。

  • C++11 实现单生产者单消费者模型的代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>

static const int repository_size = 10;//循环队列的大小
static const int item_total = 20;//要生产的产品数目

std::mutex mtx;//互斥量,保护产品缓冲区

std::condition_variable repo_not_full;//条件变量指示产品缓冲区不满
std::condition_variable repo_not_empty;//条件变量指示产品缓冲区不为空,就是缓冲区有产品

int item_buffer[repository_size];

static std::size_t read_position = 0;//消费者读取产品的位置
static std::size_t write_position = 0;//生产者写入产品的位置

std::chrono::seconds t(1);//a new feature of c++ 11 standard

void produce_item(int i)
{
std::unique_lock<std::mutex> lck(mtx);
while (((write_position + 1) % repository_size) == read_position)
{
std::cout << "Producer is waiting for an empty slot..." << std::endl;
repo_not_full.wait(lck);// 生产者等待"产品库缓冲区不为满"这一条件发生.
} //当缓冲区满了之后我们就不能添加产品了

item_buffer[write_position] = i;//写入产品
write_position++;

if (write_position == repository_size)//写入的位置如果在队列最后则重新设置
{
write_position = 0;
}

repo_not_empty.notify_all();//通知消费者产品库不为空

//lck.unlock();//解锁
}

int consume_item()
{
int data;
std::unique_lock<std::mutex> lck(mtx);
while (write_position == read_position)
{
std::cout << "Consumer is waiting for items..." << std::endl;
repo_not_empty.wait(lck);// 消费者等待"产品库缓冲区不为空"这一条件发生.
}

data = item_buffer[read_position];//读取产品
read_position++;

if (read_position >= repository_size)
{
read_position = 0;
}

repo_not_full.notify_all();//通知产品库不满
//lck.unlock();

return data;
}

void Producer_thread()
{
for (int i = 1; i <= item_total; ++i)
{
//std::this_thread::sleep_for(t);
std::cout << "生产者生产第" << i << "个产品" << std::endl;
produce_item(i);
}
}

void Consumer_thread()
{
static int cnt = 0;
while (1)
{
//std::this_thread::sleep_for(t);
int item = consume_item();
std::cout << "消费者消费第" << item << "个产品" << std::endl;

if (++cnt == item_total)
break;
}
}

int main()
{
std::thread producer(Producer_thread); // 创建生产者线程.
std::thread consumer(Consumer_thread); // 创建消费之线程.
producer.join();
consumer.join();
}

单生产者-多消费者模型

与单生产者和单消费者模型不同的是,单生产者-多消费者模型中可以允许多个消费者同时从产品库中取走产品。所以除了保护产品库在多个读写线程下互斥之外,还需要维护消费者取走产品的计数器。

  • 代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>
static const int repository_size = 10;//循环队列的大小
static const int item_total = 20;//要生产的产品数目

std::mutex mtx;//互斥量,保护产品缓冲区
std::mutex mtx_counter;//互斥量,保护产品计数器

std::condition_variable repo_not_full;//条件变量指示产品缓冲区不满
std::condition_variable repo_not_empty;//条件变量指示产品缓冲区不为空,就是缓冲区有产品

int item_buffer[repository_size];//产品缓冲区,这里使用了一个循环队列

static std::size_t read_position = 0;//消费者读取产品的位置
static std::size_t write_position = 0;//生产者写入产品的位置

static std::size_t item_counter = 0;//消费者消费产品计数器

std::chrono::seconds t(1);//a new feature of c++ 11 standard

void produce_item(int i)
{
std::unique_lock<std::mutex> lck(mtx);
//item buffer is full, just wait here.
while (((write_position + 1) % repository_size) == read_position)
{
std::cout << "Producer is waiting for an empty slot..." << std::endl;
repo_not_full.wait(lck);// 生产者等待"产品库缓冲区不为满"这一条件发生.
} //当缓冲区满了之后我们就不能添加产品了

item_buffer[write_position] = i;//写入产品
write_position++;

if (write_position == repository_size)//写入的位置如果在队列最后则重新设置
{
write_position = 0;
}

repo_not_empty.notify_all();//通知消费者产品库不为空

lck.unlock();//解锁
}

int consume_item()
{
int data;
std::unique_lock<std::mutex> lck(mtx);
// item buffer is empty, just wait here.
while (write_position == read_position)
{
std::cout << "Consumer is waiting for items..." << std::endl;
repo_not_empty.wait(lck);// 消费者等待"产品库缓冲区不为空"这一条件发生.
}

data = item_buffer[read_position];//读取产品
read_position++;

if (read_position >= repository_size)
{
read_position = 0;
}

repo_not_full.notify_all();//通知产品库不满
lck.unlock();

return data;
}

void Producer_thread()
{
for (int i = 1; i <= item_total; ++i)
{
//std::this_thread::sleep_for(t);
std::cout << "生产者生产第" << i << "个产品" << std::endl;
produce_item(i);
}
}

void Consumer_thread()
{
bool read_to_exit = false;
while (1)
{
std::this_thread::sleep_for(t);
std::unique_lock<std::mutex> lck(mtx_counter);
if (item_counter < item_total)
{
int item = consume_item();
++item_counter;
std::cout << "消费者线程" << std::this_thread::get_id()
<< "消费第" << item << "个产品" << std::endl;
}
else
{
read_to_exit = true;
}
if (read_to_exit == true)
break;
}

std::cout << "Consumer thread " << std::this_thread::get_id()
<< " is exiting..." << std::endl;

}

int main()
{
std::thread producer(Producer_thread); // 创建生产者线程.
std::vector<std::thread> thread_vector;
for (int i = 0; i != 5; ++i)
{
thread_vector.push_back(std::thread(Consumer_thread));// 创建消费者线程.
}

producer.join();
for (auto &thr : thread_vector)
{
thr.join();
}

}

多生产者-单消费者模型

与单生产者和单消费者模型不同的是,多生产者-单消费者模型中可以允许多个生产者同时向产品库中放入产品。所以除了保护产品库在多个读写线程下互斥之外,还需要维护生产者放入产品的计数器。

  • 代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>

static const int repository_size = 10;//循环队列的大小
static const int item_total = 20;//要生产的产品数目

std::mutex mtx;//互斥量,保护产品缓冲区
std::mutex mtx_counter;

std::condition_variable repo_not_full;//条件变量指示产品缓冲区不满
std::condition_variable repo_not_empty;//条件变量指示产品缓冲区不为空,就是缓冲区有产品

int item_buffer[repository_size];//产品缓冲区,这里使用了一个循环队列

static std::size_t read_position = 0;//消费者读取产品的位置
static std::size_t write_position = 0;//生产者写入产品的位置

static std::size_t item_counter = 0;//计数器

std::chrono::seconds t(1);//a new feature of c++ 11 standard

void produce_item(int i)
{
std::unique_lock<std::mutex> lck(mtx);
// item buffer is full, just wait here.
while (((write_position + 1) % repository_size) == read_position)
{
std::cout << "Producer is waiting for an empty slot..." << std::endl;
repo_not_full.wait(lck);// 生产者等待"产品库缓冲区不为满"这一条件发生.
} //当缓冲区满了之后我们就不能添加产品了

item_buffer[write_position] = i;//写入产品
write_position++;

if (write_position == repository_size)//写入的位置如果在队列最后则重新设置
{
write_position = 0;
}

repo_not_empty.notify_all();//通知消费者产品库不为空

lck.unlock();//解锁
}

int consume_item()
{
int data;
std::unique_lock<std::mutex> lck(mtx);
// item buffer is empty, just wait here.
while (write_position == read_position)
{
std::cout << "Consumer is waiting for items..." << std::endl;
repo_not_empty.wait(lck);// 消费者等待"产品库缓冲区不为空"这一条件发生.
}

data = item_buffer[read_position];//读取产品
read_position++;

if (read_position >= repository_size)
{
read_position = 0;
}

repo_not_full.notify_all();//通知产品库不满
lck.unlock();

return data;
}

void Producer_thread()
{
bool read_to_exit = false;
while (1)
{
std::unique_lock<std::mutex> lck(mtx_counter);
if (item_counter < item_total)
{
++item_counter;
produce_item(item_counter);
std::cout << "生产者线程 " << std::this_thread::get_id()
<< "生产第 " << item_counter << "个产品" << std::endl;
}
else
{
read_to_exit = true;
}

if (read_to_exit == true)
break;
}

std::cout << "Producer thread " << std::this_thread::get_id()
<< " is exiting..." << std::endl;

}

void Consumer_thread()
{
static int cnt = 0;
while (1)
{
std::this_thread::sleep_for(t);
int item = consume_item();
std::cout << "消费者消费第" << item << "个产品" << std::endl;

if (++cnt == item_total)
break;
}
}

int main()
{
std::vector<std::thread> thread_vector;
for (int i = 0; i != 5; ++i)
{
thread_vector.push_back(std::thread(Producer_thread));// 创建消费者线程.
}

std::thread consumer(Consumer_thread); // 创建消费之线程.

for (auto &thr : thread_vector)
{
thr.join();
}

consumer.join();
}

多生产者-多消费者模型

该模型可以说是前面两种模型的综合,程序需要维护两个计数器,分别是生产者已生产产品的数目和消费者已取走产品的数目。另外也需要保护产品库在多个生产者和多个消费者互斥地访问。

  • 示例代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <vector>

static const int repository_size = 10;//循环队列的大小
static const int item_total = 20;//要生产的产品数目

std::mutex mtx;//互斥量,保护产品缓冲区
std::mutex producer_count_mtx;
std::mutex consumer_count_mtx;

std::condition_variable repo_not_full;//条件变量指示产品缓冲区不满
std::condition_variable repo_not_empty;//条件变量指示产品缓冲区不为空,就是缓冲区有产品

int item_buffer[repository_size];//产品缓冲区,这里使用了一个循环队列

static std::size_t read_position = 0;//消费者读取产品的位置
static std::size_t write_position = 0;//生产者写入产品的位置

static size_t produced_item_counter = 0;
static size_t consumed_item_counter = 0;

std::chrono::seconds t(1);//a new feature of c++ 11 standard
std::chrono::microseconds t1(1000);

void produce_item(int i)
{
std::unique_lock<std::mutex> lck(mtx);
// item buffer is full, just wait here.
while (((write_position + 1) % repository_size) == read_position)
{
std::cout << "Producer is waiting for an empty slot..." << std::endl;
repo_not_full.wait(lck);// 生产者等待"产品库缓冲区不为满"这一条件发生.
} //当缓冲区满了之后我们就不能添加产品了

item_buffer[write_position] = i;//写入产品
write_position++;

if (write_position == repository_size)//写入的位置如果在队列最后则重新设置
{
write_position = 0;
}

repo_not_empty.notify_all();//通知消费者产品库不为空

lck.unlock();//解锁
}

int consume_item()
{
int data;
std::unique_lock<std::mutex> lck(mtx);
// item buffer is empty, just wait here.
while (write_position == read_position)
{
std::cout << "Consumer is waiting for items..." << std::endl;
repo_not_empty.wait(lck);// 消费者等待"产品库缓冲区不为空"这一条件发生.
}

data = item_buffer[read_position];//读取产品
read_position++;

if (read_position >= repository_size)
{
read_position = 0;
}

repo_not_full.notify_all();//通知产品库不满
lck.unlock();

return data;
}

void Producer_thread()
{
bool ready_to_exit = false;
while (1)
{
//std::this_thread::sleep_for(t);
std::unique_lock<std::mutex> lock(producer_count_mtx);
if (produced_item_counter < item_total)
{
++produced_item_counter;
produce_item(produced_item_counter);
std::cout << "生产者线程 " << std::this_thread::get_id()
<< "生产第 " << produced_item_counter << "个产品" << std::endl;
}
else
{
ready_to_exit = true;
}

lock.unlock();

if (ready_to_exit == true)
{
break;
}
}

std::cout << "Producer thread " << std::this_thread::get_id()
<< " is exiting..." << std::endl;
}

void Consumer_thread()
{
bool read_to_exit = false;
while (1)
{
std::this_thread::sleep_for(t1);
std::unique_lock<std::mutex> lck(consumer_count_mtx);
if (consumed_item_counter < item_total)
{
int item = consume_item();
++consumed_item_counter;
std::cout << "消费者线程" << std::this_thread::get_id()
<< "消费第" << item << "个产品" << std::endl;
}
else
{
read_to_exit = true;
}

if (read_to_exit == true)
{
break;
}
}

std::cout << "Consumer thread " << std::this_thread::get_id()
<< " is exiting..." << std::endl;
}

int main()
{
std::vector<std::thread> thread_vector1;
std::vector<std::thread> thread_vector2;
for (int i = 0; i != 5; ++i)
{
thread_vector1.push_back(std::thread(Producer_thread));// 创建生产者线程.
thread_vector2.push_back(std::thread(Consumer_thread));// 创建消费者线程.

}

for (auto &thr1 : thread_vector1)
{
thr1.join();
}

for (auto &thr2 : thread_vector2)
{
thr2.join();
}
}

参考资料

文章链接:
https://www.zywvvd.com/notes/coding/cpp/cpp-producer-consumer/cpp-producer-consumer/