From dc381b81e5444f2dfaad15813bbd17a878c5b137 Mon Sep 17 00:00:00 2001 From: 51hhh Date: Mon, 11 Aug 2025 13:46:05 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=94=B9=EF=BC=8C=E7=B2=BE=E7=AE=80?= =?UTF-8?q?=E4=B8=8B=E4=BD=8D=E6=9C=BA=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 93 ++------- pi_video_client.py | 43 +++++ src/hand_detection_3d.py | 316 ------------------------------ src/process_video_3d.py | 168 ---------------- src/web_preview.py | 213 --------------------- src/web_server.py | 401 --------------------------------------- 6 files changed, 62 insertions(+), 1172 deletions(-) create mode 100644 pi_video_client.py delete mode 100644 src/hand_detection_3d.py delete mode 100644 src/process_video_3d.py delete mode 100644 src/web_preview.py delete mode 100644 src/web_server.py diff --git a/README.md b/README.md index 5c96ca5..1bbf6f3 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,25 @@ ### 目录介绍 + src/:下位机源码 -+ start_robot_client:启动机械臂客户端脚本 ++ + arm.py 机械臂控制脚本 ++ + arm3d_drag_demo.py 机械臂3D拖拽控制脚本 ++ + robot_client.py 机械臂客户端脚本 ++ + test_motors.py 电机测试脚本 ++ + ZDT_Servo.py ZDT步进电机驱动脚本 ++ start_robot_client.sh:启动机械臂客户端脚本 ++ pi_video_client.py:树莓派视频流处理脚本 + + + +**注意:**启动客户端和视频传输之前需要先启动服务端,请参考[服务端部署文档](https://github.com/ricklentz/checkhand/blob/main/README_server.md) + + +连接配置以参数传入的方式启动客户端,可以在启动脚本中修改参数 + +```sh +SERVER="http://192.168.114.26:5000" +MOCK="" +``` ### 方式一:使用启动脚本(推荐) @@ -21,76 +39,3 @@ ``` - -### 方式二:手动启动 - -#### 1. 创建虚拟环境并安装依赖 - -```bash -python3 -m venv venv -source venv/bin/activate # Linux/Mac -``` - -#### 2. 启动Web服务器 - -```bash -python run_web_service.py -``` - -#### 3. 启动机械臂客户端( - -```bash -cd src -python robot_client.py --mock -``` - -### 3. 访问Web界面 - -打开浏览器访问: `http://localhost:5000` - -## 启动脚本选项 - -### Web服务器启动选项: -```bash -./start_service.sh --help # 查看帮助 -./start_service.sh # 基本启动 -./start_service.sh --host 0.0.0.0 --port 8080 # 自定义地址和端口 -./start_service.sh --debug # 调试模式 -./start_service.sh --test-video data/videos/test.mp4 # 本地视频测试 -``` - -### 机械臂客户端启动选项: -```bash -./start_robot_client.sh --help # 查看帮助 -./start_robot_client.sh # 基本启动(模拟模式) -./start_robot_client.sh --server http://192.168.1.100:5000 # 连接远程服务器 -./start_robot_client.sh --real # 真实机械臂模式 -``` - -## 功能特性 - -- ✅ 使用MediaPipe进行实时手部检测 -- ✅ WebSocket实时通信 -- ✅ 3D坐标计算(X、Y、Z轴角度) -- ✅ Web预览界面 -- ✅ 机械臂控制接口 -- ✅ 本地视频测试支持 - -## 系统架构 - -``` -视频客户端 → Web服务器 → MediaPipe → 3D坐标计算 → 客户端(Web预览/机械臂) -``` - -## 控制信号格式 - -```json -{ - "x_angle": 90.0, // X轴角度 (0-180°) - "y_angle": 90.0, // Y轴角度 (0-180°) - "z_angle": 90.0, // Z轴角度 (0-180°) - "grip": 0, // 抓取状态 (0=松开, 1=抓取) - "action": "none", // 当前动作 - "speed": 5 // 移动速度 (1-10) -} -``` \ No newline at end of file diff --git a/pi_video_client.py b/pi_video_client.py new file mode 100644 index 0000000..97fda14 --- /dev/null +++ b/pi_video_client.py @@ -0,0 +1,43 @@ + +import base64 +import socketio +import time +from picamera2 import Picamera2 +import numpy as np + +SERVER_URL = 'http://192.168.114.26:5000' # 修改为你的服务端地址和端口 + +sio = socketio.Client() + +@sio.event +def connect(): + print('已连接到服务端') + sio.emit('register_client', {'type': 'pi_camera'}) + +@sio.event +def disconnect(): + print('与服务端断开连接') + +def main(): + sio.connect(SERVER_URL) + picam = Picamera2() + config = picam.create_video_configuration(main={'size': (640, 480)}) + picam.configure(config) + picam.start() + time.sleep(1) # 等待摄像头启动 + + try: + while True: + frame = picam.capture_array() + # 转为JPEG并编码为base64 + import cv2 + _, buffer = cv2.imencode('.jpg', frame, [int(cv2.IMWRITE_JPEG_QUALITY), 80]) + jpg_as_text = base64.b64encode(buffer).decode('utf-8') + sio.emit('video_frame', {'frame': jpg_as_text}) + time.sleep(1/30) # 控制帧率 + finally: + picam.close() + sio.disconnect() + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/src/hand_detection_3d.py b/src/hand_detection_3d.py deleted file mode 100644 index 8b5507f..0000000 --- a/src/hand_detection_3d.py +++ /dev/null @@ -1,316 +0,0 @@ -import cv2 -import numpy as np -import math -import mediapipe as mp - -# 初始化MediaPipe Hands -mp_hands = mp.solutions.hands -mp_drawing = mp.solutions.drawing_utils -mp_drawing_styles = mp.solutions.drawing_styles - -# 加载MediaPipe手部检测模型 -def load_mediapipe_model(max_num_hands=1, min_detection_confidence=0.5, min_tracking_confidence=0.5): - """ - 加载MediaPipe手部检测模型 - - 参数: - max_num_hands: 最大检测手的数量 - min_detection_confidence: 最小检测置信度 - min_tracking_confidence: 最小跟踪置信度 - - 返回: - MediaPipe Hands对象 - """ - hands = mp_hands.Hands( - static_image_mode=False, - max_num_hands=max_num_hands, - min_detection_confidence=min_detection_confidence, - min_tracking_confidence=min_tracking_confidence - ) - return hands - -# 处理帧并返回三轴控制信号 -def process_frame_3d(frame, hands_model, prev_hand_data=None): - """ - 处理视频帧,检测手部并计算三轴控制信号 - - 参数: - frame: 输入的BGR格式帧 - hands_model: MediaPipe Hands模型 - prev_hand_data: 前一帧的手部数据 - - 返回: - 三轴控制信号字典和当前手部数据 - """ - # 获取帧尺寸 - frame_height, frame_width = frame.shape[:2] - - # 初始化三轴控制信号 - control_signal = { - "x_angle": 90, # 水平角度 (左右) - "y_angle": 90, # 垂直角度 (上下) - "z_angle": 90, # 深度角度 (前后) - "grip": 0, # 抓取状态 (0=松开, 1=抓取) - "action": "none", # 当前动作 - "speed": 5 # 移动速度 (1-10) - } - - # MediaPipe需要RGB格式的图像 - rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) - - # 处理图像 - results = hands_model.process(rgb_frame) - - # 如果没有检测到手,返回默认控制信号和前一帧数据 - if not results.multi_hand_landmarks: - return control_signal, prev_hand_data - - # 获取第一只手的关键点 - hand_landmarks = results.multi_hand_landmarks[0] - - # 计算手部中心点 (使用所有关键点的平均位置) - center_x, center_y, center_z = 0, 0, 0 - for landmark in hand_landmarks.landmark: - center_x += landmark.x - center_y += landmark.y - center_z += landmark.z - - num_landmarks = len(hand_landmarks.landmark) - center_x /= num_landmarks - center_y /= num_landmarks - center_z /= num_landmarks - - # 转换为像素坐标 - hand_center = { - 'x': center_x * frame_width, - 'y': center_y * frame_height, - 'z': center_z # 保留归一化的z值 - } - - # 获取更精确的手部姿态信息 - wrist = hand_landmarks.landmark[mp_hands.HandLandmark.WRIST] - index_finger_tip = hand_landmarks.landmark[mp_hands.HandLandmark.INDEX_FINGER_TIP] - pinky_tip = hand_landmarks.landmark[mp_hands.HandLandmark.PINKY_TIP] - thumb_tip = hand_landmarks.landmark[mp_hands.HandLandmark.THUMB_TIP] - - # 计算手部方向向量 (从手腕到手中心) - direction_vector = { - 'x': center_x - wrist.x, - 'y': center_y - wrist.y, - 'z': center_z - wrist.z - } - - # 计算屏幕中心点 - frame_center = { - 'x': frame_width / 2, - 'y': frame_height / 2, - 'z': 0 # 参考点 - } - - # 计算手部相对于屏幕中心的位移 - dx = hand_center['x'] - frame_center['x'] - dy = hand_center['y'] - frame_center['y'] - dz = hand_center['z'] * 10 # 缩放z值使其更明显 - - # 转换为机械臂的三个轴的角度 - # X轴角度 (左右移动) - 范围0-180度,中间是90度 - x_angle = map_to_angle(dx, frame_width / 2) - - # Y轴角度 (上下移动) - 范围0-180度,中间是90度 - y_angle = map_to_angle(-dy, frame_height / 2) # 注意负号:图像y轴向下,但机械臂y轴向上 - - # Z轴角度 (前后移动) - 范围0-180度,中间是90度 - # Z值需要正确映射,这里我们假设z值范围在-0.5至0.5之间 - # 越靠近摄像头z值越小(更负),越远离摄像头z值越大(更正) - z_angle = 90 + (dz * 90) # 将z值映射到0-180范围 - z_angle = max(0, min(180, z_angle)) # 确保在有效范围内 - - # 检测抓取动作 (拇指和食指距离) - pinch_distance = calculate_3d_distance( - thumb_tip.x, thumb_tip.y, thumb_tip.z, - index_finger_tip.x, index_finger_tip.y, index_finger_tip.z - ) - - # 如果拇指和食指距离小于阈值,认为是抓取状态 - grip = 1 if pinch_distance < 0.05 else 0 - - # 检测手部动作 (如果有前一帧数据) - action = "none" - speed = 5 # 默认中等速度 - - if prev_hand_data: - # 计算移动向量 - move_x = hand_center['x'] - prev_hand_data['x'] - move_y = hand_center['y'] - prev_hand_data['y'] - move_z = hand_center['z'] - prev_hand_data['z'] - - # 计算移动距离 - move_distance = math.sqrt(move_x**2 + move_y**2 + move_z**2) - - # 如果移动足够大,确定主要移动方向 - if move_distance > 0.01: - # 计算移动速度 (1-10范围) - speed = min(10, max(1, int(move_distance * 200))) - - # 确定主导移动方向 - abs_move = [abs(move_x), abs(move_y), abs(move_z)] - max_move = max(abs_move) - - if max_move == abs_move[0]: # X轴移动 - action = "right" if move_x > 0 else "left" - elif max_move == abs_move[1]: # Y轴移动 - action = "down" if move_y > 0 else "up" - else: # Z轴移动 - action = "forward" if move_z < 0 else "backward" - - # 更新控制信号 - control_signal = { - "x_angle": x_angle, - "y_angle": y_angle, - "z_angle": z_angle, - "grip": grip, - "action": action, - "speed": speed - } - - # 在帧上绘制手部关键点和连接线 - mp_drawing.draw_landmarks( - frame, - hand_landmarks, - mp_hands.HAND_CONNECTIONS, - mp_drawing_styles.get_default_hand_landmarks_style(), - mp_drawing_styles.get_default_hand_connections_style() - ) - - # 绘制三轴控制指示器 - draw_3d_control_visualization(frame, control_signal, hand_center) - - return control_signal, hand_center - -def calculate_3d_distance(x1, y1, z1, x2, y2, z2): - """计算3D空间中两点之间的欧几里得距离""" - return math.sqrt((x2 - x1)**2 + (y2 - y1)**2 + (z2 - z1)**2) - -def map_to_angle(value, max_value): - """将-max_value到max_value范围内的值映射到0-180度范围的角度""" - # 计算相对位置 (-1到1) - relative_position = value / max_value - - # 映射到角度 (90度为中点) - angle = 90 + (relative_position * 45) # 使用45度作为最大偏移量 - - # 确保角度在有效范围内 - return max(0, min(180, angle)) - -def draw_3d_control_visualization(frame, control_signal, hand_center): - """在帧上绘制三轴控制可视化""" - height, width = frame.shape[:2] - - # 绘制坐标轴 - origin_x, origin_y = width - 150, height - 150 - axis_length = 100 - - # X轴 (红色) - cv2.line(frame, (origin_x, origin_y), (origin_x + axis_length, origin_y), (0, 0, 255), 2) - cv2.putText(frame, "X", (origin_x + axis_length + 5, origin_y + 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2) - - # Y轴 (绿色) - cv2.line(frame, (origin_x, origin_y), (origin_x, origin_y - axis_length), (0, 255, 0), 2) - cv2.putText(frame, "Y", (origin_x - 15, origin_y - axis_length - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) - - # Z轴 (蓝色) - 以45度角向右上方 - z_x = int(origin_x + axis_length * 0.7) - z_y = int(origin_y - axis_length * 0.7) - cv2.line(frame, (origin_x, origin_y), (z_x, z_y), (255, 0, 0), 2) - cv2.putText(frame, "Z", (z_x + 5, z_y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2) - - # 绘制控制角度值 - x_text = f"X: {control_signal['x_angle']:.1f}°" - y_text = f"Y: {control_signal['y_angle']:.1f}°" - z_text = f"Z: {control_signal['z_angle']:.1f}°" - grip_text = f"抓取: {'开' if control_signal['grip'] == 0 else '关'}" - action_text = f"动作: {control_signal['action']}" - speed_text = f"速度: {control_signal['speed']}" - - cv2.putText(frame, x_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) - cv2.putText(frame, y_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) - cv2.putText(frame, z_text, (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2) - cv2.putText(frame, grip_text, (10, 120), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 0), 2) - cv2.putText(frame, action_text, (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2) - cv2.putText(frame, speed_text, (10, 180), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 255), 2) - - # 绘制手部位置到屏幕中心的连接线 - if hand_center: - center_x, center_y = int(width/2), int(height/2) - hand_x, hand_y = int(hand_center['x']), int(hand_center['y']) - - # 绘制屏幕中心点 - cv2.circle(frame, (center_x, center_y), 5, (0, 0, 255), -1) - - # 绘制手部位置点 - cv2.circle(frame, (hand_x, hand_y), 10, (0, 255, 0), -1) - - # 绘制连接线 - cv2.line(frame, (center_x, center_y), (hand_x, hand_y), (255, 0, 0), 2) - -def analyze_hand_gesture(hand_landmarks): - """分析手势以确定机械臂控制模式""" - # 获取指尖和手掌关键点 - thumb_tip = hand_landmarks.landmark[mp_hands.HandLandmark.THUMB_TIP] - index_tip = hand_landmarks.landmark[mp_hands.HandLandmark.INDEX_FINGER_TIP] - middle_tip = hand_landmarks.landmark[mp_hands.HandLandmark.MIDDLE_FINGER_TIP] - ring_tip = hand_landmarks.landmark[mp_hands.HandLandmark.RING_FINGER_TIP] - pinky_tip = hand_landmarks.landmark[mp_hands.HandLandmark.PINKY_TIP] - - # 获取手掌底部关键点 - wrist = hand_landmarks.landmark[mp_hands.HandLandmark.WRIST] - index_mcp = hand_landmarks.landmark[mp_hands.HandLandmark.INDEX_FINGER_MCP] - pinky_mcp = hand_landmarks.landmark[mp_hands.HandLandmark.PINKY_MCP] - - # 检测拳头 - 所有手指弯曲 - fist = ( - thumb_tip.y > index_mcp.y and - index_tip.y > index_mcp.y and - middle_tip.y > index_mcp.y and - ring_tip.y > index_mcp.y and - pinky_tip.y > pinky_mcp.y - ) - - # 检测手掌打开 - 所有手指伸直 - open_palm = ( - thumb_tip.y < wrist.y and - index_tip.y < index_mcp.y and - middle_tip.y < index_mcp.y and - ring_tip.y < index_mcp.y and - pinky_tip.y < pinky_mcp.y - ) - - # 检测指向手势 - 食指伸出,其他手指弯曲 - pointing = ( - index_tip.y < index_mcp.y and - middle_tip.y > index_mcp.y and - ring_tip.y > index_mcp.y and - pinky_tip.y > pinky_mcp.y - ) - - # 检测"OK"手势 - 拇指和食指形成圆圈,其他手指伸出 - pinch_distance = calculate_3d_distance( - thumb_tip.x, thumb_tip.y, thumb_tip.z, - index_tip.x, index_tip.y, index_tip.z - ) - ok_gesture = (pinch_distance < 0.05 and - middle_tip.y < index_mcp.y and - ring_tip.y < index_mcp.y and - pinky_tip.y < pinky_mcp.y) - - # 返回识别的手势 - if fist: - return "fist" - elif open_palm: - return "open_palm" - elif pointing: - return "pointing" - elif ok_gesture: - return "ok" - else: - return "unknown" \ No newline at end of file diff --git a/src/process_video_3d.py b/src/process_video_3d.py deleted file mode 100644 index 0fbd008..0000000 --- a/src/process_video_3d.py +++ /dev/null @@ -1,168 +0,0 @@ -import cv2 -import numpy as np -import time -import os - -# 导入我们的3D手部检测模块 -from src.hand_detection_3d import load_mediapipe_model, process_frame_3d - -# 尝试导入 Picamera2,如果不可用则使用标准OpenCV -try: - from picamera2 import Picamera2 - PICAMERA_AVAILABLE = True -except ImportError: - PICAMERA_AVAILABLE = False - -def process_camera_3d(camera_id=0, output_path=None, use_picamera=True): - """ - 使用摄像头进行3D手部检测 - 简化版本 - - 参数: - camera_id: 摄像头ID (默认为0) - output_path: 输出视频文件路径 (可选) - use_picamera: 是否优先使用Picamera2 (树莓派摄像头) - """ - # 加载MediaPipe手部检测模型 - hands_model = load_mediapipe_model(max_num_hands=1) - - # 初始化摄像头 - cap = None - picam2 = None - - if use_picamera and PICAMERA_AVAILABLE: - # 使用 Picamera2 (树莓派摄像头) - try: - print("正在初始化 Picamera2...") - picam2 = Picamera2() - video_config = picam2.create_video_configuration(main={"size": (640, 480)}) - picam2.configure(video_config) - picam2.start() - print("✅ Picamera2 启动成功") - frame_width, frame_height = 640, 480 - except Exception as e: - print(f"❌ Picamera2 启动失败: {e}") - print("⚠️ 回退到 OpenCV VideoCapture") - picam2 = None - use_picamera = False - - if not use_picamera or not PICAMERA_AVAILABLE: - # 使用标准 OpenCV VideoCapture - cap = cv2.VideoCapture(camera_id) - if not cap.isOpened(): - print(f"无法打开摄像头 ID: {camera_id}") - return - frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) - frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) - - fps = 30 - - # 创建视频写入器(如果需要保存) - video_writer = None - if output_path: - fourcc = cv2.VideoWriter_fourcc(*'XVID') - video_writer = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height)) - - # 前一帧的手部数据 - prev_hand_data = None - - # 初始化性能计数器 - frame_count = 0 - start_time = time.time() - - print("控制说明:") - print("- 移动手部显示控制信号") - print("- 拇指和食指捏合显示抓取状态") - print("- 按 Ctrl+C 退出") - - try: - while True: - # 读取一帧 - ret = False - frame = None - - if picam2: - try: - frame = picam2.capture_array() - frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) - ret = True - except Exception as e: - print(f"Picamera2 读取帧失败: {e}") - break - else: - ret, frame = cap.read() - - if not ret or frame is None: - break - - frame_count += 1 - - # 处理帧并获取三轴控制信号 - control_signal, prev_hand_data = process_frame_3d(frame, hands_model, prev_hand_data) - - # 显示控制信号 - if control_signal and frame_count % 30 == 0: # 每秒显示一次 - print_control_signal(control_signal) - - # 写入输出视频 - if video_writer: - video_writer.write(frame) - - except KeyboardInterrupt: - print("\n收到中断信号,正在退出...") - except Exception as e: - print(f"程序运行时出错: {e}") - finally: - print("正在清理资源...") - - # 释放资源 - if picam2: - try: - picam2.stop() - picam2.close() - except: - pass - - if cap: - try: - cap.release() - except: - pass - - if video_writer: - try: - video_writer.release() - print(f"✅ 视频已保存: {output_path}") - except: - pass - - # 显示统计信息 - elapsed_time = time.time() - start_time - if frame_count > 0: - avg_fps = frame_count / elapsed_time - print(f"📊 处理了 {frame_count} 帧,平均FPS: {avg_fps:.1f}") - - print("✅ 程序已退出") - -def print_control_signal(control_signal): - """显示手部控制信号信息""" - print(f"手部控制信号:") - print(f" X轴角度: {control_signal['x_angle']:.1f}°") - print(f" Y轴角度: {control_signal['y_angle']:.1f}°") - print(f" Z轴角度: {control_signal['z_angle']:.1f}°") - print(f" 抓取状态: {'抓取' if control_signal['grip'] == 1 else '释放'}") - print(f" 动作: {control_signal['action']}") - print("-------------------------------") - -if __name__ == "__main__": - import argparse - - parser = argparse.ArgumentParser(description='简化版3D手部检测') - parser.add_argument('--camera-id', '-i', type=int, default=0, help='摄像头ID (默认为0)') - parser.add_argument('--output', '-o', help='输出视频文件路径') - - args = parser.parse_args() - - process_camera_3d( - camera_id=args.camera_id, - output_path=args.output - ) \ No newline at end of file diff --git a/src/web_preview.py b/src/web_preview.py deleted file mode 100644 index 8ceb9a2..0000000 --- a/src/web_preview.py +++ /dev/null @@ -1,213 +0,0 @@ -#!/usr/bin/env python3 -""" -网页预览版手部检测系统 -通过Flask提供网页界面,解决树莓派图形界面问题 -""" - -import cv2 -import threading -import time -from flask import Flask, render_template_string, Response -from picamera2 import Picamera2 -import sys -import os - -# 导入手部检测模块 -sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) -from src.hand_detection_3d import load_mediapipe_model, process_frame_3d - -app = Flask(__name__) - -class WebPreviewCamera: - def __init__(self): - self.picam2 = None - self.hands_model = load_mediapipe_model(max_num_hands=1) - self.prev_hand_data = None - self.latest_control_signal = None - - def start_camera(self): - """启动摄像头""" - self.picam2 = Picamera2() - video_config = self.picam2.create_video_configuration(main={"size": (640, 480)}) - self.picam2.configure(video_config) - self.picam2.start() - - def get_frame(self): - """获取处理后的帧""" - if not self.picam2: - return None - - # 获取原始帧 - frame = self.picam2.capture_array() - frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) - - # 进行手部检测 - control_signal, self.prev_hand_data = process_frame_3d( - frame, self.hands_model, self.prev_hand_data - ) - self.latest_control_signal = control_signal - - # 在帧上添加控制信号信息 - if control_signal: - y_offset = 30 - cv2.putText(frame, f"X: {control_signal['x_angle']:.1f}°", - (10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) - cv2.putText(frame, f"Y: {control_signal['y_angle']:.1f}°", - (10, y_offset + 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) - cv2.putText(frame, f"Z: {control_signal['z_angle']:.1f}°", - (10, y_offset + 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2) - cv2.putText(frame, f"Grip: {'ON' if control_signal['grip'] else 'OFF'}", - (10, y_offset + 75), cv2.FONT_HERSHEY_SIMPLEX, 0.6, - (0, 0, 255) if control_signal['grip'] else (0, 255, 0), 2) - - return frame - - def generate_frames(self): - """生成视频流""" - while True: - frame = self.get_frame() - if frame is None: - continue - - # 编码为JPEG - ret, buffer = cv2.imencode('.jpg', frame) - frame_bytes = buffer.tobytes() - - yield (b'--frame\r\n' - b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n') - - time.sleep(0.033) # ~30 FPS - -# 全局摄像头实例 -camera = WebPreviewCamera() - -@app.route('/') -def index(): - """主页面""" - return render_template_string(''' - - - - 🍓 树莓派手部检测预览 - - - -
-

🍓 树莓派3D手部检测系统

- -
-

操作说明:

-

• 将手部放在摄像头前

-

• 移动手部查看三轴角度变化

-

• 拇指和食指捏合触发抓取检测

-

• 实时显示控制信号

-
- -
- 手部检测预览 -
- -
- - -
- -
-

访问地址: http://树莓派IP:5000

-

提示: 可以在手机、电脑等任意设备上打开此页面查看预览

-
-
- - - - - ''') - -@app.route('/video_feed') -def video_feed(): - """视频流路由""" - return Response(camera.generate_frames(), - mimetype='multipart/x-mixed-replace; boundary=frame') - -def start_web_preview(): - """启动网页预览服务""" - print("🍓 启动树莓派网页预览系统...") - print("📷 初始化摄像头...") - - try: - camera.start_camera() - print("✅ 摄像头启动成功") - print("🌐 启动网页服务器...") - print("📱 请在浏览器中访问: http://localhost:5000") - print("📱 或在其他设备访问: http://树莓派IP:5000") - print("🛑 按 Ctrl+C 停止服务") - - # 启动Flask应用 - app.run(host='0.0.0.0', port=5000, debug=False, threaded=True) - - except KeyboardInterrupt: - print("\n🛑 用户停止服务") - except Exception as e: - print(f"❌ 错误: {e}") - finally: - if camera.picam2: - camera.picam2.stop() - camera.picam2.close() - print("✅ 网页预览系统已停止") - -if __name__ == "__main__": - start_web_preview() \ No newline at end of file diff --git a/src/web_server.py b/src/web_server.py deleted file mode 100644 index 0898459..0000000 --- a/src/web_server.py +++ /dev/null @@ -1,401 +0,0 @@ -#!/usr/bin/env python3 -""" -实时手部检测Web服务器 -支持WebSocket通信,实时视频流处理和机械臂控制 -""" - -import asyncio -import base64 -import json -import logging -import os -import time -from threading import Thread -from typing import Dict, Optional, Any - -import cv2 -import numpy as np -from flask import Flask, render_template, request -from flask_socketio import SocketIO, emit -from PIL import Image -import io - -from hand_detection_3d import load_mediapipe_model, process_frame_3d - -# 配置日志 -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -class HandDetectionWebServer: - """实时手部检测Web服务器""" - - def __init__(self, host='0.0.0.0', port=5000): - self.host = host - self.port = port - - # Flask应用和SocketIO - # 设置模板和静态文件路径 - import os - template_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'templates') - self.app = Flask(__name__, template_folder=template_dir) - self.app.config['SECRET_KEY'] = 'hand_detection_secret_key' - self.socketio = SocketIO(self.app, cors_allowed_origins="*", async_mode='threading') - - # MediaPipe模型 - self.hands_model = load_mediapipe_model() - - # 状态管理 - self.clients = {} # 连接的客户端 - self.current_frame = None - self.previous_hand_data = None - self.detection_results = { - 'x_angle': 90, - 'y_angle': 90, - 'z_angle': 90, - 'grip': 0, - 'action': 'none', - 'speed': 5, - 'timestamp': time.time() - } - - # 性能监控 - self.fps_counter = 0 - self.last_fps_time = time.time() - self.current_fps = 0 - - # 配置路由和事件处理 - self._setup_routes() - self._setup_socket_events() - - def _setup_routes(self): - """设置HTTP路由""" - - @self.app.route('/') - def index(): - """主页面""" - return render_template('index.html') - - @self.app.route('/api/status') - def api_status(): - """获取系统状态""" - return { - 'status': 'running', - 'fps': self.current_fps, - 'clients': len(self.clients), - 'detection_results': self.detection_results - } - - def _setup_socket_events(self): - """设置WebSocket事件处理""" - - @self.socketio.on('connect') - def handle_connect(): - """客户端连接""" - client_id = request.sid - self.clients[client_id] = { - 'connected_at': time.time(), - 'type': 'unknown', - 'last_ping': time.time() - } - logger.info(f"客户端 {client_id} 已连接") - - # 发送欢迎消息和当前状态 - emit('status', { - 'message': '连接成功', - 'client_id': client_id, - 'current_results': self.detection_results - }) - - @self.socketio.on('disconnect') - def handle_disconnect(): - """客户端断开连接""" - client_id = request.sid - if client_id in self.clients: - del self.clients[client_id] - logger.info(f"客户端 {client_id} 已断开连接") - - @self.socketio.on('register_client') - def handle_register_client(data): - """注册客户端类型""" - client_id = request.sid - client_type = data.get('type', 'unknown') - - if client_id in self.clients: - self.clients[client_id]['type'] = client_type - logger.info(f"客户端 {client_id} 注册为: {client_type}") - - emit('registration_success', { - 'client_id': client_id, - 'type': client_type - }) - - @self.socketio.on('video_frame') - def handle_video_frame(data): - """处理视频帧""" - try: - # 解码base64图像 - frame_data = base64.b64decode(data['frame']) - frame = self._decode_frame(frame_data) - - if frame is not None: - # 处理帧并检测手部 - control_signal, hand_data = process_frame_3d( - frame, self.hands_model, self.previous_hand_data - ) - - # 更新检测结果 - self.detection_results = control_signal - self.detection_results['timestamp'] = time.time() - self.previous_hand_data = hand_data - - # 编码处理后的帧 - processed_frame_data = self._encode_frame(frame) - - # 发送结果给web预览客户端 - self.socketio.emit('detection_results', { - 'control_signal': control_signal, - 'processed_frame': processed_frame_data, - 'fps': self.current_fps - }, room=None) - - # 发送控制信号给机械臂客户端 - self._send_to_robot_clients(control_signal) - - # 更新FPS - self._update_fps() - - except Exception as e: - logger.error(f"处理视频帧时出错: {e}") - emit('error', {'message': str(e)}) - - @self.socketio.on('ping') - def handle_ping(): - """处理ping请求""" - client_id = request.sid - if client_id in self.clients: - self.clients[client_id]['last_ping'] = time.time() - emit('pong', {'timestamp': time.time()}) - - @self.socketio.on('get_detection_results') - def handle_get_detection_results(): - """获取最新的检测结果""" - emit('detection_results', { - 'control_signal': self.detection_results, - 'fps': self.current_fps - }) - - @self.socketio.on('start_local_test') - def handle_start_local_test(data=None): - """处理开始本地测试请求""" - try: - # 如果提供了视频路径,使用指定的视频 - if data and 'video_path' in data: - test_video = data['video_path'] - if not os.path.exists(test_video): - emit('test_error', { - 'message': f'视频文件不存在: {test_video}' - }) - return - else: - # 检查是否有默认测试视频 - test_videos = [ - 'data/videos/test_basic.mp4', - 'data/videos/test_gesture.mp4' - ] - - # 找到第一个存在的测试视频 - test_video = None - for video_path in test_videos: - if os.path.exists(video_path): - test_video = video_path - break - - if not test_video: - # 没有找到测试视频,提供帮助信息 - emit('test_error', { - 'message': '未找到测试视频文件', - 'help': '请先生成测试视频:python create_test_video.py' - }) - return - - logger.info(f"开始本地测试,使用视频: {test_video}") - self.start_local_video_test(test_video) - - emit('test_started', { - 'message': f'本地测试已开始,使用视频: {os.path.basename(test_video)}', - 'video_path': test_video - }) - - except Exception as e: - logger.error(f"启动本地测试时出错: {e}") - emit('test_error', { - 'message': f'启动本地测试失败: {str(e)}' - }) - - @self.socketio.on('get_video_list') - def handle_get_video_list(): - """获取可用的视频文件列表""" - try: - video_dirs = ['data/videos', 'videos', '.'] - video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.wmv', '.flv'] - videos = [] - - for video_dir in video_dirs: - if os.path.exists(video_dir): - for file in os.listdir(video_dir): - if any(file.lower().endswith(ext) for ext in video_extensions): - file_path = os.path.join(video_dir, file) - try: - file_size = os.path.getsize(file_path) - size_mb = file_size / (1024 * 1024) - videos.append({ - 'path': file_path, - 'name': file, - 'size': f'{size_mb:.1f}MB' - }) - except OSError: - continue - - # 按文件名排序 - videos.sort(key=lambda x: x['name']) - - emit('video_list', { - 'videos': videos - }) - - except Exception as e: - logger.error(f"获取视频列表时出错: {e}") - emit('video_list', { - 'videos': [] - }) - - def _decode_frame(self, frame_data: bytes) -> Optional[np.ndarray]: - """解码图像帧""" - try: - # 使用PIL解码 - image = Image.open(io.BytesIO(frame_data)) - frame = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) - return frame - except Exception as e: - logger.error(f"解码帧时出错: {e}") - return None - - def _encode_frame(self, frame: np.ndarray) -> str: - """编码图像帧为base64""" - try: - # 转换为RGB格式 - frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) - image = Image.fromarray(frame_rgb) - - # 编码为JPEG - buffer = io.BytesIO() - image.save(buffer, format='JPEG', quality=80) - frame_data = base64.b64encode(buffer.getvalue()).decode('utf-8') - - return f"data:image/jpeg;base64,{frame_data}" - except Exception as e: - logger.error(f"编码帧时出错: {e}") - return "" - - def _send_to_robot_clients(self, control_signal: Dict[str, Any]): - """发送控制信号给机械臂客户端""" - robot_clients = [ - client_id for client_id, info in self.clients.items() - if info.get('type') == 'robot' - ] - - if robot_clients: - for client_id in robot_clients: - self.socketio.emit('robot_control', control_signal, room=client_id) - - def _update_fps(self): - """更新FPS计数""" - self.fps_counter += 1 - current_time = time.time() - - if current_time - self.last_fps_time >= 1.0: # 每秒更新一次 - self.current_fps = self.fps_counter - self.fps_counter = 0 - self.last_fps_time = current_time - - def start_local_video_test(self, video_path: str): - """启动本地视频测试""" - def video_test_thread(): - cap = cv2.VideoCapture(video_path) - - while cap.isOpened(): - ret, frame = cap.read() - if not ret: - break - - # 处理帧 - control_signal, hand_data = process_frame_3d( - frame, self.hands_model, self.previous_hand_data - ) - - # 更新状态 - self.detection_results = control_signal - self.detection_results['timestamp'] = time.time() - self.previous_hand_data = hand_data - - # 编码帧 - processed_frame_data = self._encode_frame(frame) - - # 广播结果 - self.socketio.emit('detection_results', { - 'control_signal': control_signal, - 'processed_frame': processed_frame_data, - 'fps': self.current_fps - }) - - # 发送给机械臂 - self._send_to_robot_clients(control_signal) - - # 更新FPS - self._update_fps() - - # 控制帧率 - time.sleep(1/30) # 30 FPS - - cap.release() - - thread = Thread(target=video_test_thread) - thread.daemon = True - thread.start() - logger.info(f"本地视频测试已启动: {video_path}") - - def run(self, debug=False): - """启动Web服务器""" - logger.info(f"启动手部检测Web服务器 http://{self.host}:{self.port}") - self.socketio.run( - self.app, - host=self.host, - port=self.port, - debug=debug, - allow_unsafe_werkzeug=True - ) - -def main(): - """主函数""" - import argparse - - parser = argparse.ArgumentParser(description='实时手部检测Web服务器') - parser.add_argument('--host', default='0.0.0.0', help='服务器地址') - parser.add_argument('--port', type=int, default=5000, help='端口号') - parser.add_argument('--debug', action='store_true', help='调试模式') - parser.add_argument('--test-video', help='本地测试视频路径') - - args = parser.parse_args() - - # 创建服务器实例 - server = HandDetectionWebServer(host=args.host, port=args.port) - - # 如果指定了测试视频,启动本地视频测试 - if args.test_video: - server.start_local_video_test(args.test_video) - - # 启动服务器 - server.run(debug=args.debug) - -if __name__ == '__main__': - main() \ No newline at end of file