惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

Security Latest
Security Latest
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
Stack Overflow Blog
Stack Overflow Blog
WordPress大学
WordPress大学
N
Netflix TechBlog - Medium
GbyAI
GbyAI
云风的 BLOG
云风的 BLOG
Threat Intelligence Blog | Flashpoint
Threat Intelligence Blog | Flashpoint
宝玉的分享
宝玉的分享
博客园 - 【当耐特】
C
Cyber Attacks, Cyber Crime and Cyber Security
雷峰网
雷峰网
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
T
Threat Research - Cisco Blogs
NISL@THU
NISL@THU
Spread Privacy
Spread Privacy
P
Proofpoint News Feed
J
Java Code Geeks
freeCodeCamp Programming Tutorials: Python, JavaScript, Git & More
MyScale Blog
MyScale Blog
T
Tor Project blog
P
Proofpoint News Feed
C
CERT Recently Published Vulnerability Notes
P
Privacy & Cybersecurity Law Blog
MongoDB | Blog
MongoDB | Blog
Simon Willison's Weblog
Simon Willison's Weblog
C
Cybersecurity and Infrastructure Security Agency CISA
L
LINUX DO - 热门话题
小众软件
小众软件
G
GRAHAM CLULEY
P
Privacy International News Feed
AWS News Blog
AWS News Blog
Know Your Adversary
Know Your Adversary
P
Palo Alto Networks Blog
人人都是产品经理
人人都是产品经理
S
Schneier on Security
Scott Helme
Scott Helme
OSCHINA 社区最新新闻
OSCHINA 社区最新新闻
B
Blog RSS Feed
T
The Exploit Database - CXSecurity.com
Recent Announcements
Recent Announcements
E
Exploit-DB.com RSS Feed
C
CXSECURITY Database RSS Feed - CXSecurity.com
U
Unit 42
The Register - Security
The Register - Security
S
Securelist
Martin Fowler
Martin Fowler
Project Zero
Project Zero
大猫的无限游戏
大猫的无限游戏
Cisco Talos Blog
Cisco Talos Blog

博客园 - 西北逍遥

实验启动指令 osg3.6绘制半球体 kinova jaco2 机械臂控制器故障灯闪烁(双绿灯)问题解决方法 IFC标准在学术界的研究与发展历程:从理论探索到产业实践的全面梳理IFC标准在学术界的研究与发展历程:从理论探索到产业实践的全面梳理 BIM的“普通话”:解密IFC标准如何重塑建筑行业 IfcCrewResource Qt重置 Brush pyqt 操作mysql数据库 泵仿真 Qt折线的显示与隐藏 Qt绘制折线 c++ Qt绘制传热云图 - 西北逍遥 livox mid-70采集点云数据 随机配色 学习:LED灯闪烁 win10安装neo4j-community-3.5.7-windows win10安装MongoDB 3.0.15 Community python把图片合并成gif图 ubuntu20.04测试cuda start.bat Djstra求解最短路径
yolov8-pose监测人体关节并保存关节坐标
西北逍遥 · 2026-04-18 · via 博客园 - 西北逍遥
import cv2
import torch
import numpy as np
from pathlib import Path
import argparse
import os
import csv  # 新增
from ultralytics import YOLO

class YOLOv8PoseDetector:
    def __init__(self, model_path='yolov8n-pose.pt'):
        """
        初始化YOLOv8姿态检测器
        Args:
            model_path: 模型文件路径
        """
        # 检查GPU可用性
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        print(f"使用设备: {self.device}")
        
        # 加载模型
        self.model = YOLO(model_path)
        self.model.to(self.device)
        
        # 姿态关键点连接关系
        self.skeleton = [
            (5, 7), (7, 9), (6, 8), (8, 10), (5, 6), 
            (5, 11), (6, 12), (11, 12), 
            (11, 13), (13, 15), (12, 14), (14, 16)
        ]
        
        # 关键点颜色
        self.keypoint_colors = [
            (255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0),
            (255, 0, 255), (0, 255, 255), (255, 128, 0), (0, 128, 255),
            (128, 255, 0), (255, 0, 128), (0, 255, 128), (128, 0, 255),
            (255, 128, 128), (128, 255, 128), (128, 128, 255), (255, 255, 255)
        ]

        # 给每一根骨架连线定义不同颜色(BGR格式)
        self.connection_colors = [
            (255, 0, 0),    #
            (0, 255, 0),    # 绿
            (0, 0, 255),    #
            (255, 255, 0),  #
            (255, 0, 255),  #
            (0, 255, 255),  #
            (128, 0, 128),  # 深紫
            (0, 128, 255),  #
            (128, 255, 0),  # 浅绿
            (255, 128, 0),  # 蓝橙
            (0, 128, 128),  # 橄榄
            (128, 128, 0),  # 墨绿
        ]
    
    def resize_frame(self, frame, max_width=1280, max_height=720):
        """
        调整帧大小,保持宽高比
        Args:
            frame: 输入帧
            max_width: 最大宽度
            max_height: 最大高度
        Returns:
            numpy.ndarray: 调整大小后的帧
        """
        height, width = frame.shape[:2]
        
        # 如果帧尺寸已经小于最大尺寸,则不需要调整
        if width <= max_width and height <= max_height:
            return frame
        
        # 计算缩放比例
        scale = min(max_width / width, max_height / height)
        
        new_width = int(width * scale)
        new_height = int(height * scale)
        
        return cv2.resize(frame, (new_width, new_height))
    
    def draw_pose(self, frame, keypoints, confidence_threshold=0.5):
        """
        在帧上绘制人体姿态
        Args:
            frame: 输入帧
            keypoints: 关键点坐标
            confidence_threshold: 置信度阈值
        Returns:
            numpy.ndarray: 绘制了姿态的帧
        """
        # 绘制骨架连接
        for idx,connection in enumerate(self.skeleton):
            start_idx, end_idx = connection
            index_color = self.connection_colors[idx % len(self.connection_colors)]
            # 检查关键点是否存在且置信度足够
            if (len(keypoints) > start_idx and len(keypoints) > end_idx and
                keypoints[start_idx][2] > confidence_threshold and 
                keypoints[end_idx][2] > confidence_threshold):
                
                start_point = (int(keypoints[start_idx][0]), int(keypoints[start_idx][1]))
                end_point = (int(keypoints[end_idx][0]), int(keypoints[end_idx][1]))
                
                # 绘制连接线
                cv2.line(frame, start_point, end_point, index_color, 2)
        
        # 绘制关键点
        for i, keypoint in enumerate(keypoints):
            if keypoint[2] > confidence_threshold:
                x, y = int(keypoint[0]), int(keypoint[1])
                cv2.circle(frame, (x, y), 2+1, self.keypoint_colors[i % len(self.keypoint_colors)], thickness=-1)
                cv2.circle(frame, (x, y), 2+1, (0,255,0), thickness=-1)
        
        return frame

    # 新增:保存关节坐标到 CSV
    def save_keypoints_to_csv(self, keypoints_batch, csv_path, confidence_threshold=0.5):
        """
        保存所有人的关节坐标到CSV
        格式:人id, 关节编号, x, y
        """
        with open(csv_path, 'w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            # 写入表头
            writer.writerow(['person_id', 'keypoint_id', 'x', 'y'])

            # 遍历每个人
            for person_id, keypoints in enumerate(keypoints_batch):
                # 遍历每个关节
                for kp_id, kp in enumerate(keypoints):
                    x, y, conf = kp
                    if conf > confidence_threshold:
                        writer.writerow([person_id, kp_id, round(float(x), 2), round(float(y), 2)])
    
    def process_image(self, image_path, output_path=None, confidence=0.5):
        """
        处理单张图片
        Args:
            image_path: 输入图片路径
            output_path: 输出图片路径
            confidence: 检测置信度
        Returns:
            numpy.ndarray: 处理后的图像
        """
        # 读取图片
        image = cv2.imread(image_path)
        if image is None:
            raise ValueError(f"无法读取图片: {image_path}")
        
        # 使用YOLOv8进行姿态检测
        results = self.model(image, conf=confidence, device=self.device)
        
        # 存储所有人物关键点
        all_keypoints = []

        # 绘制结果
        for result in results:
            if result.keypoints is not None:
                keypoints_batch = result.keypoints.data.cpu().numpy()
                all_keypoints = keypoints_batch
                for keypoints in keypoints_batch:
                    image = self.draw_pose(image, keypoints, confidence)
        
        # 保存图片
        if output_path:
            cv2.imwrite(output_path, image)

            # ===================== 保存CSV =====================
            csv_path = os.path.splitext(output_path)[0] + ".csv"
            self.save_keypoints_to_csv(all_keypoints, csv_path, confidence)

        return image

    # 批量处理文件夹
    def process_folder(self, input_dir, output_dir, confidence=0.5):
        # 自动创建输出目录
        os.makedirs(output_dir, exist_ok=True)
        print(f"输出目录已创建: {output_dir}")

        # 支持的图片格式
        exts = ['.jpg', '.jpeg', '.png', '.bmp', '.tiff']
        input_path = Path(input_dir)
        image_files = [f for f in input_path.glob('*') if f.suffix.lower() in exts]

        if not image_files:
            print("目录中没有图片!")
            return

        total = len(image_files)
        print(f"找到 {total} 张图片,开始处理...\n")

        for i, img in enumerate(image_files, 1):
            print(f"[{i}/{total}] 处理: {img.name}")
            out_path = os.path.join(output_dir, img.name)
            try:
                self.process_image(str(img), out_path, confidence)
                print(f"✅ 已保存:{img.name} | {Path(out_path).stem}.csv")
            except Exception as e:
                print(f"处理失败: {e}")

        print("\n✅ 全部图片处理完成!")

def main():
    # 固定你要的路径
    INPUT_DIR  = r"G:\Dataset\YXR-data\PPE only-multi worker(尝试即可)"
    OUTPUT_DIR = r"G:\Dataset\YXR-data\output\PPE only-multi worker(尝试即可)"

    parser = argparse.ArgumentParser(description='YOLOv8姿态检测 - 批量图片')
    parser.add_argument('--model', type=str, default='yolov8m-pose.pt', help='模型')
    parser.add_argument('--conf', type=float, default=0.25, help='置信度')
    
    args = parser.parse_args()
    
    detector = YOLOv8PoseDetector(args.model)
    
    print("="*50)
    print("批量姿态检测 + 关节坐标CSV保存")
    print(f"输入: {INPUT_DIR}")
    print(f"输出: {OUTPUT_DIR}")
    print("="*50)
    
    detector.process_folder(INPUT_DIR, OUTPUT_DIR, args.conf)

if __name__ == "__main__":
    main()