惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

爱范儿
爱范儿
Security Latest
Security Latest
NISL@THU
NISL@THU
OSCHINA 社区最新新闻
OSCHINA 社区最新新闻
C
Cybersecurity and Infrastructure Security Agency CISA
Cloudbric
Cloudbric
T
Threat Research - Cisco Blogs
大猫的无限游戏
大猫的无限游戏
C
CXSECURITY Database RSS Feed - CXSecurity.com
阮一峰的网络日志
阮一峰的网络日志
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
雷峰网
雷峰网
C
Cisco Blogs
V
Vulnerabilities – Threatpost
S
Security Archives - TechRepublic
V
Visual Studio Blog
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
cs.AI updates on arXiv.org
cs.AI updates on arXiv.org
J
Java Code Geeks
D
Darknet – Hacking Tools, Hacker News & Cyber Security
Know Your Adversary
Know Your Adversary
博客园 - 叶小钗
腾讯CDC
钛媒体:引领未来商业与生活新知
钛媒体:引领未来商业与生活新知
P
Privacy International News Feed
P
Palo Alto Networks Blog
博客园_首页
V
V2EX
WordPress大学
WordPress大学
Schneier on Security
Schneier on Security
月光博客
月光博客
博客园 - 司徒正美
Google DeepMind News
Google DeepMind News
TaoSecurity Blog
TaoSecurity Blog
博客园 - 聂微东
酷 壳 – CoolShell
酷 壳 – CoolShell
人人都是产品经理
人人都是产品经理
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
博客园 - 【当耐特】
The Cloudflare Blog
罗磊的独立博客
美团技术团队
N
News | PayPal Newsroom
K
KPMG report finds enterprise disconnect between AI and its ROI | CIO
Last Week in AI
Last Week in AI
K
Kaspersky official blog
Google Online Security Blog
Google Online Security Blog
S
SegmentFault 最新的问题
Application and Cybersecurity Blog
Application and Cybersecurity Blog
T
Tailwind CSS Blog

又见苍岚

COLMAP PatchMatch Stereo 算法详解 事件驱动的状态机框架:从理论到工程实践 Git 在国内网络环境下无法 Push 的排查与修复 —— 配置 Clash 代理 分段五次多项式插值原理详解 路径插值方法深度对比研究 Claude Code 使用指南 OpenClaw 记忆管理与技能创建指南 CBS(Conflict-Based Search)算法详解 A* 算法及其变种详解 OpenClaw 配置多 Agents Windows Powershell 无法加载文件,因为在此系统上禁止运行脚本问题的解决方案 MaxClaw 安装流程 大模型 AI 名词介绍 AList 网盘聚合工具简介 Protobuf 简介与测试 Claude Code 简介以及 GLM 4.7 模型接入 Github 歌词下载工具 163MusicLyrics Python __getattr__ 懒加载 Python TypedDict 机器人仿真平台 Gazebo 安装记录 机器人仿真平台 Gazebo 简介 多机器人路径规划问题(Multi-Agent Path Finding, MAPF)简介 Python exifread 读取修改过的 jpeg 信息错误问题修复 3D 坐标系变换的理解 3D 旋转矩阵基本概念 MongoDB Compass 介绍 Python 环境管理工具 uv Flutter 开发指南 Snipaste 安装下载与黑屏问题解决方案 全局路径规划算法记录 2025 Python 版本性能测试 Flutter Hello World Flutter 安装环境配置 Ubuntu VMware 硬盘扩容后 SMBus Host controller not enabled 报错问题解决 Python NetworkX 教程 Docker GPU 报错 - Failed to initialize NVML Unknown Error 解决方案 Python matplotlib 图表绘制 cuda-toolkit 安装替代 Cuda 与 Cudnn Jinja2 Python 利用 docxtpl 和 Jinja2 生成基于模板的 Word 文档 Docker 实现 CPU 核心隔离 LoFTR 基于 Transformer 的特征提取匹配算法 OmniGlue 特征匹配 SuperGlue 使用图神经网络学习特征匹配 Ubuntu 下将 xlsx 文件按照 sheet 转换为 图片 Python 使用 SQLAlchemy Python FastAPI 教程 openwrt 软路由配置安装 Nav2 地图文件(PGM/YAML)规范标准 3D OBJ 模型转换为 glb 瓦片格式 Python 源码 Redis 数据库介绍 Ubuntu 22.04 内核自动升级导致 MongoDB 7.0.12 错误记录 ubuntu 20.04 安装 ROS Noetic ubuntu 18.04 安装 ROS Melodic VMware Workstation Pro 个人免费版下载、安装、使用指南 Hybrid A-star 路径规划 Reeds-Shepp 曲线 Dubins 曲线 Linux kvm 虚拟机网络不通的问题解决方法 Ubuntu 自动内存清理 BiliBili 缓存视频转 mp4 Python 求解线性规划 3D Gaussian Splatting 官方源码实践记录 ImageMagick 教程 Ubuntu 22.04 安装 Colmap 对数几率 odds Ubuntu nmcli 网络管理工具使用指南 SuperPoint 自监督深度学习特征点提取 SyncTV Music Tag Web 在线音乐信息整理工具 ncm 格式转 mp3 MusicBrainz 音乐元数据百科数据库 Ubuntu 网络流量监控工具 私人云音乐平台 Navidrome 入门 手眼标定 四元数(Quaternions) OHTTPS 实现免费自动 https 证书申请、更新、部署 ubuntu 22.04 安装 CloudCompare 单机 KVM 虚拟机冷迁移 Ubuntu 22.04 使用 mdadm 实现软 raid 小鱼 一键安装 ROS-humble Fluid -46- 基于 Simpletex API 构建公式识别页面 公式识别 API 简介 -- Simpletex 使用 Python web 部署库 waitress 3D Gaussian Splatting for Real-Time Radiance Field Rendering Ubuntu Swap 简介与空间扩展 Ubuntu 24.04 安装 forticlient Clash Verge 使用 MongoDB 7.0.17 集群 Docker 构建源码 Error code - 2013. Lost connection to MySQL server during query 问题解决 Python 日志记录库 loguru 使用指北 Python 实现 Web 日志查看服务 MySQL LOAD DATA LOCAL INFILE 极速数据加载 Image size exceeds limit of 89478485 pixels 解决方案 Docker 使用 NVIDIA GPU 驱动错误解决 阿里云 docker 镜像仓库 Ubuntu中没有wired connected的解决方案 MinIO 简介 subconverter 代理订阅格式转换 修复 node –openssl-legacy-provider is not allowed in NODE_OPTIONS 错误
Halcon 消息队列
Yiwei Zhang · 2023-03-09 · via 又见苍岚

之前我们介绍了 消息队列,本文介绍 Halcon 消息队列的用法。

消息队列

我们可以把消息队列比作是一个存放消息的容器,当我们需要使用消息的时候可以取出消息供自己使用。 具体内容详见 消息队列

Halcon 算子

创建消息队列

  • 核心函数 create_message_queue

    1
    create_message_queue( : : : QueueHandle)

该函数创建一个新的空的消息队列,输出参数 QueueHandle 是指向新建消息队列的句柄。

消息队列被设计成 FIFO 管道,在不同的线程之间安全地传递任意集合的数据。

队列访问在内部是完全同步的,不需要从应用程序进行显式锁定。数据在所谓的消息中通过队列传输。

多个生产者线程可以同时添加数据(enqueue_message) ,而多个消费者线程可以同时提取数据(dequeue_message)。

所有排队的消息都由 enqueue_message 操作复制。因此,原始消息可以在 enqueue_message 调用之后立即重用,而不会影响到排队副本。

创建消息

  • 核心函数 create_message:

    1
    create_message( : : : MessageHandle)

create_message 创建一个新的空消息。输出参数 MessageHandle 是新创建的消息的句柄,用于在使用该消息的任何后续运算符调用中标识该消息。消息充当类似字典的容器,可以使用异步消息队列在应用程序的线程之间传递。

消息可以存储任意数量的条目,每个条目都有其唯一的键(字符串或整数)和相关的值。每个键可以引用控件参数元组,也可以引用图标对象。这些数据分别使用 set_message_tupleset_message_obj 存储到消息中,在这里可以使用 get_message_tupleget_message_obj 再次检索这些数据。

存储在消息中的控件参数元组始终是原始数据的深度副本。因此,可以在 set_message_tuple 调用之后立即重用原始数据,而不会影响消息。值得注意的例外是句柄: 在消息中存储任何句柄都会复制句柄值,但不会复制句柄后面的资源。

设置消息内容

  • 核心函数 set_message_obj , set_message_tuple:

    1
    2
    set_message_obj(ObjectData : : MessageHandle, Key : )
    set_message_tuple( : : MessageHandle, Key, TupleData : )

两个函数分别可以将 objtuple 数据添加进消息句柄中,使用时类似于字典的取值用法;添加进消息的数据是原始数据的深拷贝。

添加消息进队列

  • 核心函数 enqueue_message

    1
    enqueue_message( : : QueueHandle, MessageHandle, GenParamName, GenParamValue : )

enqueue_message 将一个或多个消息排队到由 QueueHandle 参数表示的消息队列。任何线程都可以使用 dequeue_message 从队列中检索排队的消息。

多个生产者(排队)线程和多个使用者(排队)线程可以同时共享相同的队列。消息按先进先出(FIFO)顺序传递。即使多个使用者线程正在使用队列,每条消息也只传递一次。

队列访问在内部是完全同步的,不需要外部锁定。如果队列为空,并且在 dequeue_message 中至少有一个使用者线程在等待消息数据,那么其中一个线程将被成功的 enqueue_message 调用唤醒,并立即传递加入队列的消息数据。否则,消息数据将异步附加到队列中,以便在使用者线程准备好再次取消消息数据队列时立即传递。

所有排队的消息(MessageHandle)都由 enqueue_message 操作复制。因此,原始消息可以在 enqueue_message 调用之后立即重用,而不会影响到排队副本。

操作符参数 GenParamNameGenParamValue 保留在操作符接口中以供将来使用,目前不支持通用参数。

如果在操作之后,队列中包含的消息数量大于最大队列长度 set_message_queue_param 的参数值max_message_num 所指定的最大数量,会抛出 H_ERR_MQOVL 错误。

从队列中取走消息

  • 核心函数 dequeue_message

    1
    dequeue_message( : : QueueHandle, GenParamName, GenParamValue : MessageHandle)

Dequeue_messageQueueHandle 参数表示的消息队列中排出消息。消息必须由使用 enqueue_message 的任何线程排队。

消息按先进先出(FIFO)顺序传递,每条消息只传递一次。如果队列不是空的,dequeue_message 将立即从队列传递最早的消息。此消息将从队列中删除,并在 MessageHandle 输出参数中返回该消息的句柄。消息数据所有权从消息队列传输(不复制)到新创建的消息句柄。

如果队列为空,dequeue_message 将阻塞,直到消息可以传递(另一个线程使用 enqueue_message 添加消息)。

可以使用 get_message_tupleget_message_objget_message_param 查询存储在消息中的数据。

通过 dequeue_message 获得的消息句柄可以进一步重用(修改和/或排队到另一个消息队列)。

如果使用一个 enqueue_message 调用对多个消息进行排队,那么所有这些消息也将通过一个 dequeue_message 调用一起检索,并通过 MessageHandle 元组传递多个消息句柄。

队列访问在内部是完全同步的,不需要外部锁定。

在应用程序重新配置或清理期间,可能需要唤醒在 dequeue_message 中等待消息的线程。这可以通过使用带有参数 abort_dequeue 的运算符 set_message_queue_param 来实现。在这种情况下,当前阻塞的 dequeue_message 调用立即使用 H_ERR_MQCNCL 返回。

最后,可以通过 GenParamNameGenParamValue 中的通用参数来调整 dequeue_message 行为。目前,只支持一个通用参数 —— timeout

如果队列为空,则超时控制操作员在等待消息时将阻塞多长时间。过期时,操作符返回 H_ERR_TIMEOUT。超时可以指定为整数或双精度值(以秒为单位) ,也可以指定为字符串 infinite 表示永远等消息。

1
dequeue_message (MessageQueue, 'timeout', 'infinite', MessageData)

如果未指定超时,则默认使用无限超时,这意味着操作符将阻塞,直到新消息进入队列或操作中止。

获取消息内容

  • 核心函数 get_message_paramget_message_objget_message_tuple

    1
    2
    3
    get_message_param( : : MessageHandle, GenParamName, Key : GenParamValue)
    get_message_obj( : ObjectData : MessageHandle, Key : )
    get_message_tuple( : : MessageHandle, Key : TupleData)

get_message_objget_message_tuple 算子从指定 MessageHandle 的消息中提取 key 值对应的内容,得到 TupleData 数据。提取的数据必须是通过 set_message 系列算子设置的。

get_message_param 算子查询消息参数的当前值或有关消息状态的其他信息。

可以查询的值包括:

key 含义
message_keys 查询存储在消息中的所有键,不管它们是与元组数据还是对象数据相关联。键列表通过 GenParamValue 以字符串元组的形式报告。对于此查询,参数 Key 必须是空元组。
key_exists 如果给定的密钥存储在消息中,则报告1,否则报告0。结果通过 GenParamValue 报告,每个键一个值。
key_data_type 报告与消息中的 tuple 数据关联的键的 tuple(可以使用 get_message_tuple 检索数据)。为与对象数据关联的键报告“object(可以使用 get_message_obj 检索数据)。结果通过 GenParamValue 报告,每个键一个值。这个参数有助于动态决定是使用 get_message_tuple 还是 get_message_obj 来获取特定键的数据。

获取队列参数

  • 核心函数 get_message_queue_param :

    1
    get_message_queue_param( : : QueueHandle, GenParamName : GenParamValue)

get_message_queue_param 查询消息队列参数的当前值或有关队列状态的其他信息。可以通过一个 get_message_queue_param 调用执行多个查询,将多个参数名传递给参数 GenParamName。在 GenParamValue,参数值的返回顺序与调用者请求的参数名相同。

当前支持查询的属性:

key 含义
is_empty 如果队列为空,则返回1,否则返回0。
message_num 返回队列中当前存储的消息数。
max_message_num 返回消息队列的“ max_message_num”参数的当前值,因为它是使用 set_message_queue_param 设置的。默认值 -1表示没有限制。

Halcon 生产者消费者示例

Halcon 自带例程 examples/hdevelop/System/Multithreading/message_queue_producer_consumer.hdev

我模仿实现了简化版的内容:

  • 主程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
set_system ('parallelize_operators', 'false')  * 关闭 AOP
count_seconds (T1)
thread_num := 12
create_message_queue (image_path_queue) * 数据队列,包含结束处理线程
create_message_queue (finish_queue) * 结果队列,检查工作是否完成

for ThreadIndex := 0 to thread_num - 1 by 1
par_start<VProcThreads.at(ThreadIndex)> : processing_thread (DarkImage, SubImage, golden_img, image_height, image_width, Mean, nccModel, image_path_queue, finish_queue) * 工消费者线程,创建后阻塞从数据队列中提取数据
endfor

par_start<AcqThread> : acquisition_thread (ImageFiles, data_size, image_path_queue, finish_queue, thread_num) * 生产者线程,向队列中不断添加数据

NumStartedThreads := thread_num + 1
NumFinishedThreads := 0

while (1)
* Read next message with results from the queue.
dequeue_message (finish_queue, 'timeout', 'infinite', MessageResult) * 查看消费者完成工作的情况
*
* Here, the message can be either a real collection of results processing
* a single input image - or a special message informing about completion
* of one of the worker threads.
get_message_param (MessageResult, 'key_exists', 'thread_finished', ThreadEndInfo)
if (ThreadEndInfo[0])
* Worker-thread-finish message, just increment the
* finished-thread counter.
get_message_tuple (MessageResult, 'thread_finished', ThreadEndMessage)
NumFinishedThreads := NumFinishedThreads + 1
endif
if (NumFinishedThreads == NumStartedThreads) * 所有消费者都完成了工作
break
endif
endwhile
count_seconds (T2)
producer_consumer_without_aop_time := (T2-T1) * 1000

  • 生产者代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

for Index := 0 to data_size - 1 by 1

image_path := image_path_list[Index]
*
* Create a message and store the acquired image path in it
create_message (MessageImgPath)

set_message_tuple (MessageImgPath, 'path', image_path)
*
* Send the message to the "image" queue, which is used to deliver
* input images from the acquisition thread to the multiple
* processing threads.
enqueue_message (image_path_queue, MessageImgPath, [], [])
*
* Pretend non-trivial acquisition time
wait_seconds (0.01)
endfor

create_message (MessageStop)
set_message_tuple (MessageStop, 'stop_processing', 1)
for Index := 1 to worker_num by 1
enqueue_message (image_path_queue, MessageStop, [], [])
endfor

create_message (MessageFinished)
set_message_tuple (MessageFinished, 'thread_finished', 'Acquisition thread finished')
enqueue_message (finish_queue, MessageFinished, [], [])

return ()

  • 消费者代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

while (1)
dequeue_message (image_path_queue, 'timeout', 'infinite', MessageData)

get_message_param (MessageData, 'key_exists', 'stop_processing', StopProcInfo)
if (StopProcInfo[0])
break
endif

get_message_tuple (MessageData, 'path', image_path)
test_run (DarkImage, SubImage, golden_img, image_path, image_height, image_width, Mean, nccModel, result)
endwhile

create_message (MessageFinished)
set_message_tuple (MessageFinished, 'thread_finished', 'Processing finished')
enqueue_message (finish_queue, MessageFinished, [], [])

return ()

实测下来提升吞吐量 3-4 倍。

内存泄漏

Halcon 在申请图像内存时会额外申请最大图像相同大小的额外内存,这个额外内存一般用于加速处理,但是当多线程启动时,如果在多线程内部读取图像,Halcon 会为每个线程申请一份上述的额外内存,导致内存泄漏

因此正确的多线程使用方式是将数据通过参数传入线程内部,核心只实现运算部分,不要在线程中读取图像数据

参考资料

文章链接:
https://www.zywvvd.com/notes/coding/halcon/halcon-queue/halcon-queue/