


























import cv2 import torch import numpy as np from pathlib import Path import argparse import os import csv # 新增 from ultralytics import YOLO class YOLOv8PoseDetector: def __init__(self, model_path='yolov8n-pose.pt'): """ 初始化YOLOv8姿态检测器 Args: model_path: 模型文件路径 """ # 检查GPU可用性 self.device = 'cuda' if torch.cuda.is_available() else 'cpu' print(f"使用设备: {self.device}") # 加载模型 self.model = YOLO(model_path) self.model.to(self.device) # 姿态关键点连接关系 self.skeleton = [ (5, 7), (7, 9), (6, 8), (8, 10), (5, 6), (5, 11), (6, 12), (11, 12), (11, 13), (13, 15), (12, 14), (14, 16) ] # 关键点颜色 self.keypoint_colors = [ (255, 0, 0), (0, 255, 0), (0, 0, 255), (255, 255, 0), (255, 0, 255), (0, 255, 255), (255, 128, 0), (0, 128, 255), (128, 255, 0), (255, 0, 128), (0, 255, 128), (128, 0, 255), (255, 128, 128), (128, 255, 128), (128, 128, 255), (255, 255, 255) ] # 给每一根骨架连线定义不同颜色(BGR格式) self.connection_colors = [ (255, 0, 0), # 蓝 (0, 255, 0), # 绿 (0, 0, 255), # 红 (255, 255, 0), # 青 (255, 0, 255), # 紫 (0, 255, 255), # 黄 (128, 0, 128), # 深紫 (0, 128, 255), # 橙 (128, 255, 0), # 浅绿 (255, 128, 0), # 蓝橙 (0, 128, 128), # 橄榄 (128, 128, 0), # 墨绿 ] def resize_frame(self, frame, max_width=1280, max_height=720): """ 调整帧大小,保持宽高比 Args: frame: 输入帧 max_width: 最大宽度 max_height: 最大高度 Returns: numpy.ndarray: 调整大小后的帧 """ height, width = frame.shape[:2] # 如果帧尺寸已经小于最大尺寸,则不需要调整 if width <= max_width and height <= max_height: return frame # 计算缩放比例 scale = min(max_width / width, max_height / height) new_width = int(width * scale) new_height = int(height * scale) return cv2.resize(frame, (new_width, new_height)) def draw_pose(self, frame, keypoints, confidence_threshold=0.5): """ 在帧上绘制人体姿态 Args: frame: 输入帧 keypoints: 关键点坐标 confidence_threshold: 置信度阈值 Returns: numpy.ndarray: 绘制了姿态的帧 """ # 绘制骨架连接 for idx,connection in enumerate(self.skeleton): start_idx, end_idx = connection index_color = self.connection_colors[idx % len(self.connection_colors)] # 检查关键点是否存在且置信度足够 if (len(keypoints) > start_idx and len(keypoints) > end_idx and keypoints[start_idx][2] > confidence_threshold and keypoints[end_idx][2] > confidence_threshold): start_point = (int(keypoints[start_idx][0]), int(keypoints[start_idx][1])) end_point = (int(keypoints[end_idx][0]), int(keypoints[end_idx][1])) # 绘制连接线 cv2.line(frame, start_point, end_point, index_color, 2) # 绘制关键点 for i, keypoint in enumerate(keypoints): if keypoint[2] > confidence_threshold: x, y = int(keypoint[0]), int(keypoint[1]) cv2.circle(frame, (x, y), 2+1, self.keypoint_colors[i % len(self.keypoint_colors)], thickness=-1) cv2.circle(frame, (x, y), 2+1, (0,255,0), thickness=-1) return frame # 新增:保存关节坐标到 CSV def save_keypoints_to_csv(self, keypoints_batch, csv_path, confidence_threshold=0.5): """ 保存所有人的关节坐标到CSV 格式:人id, 关节编号, x, y """ with open(csv_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) # 写入表头 writer.writerow(['person_id', 'keypoint_id', 'x', 'y']) # 遍历每个人 for person_id, keypoints in enumerate(keypoints_batch): # 遍历每个关节 for kp_id, kp in enumerate(keypoints): x, y, conf = kp if conf > confidence_threshold: writer.writerow([person_id, kp_id, round(float(x), 2), round(float(y), 2)]) def process_image(self, image_path, output_path=None, confidence=0.5): """ 处理单张图片 Args: image_path: 输入图片路径 output_path: 输出图片路径 confidence: 检测置信度 Returns: numpy.ndarray: 处理后的图像 """ # 读取图片 image = cv2.imread(image_path) if image is None: raise ValueError(f"无法读取图片: {image_path}") # 使用YOLOv8进行姿态检测 results = self.model(image, conf=confidence, device=self.device) # 存储所有人物关键点 all_keypoints = [] # 绘制结果 for result in results: if result.keypoints is not None: keypoints_batch = result.keypoints.data.cpu().numpy() all_keypoints = keypoints_batch for keypoints in keypoints_batch: image = self.draw_pose(image, keypoints, confidence) # 保存图片 if output_path: cv2.imwrite(output_path, image) # ===================== 保存CSV ===================== csv_path = os.path.splitext(output_path)[0] + ".csv" self.save_keypoints_to_csv(all_keypoints, csv_path, confidence) return image # 批量处理文件夹 def process_folder(self, input_dir, output_dir, confidence=0.5): # 自动创建输出目录 os.makedirs(output_dir, exist_ok=True) print(f"输出目录已创建: {output_dir}") # 支持的图片格式 exts = ['.jpg', '.jpeg', '.png', '.bmp', '.tiff'] input_path = Path(input_dir) image_files = [f for f in input_path.glob('*') if f.suffix.lower() in exts] if not image_files: print("目录中没有图片!") return total = len(image_files) print(f"找到 {total} 张图片,开始处理...\n") for i, img in enumerate(image_files, 1): print(f"[{i}/{total}] 处理: {img.name}") out_path = os.path.join(output_dir, img.name) try: self.process_image(str(img), out_path, confidence) print(f"✅ 已保存:{img.name} | {Path(out_path).stem}.csv") except Exception as e: print(f"处理失败: {e}") print("\n✅ 全部图片处理完成!") def main(): # 固定你要的路径 INPUT_DIR = r"G:\Dataset\YXR-data\PPE only-multi worker(尝试即可)" OUTPUT_DIR = r"G:\Dataset\YXR-data\output\PPE only-multi worker(尝试即可)" parser = argparse.ArgumentParser(description='YOLOv8姿态检测 - 批量图片') parser.add_argument('--model', type=str, default='yolov8m-pose.pt', help='模型') parser.add_argument('--conf', type=float, default=0.25, help='置信度') args = parser.parse_args() detector = YOLOv8PoseDetector(args.model) print("="*50) print("批量姿态检测 + 关节坐标CSV保存") print(f"输入: {INPUT_DIR}") print(f"输出: {OUTPUT_DIR}") print("="*50) detector.process_folder(INPUT_DIR, OUTPUT_DIR, args.conf) if __name__ == "__main__": main()
此内容由惯性聚合(RSS阅读器)自动聚合整理,仅供阅读参考。 原文来自 — 版权归原作者所有。