更改,精简下位机代码

This commit is contained in:
51hhh 2025-08-11 13:46:05 +08:00
parent d7a816d993
commit dc381b81e5
6 changed files with 62 additions and 1172 deletions

View File

@ -9,7 +9,25 @@
### 目录介绍
+ src/:下位机源码
+ start_robot_client启动机械臂客户端脚本
+ + arm.py 机械臂控制脚本
+ + arm3d_drag_demo.py 机械臂3D拖拽控制脚本
+ + robot_client.py 机械臂客户端脚本
+ + test_motors.py 电机测试脚本
+ + ZDT_Servo.py ZDT步进电机驱动脚本
+ start_robot_client.sh启动机械臂客户端脚本
+ pi_video_client.py树莓派视频流处理脚本
**注意:**启动客户端和视频传输之前需要先启动服务端,请参考[服务端部署文档](https://github.com/ricklentz/checkhand/blob/main/README_server.md)
连接配置以参数传入的方式启动客户端,可以在启动脚本中修改参数
```sh
SERVER="http://192.168.114.26:5000"
MOCK=""
```
### 方式一:使用启动脚本(推荐)
@ -21,76 +39,3 @@
```
### 方式二:手动启动
#### 1. 创建虚拟环境并安装依赖
```bash
python3 -m venv venv
source venv/bin/activate # Linux/Mac
```
#### 2. 启动Web服务器
```bash
python run_web_service.py
```
#### 3. 启动机械臂客户端(
```bash
cd src
python robot_client.py --mock
```
### 3. 访问Web界面
打开浏览器访问: `http://localhost:5000`
## 启动脚本选项
### Web服务器启动选项:
```bash
./start_service.sh --help # 查看帮助
./start_service.sh # 基本启动
./start_service.sh --host 0.0.0.0 --port 8080 # 自定义地址和端口
./start_service.sh --debug # 调试模式
./start_service.sh --test-video data/videos/test.mp4 # 本地视频测试
```
### 机械臂客户端启动选项:
```bash
./start_robot_client.sh --help # 查看帮助
./start_robot_client.sh # 基本启动(模拟模式)
./start_robot_client.sh --server http://192.168.1.100:5000 # 连接远程服务器
./start_robot_client.sh --real # 真实机械臂模式
```
## 功能特性
- ✅ 使用MediaPipe进行实时手部检测
- ✅ WebSocket实时通信
- ✅ 3D坐标计算X、Y、Z轴角度
- ✅ Web预览界面
- ✅ 机械臂控制接口
- ✅ 本地视频测试支持
## 系统架构
```
视频客户端 → Web服务器 → MediaPipe → 3D坐标计算 → 客户端(Web预览/机械臂)
```
## 控制信号格式
```json
{
"x_angle": 90.0, // X轴角度 (0-180°)
"y_angle": 90.0, // Y轴角度 (0-180°)
"z_angle": 90.0, // Z轴角度 (0-180°)
"grip": 0, // 抓取状态 (0=松开, 1=抓取)
"action": "none", // 当前动作
"speed": 5 // 移动速度 (1-10)
}
```

43
pi_video_client.py Normal file
View File

@ -0,0 +1,43 @@
import base64
import socketio
import time
from picamera2 import Picamera2
import numpy as np
SERVER_URL = 'http://192.168.114.26:5000' # 修改为你的服务端地址和端口
sio = socketio.Client()
@sio.event
def connect():
print('已连接到服务端')
sio.emit('register_client', {'type': 'pi_camera'})
@sio.event
def disconnect():
print('与服务端断开连接')
def main():
sio.connect(SERVER_URL)
picam = Picamera2()
config = picam.create_video_configuration(main={'size': (640, 480)})
picam.configure(config)
picam.start()
time.sleep(1) # 等待摄像头启动
try:
while True:
frame = picam.capture_array()
# 转为JPEG并编码为base64
import cv2
_, buffer = cv2.imencode('.jpg', frame, [int(cv2.IMWRITE_JPEG_QUALITY), 80])
jpg_as_text = base64.b64encode(buffer).decode('utf-8')
sio.emit('video_frame', {'frame': jpg_as_text})
time.sleep(1/30) # 控制帧率
finally:
picam.close()
sio.disconnect()
if __name__ == '__main__':
main()

View File

@ -1,316 +0,0 @@
import cv2
import numpy as np
import math
import mediapipe as mp
# 初始化MediaPipe Hands
mp_hands = mp.solutions.hands
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles
# 加载MediaPipe手部检测模型
def load_mediapipe_model(max_num_hands=1, min_detection_confidence=0.5, min_tracking_confidence=0.5):
"""
加载MediaPipe手部检测模型
参数:
max_num_hands: 最大检测手的数量
min_detection_confidence: 最小检测置信度
min_tracking_confidence: 最小跟踪置信度
返回:
MediaPipe Hands对象
"""
hands = mp_hands.Hands(
static_image_mode=False,
max_num_hands=max_num_hands,
min_detection_confidence=min_detection_confidence,
min_tracking_confidence=min_tracking_confidence
)
return hands
# 处理帧并返回三轴控制信号
def process_frame_3d(frame, hands_model, prev_hand_data=None):
"""
处理视频帧检测手部并计算三轴控制信号
参数:
frame: 输入的BGR格式帧
hands_model: MediaPipe Hands模型
prev_hand_data: 前一帧的手部数据
返回:
三轴控制信号字典和当前手部数据
"""
# 获取帧尺寸
frame_height, frame_width = frame.shape[:2]
# 初始化三轴控制信号
control_signal = {
"x_angle": 90, # 水平角度 (左右)
"y_angle": 90, # 垂直角度 (上下)
"z_angle": 90, # 深度角度 (前后)
"grip": 0, # 抓取状态 (0=松开, 1=抓取)
"action": "none", # 当前动作
"speed": 5 # 移动速度 (1-10)
}
# MediaPipe需要RGB格式的图像
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# 处理图像
results = hands_model.process(rgb_frame)
# 如果没有检测到手,返回默认控制信号和前一帧数据
if not results.multi_hand_landmarks:
return control_signal, prev_hand_data
# 获取第一只手的关键点
hand_landmarks = results.multi_hand_landmarks[0]
# 计算手部中心点 (使用所有关键点的平均位置)
center_x, center_y, center_z = 0, 0, 0
for landmark in hand_landmarks.landmark:
center_x += landmark.x
center_y += landmark.y
center_z += landmark.z
num_landmarks = len(hand_landmarks.landmark)
center_x /= num_landmarks
center_y /= num_landmarks
center_z /= num_landmarks
# 转换为像素坐标
hand_center = {
'x': center_x * frame_width,
'y': center_y * frame_height,
'z': center_z # 保留归一化的z值
}
# 获取更精确的手部姿态信息
wrist = hand_landmarks.landmark[mp_hands.HandLandmark.WRIST]
index_finger_tip = hand_landmarks.landmark[mp_hands.HandLandmark.INDEX_FINGER_TIP]
pinky_tip = hand_landmarks.landmark[mp_hands.HandLandmark.PINKY_TIP]
thumb_tip = hand_landmarks.landmark[mp_hands.HandLandmark.THUMB_TIP]
# 计算手部方向向量 (从手腕到手中心)
direction_vector = {
'x': center_x - wrist.x,
'y': center_y - wrist.y,
'z': center_z - wrist.z
}
# 计算屏幕中心点
frame_center = {
'x': frame_width / 2,
'y': frame_height / 2,
'z': 0 # 参考点
}
# 计算手部相对于屏幕中心的位移
dx = hand_center['x'] - frame_center['x']
dy = hand_center['y'] - frame_center['y']
dz = hand_center['z'] * 10 # 缩放z值使其更明显
# 转换为机械臂的三个轴的角度
# X轴角度 (左右移动) - 范围0-180度中间是90度
x_angle = map_to_angle(dx, frame_width / 2)
# Y轴角度 (上下移动) - 范围0-180度中间是90度
y_angle = map_to_angle(-dy, frame_height / 2) # 注意负号图像y轴向下但机械臂y轴向上
# Z轴角度 (前后移动) - 范围0-180度中间是90度
# Z值需要正确映射这里我们假设z值范围在-0.5至0.5之间
# 越靠近摄像头z值越小更负越远离摄像头z值越大更正
z_angle = 90 + (dz * 90) # 将z值映射到0-180范围
z_angle = max(0, min(180, z_angle)) # 确保在有效范围内
# 检测抓取动作 (拇指和食指距离)
pinch_distance = calculate_3d_distance(
thumb_tip.x, thumb_tip.y, thumb_tip.z,
index_finger_tip.x, index_finger_tip.y, index_finger_tip.z
)
# 如果拇指和食指距离小于阈值,认为是抓取状态
grip = 1 if pinch_distance < 0.05 else 0
# 检测手部动作 (如果有前一帧数据)
action = "none"
speed = 5 # 默认中等速度
if prev_hand_data:
# 计算移动向量
move_x = hand_center['x'] - prev_hand_data['x']
move_y = hand_center['y'] - prev_hand_data['y']
move_z = hand_center['z'] - prev_hand_data['z']
# 计算移动距离
move_distance = math.sqrt(move_x**2 + move_y**2 + move_z**2)
# 如果移动足够大,确定主要移动方向
if move_distance > 0.01:
# 计算移动速度 (1-10范围)
speed = min(10, max(1, int(move_distance * 200)))
# 确定主导移动方向
abs_move = [abs(move_x), abs(move_y), abs(move_z)]
max_move = max(abs_move)
if max_move == abs_move[0]: # X轴移动
action = "right" if move_x > 0 else "left"
elif max_move == abs_move[1]: # Y轴移动
action = "down" if move_y > 0 else "up"
else: # Z轴移动
action = "forward" if move_z < 0 else "backward"
# 更新控制信号
control_signal = {
"x_angle": x_angle,
"y_angle": y_angle,
"z_angle": z_angle,
"grip": grip,
"action": action,
"speed": speed
}
# 在帧上绘制手部关键点和连接线
mp_drawing.draw_landmarks(
frame,
hand_landmarks,
mp_hands.HAND_CONNECTIONS,
mp_drawing_styles.get_default_hand_landmarks_style(),
mp_drawing_styles.get_default_hand_connections_style()
)
# 绘制三轴控制指示器
draw_3d_control_visualization(frame, control_signal, hand_center)
return control_signal, hand_center
def calculate_3d_distance(x1, y1, z1, x2, y2, z2):
"""计算3D空间中两点之间的欧几里得距离"""
return math.sqrt((x2 - x1)**2 + (y2 - y1)**2 + (z2 - z1)**2)
def map_to_angle(value, max_value):
"""将-max_value到max_value范围内的值映射到0-180度范围的角度"""
# 计算相对位置 (-1到1)
relative_position = value / max_value
# 映射到角度 (90度为中点)
angle = 90 + (relative_position * 45) # 使用45度作为最大偏移量
# 确保角度在有效范围内
return max(0, min(180, angle))
def draw_3d_control_visualization(frame, control_signal, hand_center):
"""在帧上绘制三轴控制可视化"""
height, width = frame.shape[:2]
# 绘制坐标轴
origin_x, origin_y = width - 150, height - 150
axis_length = 100
# X轴 (红色)
cv2.line(frame, (origin_x, origin_y), (origin_x + axis_length, origin_y), (0, 0, 255), 2)
cv2.putText(frame, "X", (origin_x + axis_length + 5, origin_y + 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
# Y轴 (绿色)
cv2.line(frame, (origin_x, origin_y), (origin_x, origin_y - axis_length), (0, 255, 0), 2)
cv2.putText(frame, "Y", (origin_x - 15, origin_y - axis_length - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
# Z轴 (蓝色) - 以45度角向右上方
z_x = int(origin_x + axis_length * 0.7)
z_y = int(origin_y - axis_length * 0.7)
cv2.line(frame, (origin_x, origin_y), (z_x, z_y), (255, 0, 0), 2)
cv2.putText(frame, "Z", (z_x + 5, z_y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
# 绘制控制角度值
x_text = f"X: {control_signal['x_angle']:.1f}°"
y_text = f"Y: {control_signal['y_angle']:.1f}°"
z_text = f"Z: {control_signal['z_angle']:.1f}°"
grip_text = f"抓取: {'' if control_signal['grip'] == 0 else ''}"
action_text = f"动作: {control_signal['action']}"
speed_text = f"速度: {control_signal['speed']}"
cv2.putText(frame, x_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
cv2.putText(frame, y_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
cv2.putText(frame, z_text, (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2)
cv2.putText(frame, grip_text, (10, 120), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 0), 2)
cv2.putText(frame, action_text, (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
cv2.putText(frame, speed_text, (10, 180), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 255), 2)
# 绘制手部位置到屏幕中心的连接线
if hand_center:
center_x, center_y = int(width/2), int(height/2)
hand_x, hand_y = int(hand_center['x']), int(hand_center['y'])
# 绘制屏幕中心点
cv2.circle(frame, (center_x, center_y), 5, (0, 0, 255), -1)
# 绘制手部位置点
cv2.circle(frame, (hand_x, hand_y), 10, (0, 255, 0), -1)
# 绘制连接线
cv2.line(frame, (center_x, center_y), (hand_x, hand_y), (255, 0, 0), 2)
def analyze_hand_gesture(hand_landmarks):
"""分析手势以确定机械臂控制模式"""
# 获取指尖和手掌关键点
thumb_tip = hand_landmarks.landmark[mp_hands.HandLandmark.THUMB_TIP]
index_tip = hand_landmarks.landmark[mp_hands.HandLandmark.INDEX_FINGER_TIP]
middle_tip = hand_landmarks.landmark[mp_hands.HandLandmark.MIDDLE_FINGER_TIP]
ring_tip = hand_landmarks.landmark[mp_hands.HandLandmark.RING_FINGER_TIP]
pinky_tip = hand_landmarks.landmark[mp_hands.HandLandmark.PINKY_TIP]
# 获取手掌底部关键点
wrist = hand_landmarks.landmark[mp_hands.HandLandmark.WRIST]
index_mcp = hand_landmarks.landmark[mp_hands.HandLandmark.INDEX_FINGER_MCP]
pinky_mcp = hand_landmarks.landmark[mp_hands.HandLandmark.PINKY_MCP]
# 检测拳头 - 所有手指弯曲
fist = (
thumb_tip.y > index_mcp.y and
index_tip.y > index_mcp.y and
middle_tip.y > index_mcp.y and
ring_tip.y > index_mcp.y and
pinky_tip.y > pinky_mcp.y
)
# 检测手掌打开 - 所有手指伸直
open_palm = (
thumb_tip.y < wrist.y and
index_tip.y < index_mcp.y and
middle_tip.y < index_mcp.y and
ring_tip.y < index_mcp.y and
pinky_tip.y < pinky_mcp.y
)
# 检测指向手势 - 食指伸出,其他手指弯曲
pointing = (
index_tip.y < index_mcp.y and
middle_tip.y > index_mcp.y and
ring_tip.y > index_mcp.y and
pinky_tip.y > pinky_mcp.y
)
# 检测"OK"手势 - 拇指和食指形成圆圈,其他手指伸出
pinch_distance = calculate_3d_distance(
thumb_tip.x, thumb_tip.y, thumb_tip.z,
index_tip.x, index_tip.y, index_tip.z
)
ok_gesture = (pinch_distance < 0.05 and
middle_tip.y < index_mcp.y and
ring_tip.y < index_mcp.y and
pinky_tip.y < pinky_mcp.y)
# 返回识别的手势
if fist:
return "fist"
elif open_palm:
return "open_palm"
elif pointing:
return "pointing"
elif ok_gesture:
return "ok"
else:
return "unknown"

View File

@ -1,168 +0,0 @@
import cv2
import numpy as np
import time
import os
# 导入我们的3D手部检测模块
from src.hand_detection_3d import load_mediapipe_model, process_frame_3d
# 尝试导入 Picamera2如果不可用则使用标准OpenCV
try:
from picamera2 import Picamera2
PICAMERA_AVAILABLE = True
except ImportError:
PICAMERA_AVAILABLE = False
def process_camera_3d(camera_id=0, output_path=None, use_picamera=True):
"""
使用摄像头进行3D手部检测 - 简化版本
参数:
camera_id: 摄像头ID (默认为0)
output_path: 输出视频文件路径 (可选)
use_picamera: 是否优先使用Picamera2 (树莓派摄像头)
"""
# 加载MediaPipe手部检测模型
hands_model = load_mediapipe_model(max_num_hands=1)
# 初始化摄像头
cap = None
picam2 = None
if use_picamera and PICAMERA_AVAILABLE:
# 使用 Picamera2 (树莓派摄像头)
try:
print("正在初始化 Picamera2...")
picam2 = Picamera2()
video_config = picam2.create_video_configuration(main={"size": (640, 480)})
picam2.configure(video_config)
picam2.start()
print("✅ Picamera2 启动成功")
frame_width, frame_height = 640, 480
except Exception as e:
print(f"❌ Picamera2 启动失败: {e}")
print("⚠️ 回退到 OpenCV VideoCapture")
picam2 = None
use_picamera = False
if not use_picamera or not PICAMERA_AVAILABLE:
# 使用标准 OpenCV VideoCapture
cap = cv2.VideoCapture(camera_id)
if not cap.isOpened():
print(f"无法打开摄像头 ID: {camera_id}")
return
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = 30
# 创建视频写入器(如果需要保存)
video_writer = None
if output_path:
fourcc = cv2.VideoWriter_fourcc(*'XVID')
video_writer = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))
# 前一帧的手部数据
prev_hand_data = None
# 初始化性能计数器
frame_count = 0
start_time = time.time()
print("控制说明:")
print("- 移动手部显示控制信号")
print("- 拇指和食指捏合显示抓取状态")
print("- 按 Ctrl+C 退出")
try:
while True:
# 读取一帧
ret = False
frame = None
if picam2:
try:
frame = picam2.capture_array()
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
ret = True
except Exception as e:
print(f"Picamera2 读取帧失败: {e}")
break
else:
ret, frame = cap.read()
if not ret or frame is None:
break
frame_count += 1
# 处理帧并获取三轴控制信号
control_signal, prev_hand_data = process_frame_3d(frame, hands_model, prev_hand_data)
# 显示控制信号
if control_signal and frame_count % 30 == 0: # 每秒显示一次
print_control_signal(control_signal)
# 写入输出视频
if video_writer:
video_writer.write(frame)
except KeyboardInterrupt:
print("\n收到中断信号,正在退出...")
except Exception as e:
print(f"程序运行时出错: {e}")
finally:
print("正在清理资源...")
# 释放资源
if picam2:
try:
picam2.stop()
picam2.close()
except:
pass
if cap:
try:
cap.release()
except:
pass
if video_writer:
try:
video_writer.release()
print(f"✅ 视频已保存: {output_path}")
except:
pass
# 显示统计信息
elapsed_time = time.time() - start_time
if frame_count > 0:
avg_fps = frame_count / elapsed_time
print(f"📊 处理了 {frame_count}平均FPS: {avg_fps:.1f}")
print("✅ 程序已退出")
def print_control_signal(control_signal):
"""显示手部控制信号信息"""
print(f"手部控制信号:")
print(f" X轴角度: {control_signal['x_angle']:.1f}°")
print(f" Y轴角度: {control_signal['y_angle']:.1f}°")
print(f" Z轴角度: {control_signal['z_angle']:.1f}°")
print(f" 抓取状态: {'抓取' if control_signal['grip'] == 1 else '释放'}")
print(f" 动作: {control_signal['action']}")
print("-------------------------------")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='简化版3D手部检测')
parser.add_argument('--camera-id', '-i', type=int, default=0, help='摄像头ID (默认为0)')
parser.add_argument('--output', '-o', help='输出视频文件路径')
args = parser.parse_args()
process_camera_3d(
camera_id=args.camera_id,
output_path=args.output
)

View File

@ -1,213 +0,0 @@
#!/usr/bin/env python3
"""
网页预览版手部检测系统
通过Flask提供网页界面解决树莓派图形界面问题
"""
import cv2
import threading
import time
from flask import Flask, render_template_string, Response
from picamera2 import Picamera2
import sys
import os
# 导入手部检测模块
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.hand_detection_3d import load_mediapipe_model, process_frame_3d
app = Flask(__name__)
class WebPreviewCamera:
def __init__(self):
self.picam2 = None
self.hands_model = load_mediapipe_model(max_num_hands=1)
self.prev_hand_data = None
self.latest_control_signal = None
def start_camera(self):
"""启动摄像头"""
self.picam2 = Picamera2()
video_config = self.picam2.create_video_configuration(main={"size": (640, 480)})
self.picam2.configure(video_config)
self.picam2.start()
def get_frame(self):
"""获取处理后的帧"""
if not self.picam2:
return None
# 获取原始帧
frame = self.picam2.capture_array()
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
# 进行手部检测
control_signal, self.prev_hand_data = process_frame_3d(
frame, self.hands_model, self.prev_hand_data
)
self.latest_control_signal = control_signal
# 在帧上添加控制信号信息
if control_signal:
y_offset = 30
cv2.putText(frame, f"X: {control_signal['x_angle']:.1f}°",
(10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
cv2.putText(frame, f"Y: {control_signal['y_angle']:.1f}°",
(10, y_offset + 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
cv2.putText(frame, f"Z: {control_signal['z_angle']:.1f}°",
(10, y_offset + 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
cv2.putText(frame, f"Grip: {'ON' if control_signal['grip'] else 'OFF'}",
(10, y_offset + 75), cv2.FONT_HERSHEY_SIMPLEX, 0.6,
(0, 0, 255) if control_signal['grip'] else (0, 255, 0), 2)
return frame
def generate_frames(self):
"""生成视频流"""
while True:
frame = self.get_frame()
if frame is None:
continue
# 编码为JPEG
ret, buffer = cv2.imencode('.jpg', frame)
frame_bytes = buffer.tobytes()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n')
time.sleep(0.033) # ~30 FPS
# 全局摄像头实例
camera = WebPreviewCamera()
@app.route('/')
def index():
"""主页面"""
return render_template_string('''
<!DOCTYPE html>
<html>
<head>
<title>🍓 树莓派手部检测预览</title>
<style>
body {
font-family: Arial, sans-serif;
text-align: center;
background-color: #f0f0f0;
margin: 0;
padding: 20px;
}
.container {
max-width: 800px;
margin: 0 auto;
background: white;
padding: 20px;
border-radius: 10px;
box-shadow: 0 4px 6px rgba(0,0,0,0.1);
}
h1 { color: #333; }
.video-container {
margin: 20px 0;
border: 2px solid #ddd;
border-radius: 10px;
overflow: hidden;
}
img {
width: 100%;
height: auto;
display: block;
}
.info {
background: #e7f3ff;
padding: 15px;
border-radius: 5px;
margin: 20px 0;
}
.controls {
margin: 20px 0;
}
button {
background: #007bff;
color: white;
border: none;
padding: 10px 20px;
border-radius: 5px;
cursor: pointer;
margin: 0 10px;
}
button:hover {
background: #0056b3;
}
</style>
</head>
<body>
<div class="container">
<h1>🍓 树莓派3D手部检测系统</h1>
<div class="info">
<p><strong>操作说明:</strong></p>
<p> 将手部放在摄像头前</p>
<p> 移动手部查看三轴角度变化</p>
<p> 拇指和食指捏合触发抓取检测</p>
<p> 实时显示控制信号</p>
</div>
<div class="video-container">
<img src="{{ url_for('video_feed') }}" alt="手部检测预览">
</div>
<div class="controls">
<button onclick="location.reload()">🔄 刷新页面</button>
<button onclick="window.close()"> 关闭窗口</button>
</div>
<div class="info">
<p><strong>访问地址:</strong> http://树莓派IP:5000</p>
<p><strong>提示:</strong> 可以在手机电脑等任意设备上打开此页面查看预览</p>
</div>
</div>
<script>
// 自动刷新控制信号信息
setInterval(() => {
// 这里可以添加AJAX获取最新控制信号的代码
}, 1000);
</script>
</body>
</html>
''')
@app.route('/video_feed')
def video_feed():
"""视频流路由"""
return Response(camera.generate_frames(),
mimetype='multipart/x-mixed-replace; boundary=frame')
def start_web_preview():
"""启动网页预览服务"""
print("🍓 启动树莓派网页预览系统...")
print("📷 初始化摄像头...")
try:
camera.start_camera()
print("✅ 摄像头启动成功")
print("🌐 启动网页服务器...")
print("📱 请在浏览器中访问: http://localhost:5000")
print("📱 或在其他设备访问: http://树莓派IP:5000")
print("🛑 按 Ctrl+C 停止服务")
# 启动Flask应用
app.run(host='0.0.0.0', port=5000, debug=False, threaded=True)
except KeyboardInterrupt:
print("\n🛑 用户停止服务")
except Exception as e:
print(f"❌ 错误: {e}")
finally:
if camera.picam2:
camera.picam2.stop()
camera.picam2.close()
print("✅ 网页预览系统已停止")
if __name__ == "__main__":
start_web_preview()

View File

@ -1,401 +0,0 @@
#!/usr/bin/env python3
"""
实时手部检测Web服务器
支持WebSocket通信实时视频流处理和机械臂控制
"""
import asyncio
import base64
import json
import logging
import os
import time
from threading import Thread
from typing import Dict, Optional, Any
import cv2
import numpy as np
from flask import Flask, render_template, request
from flask_socketio import SocketIO, emit
from PIL import Image
import io
from hand_detection_3d import load_mediapipe_model, process_frame_3d
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class HandDetectionWebServer:
"""实时手部检测Web服务器"""
def __init__(self, host='0.0.0.0', port=5000):
self.host = host
self.port = port
# Flask应用和SocketIO
# 设置模板和静态文件路径
import os
template_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'templates')
self.app = Flask(__name__, template_folder=template_dir)
self.app.config['SECRET_KEY'] = 'hand_detection_secret_key'
self.socketio = SocketIO(self.app, cors_allowed_origins="*", async_mode='threading')
# MediaPipe模型
self.hands_model = load_mediapipe_model()
# 状态管理
self.clients = {} # 连接的客户端
self.current_frame = None
self.previous_hand_data = None
self.detection_results = {
'x_angle': 90,
'y_angle': 90,
'z_angle': 90,
'grip': 0,
'action': 'none',
'speed': 5,
'timestamp': time.time()
}
# 性能监控
self.fps_counter = 0
self.last_fps_time = time.time()
self.current_fps = 0
# 配置路由和事件处理
self._setup_routes()
self._setup_socket_events()
def _setup_routes(self):
"""设置HTTP路由"""
@self.app.route('/')
def index():
"""主页面"""
return render_template('index.html')
@self.app.route('/api/status')
def api_status():
"""获取系统状态"""
return {
'status': 'running',
'fps': self.current_fps,
'clients': len(self.clients),
'detection_results': self.detection_results
}
def _setup_socket_events(self):
"""设置WebSocket事件处理"""
@self.socketio.on('connect')
def handle_connect():
"""客户端连接"""
client_id = request.sid
self.clients[client_id] = {
'connected_at': time.time(),
'type': 'unknown',
'last_ping': time.time()
}
logger.info(f"客户端 {client_id} 已连接")
# 发送欢迎消息和当前状态
emit('status', {
'message': '连接成功',
'client_id': client_id,
'current_results': self.detection_results
})
@self.socketio.on('disconnect')
def handle_disconnect():
"""客户端断开连接"""
client_id = request.sid
if client_id in self.clients:
del self.clients[client_id]
logger.info(f"客户端 {client_id} 已断开连接")
@self.socketio.on('register_client')
def handle_register_client(data):
"""注册客户端类型"""
client_id = request.sid
client_type = data.get('type', 'unknown')
if client_id in self.clients:
self.clients[client_id]['type'] = client_type
logger.info(f"客户端 {client_id} 注册为: {client_type}")
emit('registration_success', {
'client_id': client_id,
'type': client_type
})
@self.socketio.on('video_frame')
def handle_video_frame(data):
"""处理视频帧"""
try:
# 解码base64图像
frame_data = base64.b64decode(data['frame'])
frame = self._decode_frame(frame_data)
if frame is not None:
# 处理帧并检测手部
control_signal, hand_data = process_frame_3d(
frame, self.hands_model, self.previous_hand_data
)
# 更新检测结果
self.detection_results = control_signal
self.detection_results['timestamp'] = time.time()
self.previous_hand_data = hand_data
# 编码处理后的帧
processed_frame_data = self._encode_frame(frame)
# 发送结果给web预览客户端
self.socketio.emit('detection_results', {
'control_signal': control_signal,
'processed_frame': processed_frame_data,
'fps': self.current_fps
}, room=None)
# 发送控制信号给机械臂客户端
self._send_to_robot_clients(control_signal)
# 更新FPS
self._update_fps()
except Exception as e:
logger.error(f"处理视频帧时出错: {e}")
emit('error', {'message': str(e)})
@self.socketio.on('ping')
def handle_ping():
"""处理ping请求"""
client_id = request.sid
if client_id in self.clients:
self.clients[client_id]['last_ping'] = time.time()
emit('pong', {'timestamp': time.time()})
@self.socketio.on('get_detection_results')
def handle_get_detection_results():
"""获取最新的检测结果"""
emit('detection_results', {
'control_signal': self.detection_results,
'fps': self.current_fps
})
@self.socketio.on('start_local_test')
def handle_start_local_test(data=None):
"""处理开始本地测试请求"""
try:
# 如果提供了视频路径,使用指定的视频
if data and 'video_path' in data:
test_video = data['video_path']
if not os.path.exists(test_video):
emit('test_error', {
'message': f'视频文件不存在: {test_video}'
})
return
else:
# 检查是否有默认测试视频
test_videos = [
'data/videos/test_basic.mp4',
'data/videos/test_gesture.mp4'
]
# 找到第一个存在的测试视频
test_video = None
for video_path in test_videos:
if os.path.exists(video_path):
test_video = video_path
break
if not test_video:
# 没有找到测试视频,提供帮助信息
emit('test_error', {
'message': '未找到测试视频文件',
'help': '请先生成测试视频python create_test_video.py'
})
return
logger.info(f"开始本地测试,使用视频: {test_video}")
self.start_local_video_test(test_video)
emit('test_started', {
'message': f'本地测试已开始,使用视频: {os.path.basename(test_video)}',
'video_path': test_video
})
except Exception as e:
logger.error(f"启动本地测试时出错: {e}")
emit('test_error', {
'message': f'启动本地测试失败: {str(e)}'
})
@self.socketio.on('get_video_list')
def handle_get_video_list():
"""获取可用的视频文件列表"""
try:
video_dirs = ['data/videos', 'videos', '.']
video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.wmv', '.flv']
videos = []
for video_dir in video_dirs:
if os.path.exists(video_dir):
for file in os.listdir(video_dir):
if any(file.lower().endswith(ext) for ext in video_extensions):
file_path = os.path.join(video_dir, file)
try:
file_size = os.path.getsize(file_path)
size_mb = file_size / (1024 * 1024)
videos.append({
'path': file_path,
'name': file,
'size': f'{size_mb:.1f}MB'
})
except OSError:
continue
# 按文件名排序
videos.sort(key=lambda x: x['name'])
emit('video_list', {
'videos': videos
})
except Exception as e:
logger.error(f"获取视频列表时出错: {e}")
emit('video_list', {
'videos': []
})
def _decode_frame(self, frame_data: bytes) -> Optional[np.ndarray]:
"""解码图像帧"""
try:
# 使用PIL解码
image = Image.open(io.BytesIO(frame_data))
frame = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
return frame
except Exception as e:
logger.error(f"解码帧时出错: {e}")
return None
def _encode_frame(self, frame: np.ndarray) -> str:
"""编码图像帧为base64"""
try:
# 转换为RGB格式
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
image = Image.fromarray(frame_rgb)
# 编码为JPEG
buffer = io.BytesIO()
image.save(buffer, format='JPEG', quality=80)
frame_data = base64.b64encode(buffer.getvalue()).decode('utf-8')
return f"data:image/jpeg;base64,{frame_data}"
except Exception as e:
logger.error(f"编码帧时出错: {e}")
return ""
def _send_to_robot_clients(self, control_signal: Dict[str, Any]):
"""发送控制信号给机械臂客户端"""
robot_clients = [
client_id for client_id, info in self.clients.items()
if info.get('type') == 'robot'
]
if robot_clients:
for client_id in robot_clients:
self.socketio.emit('robot_control', control_signal, room=client_id)
def _update_fps(self):
"""更新FPS计数"""
self.fps_counter += 1
current_time = time.time()
if current_time - self.last_fps_time >= 1.0: # 每秒更新一次
self.current_fps = self.fps_counter
self.fps_counter = 0
self.last_fps_time = current_time
def start_local_video_test(self, video_path: str):
"""启动本地视频测试"""
def video_test_thread():
cap = cv2.VideoCapture(video_path)
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# 处理帧
control_signal, hand_data = process_frame_3d(
frame, self.hands_model, self.previous_hand_data
)
# 更新状态
self.detection_results = control_signal
self.detection_results['timestamp'] = time.time()
self.previous_hand_data = hand_data
# 编码帧
processed_frame_data = self._encode_frame(frame)
# 广播结果
self.socketio.emit('detection_results', {
'control_signal': control_signal,
'processed_frame': processed_frame_data,
'fps': self.current_fps
})
# 发送给机械臂
self._send_to_robot_clients(control_signal)
# 更新FPS
self._update_fps()
# 控制帧率
time.sleep(1/30) # 30 FPS
cap.release()
thread = Thread(target=video_test_thread)
thread.daemon = True
thread.start()
logger.info(f"本地视频测试已启动: {video_path}")
def run(self, debug=False):
"""启动Web服务器"""
logger.info(f"启动手部检测Web服务器 http://{self.host}:{self.port}")
self.socketio.run(
self.app,
host=self.host,
port=self.port,
debug=debug,
allow_unsafe_werkzeug=True
)
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description='实时手部检测Web服务器')
parser.add_argument('--host', default='0.0.0.0', help='服务器地址')
parser.add_argument('--port', type=int, default=5000, help='端口号')
parser.add_argument('--debug', action='store_true', help='调试模式')
parser.add_argument('--test-video', help='本地测试视频路径')
args = parser.parse_args()
# 创建服务器实例
server = HandDetectionWebServer(host=args.host, port=args.port)
# 如果指定了测试视频,启动本地视频测试
if args.test_video:
server.start_local_video_test(args.test_video)
# 启动服务器
server.run(debug=args.debug)
if __name__ == '__main__':
main()