惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

爱范儿
爱范儿
Security Latest
Security Latest
NISL@THU
NISL@THU
OSCHINA 社区最新新闻
OSCHINA 社区最新新闻
C
Cybersecurity and Infrastructure Security Agency CISA
Cloudbric
Cloudbric
T
Threat Research - Cisco Blogs
大猫的无限游戏
大猫的无限游戏
C
CXSECURITY Database RSS Feed - CXSecurity.com
阮一峰的网络日志
阮一峰的网络日志
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
雷峰网
雷峰网
C
Cisco Blogs
V
Vulnerabilities – Threatpost
S
Security Archives - TechRepublic
V
Visual Studio Blog
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
cs.AI updates on arXiv.org
cs.AI updates on arXiv.org
J
Java Code Geeks
D
Darknet – Hacking Tools, Hacker News & Cyber Security
Know Your Adversary
Know Your Adversary
博客园 - 叶小钗
腾讯CDC
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
P
Privacy International News Feed
P
Palo Alto Networks Blog
博客园_首页
V
V2EX
WordPress大学
WordPress大学
Schneier on Security
Schneier on Security
月光博客
月光博客
博客园 - 司徒正美
Google DeepMind News
Google DeepMind News
TaoSecurity Blog
TaoSecurity Blog
博客园 - 聂微东
酷 壳 – CoolShell
酷 壳 – CoolShell
人人都是产品经理
人人都是产品经理
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
博客园 - 【当耐特】
The Cloudflare Blog
罗磊的独立博客
美团技术团队
N
News | PayPal Newsroom
K
KPMG report finds enterprise disconnect between AI and its ROI | CIO
Last Week in AI
Last Week in AI
K
Kaspersky official blog
Google Online Security Blog
Google Online Security Blog
S
SegmentFault 最新的问题
Application and Cybersecurity Blog
Application and Cybersecurity Blog
T
Tailwind CSS Blog

又见苍岚

COLMAP PatchMatch Stereo 算法详解 事件驱动的状态机框架:从理论到工程实践 Git 在国内网络环境下无法 Push 的排查与修复 —— 配置 Clash 代理 分段五次多项式插值原理详解 路径插值方法深度对比研究 Claude Code 使用指南 OpenClaw 记忆管理与技能创建指南 CBS(Conflict-Based Search)算法详解 A* 算法及其变种详解 OpenClaw 配置多 Agents Windows Powershell 无法加载文件,因为在此系统上禁止运行脚本问题的解决方案 MaxClaw 安装流程 大模型 AI 名词介绍 AList 网盘聚合工具简介 Protobuf 简介与测试 Claude Code 简介以及 GLM 4.7 模型接入 Github 歌词下载工具 163MusicLyrics Python __getattr__ 懒加载 Python TypedDict 机器人仿真平台 Gazebo 安装记录 机器人仿真平台 Gazebo 简介 多机器人路径规划问题(Multi-Agent Path Finding, MAPF)简介 Python exifread 读取修改过的 jpeg 信息错误问题修复 3D 坐标系变换的理解 3D 旋转矩阵基本概念 MongoDB Compass 介绍 Python 环境管理工具 uv Flutter 开发指南 Snipaste 安装下载与黑屏问题解决方案 全局路径规划算法记录 2025 Python 版本性能测试 Flutter Hello World Flutter 安装环境配置 Ubuntu VMware 硬盘扩容后 SMBus Host controller not enabled 报错问题解决 Python NetworkX 教程 Docker GPU 报错 - Failed to initialize NVML Unknown Error 解决方案 Python matplotlib 图表绘制 cuda-toolkit 安装替代 Cuda 与 Cudnn Jinja2 Python 利用 docxtpl 和 Jinja2 生成基于模板的 Word 文档 Docker 实现 CPU 核心隔离 LoFTR 基于 Transformer 的特征提取匹配算法 OmniGlue 特征匹配 SuperGlue 使用图神经网络学习特征匹配 Ubuntu 下将 xlsx 文件按照 sheet 转换为 图片 Python 使用 SQLAlchemy Python FastAPI 教程 openwrt 软路由配置安装 Nav2 地图文件(PGM/YAML)规范标准 3D OBJ 模型转换为 glb 瓦片格式 Python 源码 Redis 数据库介绍 Ubuntu 22.04 内核自动升级导致 MongoDB 7.0.12 错误记录 ubuntu 20.04 安装 ROS Noetic ubuntu 18.04 安装 ROS Melodic VMware Workstation Pro 个人免费版下载、安装、使用指南 Hybrid A-star 路径规划 Reeds-Shepp 曲线 Dubins 曲线 Linux kvm 虚拟机网络不通的问题解决方法 Ubuntu 自动内存清理 BiliBili 缓存视频转 mp4 Python 求解线性规划 3D Gaussian Splatting 官方源码实践记录 ImageMagick 教程 Ubuntu 22.04 安装 Colmap 对数几率 odds Ubuntu nmcli 网络管理工具使用指南 SuperPoint 自监督深度学习特征点提取 SyncTV Music Tag Web 在线音乐信息整理工具 ncm 格式转 mp3 MusicBrainz 音乐元数据百科数据库 Ubuntu 网络流量监控工具 私人云音乐平台 Navidrome 入门 手眼标定 四元数(Quaternions) OHTTPS 实现免费自动 https 证书申请、更新、部署 ubuntu 22.04 安装 CloudCompare 单机 KVM 虚拟机冷迁移 Ubuntu 22.04 使用 mdadm 实现软 raid 小鱼 一键安装 ROS-humble Fluid -46- 基于 Simpletex API 构建公式识别页面 公式识别 API 简介 -- Simpletex 使用 Python web 部署库 waitress 3D Gaussian Splatting for Real-Time Radiance Field Rendering Ubuntu Swap 简介与空间扩展 Ubuntu 24.04 安装 forticlient Clash Verge 使用 MongoDB 7.0.17 集群 Docker 构建源码 Error code - 2013. Lost connection to MySQL server during query 问题解决 Python 日志记录库 loguru 使用指北 Python 实现 Web 日志查看服务 MySQL LOAD DATA LOCAL INFILE 极速数据加载 Image size exceeds limit of 89478485 pixels 解决方案 Docker 使用 NVIDIA GPU 驱动错误解决 阿里云 docker 镜像仓库 Ubuntu中没有wired connected的解决方案 MinIO 简介 subconverter 代理订阅格式转换 修复 node –openssl-legacy-provider is not allowed in NODE_OPTIONS 错误
ZMQ
Yiwei Zhang · 2024-09-25 · via 又见苍岚

ZMQ(ZeroMQ)是一个开源的库,用于在应用程序中实现消息传递, 本文记录相关内容。

ZMQ

ZeroMQ (也称为 ØMQ,0MQ,或 zmq)看起来像一个可嵌入的网络库,但其作用类似于并发框架。它提供了跨进程、进程间、 TCP 和多播等各种传输方式携带原子消息的套接字。您可以使用诸如扇出、发布-订阅、任务分配和请求-应答等模式将套接字 N 到 N 连接起来。它的速度足以成为集群产品的结构。它的异步 I/O 模型为您提供了可伸缩的多核应用程序,构建为异步消息处理任务。它有许多语言 API,并且运行在大多数操作系统上。

它被设计为类似于socket的API,但其运作方式更像是消息队列(message queue)或企业消息传递系统(enterprise messaging system)。

特点

  1. 高级抽象:ZMQ提供高级的抽象,使得消息传递变得简单,无需担心底层网络细节。
  2. 模式多样:支持多种通信模式,如请求-应答(request-reply)、发布-订阅(publish-subscribe)、推-拉(push-pull)等。
  3. 跨平台:可以在多种操作系统和编程语言上使用。
  4. 性能优异:经过优化,具有很高的消息吞吐量。
  5. 无中心:ZMQ不需要一个中心节点,每个节点既是客户端也是服务器。

通信方式: ZeroMQ的三种通信模式分别是:Request-ReplyPublisher-subscriber, Parallel Pipeline

官方网站https://zeromq.org/

PyZMQhttps://pyzmq.readthedocs.io/en/latest/

Python 安装

PyZMQ 需要在 python 3.7 及以上的版本上使用,当前(2024.09)最新版本为 26.2

  • 安装 zmq
1
pip install pyzmq

bind / connect

在 ZeroMQ(也称为 0MQ 或 zmq)中,bindconnect 是用于建立网络连接的两种主要方法,它们用于设置消息传递的端点,但是它们在作用和用途上有所区别。

bind

  • 定义:当一个 Socket 在指定地址上使用 bind 方法时,它告诉 ZeroMQ 在该地址上监听进入的连接。
  • 使用场景:通常用于服务器端或者说是消息的接收端,它监听来自客户端的连接。
  • 地址:使用 bind 的 Socket 需要提供一个具体的、可访问的网络地址(如 tcp://*:5555,其中 * 表示监听所有可用的接口,5555 是端口号)。
  • 关系:一个端口只能被一个 Socket 绑定,即在一个网络接口上的特定端口上只能有一个 bind 调用。

connect

  • 定义:当一个 Socket 在指定地址上使用 connect 方法时,它尝试建立一个到该地址的连接。
  • 使用场景:通常用于客户端或者说是消息的发送端,它主动发起与服务器端的连接。
  • 地址:使用 connect 的 Socket 需要指定它想要连接的服务器地址(如 tcp://localhost:5555)。
  • 关系:多个 Socket 可以使用 connect 方法连接到同一个由另一个 Socket 通过 bind 绑定的地址。

关系和区别

  • 关系
    • bindconnect 是相互补充的:一个 Socket 通过 bind 监听连接,而另一个 Socket 通过 connect 建立到前者的连接。
    • 在对等网络(Peer-to-Peer)通信模式中,所有的节点既可以是客户端也可以是服务器端,因此它们可能会同时使用 bindconnect
  • 区别
    • 角色bind 用于监听,而 connect 用于发起连接。
    • 地址bind 通常使用通配符表示监听所有接口,而 connect 指定具体的远程地址。
    • 数量限制:一个端口只能被一个 Socket 绑定,但可以有多个 Socket 连接到同一个绑定的端口。

理解这两个概念对于使用 ZeroMQ 构建网络通信模型至关重要。正确地使用 bindconnect 可以确保消息在复杂的网络拓扑中有效地传递。

Request-Reply(应答模式)

应答模式特点:

  1. 客户端提出请求,服务端必须回答请求,每个请求只回答一次

  2. 客户端没有收到答复前,不能再次进行请求

  3. 可以有多个客户端提出请求,服务端能保证各个客户端只接收到自己的答复

    1. 如果服务端断掉或者客户端断掉会产生怎样的影响?

      如果是客户端断掉,对服务端没有任何影响,如果客户端随后又重新启动,那么两方继续一问一答,但是如果是服务端断掉了,就可能会产生一些问题,这要看服务端是在什么情况下断掉的,如果服务端收是在回答完问题后断掉的,那么没影响,重启服务端后,双发继续一问一答,但如果服务端是在收到问题后断掉了,还没来得及回答问题,这就有问题了,那个提问的客户端迟迟得不到答案,就会一直等待答案,因此不会再发送新的提问,服务端重启后,客户端迟迟不发问题,所以也就一直等待提问。

python 实现 1

  • 客户端和服务端代码如下:

zmq_server.py

1
2
3
4
5
6
7
8
9
10
11
12
import zmq

context = zmq.Context() #创建上下文
socket = context.socket(zmq.REP) #创建Response服务端socket
socket.bind("tcp://*:5555") #socket绑定,*表示本机ip,端口号为5555,采用tcp协议通信

while True:
message = socket.recv()
print(type(message)) #接收到的消息也会bytes类型(字节)
print("收到消息:{}".format(message))
socket.send(b"new message") #发送消息,字节码消息

zmq_client.py

1
2
3
4
5
6
7
8
9
10
11
#coding:utf-8

import zmq

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")

socket.send(b"A message")
response = socket.recv()
print(response)

常用数据发送API如下:

1
2
3
4
5
6
7
8
9
10
11
12
#发送数据
socket.send_json(data) #data 会被json序列化后进行传输 (json.dumps)
socket.send_string(data, encoding="utf-8") #data为unicode字符串,会进行编码成子节再传输
socket.send_pyobj(obj) #obj为python对象,采用pickle进行序列化后传输
socket.send_multipart(msg_parts) # msg_parts, 发送多条消息组成的迭代器序列,每条消息是子节类型,
# 如[b"message1", b"message2", b"message2"]

#接收数据
socket.recv_json()
socket.recv_string()
socket.recv_pyobj()
socket.recv_multipart()

python 实现 2

sever.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import zmq
import sys
context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:5555")
while True:
try:
print("wait for client ...")
message = socket.recv()
print("message from client:", message.decode('utf-8'))
socket.send(message)
except Exception as e:
print('异常:',e)
sys.exit()

client.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import zmq
import sys
context = zmq.Context()
print("Connecting to server...")
socket = context.socket(zmq.REQ)
socket.connect("tcp://localhost:5555")
while True:

input1 = input("请输入内容:").strip()
if input1 == 'b':
sys.exit()
socket.send(input1.encode('utf-8'))

message = socket.recv()
print("Received reply: ", message.decode('utf-8'))

Publisher-Subscriber (发布-订阅模式)

publiser广播消息到所有客户端,客户端根据订阅主题过滤消息。

广播所有client,没有队列缓存,断开连接数据将永远丢失。

PUB发送,send。SUB接收,recv。和PUSH-PULL模式不同,PUB将消息同时发给和他建立的链接,类似于广播。另外发布订阅模式也可以使用订阅过滤来实现只接收特定的消息。订阅过滤是在服务器上进行过滤的,如果一个订阅者设定了过滤,那么发布者将只发布满足他订阅条件的消息。

这个就是广播和收听的关系。PUB-SUB模式虽然没有使用网络的广播功能,但是它内部是异步的。也就是一次发送没有结束立刻开始下一次发送。
广播所有client,没有队列缓存,断开连接数据将永远丢失。client可以进行数据过滤。

Python 实现

python实现代码如下, 其中publisher发布两条消息,第一条消息的topic为client1, 被第一个subscriber接收到;第二条消息的topic为client2, 被第二个subscriber接收到。

注意的是 subscriber 在匹配时,并不是完全匹配的,消息的topic为client1开头的字符串都会被匹配到,如果topic为"client1cient2", 也会被第一个subscriber接收到

server.py

1
2
3
4
5
6
7
8
9
10
11
12
13
import zmq
import time
import sys
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5555")

while True:
msg = input("请输入要发布的信息:").strip()
if msg == 'b':
sys.exit()
socket.send(msg.encode('utf-8'))
time.sleep(1)

client1.py

1
2
3
4
5
6
7
8
9
10
import zmq

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")
socket.setsockopt(zmq.SUBSCRIBE,''.encode('utf-8')) # 接收所有消息
while True:
response = socket.recv().decode('utf-8');
print("response: %s" % response)

client2.py

1
2
3
4
5
6
7
8
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5555")
socket.setsockopt(zmq.SUBSCRIBE,'123'.encode('utf-8')) # 消息过滤 只接受123开头的信息
while True:
response = socket.recv().decode('utf-8');
print("response: %s" % response)

utf-8 编码

1
2
3
4
5
# 发送端
self.socket.send_string("你好世界", encoding='utf-8')

# 接收端
msg = socket.recv_string(encoding='utf-8')

C++ 实现

server:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <zmq.h>
#include <stdio.h>
#include <stdlib.h>
#include "zmq_helper.h"

int main(void)
{
void * context = zmq_ctx_new(); void * socket = zmq_socket(context, ZMQ_PUB);
zmq_bind(socket, "tcp://*:5556");

srandom((unsigned)time(NULL));

while(1)
{
int zipcode = randof(100000); // 邮编: 0 ~ 99999
int temp = randof(84) - 42; // 温度: -42 ~ 41
int relhumidity = randof(50) + 10; // 相对湿度: 10 ~ 59

char msg[20];
snprintf(msg, sizeof(msg), "%5d %d %d", zipcode, temp, relhumidity);
s_send(socket, msg);
}

zmq_close(socket);
zmq_ctx_destroy(context);

return 0;
}

client:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include <zmq.h>
#include <stdio.h>
#include "zmq_helper.h"

int main(void)
{
void * context = zmq_ctx_new();
void * socket = zmq_socket(context, ZMQ_SUB);
zmq_connect(socket, "tcp://localhost:5556");

char * zipcode = "10001";
zmq_setsockopt(socket, ZMQ_SUBSCRIBE, zipcode, strlen(zipcode));

for(int i = 0; i < 50; ++i)
{
char * string = s_recv(socket);
printf("[Subscriber] Received weather report msg: %s\n", string);
free(string);
}

zmq_close(socket);
zmq_ctx_destroy(context);

return 0;
}

  1. ZMQ_PUB类型的socket, 如果没有任何client与其相连, 其所有消息都将被简单就地抛弃
  2. ZMQ_SUB类型的socket, 即是client, 可以与多个ZMQ_PUB类型的socket相连, 即村民可以同时收听多个msg 但必须为每个msg都设置过滤器. 否则默认情况下, zmq认为client不关心msg里的所有内容.
  3. 当一个cline收听多个时, 接收消息采用公平队列策略
  4. 如果存在至少一个clint在收听, 那么这个消息就不会被随意抛弃: 这句话的意思是, 当消息过多, 而client的消化能力比较低的话, 未发送的消息会缓存在msg里.
  5. 在ZMQ大版本号在3以上的版本里, 当msg与client的速度不匹配时. 若使用的传输层协议是tcpipc这种面向连接的协议, 则堆积的消息缓存在里, 当使用epgm这种协议时, 堆积的消息缓存了client里. 在ZMQ 大版本号为2的版本中, 所有情况下, 消息都将堆积在clinet里

xsub / xpub

在 ZeroMQ 中,xsubxpub 是高级消息传递模式,它们分别是 sub(订阅者)和 pub(发布者)模式的扩展。

  • xsub (Extended Subscriber): xsub 是一个扩展的订阅者套接字,它支持传统的订阅模式,同时还支持发送和接收来自其他订阅者的订阅信息。这意味着 xsub 套接字不仅可以接收发布者发布的消息,还可以接收其他订阅者订阅的主题。
  • xpub (Extended Publisher): xpub 是一个扩展的发布者套接字,它允许发送者不仅发送消息,还可以发送订阅信息。这允许 xpub 套接字在接收到订阅信息时将其广播给所有连接的订阅者。

xsub/xpub 与 sub/pub 的区别

  • 消息流控制:
    • subpub 套接字只支持单向的消息流。sub 套接字接收来自 pub 的消息,而 pub 套接字只发送消息,不关心订阅者的存在。
    • xsubxpub 套接字支持双向消息流。xsub 可以接收来自 xpub 的消息和订阅信息,而 xpub 可以发送消息和广播订阅信息。
  • 订阅信息的传递:
    • sub/pub 模式下,订阅者通过发送订阅信息来订阅特定主题,但订阅信息不会传递给其他订阅者。
    • xsub/xpub 模式下,订阅信息会被广播给所有连接到 xpub 的订阅者,这样每个订阅者都能知道其他订阅者订阅了哪些主题。
  • 消息过滤:
    • sub 套接字通过发送订阅信息来指定它想要接收哪些消息。
    • xsub 套接字不仅可以接收消息,还可以接收其他订阅者的订阅信息,这使得它可以在内部构建订阅列表,并可能对消息进行更复杂的过滤。
  • 使用场景:
    • subpub 套接字适用于简单的发布-订阅模式,其中不需要关心订阅者的具体信息。
    • xsubxpub 套接字适用于更复杂的场景,比如需要了解订阅者状态或者需要构建消息代理的场景。

示例场景

一个常见的使用 xsubxpub 的场景是构建一个 ZeroMQ 消息代理。在这种情况下,xpub 套接字可以连接到多个 subxsub 套接字,并广播消息和订阅信息。同样,xsub 套接字可以连接到多个 pubxpub 套接字,接收消息和订阅信息,并将它们转发给下游的订阅者。

在有 XSUB 服务器时,客户端 pub 仅需 connect 即可进行通信。

Parallel Pipeline(管道模型)

由三部分组成,push进行数据推送,work进行数据缓存,pull进行数据竞争获取处理。区别于Publish-Subscribe存在一个数据缓存和处理负载。

当连接被断开,数据不会丢失,重连后数据继续发送到对端。

分治套路里有三个角色:

  1. Ventilator. 包工头, 向手下各个工程队分派任务. (一个)
  2. Worker. 工程队, 从包工头里接收任务, 干活. (多个)
  3. Sink. 甲方监理, 工程队干完活后, 向甲方监理报告. 所以工程队的活干完之后, 监理统一收集所有工程队的成果. (一个)

Python 实现

server.py

1
2
3
4
5
6
7
8
9
10
11
12
import zmq
import time

context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5557")

while True:
msg = input("请输入要发布的信息:").strip()
socket.send(msg.encode('utf-8'))
print("已发送")
time.sleep(1)

worker.py

1
2
3
4
5
6
7
8
9
10
11
import zmq
context = zmq.Context()
receive = context.socket(zmq.PULL)
receive.connect('tcp://127.0.0.1:5557')
sender = context.socket(zmq.PUSH)
sender.connect('tcp://127.0.0.1:5558')

while True:
data = receive.recv()
print("正在转发...")
sender.send(data)

client.py

1
2
3
4
5
6
7
8
import zmq
context = zmq.Context()
socket = context.socket(zmq.PULL)
socket.bind("tcp://*:5558")

while True:
response = socket.recv().decode('utf-8')
print("response: %s" % response)

C++ 实现

包工头代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#include <zmq.h>
#include <stdio.h>
#include <time.h>
#include "zmq_helper.h"

int main(void)
{
void * context = zmq_ctx_new();
void * socket_to_sink = zmq_socket(context, ZMQ_PUSH);
void * socket_to_worker = zmq_socket(context, ZMQ_PUSH);
zmq_connect(socket_to_sink, "tcp://localhost:5558");
zmq_bind(socket_to_worker, "tcp://*:5557");

printf("Press Enter when all workers get ready:");
getchar();
printf("Sending tasks to workers...\n");

s_send(socket_to_sink, "Get ur ass up"); // 通知监理, 干活了

srandom((unsigned)time(NULL));

int total_ms = 0;
for(int i = 0; i < 100; ++i)
{
int workload = randof(100) + 1; // 工作需要的耗时, 单位ms
total_ms += workload;
char string[10];
snprintf(string, sizeof(string), "%d", workload);
s_send(socket_to_worker, string); // 将工作分派给工程队
}

printf("Total expected cost: %d ms\n", total_ms);

zmq_close(socket_to_sink);
zmq_close(socket_to_worker);
zmq_ctx_destroy(context);

return 0;
}

工程队代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <zmq.h>
#include <stdio.h>
#include "zmq_helper.h"

int main(void)
{
void * context = zmq_ctx_new();
void * socket_to_ventilator = zmq_socket(context, ZMQ_PULL);
void * socket_to_sink = zmq_socket(context, ZMQ_PUSH);
zmq_connect(socket_to_ventilator, "tcp://localhost:5557");
zmq_connect(socket_to_sink, "tcp://localhost:5558");

while(1)
{
char * msg = s_recv(socket_to_ventilator);
printf("Received msg: %s\n", msg);
fflush(stdout);
s_sleep(atoi(msg)); // 干活, 即睡眠指定毫秒
free(msg);
s_send(socket_to_sink, "DONE"); // 活干完了通知监理
}

zmq_close(socket_to_ventilator);
zmq_close(socket_to_sink);
zmq_ctx_destroy(context);

return 0;
}

监理代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <zmq.h>
#include <stdio.h>
#include "zmq_helper.h"

int main(void)
{
void * context = zmq_ctx_new();
void * socket_to_worker_and_ventilator = zmq_socket(context, ZMQ_PULL);
zmq_bind(socket_to_worker_and_ventilator, "tcp://*:5558");

char * msg = s_recv(socket_to_worker_and_ventilator);
printf("Received msg: %s", msg); // 接收来自包工头的开始干活的消息
free(msg);

int64_t start_time = s_clock();

for(int i = 0; i < 100; ++i)
{
// 接收100个worker干完活的消息
char * msg = s_recv(socket_to_worker_and_ventilator);
free(msg);

if(i / 10 * 10 == i)
printf(":");
else
printf(".");
fflush(stdout);
}

printf("Total elapsed time: %d ms]\n", (int)(s_clock() - start_time));

zmq_close(socket_to_worker_and_ventilator);
zmq_ctx_destroy(context);

return 0;
}

这个示例程序的逻辑流程是这样的:

  1. 包工头向两个角色发送消息: 向工程队发送共计100个任务, 向监理发送消息, 通知监理开始干活
  2. 工程队接收来自包工头的消息, 并按消息里的数值, 睡眠指定毫秒. 每个任务结束后都通知监理.
  3. 监理先是接收来自包工头的消息, 开始计时. 然后统计来自工程队的消息, 当收集到100个任务完成的消息后, 计算实际耗时.

包工头里输出的预计耗时是100个任务的共计耗时, 在监理那里统计的实际耗时则是由多个工程队并行处理100个任务实际的耗时.

这里个例子中需要注意的点有:

  1. 这个例子中使用了ZMQ_PULLZMQ_PUSH两种socket. 分别供消息分发方与消息接收方使用. 看起来略微有点类似于发布-订阅套路, 具体之间的区别后续章节会讲到.
  2. 工程队上接包工头, 下接监理. 在任务执行过程中, 你可以随意的增加工程队的数量.
  3. 我们通过让包工头通知监理, 以及手动输入enter来启动任务分发的方式, 手动同步了工程队/包工头/监理. PUSH/PULL模式虽然和PUB/SUB不一样, 不会丢失消息. 但如果不手动同步的话, 最先建立连接的工程队将几乎把所有任务都接收到手, 导致后续完成连接的工程队拿不到任务, 任务分配不平衡.
  4. 包工头分派任务使用的是轮流/平均分配的方式.这是一种简单的负载均衡
  5. 监理接收多个工程队的消息, 使用的是公平队列策略.

参考资料

文章链接:
https://www.zywvvd.com/notes/tools/zmq/zmq/