This commit is contained in:
51hhh 2025-08-11 13:15:21 +08:00
commit 41cf62006a
43 changed files with 4649 additions and 0 deletions

View File

@ -0,0 +1,19 @@
{
"permissions": {
"allow": [
"Bash(git checkout:*)",
"Bash(python3:*)",
"Bash(source:*)",
"Bash(pip install:*)",
"Bash(mkdir:*)",
"Bash(chmod:*)",
"Bash(rm:*)",
"Bash(python:*)",
"Bash(./start_service.sh:*)",
"Bash(./start_robot_client.sh:*)",
"Bash(timeout:*)",
"Bash(ls:*)"
],
"deny": []
}
}

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/venv

4
.idea/checkHand.iml generated Normal file
View File

@ -0,0 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<module version="4">
<component name="Go" enabled="true" />
</module>

6
.idea/vcs.xml generated Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

74
.idea/workspace.xml generated Normal file
View File

@ -0,0 +1,74 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="AutoImportSettings">
<option name="autoReloadType" value="ALL" />
</component>
<component name="ChangeListManager">
<list default="true" id="22185256-c295-4e90-b747-14293ac79c14" name="Changes" comment="">
<change beforePath="$PROJECT_DIR$/data/videos/test_basic.mp4" beforeDir="false" afterPath="$PROJECT_DIR$/data/videos/test_basic.mp4" afterDir="false" />
<change beforePath="$PROJECT_DIR$/data/videos/test_gesture.mp4" beforeDir="false" afterPath="$PROJECT_DIR$/data/videos/test_gesture.mp4" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/web_server.py" beforeDir="false" afterPath="$PROJECT_DIR$/src/web_server.py" afterDir="false" />
<change beforePath="$PROJECT_DIR$/templates/index.html" beforeDir="false" afterPath="$PROJECT_DIR$/templates/index.html" afterDir="false" />
</list>
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
<option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
<option name="LAST_RESOLUTION" value="IGNORE" />
</component>
<component name="GOROOT" url="file:///opt/homebrew/opt/go/libexec" />
<component name="Git.Settings">
<option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$" />
</component>
<component name="ProjectColorInfo">{
&quot;customColor&quot;: &quot;&quot;,
&quot;associatedIndex&quot;: 2
}</component>
<component name="ProjectId" id="2xlSRm139ffFJ74NmVN2yfanr4g" />
<component name="ProjectViewState">
<option name="hideEmptyMiddlePackages" value="true" />
<option name="showLibraryContents" value="true" />
</component>
<component name="PropertiesComponent">{
&quot;keyToString&quot;: {
&quot;ModuleVcsDetector.initialDetectionPerformed&quot;: &quot;true&quot;,
&quot;RunOnceActivity.GoLinterPluginOnboarding&quot;: &quot;true&quot;,
&quot;RunOnceActivity.GoLinterPluginStorageMigration&quot;: &quot;true&quot;,
&quot;RunOnceActivity.ShowReadmeOnStart&quot;: &quot;true&quot;,
&quot;RunOnceActivity.git.unshallow&quot;: &quot;true&quot;,
&quot;RunOnceActivity.go.formatter.settings.were.checked&quot;: &quot;true&quot;,
&quot;RunOnceActivity.go.migrated.go.modules.settings&quot;: &quot;true&quot;,
&quot;RunOnceActivity.go.modules.go.list.on.any.changes.was.set&quot;: &quot;true&quot;,
&quot;git-widget-placeholder&quot;: &quot;master&quot;,
&quot;go.import.settings.migrated&quot;: &quot;true&quot;,
&quot;go.sdk.automatically.set&quot;: &quot;true&quot;,
&quot;last_opened_file_path&quot;: &quot;/Users/edzhao/Documents/workspase/checkHand&quot;,
&quot;node.js.detected.package.eslint&quot;: &quot;true&quot;,
&quot;node.js.selected.package.eslint&quot;: &quot;(autodetect)&quot;,
&quot;nodejs_package_manager_path&quot;: &quot;npm&quot;
}
}</component>
<component name="SharedIndexes">
<attachedChunks>
<set>
<option value="bundled-gosdk-3b128438d3f6-34ac7aa8f6ab-org.jetbrains.plugins.go.sharedIndexes.bundled-GO-251.23774.430" />
<option value="bundled-js-predefined-d6986cc7102b-f27c65a3e318-JavaScript-GO-251.23774.430" />
</set>
</attachedChunks>
</component>
<component name="TaskManager">
<task active="true" id="Default" summary="Default task">
<changelist id="22185256-c295-4e90-b747-14293ac79c14" name="Changes" comment="" />
<created>1748511160781</created>
<option name="number" value="Default" />
<option name="presentableId" value="Default" />
<updated>1748511160781</updated>
</task>
<servers />
</component>
<component name="TypeScriptGeneratedFilesManager">
<option name="version" value="3" />
</component>
<component name="VgoProject">
<settings-migrated>true</settings-migrated>
</component>
</project>

244
LOCAL_TEST_GUIDE.md Normal file
View File

@ -0,0 +1,244 @@
# 📹 本地测试指南
## 🎯 测试视频文件放置位置
### 推荐目录结构:
```
checkHand/
├── data/
│ └── videos/ # 测试视频目录
│ ├── test_basic.mp4 # 基础移动测试视频
│ ├── test_gesture.mp4 # 手势测试视频
│ ├── your_video.mp4 # 你的自定义视频
│ └── real_hand.mp4 # 真实手部动作视频
```
## 🚀 快速开始本地测试
### 1. 使用自动生成的测试视频
```bash
# 生成测试视频
python create_test_video.py
# 使用基础移动测试视频
./start_service.sh --test-video data/videos/test_basic.mp4
# 使用手势测试视频
./start_service.sh --test-video data/videos/test_gesture.mp4
```
### 2. 使用你自己的视频文件
#### 支持的视频格式:
- MP4 (推荐)
- AVI
- MOV
- MKV
#### 推荐视频参数:
- 分辨率640x480 或 1280x720
- 帧率30 FPS
- 编码H.264
- 时长10-60秒测试用
#### 放置你的视频:
```bash
# 1. 将视频文件复制到测试目录
cp /path/to/your/video.mp4 data/videos/
# 2. 启动服务器并使用你的视频
./start_service.sh --test-video data/videos/your_video.mp4
```
## 🎬 创建自定义测试视频
### 使用脚本生成:
```bash
# 生成基础移动测试视频10秒
python create_test_video.py --type basic --duration 10
# 生成手势测试视频15秒
python create_test_video.py --type gesture --duration 15
# 生成两种类型的视频
python create_test_video.py --type both --duration 12
# 自定义帧率和输出目录
python create_test_video.py --fps 60 --output-dir my_videos/
```
### 使用手机录制真实手部视频:
#### 录制建议:
1. **光线充足** - 避免阴影和反光
2. **背景简单** - 纯色背景最佳
3. **手部清晰** - 确保手部在画面中央
4. **动作缓慢** - 便于算法跟踪
5. **时长适中** - 10-30秒即可
#### 录制内容示例:
- 手掌张开和握拳
- 手指抓取动作
- 手部左右移动
- 手部上下移动
- 手部前后移动(改变大小)
#### 文件转换:
```bash
# 如果需要转换格式,可以使用 ffmpeg
ffmpeg -i input.mov -c:v libx264 -c:a aac data/videos/output.mp4
```
## 🔧 测试步骤
### 1. 准备测试环境
```bash
# 1. 确保虚拟环境已激活
source venv/bin/activate
# 2. 检查测试视频是否存在
ls -la data/videos/
# 3. 运行系统测试
python test_system.py
```
### 2. 启动测试服务
```bash
# 方式1使用启动脚本
./start_service.sh --test-video data/videos/test_basic.mp4
# 方式2手动启动
python run_web_service.py --test-video data/videos/test_basic.mp4
```
### 3. 访问测试界面
1. 打开浏览器访问:`http://localhost:5000`
2. 观察Web界面显示
- 实时视频预览
- 手部检测结果
- 3D坐标值
- 控制信号
### 4. 测试机械臂客户端
```bash
# 新开一个终端窗口
./start_robot_client.sh
# 或者手动启动
cd src
python robot_client.py --mock
```
## 📊 测试检查项
### ✅ 视频显示检查:
- [ ] 视频正常播放
- [ ] 手部检测框显示正确
- [ ] 关键点连接线正常
- [ ] 坐标轴指示器显示
### ✅ 数据输出检查:
- [ ] X、Y、Z角度值变化
- [ ] 抓取状态检测
- [ ] 动作识别正确
- [ ] FPS显示正常
### ✅ 通信检查:
- [ ] WebSocket连接正常
- [ ] 机械臂客户端接收信号
- [ ] 控制信号格式正确
- [ ] 实时性满足要求
## 🛠️ 常见问题排查
### 问题1视频无法播放
```bash
# 检查视频文件
file data/videos/your_video.mp4
# 检查视频信息
ffmpeg -i data/videos/your_video.mp4 2>&1 | head -20
```
### 问题2检测效果不佳
- 检查视频光线条件
- 确认手部在画面中央
- 尝试不同的视频文件
- 调整MediaPipe参数
### 问题3性能问题
```bash
# 降低视频分辨率
ffmpeg -i input.mp4 -vf scale=640:480 output.mp4
# 减少帧率
ffmpeg -i input.mp4 -r 15 output.mp4
```
## 📈 性能测试
### 测试延迟:
```bash
# 启动服务器
./start_service.sh --test-video data/videos/test_basic.mp4
# 在另一个终端查看延迟
curl -s http://localhost:5000/api/status | jq .fps
```
### 测试负载:
```bash
# 启动多个机械臂客户端
for i in {1..5}; do
python src/robot_client.py --mock &
done
```
## 🎯 高级测试
### 使用真实摄像头:
```python
# 创建摄像头测试脚本
import cv2
import socketio
import base64
sio = socketio.Client()
sio.connect('http://localhost:5000')
sio.emit('register_client', {'type': 'video_source'})
cap = cv2.VideoCapture(0)
while True:
ret, frame = cap.read()
if ret:
_, buffer = cv2.imencode('.jpg', frame)
frame_data = base64.b64encode(buffer).decode('utf-8')
sio.emit('video_frame', {'frame': frame_data})
time.sleep(1/30)
```
### 性能监控:
```bash
# 监控资源使用
top -p $(pgrep -f "python run_web_service.py")
# 监控网络流量
netstat -i
```
## 🎉 测试完成
测试完成后,你应该能看到:
- 实时视频流处理
- 准确的手部检测
- 正确的3D坐标计算
- 稳定的WebSocket通信
- 机械臂控制信号输出
现在你可以将真实的视频流连接到系统,或者集成到你的机械臂控制系统中!

101
README.md Normal file
View File

@ -0,0 +1,101 @@
# 实时手部检测Web服务系统
一个基于MediaPipe的实时手部检测系统支持WebSocket通信、实时视频流处理和机械臂控制。
## 快速开始
### 方式一:使用启动脚本(推荐)
#### Linux/Mac:
```bash
# 启动Web服务器自动创建虚拟环境并安装依赖
./start_service.sh
# 启动机械臂客户端(新终端窗口)
./start_robot_client.sh
```
#### Windows:
```batch
# 启动Web服务器自动创建虚拟环境并安装依赖
start_service.bat
# 启动机械臂客户端(新命令行窗口)
start_robot_client.bat
```
### 方式二:手动启动
#### 1. 创建虚拟环境并安装依赖
```bash
python3 -m venv venv
source venv/bin/activate # Linux/Mac
# 或
venv\Scripts\activate # Windows
pip install -r requirements.txt
```
#### 2. 启动Web服务器
```bash
python run_web_service.py
```
#### 3. 启动机械臂客户端(可选)
```bash
cd src
python robot_client.py --mock
```
### 3. 访问Web界面
打开浏览器访问: `http://localhost:5000`
## 启动脚本选项
### Web服务器启动选项:
```bash
./start_service.sh --help # 查看帮助
./start_service.sh # 基本启动
./start_service.sh --host 0.0.0.0 --port 8080 # 自定义地址和端口
./start_service.sh --debug # 调试模式
./start_service.sh --test-video data/videos/test.mp4 # 本地视频测试
```
### 机械臂客户端启动选项:
```bash
./start_robot_client.sh --help # 查看帮助
./start_robot_client.sh # 基本启动(模拟模式)
./start_robot_client.sh --server http://192.168.1.100:5000 # 连接远程服务器
./start_robot_client.sh --real # 真实机械臂模式
```
## 功能特性
- ✅ 使用MediaPipe进行实时手部检测
- ✅ WebSocket实时通信
- ✅ 3D坐标计算X、Y、Z轴角度
- ✅ Web预览界面
- ✅ 机械臂控制接口
- ✅ 本地视频测试支持
## 系统架构
```
视频客户端 → Web服务器 → MediaPipe → 3D坐标计算 → 客户端(Web预览/机械臂)
```
## 控制信号格式
```json
{
"x_angle": 90.0, // X轴角度 (0-180°)
"y_angle": 90.0, // Y轴角度 (0-180°)
"z_angle": 90.0, // Z轴角度 (0-180°)
"grip": 0, // 抓取状态 (0=松开, 1=抓取)
"action": "none", // 当前动作
"speed": 5 // 移动速度 (1-10)
}
```

193
VIDEO_SELECTOR_GUIDE.md Normal file
View File

@ -0,0 +1,193 @@
# 🎬 视频选择器使用指南
## 🎯 新功能介绍
现在Web界面支持**选择本地视频**进行测试不再需要在命令行指定视频文件直接在Web界面选择即可。
## 🖥️ 界面功能
### 📋 视频选择区域
- **视频下拉列表** - 显示所有可用的本地视频文件
- **开始视频测试按钮** - 使用选中的视频开始测试
- **刷新视频列表按钮** - 重新扫描视频文件
### 🔍 支持的视频文件
系统会自动扫描以下目录中的视频文件:
- `data/videos/` - 主要测试视频目录
- `videos/` - 备用视频目录
- `.` - 当前目录
支持的视频格式:
- `.mp4` (推荐)
- `.avi`
- `.mov`
- `.mkv`
- `.wmv`
- `.flv`
## 🚀 使用步骤
### 1. 启动服务器
```bash
./start_service.sh
```
### 2. 访问Web界面
打开浏览器访问 `http://localhost:5000`
### 3. 选择视频
- 在"选择测试视频"下拉列表中选择一个视频文件
- 视频显示格式:`文件名 (文件大小)`
### 4. 开始测试
点击"开始视频测试"按钮
### 5. 观察结果
- 查看系统日志中的反馈信息
- 观察视频预览和检测结果
- 查看实时的控制数据
## 📁 添加自定义视频
### 方法1复制到测试目录
```bash
# 将你的视频文件复制到测试目录
cp /path/to/your/video.mp4 data/videos/
# 在Web界面点击"刷新视频列表"
```
### 方法2使用脚本生成
```bash
# 生成多种测试视频
python create_test_video.py --type both --duration 20
# 生成自定义视频
python create_test_video.py --type gesture --duration 30 --fps 60
```
## 🎨 下拉列表显示
视频列表会显示以下信息:
- **文件名** - 视频文件的名称
- **文件大小** - 文件大小MB
- **状态提示** - 如果没有视频文件,会显示提示信息
示例:
```
请选择视频文件
test_basic.mp4 (1.4MB)
test_gesture.mp4 (1.3MB)
my_hand_video.mp4 (2.1MB)
```
## 🔧 功能特点
### ✅ 自动扫描
- 页面加载时自动扫描视频文件
- 支持多个目录扫描
- 按文件名自动排序
### ✅ 实时反馈
- 显示视频加载状态
- 实时错误提示
- 操作成功确认
### ✅ 文件验证
- 检查视频文件是否存在
- 显示文件大小信息
- 支持多种视频格式
### ✅ 用户友好
- 清晰的操作指引
- 详细的错误提示
- 一键刷新功能
## 🛠️ 错误处理
### 常见错误和解决方案
#### 1. "未找到视频文件"
**原因**:没有视频文件在指定目录
**解决**
```bash
# 生成测试视频
python create_test_video.py
# 或复制视频文件
cp your_video.mp4 data/videos/
```
#### 2. "请先选择一个视频文件"
**原因**:没有在下拉列表中选择视频
**解决**:在下拉列表中选择一个视频文件
#### 3. "视频文件不存在"
**原因**:选择的视频文件已被删除或移动
**解决**:点击"刷新视频列表"重新扫描
## 📊 系统日志
测试过程中会显示以下日志信息:
### 成功信息
- `✅ 找到 3 个视频文件`
- `✅ 本地测试已开始,使用视频: test_basic.mp4`
- `正在启动视频测试: data/videos/test_gesture.mp4`
### 错误信息
- `❌ 未找到测试视频文件`
- `❌ 请先选择一个视频文件`
- `❌ 视频文件不存在: xxx.mp4`
### 帮助信息
- `💡 运行 python create_test_video.py 生成测试视频`
## 🎯 高级使用
### 批量添加视频
```bash
# 创建多个测试视频
for i in {1..5}; do
python create_test_video.py --type basic --duration $((i*5)) --output-dir data/videos
mv data/videos/test_basic.mp4 data/videos/test_basic_${i}.mp4
done
```
### 视频格式转换
```bash
# 如果你的视频格式不支持,可以转换
ffmpeg -i input.mov -c:v libx264 -c:a aac data/videos/output.mp4
```
### 视频质量优化
```bash
# 压缩视频文件
ffmpeg -i input.mp4 -crf 23 -preset medium data/videos/compressed.mp4
# 调整分辨率
ffmpeg -i input.mp4 -vf scale=640:480 data/videos/resized.mp4
```
## 🔄 与命令行模式的区别
### Web界面模式
- ✅ 图形化选择视频
- ✅ 实时预览文件列表
- ✅ 一键刷新功能
- ✅ 错误提示更友好
### 命令行模式(旧)
- 需要手动指定视频路径
- 需要重启服务器切换视频
- 错误信息在终端显示
## 🎉 现在就试试吧!
1. 确保有测试视频:`python create_test_video.py`
2. 启动服务器:`./start_service.sh`
3. 访问:`http://localhost:5000`
4. 选择视频并开始测试!
享受更便捷的视频测试体验! 🚀

135
WEB_INTERFACE_GUIDE.md Normal file
View File

@ -0,0 +1,135 @@
# 🖥️ Web界面使用指南
## 📋 界面功能说明
### 🎯 "开始本地视频测试" 按钮功能
这个按钮的作用是:
- **启动预置测试视频** - 使用系统自带的测试视频进行手部检测
- **无需外部视频流** - 不需要摄像头或外部视频文件
- **一键测试** - 点击按钮即可开始测试系统功能
### 🔧 按钮工作原理
1. **点击按钮** → 发送 `start_local_test` 事件到服务器
2. **服务器检查** → 查找以下测试视频文件:
- `data/videos/test_basic.mp4` (基础手部移动测试)
- `data/videos/test_gesture.mp4` (手势测试)
3. **自动播放** → 找到测试视频后自动开始播放和检测
4. **实时显示** → 在Web界面显示检测结果
### 📁 测试视频文件要求
按钮会按顺序查找以下文件:
```
data/videos/test_basic.mp4 # 第一优先级
data/videos/test_gesture.mp4 # 第二优先级
```
### 🎬 如何生成测试视频
如果测试视频不存在,运行以下命令生成:
```bash
# 生成测试视频
python create_test_video.py
# 或者指定类型
python create_test_video.py --type both --duration 10
```
### 🔍 按钮状态说明
| 按钮状态 | 说明 |
|---------|------|
| "开始本地视频测试" | 按钮就绪,可以点击 |
| "启动中..." | 正在启动测试,按钮禁用 |
| 恢复原状态 | 5秒后自动恢复 |
### 📊 测试结果显示
点击按钮后,你会看到:
1. **系统日志区域**
- ✅ 成功信息:`本地测试已开始,使用视频: data/videos/test_basic.mp4`
- ❌ 错误信息:`未找到测试视频文件`
- 💡 帮助信息:`请先生成测试视频python create_test_video.py`
2. **视频预览区域**
- 显示测试视频内容
- 显示手部检测结果
- 显示关键点和连接线
3. **控制面板**
- 实时更新的X、Y、Z角度值
- 抓取状态指示器
- 当前动作显示
- FPS计数
### 🚨 常见问题
#### 问题1按钮点击没有反应
**原因**:测试视频文件不存在
**解决**
```bash
# 检查文件是否存在
ls -la data/videos/
# 如果没有文件,生成测试视频
python create_test_video.py
```
#### 问题2出现错误信息
**查看日志**在Web界面的"系统日志"区域查看具体错误信息
#### 问题3视频不显示
**检查**
- 确保测试视频文件存在
- 查看浏览器控制台是否有错误
- 检查服务器日志
### 🎯 使用步骤
1. **确保测试视频存在**
```bash
# 检查文件
ls data/videos/
# 如果没有,生成测试视频
python create_test_video.py
```
2. **启动服务器**
```bash
./start_service.sh
```
3. **打开Web界面**
访问 `http://localhost:5000`
4. **点击测试按钮**
点击"开始本地视频测试"按钮
5. **观察结果**
查看视频预览和检测数据
### 💡 提示
- 这个功能主要用于**系统测试**和**功能演示**
- 如果要使用**真实视频流**,需要:
- 启动时指定视频文件:`./start_service.sh --test-video your_video.mp4`
- 或者通过外部客户端发送视频流
### 🔗 相关文档
- [本地测试指南](LOCAL_TEST_GUIDE.md) - 详细的本地测试说明
- [README.md](README.md) - 项目总体说明
- [系统测试脚本](test_system.py) - 自动化测试
## 🎉 现在试试吧!
1. 运行 `python create_test_video.py` 生成测试视频
2. 启动服务器 `./start_service.sh`
3. 打开 `http://localhost:5000`
4. 点击"开始本地视频测试"按钮
5. 观察手部检测的神奇效果!

@ -0,0 +1 @@
Subproject commit 7903ed308ff4bf115e5502dd1efb235e61e62dc3

229
create_test_video.py Executable file
View File

@ -0,0 +1,229 @@
#!/usr/bin/env python3
"""
创建测试视频
生成包含手部动作的测试视频用于系统测试
"""
import cv2
import numpy as np
import os
import argparse
def create_test_video(output_path, duration=10, fps=30):
"""
创建测试视频
参数:
output_path: 输出视频路径
duration: 视频时长
fps: 帧率
"""
# 视频参数
width, height = 640, 480
total_frames = duration * fps
# 创建视频写入器
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
print(f"正在创建测试视频: {output_path}")
print(f"分辨率: {width}x{height}, 帧率: {fps}, 时长: {duration}")
for frame_idx in range(total_frames):
# 创建背景
frame = np.zeros((height, width, 3), dtype=np.uint8)
frame[:] = (40, 40, 40) # 深灰色背景
# 计算进度
progress = frame_idx / total_frames
# 绘制模拟手部(圆形)
center_x = int(width * 0.3 + width * 0.4 * np.sin(progress * 4 * np.pi))
center_y = int(height * 0.3 + height * 0.4 * np.cos(progress * 2 * np.pi))
# 主手部圆形
cv2.circle(frame, (center_x, center_y), 30, (255, 200, 100), -1)
# 绘制5个手指小圆形
finger_positions = [
(center_x - 20, center_y - 35), # 拇指
(center_x - 10, center_y - 40), # 食指
(center_x, center_y - 42), # 中指
(center_x + 10, center_y - 40), # 无名指
(center_x + 20, center_y - 35) # 小指
]
for i, (fx, fy) in enumerate(finger_positions):
# 手指颜色渐变
color = (200 + i * 10, 150 + i * 20, 100 + i * 15)
cv2.circle(frame, (fx, fy), 8, color, -1)
# 绘制手腕
wrist_x, wrist_y = center_x, center_y + 40
cv2.circle(frame, (wrist_x, wrist_y), 15, (180, 150, 120), -1)
# 连接手掌和手腕
cv2.line(frame, (center_x, center_y), (wrist_x, wrist_y), (200, 180, 150), 8)
# 添加文字信息
time_text = f"时间: {frame_idx/fps:.1f}s"
position_text = f"位置: ({center_x}, {center_y})"
cv2.putText(frame, time_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
cv2.putText(frame, position_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
cv2.putText(frame, "测试视频 - 模拟手部动作", (10, height - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
# 写入帧
out.write(frame)
# 显示进度
if frame_idx % (fps * 2) == 0: # 每2秒显示一次进度
print(f"进度: {progress*100:.1f}%")
# 释放资源
out.release()
print(f"✅ 测试视频创建完成: {output_path}")
def create_gesture_test_video(output_path, duration=15, fps=30):
"""
创建手势测试视频
包含不同的手势动作
"""
width, height = 640, 480
total_frames = duration * fps
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
print(f"正在创建手势测试视频: {output_path}")
gestures = [
"张开手掌",
"握拳",
"OK手势",
"指向手势",
"抓取动作"
]
frames_per_gesture = total_frames // len(gestures)
for frame_idx in range(total_frames):
frame = np.zeros((height, width, 3), dtype=np.uint8)
frame[:] = (50, 50, 50)
# 确定当前手势
gesture_idx = frame_idx // frames_per_gesture
if gesture_idx >= len(gestures):
gesture_idx = len(gestures) - 1
current_gesture = gestures[gesture_idx]
# 手部基本位置
center_x = width // 2
center_y = height // 2
# 根据手势绘制不同形状
if current_gesture == "张开手掌":
# 绘制张开的手掌
cv2.circle(frame, (center_x, center_y), 40, (255, 200, 100), -1)
# 五个分开的手指
for i, angle in enumerate([-60, -30, 0, 30, 60]):
finger_x = center_x + int(50 * np.cos(np.radians(angle)))
finger_y = center_y - int(50 * np.sin(np.radians(angle)))
cv2.circle(frame, (finger_x, finger_y), 12, (200, 180, 150), -1)
elif current_gesture == "握拳":
# 绘制握拳
cv2.circle(frame, (center_x, center_y), 35, (255, 200, 100), -1)
# 手指弯曲(小圆圈)
for i, angle in enumerate([-45, -22, 0, 22, 45]):
finger_x = center_x + int(25 * np.cos(np.radians(angle)))
finger_y = center_y - int(15 * np.sin(np.radians(angle)))
cv2.circle(frame, (finger_x, finger_y), 8, (180, 150, 120), -1)
elif current_gesture == "OK手势":
# 绘制OK手势
cv2.circle(frame, (center_x, center_y), 35, (255, 200, 100), -1)
# 拇指和食指形成圆圈
cv2.circle(frame, (center_x - 15, center_y - 20), 8, (200, 180, 150), -1)
cv2.circle(frame, (center_x - 5, center_y - 25), 8, (200, 180, 150), -1)
# 其他手指伸直
for i, angle in enumerate([15, 30, 45]):
finger_x = center_x + int(40 * np.cos(np.radians(angle)))
finger_y = center_y - int(40 * np.sin(np.radians(angle)))
cv2.circle(frame, (finger_x, finger_y), 10, (200, 180, 150), -1)
elif current_gesture == "指向手势":
# 绘制指向手势
cv2.circle(frame, (center_x, center_y), 30, (255, 200, 100), -1)
# 食指伸出
cv2.circle(frame, (center_x, center_y - 45), 10, (200, 180, 150), -1)
# 其他手指弯曲
for i, angle in enumerate([-30, 15, 30]):
finger_x = center_x + int(20 * np.cos(np.radians(angle)))
finger_y = center_y - int(10 * np.sin(np.radians(angle)))
cv2.circle(frame, (finger_x, finger_y), 8, (180, 150, 120), -1)
elif current_gesture == "抓取动作":
# 绘制抓取动作
cv2.circle(frame, (center_x, center_y), 32, (255, 200, 100), -1)
# 拇指和食指靠近
thumb_x = center_x - 20
thumb_y = center_y - 20
index_x = center_x - 10
index_y = center_y - 30
cv2.circle(frame, (thumb_x, thumb_y), 9, (200, 180, 150), -1)
cv2.circle(frame, (index_x, index_y), 9, (200, 180, 150), -1)
# 连接线表示抓取
cv2.line(frame, (thumb_x, thumb_y), (index_x, index_y), (255, 255, 255), 2)
# 添加文字信息
cv2.putText(frame, f"手势: {current_gesture}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
cv2.putText(frame, f"时间: {frame_idx/fps:.1f}s", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
cv2.putText(frame, "手势测试视频", (10, height - 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
out.write(frame)
if frame_idx % (fps * 2) == 0:
print(f"进度: {frame_idx/total_frames*100:.1f}% - 当前手势: {current_gesture}")
out.release()
print(f"✅ 手势测试视频创建完成: {output_path}")
def main():
parser = argparse.ArgumentParser(description='创建手部检测测试视频')
parser.add_argument('--type', choices=['basic', 'gesture', 'both'], default='both',
help='视频类型 (basic: 基础移动, gesture: 手势, both: 两种都创建)')
parser.add_argument('--duration', type=int, default=10, help='视频时长(秒)')
parser.add_argument('--fps', type=int, default=30, help='帧率')
parser.add_argument('--output-dir', default='data/videos', help='输出目录')
args = parser.parse_args()
# 确保输出目录存在
os.makedirs(args.output_dir, exist_ok=True)
if args.type in ['basic', 'both']:
basic_path = os.path.join(args.output_dir, 'test_basic.mp4')
create_test_video(basic_path, args.duration, args.fps)
if args.type in ['gesture', 'both']:
gesture_path = os.path.join(args.output_dir, 'test_gesture.mp4')
create_gesture_test_video(gesture_path, args.duration + 5, args.fps)
print("\n🎉 测试视频创建完成!")
print("\n📁 创建的文件:")
for file in os.listdir(args.output_dir):
if file.endswith('.mp4'):
file_path = os.path.join(args.output_dir, file)
size = os.path.getsize(file_path) / (1024 * 1024) # MB
print(f" {file} ({size:.1f} MB)")
print("\n🚀 使用方法:")
print(" ./start_service.sh --test-video data/videos/test_basic.mp4")
print(" ./start_service.sh --test-video data/videos/test_gesture.mp4")
if __name__ == '__main__':
main()

Binary file not shown.

Binary file not shown.

BIN
data/videos/test_basic.mp4 Normal file

Binary file not shown.

Binary file not shown.

23
requirements.txt Normal file
View File

@ -0,0 +1,23 @@
# 3D手部检测Web服务系统依赖包
# 计算机视觉和图像处理
opencv-python>=4.8.0
numpy<2
# MediaPipe手部检测
mediapipe>=0.10.0
# Web服务框架
flask>=2.3.0
flask-socketio>=5.3.0
python-socketio>=5.8.0
# 异步处理
websockets>=11.0
# 图像编码和传输
pillow>=10.0.0
# 可选:数据分析和可视化
# matplotlib>=3.5.0 # 用于数据可视化
# scipy>=1.7.0 # 用于信号处理

15
run_web_service.py Executable file
View File

@ -0,0 +1,15 @@
#!/usr/bin/env python3
"""
启动手部检测Web服务
"""
import sys
import os
# 添加src目录到Python路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
from web_server import main
if __name__ == '__main__':
main()

281
src/ZDT_Servo.py Executable file
View File

@ -0,0 +1,281 @@
import serial
import struct
import time
from enum import Enum
class ChecksumType(Enum):
"""校验方式枚举"""
FIXED_0x6B = 0x6B # 固定校验字节
XOR = 1 # XOR校验
CRC8 = 2 # CRC-8校验需额外实现
class Emm42Driver:
"""Emm42_V5.0闭环步进电机驱动库(串口控制)"""
def __init__(self, port, baudrate=115200, timeout=0.1, device_id=1, checksum_type=ChecksumType.FIXED_0x6B):
"""
初始化串口通信
:param port: 串口号"COM3""/dev/ttyUSB0"
:param baudrate: 波特率默认115200
:param timeout: 读取超时时间
:param device_id: 电机地址1-255默认1
:param checksum_type: 校验方式默认0x6B固定校验
"""
self.ser = serial.Serial(port, baudrate, timeout=timeout)
self.device_id = device_id
self.checksum_type = checksum_type
if not self.ser.is_open:
self.ser.open()
def close(self):
"""关闭串口"""
if self.ser.is_open:
self.ser.close()
def _calculate_checksum(self, data):
"""计算校验和"""
if self.checksum_type == ChecksumType.FIXED_0x6B:
return 0x6B
elif self.checksum_type == ChecksumType.XOR:
checksum = 0
for b in data:
checksum ^= b
return checksum & 0xFF
elif self.checksum_type == ChecksumType.CRC8:
# 如需使用CRC8需补充CRC8计算逻辑
raise NotImplementedError("CRC8校验未实现")
return 0x6B
def _send_command(self, func_code, params=None):
"""
发送命令帧
:param func_code: 功能码参考协议命令列表
:param params: 参数列表字节数组
:return: 发送的字节串
"""
params = params or []
# 构建命令帧:地址 + 功能码 + 参数 + 校验和
frame = [self.device_id, func_code] + params
checksum = self._calculate_checksum(frame)
frame.append(checksum)
# 发送数据
self.ser.write(bytearray(frame))
return bytearray(frame)
def _read_response(self, expected_func=None, timeout=0.1):
"""
读取电机返回数据
:param expected_func: 期望的功能码用于验证
:param timeout: 超时时间
:return: 解析后的响应字典含地址功能码参数失败返回None
"""
start_time = time.time()
while time.time() - start_time < timeout:
if self.ser.in_waiting < 3: # 最小响应长度:地址(1) + 功能码(1) + 校验(1)
time.sleep(0.001)
continue
# 读取地址和功能码
addr = self.ser.read(1)[0]
func = self.ser.read(1)[0]
# 验证地址和功能码
if addr != self.device_id:
continue
if expected_func is not None and func != expected_func:
continue
# 读取参数和校验和(参数长度需根据具体命令判断,此处简化处理)
params = []
while self.ser.in_waiting > 1: # 保留最后1字节作为校验和
params.append(self.ser.read(1)[0])
checksum = self.ser.read(1)[0]
# 验证校验和
verify_data = [addr, func] + params
if self._calculate_checksum(verify_data) != checksum:
continue
return {
"addr": addr,
"func": func,
"params": bytes(params)
}
return None
# ------------------------------ 基础控制指令 ------------------------------
def enable_motor(self, enable=True, sync=False):
"""
电机使能控制
:param enable: True=使能False=禁用
:param sync: 多机同步标志False=不启用True=启用
:return: 命令状态0x02=成功其他=失败
"""
# 命令格式:地址 + 0xF3 + 0xAB + 使能状态 + 同步标志 + 校验
params = [0xAB, 0x01 if enable else 0x00, 0x01 if sync else 0x00]
self._send_command(0xF3, params)
resp = self._read_response(expected_func=0xF3)
return resp["params"][0] if resp else None
def stop_immediately(self, sync=False):
"""立即停止电机"""
# 命令格式:地址 + 0xFE + 0x98 + 同步标志 + 校验
params = [0x98, 0x01 if sync else 0x00]
self._send_command(0xFE, params)
resp = self._read_response(expected_func=0xFE)
return resp["params"][0] if resp else None
# ------------------------------ 速度模式控制 ------------------------------
def set_speed_mode(self, direction, speed_rpm, acceleration=0, sync=False):
"""
速度模式控制
:param direction: 方向0=CW顺时针1=CCW逆时针
:param speed_rpm: 转速0.1-3000 RPMS_Vel_IS使能时支持0.1精度
:param acceleration: 加速度档位0-2550=无加减速
:param sync: 多机同步标志
:return: 命令状态0x02=成功
"""
# 速度转换为16位无符号整数0x0000-0xFFFF
speed_hex = struct.pack(">H", int(speed_rpm * 10)) if acceleration else struct.pack(">H", int(speed_rpm))
params = [
direction,
speed_hex[0], speed_hex[1], # 速度高低位
acceleration & 0xFF, # 加速度档位
0x01 if sync else 0x00 # 同步标志
]
self._send_command(0xF6, params)
resp = self._read_response(expected_func=0xF6)
return resp["params"][0] if resp else None
# ------------------------------ 位置模式控制 ------------------------------
def set_position_mode(self, direction, speed_rpm, pulses, is_absolute=False, acceleration=0, sync=False):
"""
位置模式控制
:param direction: 方向0=CW1=CCW
:param speed_rpm: 转速0.1-3000 RPM
:param pulses: 脉冲数根据细分计算16细分下3200脉冲=1
:param is_absolute: 是否绝对位置False=相对True=绝对
:param acceleration: 加速度档位0-255
:param sync: 多机同步标志
:return: 命令状态0x02=成功
"""
# 速度和脉冲数转换为16/32位无符号整数
speed_hex = struct.pack(">H", int(speed_rpm))
pulses_hex = struct.pack(">I", pulses) # 脉冲数为32位
params = [
direction,
speed_hex[0], speed_hex[1], # 速度高低位
acceleration & 0xFF, # 加速度档位
pulses_hex[0], pulses_hex[1], pulses_hex[2], pulses_hex[3], # 脉冲数
0x01 if is_absolute else 0x00, # 绝对/相对标志
0x01 if sync else 0x00 # 同步标志
]
self._send_command(0xFD, params)
resp = self._read_response(expected_func=0xFD)
return resp["params"][0] if resp else None
# ------------------------------ 原点回零指令 ------------------------------
def trigger_homing(self, mode, sync=False):
"""
触发回零操作
:param mode: 回零模式0=单圈就近1=单圈方向2=多圈无限位3=多圈有限位
:param sync: 多机同步标志
:return: 命令状态0x02=成功
"""
# 命令格式:地址 + 0x9A + 回零模式 + 同步标志 + 校验
params = [mode & 0xFF, 0x01 if sync else 0x00]
self._send_command(0x9A, params)
resp = self._read_response(expected_func=0x9A)
return resp["params"][0] if resp else None
# ------------------------------ 状态读取指令 ------------------------------
def read_bus_voltage(self):
"""读取总线电压mV"""
self._send_command(0x24) # 功能码0x24对应读取总线电压
resp = self._read_response(expected_func=0x24)
if resp and len(resp["params"]) == 2:
voltage = struct.unpack(">H", resp["params"])[0] # 16位无符号整数
return voltage / 1000.0 # 转换为V
return None
def read_realtime_speed(self):
"""读取实时转速RPM"""
self._send_command(0x35) # 功能码0x35对应读取转速
resp = self._read_response(expected_func=0x35)
if resp and len(resp["params"]) == 3:
sign = -1 if resp["params"][0] == 0x01 else 1 # 0x01=负0x00=正
speed = struct.unpack(">H", resp["params"][1:3])[0]
return sign * speed
return None
def read_position(self):
"""读取实时位置(脉冲数)"""
self._send_command(0x36) # 功能码0x36对应读取位置
resp = self._read_response(expected_func=0x36)
if resp and len(resp["params"]) == 5:
sign = -1 if resp["params"][0] == 0x01 else 1
position = struct.unpack(">I", resp["params"][1:5])[0]
return sign * position
return None
# ------------------------------ 配置参数指令 ------------------------------
def set_microstep(self, microstep, save=False):
"""
设置细分1-256
:param microstep: 细分值0=2561=12=2...255=255
:param save: 是否保存True=掉电保存
:return: 命令状态0x02=成功
"""
# 命令格式:地址 + 0x84 + 0x8A + 存储标志 + 细分值 + 校验
params = [0x8A, 0x01 if save else 0x00, microstep & 0xFF]
self._send_command(0x84, params)
resp = self._read_response(expected_func=0x84)
return resp["params"][0] if resp else None
def restore_factory_settings(self):
"""恢复出厂设置(需重新上电校准)"""
self._send_command(0x0F, [0x5F]) # 功能码0x0F + 参数0x5F
resp = self._read_response(expected_func=0x0F)
return resp["params"][0] if resp else None
#使用示例
if __name__ == "__main__":
# 初始化电机驱动(替换为实际串口号)
motor = Emm42Driver(port="COM3", device_id=1, baudrate=115200)
try:
# 使能电机
if motor.enable_motor(enable=True) == 0x02:
print("电机使能成功")
# 速度模式控制逆时针旋转1000RPM加速度档位5
motor.set_speed_mode(direction=1, speed_rpm=1000, acceleration=5)
time.sleep(2) # 运行2秒
# 立即停止
motor.stop_immediately()
time.sleep(1)
# 位置模式控制顺时针旋转500RPM相对位置3200脉冲16细分下1圈
motor.set_position_mode(
direction=0,
speed_rpm=500,
pulses=3200,
is_absolute=False,
acceleration=3
)
time.sleep(2)
# 读取状态
voltage = motor.read_bus_voltage()
speed = motor.read_realtime_speed()
position = motor.read_position()
print(f"总线电压:{voltage}V实时转速{speed}RPM当前位置{position}脉冲")
# 触发单圈就近回零
motor.trigger_homing(mode=0)
time.sleep(3)
finally:
# 禁用电机并关闭串口
motor.enable_motor(enable=False)
motor.close()

1
src/__init__.py Normal file
View File

@ -0,0 +1 @@
# 3D手部检测Web服务系统

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

68
src/arm.py Normal file
View File

@ -0,0 +1,68 @@
from ZDT_Servo import Emm42Driver
class Arm:
def __init__(self, port='/dev/ttyAMA5', baudrate=115200):
# 初始化3个电机
self.motors = {}
for i in range(1, 4):
try:
self.motors[i] = Emm42Driver(port=port, device_id=i, baudrate=baudrate)
except Exception as e:
self.motors[i] = None
print(f"电机{i}初始化失败: {e}")
def set_angles(self, a1, a2, a3, speed_rpm=1000, acceleration=20):
# 限位
a1 = max(-120, min(120, a1))
a2 = min(120, a2)
a3 = max(-155, min(0, a3))
# 角度转脉冲id1允许负脉冲
raw1 = 6500 * (a1 / 120)
if raw1 >= 0:
pulse1 = int(raw1)
dir1 = 1
else:
pulse1 = int(-raw1)
dir1 = 0
pulse2 = int(9000 * (120 - a2) / 120)
pulse3 = int(6300 * (a3 + 155) / 90)
for node_id, pulse, direction in zip([1, 2, 3], [pulse1, pulse2, pulse3], [dir1, 1, 1]):
m = self.motors.get(node_id)
if m:
try:
m.set_position_mode(direction=direction, speed_rpm=speed_rpm, pulses=pulse, is_absolute=True, acceleration=acceleration)
except Exception as e:
print(f"ID{node_id}异常:{e}")
if __name__ == "__main__":
import tkinter as tk
arm = Arm()
# 初始角度
angles = [0, 120, -155]
step = [2, 2, 2] # 步长
def send():
arm.set_angles(*angles)
def on_key(event):
k = event.keysym.lower()
if k == 'q':
angles[0] += step[0]
elif k == 'a':
angles[0] -= step[0]
elif k == 'w':
angles[1] += step[1]
elif k == 's':
angles[1] -= step[1]
elif k == 'e':
angles[2] += step[2]
elif k == 'd':
angles[2] -= step[2]
send()
print(f"当前角度: {angles}")
root = tk.Tk()
root.title("三轴机械臂键盘控制示例")
root.geometry("300x100")
label = tk.Label(root, text="Q/A:轴1 W/S:轴2 E/D:轴3")
label.pack()
root.bind('<KeyPress>', on_key)
send()
root.mainloop()

342
src/arm3d_drag_demo.py Normal file
View File

@ -0,0 +1,342 @@
# id 1 120 6500
# id 2 120 9000
# id 3 90 6300
import tkinter as tk
from tkinter import ttk
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
import numpy as np
from mpl_toolkits.mplot3d import proj3d
from ZDT_Servo import Emm42Driver
class Arm3DVisualizer(tk.Tk):
def _init_send_queue(self):
import threading, queue
self._send_queue = queue.Queue()
self._send_thread_stop = False
def send_loop():
while not self._send_thread_stop:
try:
args = self._send_queue.get(timeout=0.005)
except Exception:
continue
# 非阻塞发送
self._send_to_motor_nowait(*args)
t = threading.Thread(target=send_loop, daemon=True)
t.start()
self._send_thread = t
def _send_to_motor_nowait(self, a1, a2, a3):
# id1允许负脉冲需转换为正脉冲+方向
raw1 = 6500 * (a1 / 120)
if raw1 >= 0:
pulse1 = int(raw1)
dir1 = 1 # 正方向
else:
pulse1 = int(-raw1)
dir1 = 0 # 负方向
pulse2 = int(9000 * (120 - a2) / 120)
pulse3 = int(6300 * (a3 + 155) / 90)
for node_id, pulse, direction in zip([1, 2, 3], [pulse1, pulse2, pulse3], [dir1, 1, 1]):
m = self.motors.get(node_id)
if m:
try:
m.set_position_mode(direction=direction, speed_rpm=1000, pulses=pulse, is_absolute=True, acceleration=10)
except Exception as e:
print(f"ID{node_id}异常:{e}")
def _start_motion_thread(self):
import threading, time
self._last_sent_angles = list(self.joint_angles)
self._motion_thread_stop = False
self._motion_accum_count = 0
self._motion_last_time = time.time()
def motion_loop():
DEADZONE = 5 # 死区阈值,单位:度
SEND_INTERVAL = 0.06 # 最小发送间隔,单位:秒
while not self._motion_thread_stop:
# 死区判定每轴变化超过0.5度才算变化
changed = any(abs(a-b)>DEADZONE for a,b in zip(self.joint_angles, self._last_sent_angles))
now = time.time()
# 满足变化且距离上次发送超过60ms才发送
if changed and (now - self._motion_last_time) > SEND_INTERVAL:
self._motion_last_time = now
self._last_sent_angles = list(self.joint_angles)
self.send_to_motor()
time.sleep(0.01)
t = threading.Thread(target=motion_loop, daemon=True)
t.start()
self._motion_thread = t
def _init_key_state(self):
self._key_state = {'w':False,'s':False,'a':False,'d':False,'r':False,'f':False}
self._key_step_xy = 10 # 步长
self._key_step_z = 10
self.bind_all('<KeyPress>', self._on_key_press)
self.bind_all('<KeyRelease>', self._on_key_release)
self._key_move_loop()
def _on_key_press(self, event):
k = event.keysym.lower()
if k in self._key_state:
self._key_state[k] = True
def _on_key_release(self, event):
k = event.keysym.lower()
if k in self._key_state:
self._key_state[k] = False
def _key_move_loop(self):
# 按住键平滑移动
if self.pt3d is not None:
x, y, z = self.pt3d
moved = False
if self._key_state['w']:
y += self._key_step_xy; moved = True
if self._key_state['s']:
y -= self._key_step_xy; moved = True
if self._key_state['a']:
x -= self._key_step_xy; moved = True
if self._key_state['d']:
x += self._key_step_xy; moved = True
if self._key_state['r']:
z += self._key_step_z; moved = True
if self._key_state['f']:
z -= self._key_step_z; moved = True
if moved:
self.inverse_kinematics_and_update(x, y, z)
self.status_label.config(text=f"目标位置: x={x:.1f}, y={y:.1f}, z={z:.1f}")
self.after(20, self._key_move_loop)
def _init_motors(self):
# 初始化3个Emm42Driver实例
self.motors = {}
for i in range(1, 4):
try:
self.motors[i] = Emm42Driver(port='/dev/ttyAMA5', device_id=i, baudrate=115200)
except Exception as e:
self.motors[i] = None
print(f"电机{i}初始化失败: {e}")
def __init__(self):
super().__init__()
self._init_send_queue() # 必须最先初始化
self.title("三轴机械臂三维可视化与拖拽演示")
self.geometry("700x600")
# 机械臂参数
self.arm_params = {'L1': 185, 'L2': 230, 'L3': 245}
# 初始角度
self.joint_angles = [0, 120, -155]
# 末端位置
self.pt3d = None
self.init_ui()
self._init_motors()
self.update_arm_plot()
self._init_key_state()
self._start_motion_thread()
def init_ui(self):
# 主体左右布局
main_frame = ttk.Frame(self)
main_frame.pack(fill='both', expand=True)
# 左侧3D
left_frame = ttk.Frame(main_frame)
left_frame.pack(side=tk.LEFT, fill='both', expand=True)
fig = Figure(figsize=(5, 5), dpi=100)
self.ax = fig.add_subplot(111, projection='3d')
self.canvas = FigureCanvasTkAgg(fig, master=left_frame)
self.canvas.get_tk_widget().pack(fill='both', expand=True)
# 右侧xy+z
right_frame = ttk.Frame(main_frame)
right_frame.pack(side=tk.LEFT, fill='y', padx=5, pady=5)
# xy平面正方形
from matplotlib.figure import Figure as Fig2D
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg as Canvas2D
self.xy_fig = Fig2D(figsize=(3,3), dpi=80)
self.xy_ax = self.xy_fig.add_subplot(111)
self.xy_canvas = Canvas2D(self.xy_fig, master=right_frame)
self.xy_canvas.get_tk_widget().pack(side=tk.TOP, fill='both', expand=True)
self.xy_canvas.get_tk_widget().configure(width=240, height=240)
self.xy_drag_point = None
self.xy_dragging = False
self.xy_canvas.mpl_connect('button_press_event', self._on_xy_press)
self.xy_canvas.mpl_connect('motion_notify_event', self._on_xy_motion)
self.xy_canvas.mpl_connect('button_release_event', self._on_xy_release)
# z轴竖直滑条
z_frame = ttk.Frame(right_frame)
z_frame.pack(side=tk.TOP, fill='y', pady=10)
ttk.Label(z_frame, text='Z轴:').pack(side=tk.TOP)
zmin = self.arm_params['L1']
zmax = self.arm_params['L1']+self.arm_params['L2']+self.arm_params['L3']
self.z_var = tk.DoubleVar()
self.z_scale = ttk.Scale(z_frame, from_=zmax, to=zmin, orient=tk.VERTICAL, variable=self.z_var, command=self._on_z_change, length=200)
self.z_scale.pack(side=tk.TOP, fill='y', expand=True)
# 角度显示
angle_frame = ttk.Frame(self)
angle_frame.pack(fill='x', padx=5, pady=5)
self.angle_vars = []
for i in range(3):
ttk.Label(angle_frame, text=f"关节{i+1}角度(°):").pack(side=tk.LEFT)
var = tk.DoubleVar(value=self.joint_angles[i])
self.angle_vars.append(var)
entry = ttk.Entry(angle_frame, textvariable=var, width=6)
entry.pack(side=tk.LEFT, padx=2)
ttk.Button(angle_frame, text="刷新姿态", command=self.on_update_angles).pack(side=tk.LEFT, padx=10)
# 发送到电机按钮
ttk.Button(angle_frame, text="发送到电机", command=self.send_to_motor).pack(side=tk.LEFT, padx=10)
self.status_label = ttk.Label(self, text="键盘控制WSAD前后左右R/F上下")
self.status_label.pack(fill='x')
# 脉冲显示
self.pulse_label = ttk.Label(self, text="")
self.pulse_label.pack(fill='x')
# 初始点居中
if self.pt3d is None:
self.pt3d = [0, 0, (zmin+zmax)/2]
self.z_var.set(self.pt3d[2])
self._draw_xy_plane()
def _draw_xy_plane(self):
# 画xy平面和当前点保持正方形
self.xy_ax.clear()
L = self.arm_params['L2']+self.arm_params['L3']
self.xy_ax.set_xlim(-L, L)
self.xy_ax.set_ylim(-L, L)
self.xy_ax.set_aspect('equal', adjustable='box')
self.xy_ax.set_xlabel('X (mm)')
self.xy_ax.set_ylabel('Y (mm)')
if self.pt3d is not None:
x, y = self.pt3d[0], self.pt3d[1]
self.xy_drag_point = self.xy_ax.plot(x, y, 'ro', markersize=10, picker=5)[0]
self.xy_ax.grid(True)
self.xy_canvas.draw()
def _on_xy_press(self, event):
if event.inaxes != self.xy_ax: return
if self.pt3d is None: return
x, y = self.pt3d[0], self.pt3d[1]
L = self.arm_params['L2']+self.arm_params['L3']
if abs(event.xdata - x) < L*0.08 and abs(event.ydata - y) < L*0.08:
self.xy_dragging = True
def _on_xy_motion(self, event):
if not self.xy_dragging or event.inaxes != self.xy_ax: return
z = self.z_var.get() if hasattr(self, 'z_var') else self.pt3d[2]
self.inverse_kinematics_and_update(event.xdata, event.ydata, z)
self.z_var.set(z)
self._draw_xy_plane()
def _on_xy_release(self, event):
self.xy_dragging = False
def _on_z_change(self, val):
if self.pt3d is not None:
x, y = self.pt3d[0], self.pt3d[1]
z = float(val)
self.inverse_kinematics_and_update(x, y, z)
self._draw_xy_plane()
def send_to_motor(self):
# 只将当前角度放入队列,不等待
self._send_queue.put(tuple(self.joint_angles))
def destroy(self):
# 停止后台线程
self._motion_thread_stop = True
self._send_thread_stop = True
super().destroy()
def on_update_angles(self):
try:
# 限位
angles = [var.get() for var in self.angle_vars]
angles[1] = min(120, angles[1])
angles[2] = max(-155, min(0, angles[2]))
self.joint_angles = angles
for i in range(3):
self.angle_vars[i].set(self.joint_angles[i])
self.update_arm_plot()
self.update_pulse_label()
except Exception as e:
self.status_label.config(text=f"角度输入错误: {e}")
def forward_kinematics(self, theta1, theta2, theta3):
L1, L2, L3 = self.arm_params['L1'], self.arm_params['L2'], self.arm_params['L3']
theta1 = np.deg2rad(theta1)
theta2 = np.deg2rad(theta2)
theta3 = np.deg2rad(theta3)
x0, y0, z0 = 0, 0, 0
x1, y1, z1 = 0, 0, L1
x2 = L2 * np.cos(theta2) * np.cos(theta1)
y2 = L2 * np.cos(theta2) * np.sin(theta1)
z2 = L1 + L2 * np.sin(theta2)
x3 = x2 + L3 * np.cos(theta2 + theta3) * np.cos(theta1)
y3 = y2 + L3 * np.cos(theta2 + theta3) * np.sin(theta1)
z3 = z2 + L3 * np.sin(theta2 + theta3)
return np.array([[x0, y0, z0], [x1, y1, z1], [x2, y2, z2], [x3, y3, z3]])
def update_arm_plot(self):
pts = self.forward_kinematics(*self.joint_angles)
# 保留当前视角
elev = self.ax.elev
azim = self.ax.azim
self.ax.clear()
self.ax.plot(pts[:,0], pts[:,1], pts[:,2], '-o', linewidth=3, markersize=8, color='b')
self.ax.scatter(pts[3,0], pts[3,1], pts[3,2], s=120, c='r', picker=5)
self.ax.set_xlim(-self.arm_params['L2']-self.arm_params['L3'], self.arm_params['L2']+self.arm_params['L3'])
self.ax.set_ylim(-self.arm_params['L2']-self.arm_params['L3'], self.arm_params['L2']+self.arm_params['L3'])
self.ax.set_zlim(0, self.arm_params['L1']+self.arm_params['L2']+self.arm_params['L3'])
self.ax.set_xlabel('X (mm)')
self.ax.set_ylabel('Y (mm)')
self.ax.set_zlabel('Z (mm)')
self.ax.set_title('三轴机械臂三维可视化')
self.ax.view_init(elev=elev, azim=azim)
self.canvas.draw()
self.pt3d = pts[3]
self.update_pulse_label()
def update_pulse_label(self):
# 角度转脉冲
a1, a2, a3 = self.joint_angles
# id1: 120度=6500负角度为正脉冲
pulse1 = int(6500 * (a1 / 120))
# id2: 120度=9000120度为0脉冲增加角度减小
pulse2 = int(9000 * (120 - a2) / 120)
# id3: -155度=0-65度=6300脉冲增加角度增加
# 线性关系: pulse = (a3 + 155) / ( -65 + 155 ) * 6300, 但-65-(-155)=90
pulse3 = int(6300 * (a3 + 155) / 90)
self.pulse_label.config(text=f"脉冲值: ID1={pulse1} ID2={pulse2} ID3={pulse3}")
# 自动发送到电机
# self.send_to_motor()
# 键盘事件已由_key_move_loop和_key_state管理
def destroy(self):
# 停止后台线程
self._motion_thread_stop = True
super().destroy()
def inverse_kinematics_and_update(self, x, y, z):
L1, L2, L3 = self.arm_params['L1'], self.arm_params['L2'], self.arm_params['L3']
theta1 = np.arctan2(y, x)
r = np.hypot(x, y)
dz = z - L1
D = (r**2 + dz**2 - L2**2 - L3**2) / (2*L2*L3)
if np.abs(D) > 1:
self.status_label.config(text="超出机械臂工作空间")
return
theta3 = np.arctan2(-np.sqrt(1-D**2), D) # 肘下解
theta2 = np.arctan2(dz, r) - np.arctan2(L3*np.sin(theta3), L2+L3*np.cos(theta3))
# 限位
theta2_deg = min(120, np.rad2deg(theta2))
theta3_deg = max(-155, min(0, np.rad2deg(theta3)))
self.joint_angles = [np.rad2deg(theta1), theta2_deg, theta3_deg]
for i in range(3):
self.angle_vars[i].set(self.joint_angles[i])
self.update_arm_plot()
self.update_pulse_label()
if __name__ == "__main__":
app = Arm3DVisualizer()
app.mainloop()

316
src/hand_detection_3d.py Normal file
View File

@ -0,0 +1,316 @@
import cv2
import numpy as np
import math
import mediapipe as mp
# 初始化MediaPipe Hands
mp_hands = mp.solutions.hands
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles
# 加载MediaPipe手部检测模型
def load_mediapipe_model(max_num_hands=1, min_detection_confidence=0.5, min_tracking_confidence=0.5):
"""
加载MediaPipe手部检测模型
参数:
max_num_hands: 最大检测手的数量
min_detection_confidence: 最小检测置信度
min_tracking_confidence: 最小跟踪置信度
返回:
MediaPipe Hands对象
"""
hands = mp_hands.Hands(
static_image_mode=False,
max_num_hands=max_num_hands,
min_detection_confidence=min_detection_confidence,
min_tracking_confidence=min_tracking_confidence
)
return hands
# 处理帧并返回三轴控制信号
def process_frame_3d(frame, hands_model, prev_hand_data=None):
"""
处理视频帧检测手部并计算三轴控制信号
参数:
frame: 输入的BGR格式帧
hands_model: MediaPipe Hands模型
prev_hand_data: 前一帧的手部数据
返回:
三轴控制信号字典和当前手部数据
"""
# 获取帧尺寸
frame_height, frame_width = frame.shape[:2]
# 初始化三轴控制信号
control_signal = {
"x_angle": 90, # 水平角度 (左右)
"y_angle": 90, # 垂直角度 (上下)
"z_angle": 90, # 深度角度 (前后)
"grip": 0, # 抓取状态 (0=松开, 1=抓取)
"action": "none", # 当前动作
"speed": 5 # 移动速度 (1-10)
}
# MediaPipe需要RGB格式的图像
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# 处理图像
results = hands_model.process(rgb_frame)
# 如果没有检测到手,返回默认控制信号和前一帧数据
if not results.multi_hand_landmarks:
return control_signal, prev_hand_data
# 获取第一只手的关键点
hand_landmarks = results.multi_hand_landmarks[0]
# 计算手部中心点 (使用所有关键点的平均位置)
center_x, center_y, center_z = 0, 0, 0
for landmark in hand_landmarks.landmark:
center_x += landmark.x
center_y += landmark.y
center_z += landmark.z
num_landmarks = len(hand_landmarks.landmark)
center_x /= num_landmarks
center_y /= num_landmarks
center_z /= num_landmarks
# 转换为像素坐标
hand_center = {
'x': center_x * frame_width,
'y': center_y * frame_height,
'z': center_z # 保留归一化的z值
}
# 获取更精确的手部姿态信息
wrist = hand_landmarks.landmark[mp_hands.HandLandmark.WRIST]
index_finger_tip = hand_landmarks.landmark[mp_hands.HandLandmark.INDEX_FINGER_TIP]
pinky_tip = hand_landmarks.landmark[mp_hands.HandLandmark.PINKY_TIP]
thumb_tip = hand_landmarks.landmark[mp_hands.HandLandmark.THUMB_TIP]
# 计算手部方向向量 (从手腕到手中心)
direction_vector = {
'x': center_x - wrist.x,
'y': center_y - wrist.y,
'z': center_z - wrist.z
}
# 计算屏幕中心点
frame_center = {
'x': frame_width / 2,
'y': frame_height / 2,
'z': 0 # 参考点
}
# 计算手部相对于屏幕中心的位移
dx = hand_center['x'] - frame_center['x']
dy = hand_center['y'] - frame_center['y']
dz = hand_center['z'] * 10 # 缩放z值使其更明显
# 转换为机械臂的三个轴的角度
# X轴角度 (左右移动) - 范围0-180度中间是90度
x_angle = map_to_angle(dx, frame_width / 2)
# Y轴角度 (上下移动) - 范围0-180度中间是90度
y_angle = map_to_angle(-dy, frame_height / 2) # 注意负号图像y轴向下但机械臂y轴向上
# Z轴角度 (前后移动) - 范围0-180度中间是90度
# Z值需要正确映射这里我们假设z值范围在-0.5至0.5之间
# 越靠近摄像头z值越小更负越远离摄像头z值越大更正
z_angle = 90 + (dz * 90) # 将z值映射到0-180范围
z_angle = max(0, min(180, z_angle)) # 确保在有效范围内
# 检测抓取动作 (拇指和食指距离)
pinch_distance = calculate_3d_distance(
thumb_tip.x, thumb_tip.y, thumb_tip.z,
index_finger_tip.x, index_finger_tip.y, index_finger_tip.z
)
# 如果拇指和食指距离小于阈值,认为是抓取状态
grip = 1 if pinch_distance < 0.05 else 0
# 检测手部动作 (如果有前一帧数据)
action = "none"
speed = 5 # 默认中等速度
if prev_hand_data:
# 计算移动向量
move_x = hand_center['x'] - prev_hand_data['x']
move_y = hand_center['y'] - prev_hand_data['y']
move_z = hand_center['z'] - prev_hand_data['z']
# 计算移动距离
move_distance = math.sqrt(move_x**2 + move_y**2 + move_z**2)
# 如果移动足够大,确定主要移动方向
if move_distance > 0.01:
# 计算移动速度 (1-10范围)
speed = min(10, max(1, int(move_distance * 200)))
# 确定主导移动方向
abs_move = [abs(move_x), abs(move_y), abs(move_z)]
max_move = max(abs_move)
if max_move == abs_move[0]: # X轴移动
action = "right" if move_x > 0 else "left"
elif max_move == abs_move[1]: # Y轴移动
action = "down" if move_y > 0 else "up"
else: # Z轴移动
action = "forward" if move_z < 0 else "backward"
# 更新控制信号
control_signal = {
"x_angle": x_angle,
"y_angle": y_angle,
"z_angle": z_angle,
"grip": grip,
"action": action,
"speed": speed
}
# 在帧上绘制手部关键点和连接线
mp_drawing.draw_landmarks(
frame,
hand_landmarks,
mp_hands.HAND_CONNECTIONS,
mp_drawing_styles.get_default_hand_landmarks_style(),
mp_drawing_styles.get_default_hand_connections_style()
)
# 绘制三轴控制指示器
draw_3d_control_visualization(frame, control_signal, hand_center)
return control_signal, hand_center
def calculate_3d_distance(x1, y1, z1, x2, y2, z2):
"""计算3D空间中两点之间的欧几里得距离"""
return math.sqrt((x2 - x1)**2 + (y2 - y1)**2 + (z2 - z1)**2)
def map_to_angle(value, max_value):
"""将-max_value到max_value范围内的值映射到0-180度范围的角度"""
# 计算相对位置 (-1到1)
relative_position = value / max_value
# 映射到角度 (90度为中点)
angle = 90 + (relative_position * 45) # 使用45度作为最大偏移量
# 确保角度在有效范围内
return max(0, min(180, angle))
def draw_3d_control_visualization(frame, control_signal, hand_center):
"""在帧上绘制三轴控制可视化"""
height, width = frame.shape[:2]
# 绘制坐标轴
origin_x, origin_y = width - 150, height - 150
axis_length = 100
# X轴 (红色)
cv2.line(frame, (origin_x, origin_y), (origin_x + axis_length, origin_y), (0, 0, 255), 2)
cv2.putText(frame, "X", (origin_x + axis_length + 5, origin_y + 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
# Y轴 (绿色)
cv2.line(frame, (origin_x, origin_y), (origin_x, origin_y - axis_length), (0, 255, 0), 2)
cv2.putText(frame, "Y", (origin_x - 15, origin_y - axis_length - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
# Z轴 (蓝色) - 以45度角向右上方
z_x = int(origin_x + axis_length * 0.7)
z_y = int(origin_y - axis_length * 0.7)
cv2.line(frame, (origin_x, origin_y), (z_x, z_y), (255, 0, 0), 2)
cv2.putText(frame, "Z", (z_x + 5, z_y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
# 绘制控制角度值
x_text = f"X: {control_signal['x_angle']:.1f}°"
y_text = f"Y: {control_signal['y_angle']:.1f}°"
z_text = f"Z: {control_signal['z_angle']:.1f}°"
grip_text = f"抓取: {'' if control_signal['grip'] == 0 else ''}"
action_text = f"动作: {control_signal['action']}"
speed_text = f"速度: {control_signal['speed']}"
cv2.putText(frame, x_text, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
cv2.putText(frame, y_text, (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
cv2.putText(frame, z_text, (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2)
cv2.putText(frame, grip_text, (10, 120), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 0), 2)
cv2.putText(frame, action_text, (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
cv2.putText(frame, speed_text, (10, 180), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 255), 2)
# 绘制手部位置到屏幕中心的连接线
if hand_center:
center_x, center_y = int(width/2), int(height/2)
hand_x, hand_y = int(hand_center['x']), int(hand_center['y'])
# 绘制屏幕中心点
cv2.circle(frame, (center_x, center_y), 5, (0, 0, 255), -1)
# 绘制手部位置点
cv2.circle(frame, (hand_x, hand_y), 10, (0, 255, 0), -1)
# 绘制连接线
cv2.line(frame, (center_x, center_y), (hand_x, hand_y), (255, 0, 0), 2)
def analyze_hand_gesture(hand_landmarks):
"""分析手势以确定机械臂控制模式"""
# 获取指尖和手掌关键点
thumb_tip = hand_landmarks.landmark[mp_hands.HandLandmark.THUMB_TIP]
index_tip = hand_landmarks.landmark[mp_hands.HandLandmark.INDEX_FINGER_TIP]
middle_tip = hand_landmarks.landmark[mp_hands.HandLandmark.MIDDLE_FINGER_TIP]
ring_tip = hand_landmarks.landmark[mp_hands.HandLandmark.RING_FINGER_TIP]
pinky_tip = hand_landmarks.landmark[mp_hands.HandLandmark.PINKY_TIP]
# 获取手掌底部关键点
wrist = hand_landmarks.landmark[mp_hands.HandLandmark.WRIST]
index_mcp = hand_landmarks.landmark[mp_hands.HandLandmark.INDEX_FINGER_MCP]
pinky_mcp = hand_landmarks.landmark[mp_hands.HandLandmark.PINKY_MCP]
# 检测拳头 - 所有手指弯曲
fist = (
thumb_tip.y > index_mcp.y and
index_tip.y > index_mcp.y and
middle_tip.y > index_mcp.y and
ring_tip.y > index_mcp.y and
pinky_tip.y > pinky_mcp.y
)
# 检测手掌打开 - 所有手指伸直
open_palm = (
thumb_tip.y < wrist.y and
index_tip.y < index_mcp.y and
middle_tip.y < index_mcp.y and
ring_tip.y < index_mcp.y and
pinky_tip.y < pinky_mcp.y
)
# 检测指向手势 - 食指伸出,其他手指弯曲
pointing = (
index_tip.y < index_mcp.y and
middle_tip.y > index_mcp.y and
ring_tip.y > index_mcp.y and
pinky_tip.y > pinky_mcp.y
)
# 检测"OK"手势 - 拇指和食指形成圆圈,其他手指伸出
pinch_distance = calculate_3d_distance(
thumb_tip.x, thumb_tip.y, thumb_tip.z,
index_tip.x, index_tip.y, index_tip.z
)
ok_gesture = (pinch_distance < 0.05 and
middle_tip.y < index_mcp.y and
ring_tip.y < index_mcp.y and
pinky_tip.y < pinky_mcp.y)
# 返回识别的手势
if fist:
return "fist"
elif open_palm:
return "open_palm"
elif pointing:
return "pointing"
elif ok_gesture:
return "ok"
else:
return "unknown"

168
src/process_video_3d.py Normal file
View File

@ -0,0 +1,168 @@
import cv2
import numpy as np
import time
import os
# 导入我们的3D手部检测模块
from src.hand_detection_3d import load_mediapipe_model, process_frame_3d
# 尝试导入 Picamera2如果不可用则使用标准OpenCV
try:
from picamera2 import Picamera2
PICAMERA_AVAILABLE = True
except ImportError:
PICAMERA_AVAILABLE = False
def process_camera_3d(camera_id=0, output_path=None, use_picamera=True):
"""
使用摄像头进行3D手部检测 - 简化版本
参数:
camera_id: 摄像头ID (默认为0)
output_path: 输出视频文件路径 (可选)
use_picamera: 是否优先使用Picamera2 (树莓派摄像头)
"""
# 加载MediaPipe手部检测模型
hands_model = load_mediapipe_model(max_num_hands=1)
# 初始化摄像头
cap = None
picam2 = None
if use_picamera and PICAMERA_AVAILABLE:
# 使用 Picamera2 (树莓派摄像头)
try:
print("正在初始化 Picamera2...")
picam2 = Picamera2()
video_config = picam2.create_video_configuration(main={"size": (640, 480)})
picam2.configure(video_config)
picam2.start()
print("✅ Picamera2 启动成功")
frame_width, frame_height = 640, 480
except Exception as e:
print(f"❌ Picamera2 启动失败: {e}")
print("⚠️ 回退到 OpenCV VideoCapture")
picam2 = None
use_picamera = False
if not use_picamera or not PICAMERA_AVAILABLE:
# 使用标准 OpenCV VideoCapture
cap = cv2.VideoCapture(camera_id)
if not cap.isOpened():
print(f"无法打开摄像头 ID: {camera_id}")
return
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = 30
# 创建视频写入器(如果需要保存)
video_writer = None
if output_path:
fourcc = cv2.VideoWriter_fourcc(*'XVID')
video_writer = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))
# 前一帧的手部数据
prev_hand_data = None
# 初始化性能计数器
frame_count = 0
start_time = time.time()
print("控制说明:")
print("- 移动手部显示控制信号")
print("- 拇指和食指捏合显示抓取状态")
print("- 按 Ctrl+C 退出")
try:
while True:
# 读取一帧
ret = False
frame = None
if picam2:
try:
frame = picam2.capture_array()
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
ret = True
except Exception as e:
print(f"Picamera2 读取帧失败: {e}")
break
else:
ret, frame = cap.read()
if not ret or frame is None:
break
frame_count += 1
# 处理帧并获取三轴控制信号
control_signal, prev_hand_data = process_frame_3d(frame, hands_model, prev_hand_data)
# 显示控制信号
if control_signal and frame_count % 30 == 0: # 每秒显示一次
print_control_signal(control_signal)
# 写入输出视频
if video_writer:
video_writer.write(frame)
except KeyboardInterrupt:
print("\n收到中断信号,正在退出...")
except Exception as e:
print(f"程序运行时出错: {e}")
finally:
print("正在清理资源...")
# 释放资源
if picam2:
try:
picam2.stop()
picam2.close()
except:
pass
if cap:
try:
cap.release()
except:
pass
if video_writer:
try:
video_writer.release()
print(f"✅ 视频已保存: {output_path}")
except:
pass
# 显示统计信息
elapsed_time = time.time() - start_time
if frame_count > 0:
avg_fps = frame_count / elapsed_time
print(f"📊 处理了 {frame_count}平均FPS: {avg_fps:.1f}")
print("✅ 程序已退出")
def print_control_signal(control_signal):
"""显示手部控制信号信息"""
print(f"手部控制信号:")
print(f" X轴角度: {control_signal['x_angle']:.1f}°")
print(f" Y轴角度: {control_signal['y_angle']:.1f}°")
print(f" Z轴角度: {control_signal['z_angle']:.1f}°")
print(f" 抓取状态: {'抓取' if control_signal['grip'] == 1 else '释放'}")
print(f" 动作: {control_signal['action']}")
print("-------------------------------")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='简化版3D手部检测')
parser.add_argument('--camera-id', '-i', type=int, default=0, help='摄像头ID (默认为0)')
parser.add_argument('--output', '-o', help='输出视频文件路径')
args = parser.parse_args()
process_camera_3d(
camera_id=args.camera_id,
output_path=args.output
)

380
src/robot_client.py Normal file
View File

@ -0,0 +1,380 @@
#!/usr/bin/env python3
"""
机械臂客户端
连接到手部检测服务器接收控制信号并控制机械臂
"""
import asyncio
import json
import logging
import time
from typing import Dict, Any, Optional
import socketio
from ZDT_Servo import Emm42Driver
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RobotArmClient:
def _start_watchdog(self, timeout=0.2):
import threading
self._watchdog_timeout = timeout
self._watchdog_timer = None
def stop_all_motors():
logger.warning("控制信号超时,自动停止所有电机!")
for m in self.motors.values():
if m:
try:
m.set_speed_mode(direction=True, speed_rpm=0, acceleration=5)
except Exception as e:
logger.error(f"电机停止异常: {e}")
self._watchdog_stop_all = stop_all_motors
def reset_timer():
if self._watchdog_timer:
self._watchdog_timer.cancel()
self._watchdog_timer = threading.Timer(self._watchdog_timeout, self._watchdog_stop_all)
self._watchdog_timer.daemon = True
self._watchdog_timer.start()
self._watchdog_reset = reset_timer
self._watchdog_reset()
def _init_motor_read_idx(self):
if not hasattr(self, '_motor_read_idx'):
self._motor_read_idx = 1
"""机械臂客户端"""
def __init__(self, server_url='http://localhost:5000'):
self.server_url = server_url
self.sio = socketio.Client()
self.is_connected = False
self.current_position = {
'x_angle': 90,
'y_angle': 90,
'z_angle': 90,
'grip': 0
}
# 初始化3个电机
self.motors = {}
# 初始化限位标志位 {电机ID: 是否达到限位}
self.limit_flags = {1: False, 2: False, 3: False}
for i in range(1, 4):
try:
self.motors[i] = Emm42Driver(port='/dev/ttyAMA5', device_id=i, baudrate=115200)
self.motors[i].enable_motor(enable=True)
except Exception as e:
self.motors[i] = None
print(f"电机{i}初始化失败: {e}")
# 启动看门狗定时器
self._start_watchdog(timeout=0.2)
self.setup_events()
def setup_events(self):
"""设置事件处理器"""
@self.sio.event
def connect():
"""连接成功"""
self.is_connected = True
logger.info("已连接到手部检测服务器")
# 注册为机械臂客户端
self.sio.emit('register_client', {
'type': 'robot'
})
@self.sio.event
def disconnect():
"""连接断开"""
self.is_connected = False
logger.info("与服务器断开连接")
@self.sio.event
def registration_success(data):
"""注册成功"""
logger.info(f"客户端注册成功: {data}")
@self.sio.event
def robot_control(data):
"""接收机械臂控制信号"""
try:
self.process_control_signal(data)
except Exception as e:
logger.error(f"处理控制信号时出错: {e}")
@self.sio.event
def error(data):
"""错误处理"""
logger.error(f"服务器错误: {data}")
def process_control_signal(self, control_signal: Dict[str, Any]):
# 重置看门狗定时器,防止超时自动停机
if hasattr(self, '_watchdog_reset'):
self._watchdog_reset()
"""处理控制信号"""
# logger.info(f"接收到控制信号: {control_signal}")
# 检查信号变化
x_changed = abs(control_signal['x_angle'] - self.current_position['x_angle']) > 1
y_changed = abs(control_signal['y_angle'] - self.current_position['y_angle']) > 1
z_changed = abs(control_signal['z_angle'] - self.current_position['z_angle']) > 1
grip_changed = control_signal['grip'] != self.current_position['grip']
# 如果有变化,执行相应动作
if x_changed or y_changed or z_changed:
self.move_arm(control_signal)
if grip_changed:
self.control_gripper(control_signal['grip'])
# 更新当前位置
self.current_position = {
'x_angle': control_signal['x_angle'],
'y_angle': control_signal['y_angle'],
'z_angle': control_signal['z_angle'],
'grip': control_signal['grip']
}
def move_arm(self, control_signal: Dict[str, Any]):
"""移动机械臂,支持左右和上下跟随(不再负责电机位置读取和限位)"""
x_angle = control_signal['x_angle']
y_angle = control_signal['y_angle']
z_angle = control_signal['z_angle']
speed = control_signal['speed']
logger.info(f"移动机械臂: X={x_angle:.1f}°, Y={y_angle:.1f}°, 速度={speed}")
m1 = self.motors.get(1)
m3 = self.motors.get(3)
# 1. 底座左右跟随 (电机1)
if not self.limit_flags[1]: # 只有未达到限位时才执行
base_angle = x_angle - 90
if base_angle > 0:
dir1 = True
else:
dir1 = False
base_angle = -base_angle
if m1:
try:
m1.set_speed_mode(direction=dir1, speed_rpm=base_angle * 0.1, acceleration=5)
except Exception as e:
print(f"ID1异常:{e}")
# 2. 小臂上下 (电机3)
if not self.limit_flags[3]: # 只有未达到限位时才执行
y_angle_cmd = y_angle - 90
if y_angle_cmd > 0:
dir3 = True
else:
dir3 = False
y_angle_cmd = -y_angle_cmd
if m3:
try:
m3.set_speed_mode(direction=dir3, speed_rpm=y_angle_cmd * 0.1, acceleration=5)
except Exception as e:
print(f"ID3异常:{e}")
def poll_motor_and_limit(self):
"""主循环调用:顺序读取电机位置并做限位保护"""
self._init_motor_read_idx()
m1 = self.motors.get(1)
m2 = self.motors.get(2)
m3 = self.motors.get(3)
try:
if self._motor_read_idx == 1:
pos1 = m1.read_position() if m1 else None
if pos1 is not None:
logger.info(f"电机1脉冲: {pos1}")
# 限位保护
if (pos1 >= 190000 or pos1 <= -190000):
logger.warning(f"电机1到达限位强制停止")
self.limit_flags[1] = True
try:
m1.set_speed_mode(direction=True, speed_rpm=0, acceleration=5)
except Exception:
pass
else:
self.limit_flags[1] = False
self._motor_read_idx = 2
elif self._motor_read_idx == 2:
pos2 = m2.read_position() if m2 else None
if pos2 is not None:
logger.info(f"电机2脉冲: {pos2}")
# 电机2限位保护
if (pos2 >= 190000 or pos2 <= -190000):
logger.warning(f"电机2到达限位强制停止")
self.limit_flags[2] = True
try:
m2.set_speed_mode(direction=True, speed_rpm=0, acceleration=5)
except Exception:
pass
else:
self.limit_flags[2] = False
# 可加电机2限位保护
self._motor_read_idx = 3
elif self._motor_read_idx == 3:
pos3 = m3.read_position() if m3 else None
logger.info(f"电机3脉冲: {pos3}")
# 限位保护示例
if pos3 is not None:
if pos3 >= 1000:
self.limit_flags[3] = True
logger.warning(f"电机3下限自动反向脱限")
try:
# 反向(向上)低速脱限
m3.set_speed_mode(direction=True, speed_rpm=2, acceleration=5)
except Exception:
pass
elif pos3 <= -180000:
self.limit_flags[3] = True
logger.warning(f"电机3上限自动反向脱限")
try:
# 反向(向下)低速脱限
m3.set_speed_mode(direction=False, speed_rpm=2, acceleration=5)
except Exception:
pass
else:
# 安全区间,正常可停
try:
self.limit_flags[3] = False
m3.set_speed_mode(direction=True, speed_rpm=0, acceleration=5)
except Exception:
pass
self._motor_read_idx = 1
except Exception as e:
logger.error(f"电机读取异常: {e}")
def control_gripper(self, grip_state: int):
"""控制抓取器"""
action = "抓取" if grip_state == 1 else "松开"
logger.info(f"控制抓取器: {action}")
# 这里应该是实际的抓取器控制代码
# 例如:
# self.gripper_controller.set_state(grip_state)
# 模拟控制延迟
# time.sleep(0.05)
def connect(self):
"""连接到服务器"""
try:
logger.info(f"正在连接到服务器: {self.server_url}")
self.sio.connect(self.server_url)
return True
except Exception as e:
logger.error(f"连接失败: {e}")
return False
def disconnect(self):
"""断开连接"""
if self.is_connected:
self.sio.disconnect()
def run(self):
"""运行客户端"""
if self.connect():
try:
# 保持连接
while self.is_connected:
# 主动轮询电机位置和限位保护
self.poll_motor_and_limit()
time.sleep(0.05)
# 发送心跳
if self.is_connected:
self.sio.emit('ping')
except KeyboardInterrupt:
logger.info("收到中断信号,正在退出...")
finally:
self.disconnect()
else:
logger.error("无法连接到服务器")
class MockRobotArmController:
"""模拟机械臂控制器"""
def __init__(self):
self.current_angles = [90, 90, 90] # X, Y, Z轴角度
self.gripper_state = 0 # 0=松开, 1=抓取
def move_to_angles(self, x_angle: float, y_angle: float, z_angle: float, speed: int):
"""移动到指定角度"""
logger.info(f"模拟移动: X={x_angle:.1f}°, Y={y_angle:.1f}°, Z={z_angle:.1f}°")
# 模拟平滑移动
target_angles = [x_angle, y_angle, z_angle]
for i in range(3):
step = (target_angles[i] - self.current_angles[i]) / 10
for _ in range(10):
self.current_angles[i] += step
time.sleep(0.01) # 模拟移动时间
self.current_angles = target_angles
logger.info(f"移动完成: {self.current_angles}")
def set_gripper_state(self, state: int):
"""设置抓取器状态"""
self.gripper_state = state
action = "抓取" if state == 1 else "松开"
logger.info(f"抓取器状态: {action}")
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description='机械臂客户端')
parser.add_argument('--server', default='http://localhost:5000', help='服务器地址')
parser.add_argument('--mock', action='store_true', help='使用模拟控制器')
args = parser.parse_args()
# 创建客户端
client = RobotArmClient(server_url=args.server)
# 如果使用模拟控制器
if args.mock:
mock_controller = MockRobotArmController()
# 重写控制方法以使用模拟控制器
original_move_arm = client.move_arm
original_control_gripper = client.control_gripper
def mock_move_arm(control_signal):
mock_controller.move_to_angles(
control_signal['x_angle'],
control_signal['y_angle'],
control_signal['z_angle'],
control_signal['speed']
)
def mock_control_gripper(grip_state):
mock_controller.set_gripper_state(grip_state)
client.move_arm = mock_move_arm
client.control_gripper = mock_control_gripper
# 运行客户端
client.run()
if __name__ == '__main__':
main()

72
src/test_motors.py Normal file
View File

@ -0,0 +1,72 @@
# -190000 190000
# -180000 0
# -210000 0
import time
from ZDT_Servo import Emm42Driver
def test_three_motors():
# 初始化三个电机假设串口号为COM3-COM5
motors = [
Emm42Driver(port="/dev/ttyAMA5", device_id=1),
Emm42Driver(port="/dev/ttyAMA5", device_id=2),
Emm42Driver(port="/dev/ttyAMA5", device_id=3)
]
try:
# 能所有电机
for motor in motors:
motor.enable_motor(enable=False)
# 初始化位置极值记录
position_stats = [
{"min": float('inf'), "max": float('-inf')}, # 电机1
{"min": float('inf'), "max": float('-inf')}, # 电机2
{"min": float('inf'), "max": float('-inf')} # 电机3
]
# 无限循环读取位置按q退出
print("Reading positions (press 'q' to quit)...")
try:
while True:
for i, motor in enumerate(motors):
pos = motor.read_position()
if pos is not None:
position_stats[i]["min"] = min(position_stats[i]["min"], pos)
position_stats[i]["max"] = max(position_stats[i]["max"], pos)
else:
print("erro")
# 非阻塞检测键盘输入
try:
import sys
import select
if sys.stdin in select.select([sys.stdin], [], [], 0)[0]:
key = sys.stdin.read(1)
if key.lower() == 'q':
break
except:
pass
time.sleep(0.1)
except KeyboardInterrupt:
pass
# 打印结果
print("\nPosition statistics:")
for i, stats in enumerate(position_stats, 1):
print(f"Motor {i}: min={stats['min']}, max={stats['max']}")
finally:
# 失能所有电机
print("\nDisabling motors...")
for motor in motors:
motor.enable_motor(enable=False)
time.sleep(0.1)
if __name__ == "__main__":
test_three_motors()

213
src/web_preview.py Normal file
View File

@ -0,0 +1,213 @@
#!/usr/bin/env python3
"""
网页预览版手部检测系统
通过Flask提供网页界面解决树莓派图形界面问题
"""
import cv2
import threading
import time
from flask import Flask, render_template_string, Response
from picamera2 import Picamera2
import sys
import os
# 导入手部检测模块
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from src.hand_detection_3d import load_mediapipe_model, process_frame_3d
app = Flask(__name__)
class WebPreviewCamera:
def __init__(self):
self.picam2 = None
self.hands_model = load_mediapipe_model(max_num_hands=1)
self.prev_hand_data = None
self.latest_control_signal = None
def start_camera(self):
"""启动摄像头"""
self.picam2 = Picamera2()
video_config = self.picam2.create_video_configuration(main={"size": (640, 480)})
self.picam2.configure(video_config)
self.picam2.start()
def get_frame(self):
"""获取处理后的帧"""
if not self.picam2:
return None
# 获取原始帧
frame = self.picam2.capture_array()
frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
# 进行手部检测
control_signal, self.prev_hand_data = process_frame_3d(
frame, self.hands_model, self.prev_hand_data
)
self.latest_control_signal = control_signal
# 在帧上添加控制信号信息
if control_signal:
y_offset = 30
cv2.putText(frame, f"X: {control_signal['x_angle']:.1f}°",
(10, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
cv2.putText(frame, f"Y: {control_signal['y_angle']:.1f}°",
(10, y_offset + 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
cv2.putText(frame, f"Z: {control_signal['z_angle']:.1f}°",
(10, y_offset + 50), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)
cv2.putText(frame, f"Grip: {'ON' if control_signal['grip'] else 'OFF'}",
(10, y_offset + 75), cv2.FONT_HERSHEY_SIMPLEX, 0.6,
(0, 0, 255) if control_signal['grip'] else (0, 255, 0), 2)
return frame
def generate_frames(self):
"""生成视频流"""
while True:
frame = self.get_frame()
if frame is None:
continue
# 编码为JPEG
ret, buffer = cv2.imencode('.jpg', frame)
frame_bytes = buffer.tobytes()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n')
time.sleep(0.033) # ~30 FPS
# 全局摄像头实例
camera = WebPreviewCamera()
@app.route('/')
def index():
"""主页面"""
return render_template_string('''
<!DOCTYPE html>
<html>
<head>
<title>🍓 树莓派手部检测预览</title>
<style>
body {
font-family: Arial, sans-serif;
text-align: center;
background-color: #f0f0f0;
margin: 0;
padding: 20px;
}
.container {
max-width: 800px;
margin: 0 auto;
background: white;
padding: 20px;
border-radius: 10px;
box-shadow: 0 4px 6px rgba(0,0,0,0.1);
}
h1 { color: #333; }
.video-container {
margin: 20px 0;
border: 2px solid #ddd;
border-radius: 10px;
overflow: hidden;
}
img {
width: 100%;
height: auto;
display: block;
}
.info {
background: #e7f3ff;
padding: 15px;
border-radius: 5px;
margin: 20px 0;
}
.controls {
margin: 20px 0;
}
button {
background: #007bff;
color: white;
border: none;
padding: 10px 20px;
border-radius: 5px;
cursor: pointer;
margin: 0 10px;
}
button:hover {
background: #0056b3;
}
</style>
</head>
<body>
<div class="container">
<h1>🍓 树莓派3D手部检测系统</h1>
<div class="info">
<p><strong>操作说明:</strong></p>
<p> 将手部放在摄像头前</p>
<p> 移动手部查看三轴角度变化</p>
<p> 拇指和食指捏合触发抓取检测</p>
<p> 实时显示控制信号</p>
</div>
<div class="video-container">
<img src="{{ url_for('video_feed') }}" alt="手部检测预览">
</div>
<div class="controls">
<button onclick="location.reload()">🔄 刷新页面</button>
<button onclick="window.close()"> 关闭窗口</button>
</div>
<div class="info">
<p><strong>访问地址:</strong> http://树莓派IP:5000</p>
<p><strong>提示:</strong> 可以在手机电脑等任意设备上打开此页面查看预览</p>
</div>
</div>
<script>
// 自动刷新控制信号信息
setInterval(() => {
// 这里可以添加AJAX获取最新控制信号的代码
}, 1000);
</script>
</body>
</html>
''')
@app.route('/video_feed')
def video_feed():
"""视频流路由"""
return Response(camera.generate_frames(),
mimetype='multipart/x-mixed-replace; boundary=frame')
def start_web_preview():
"""启动网页预览服务"""
print("🍓 启动树莓派网页预览系统...")
print("📷 初始化摄像头...")
try:
camera.start_camera()
print("✅ 摄像头启动成功")
print("🌐 启动网页服务器...")
print("📱 请在浏览器中访问: http://localhost:5000")
print("📱 或在其他设备访问: http://树莓派IP:5000")
print("🛑 按 Ctrl+C 停止服务")
# 启动Flask应用
app.run(host='0.0.0.0', port=5000, debug=False, threaded=True)
except KeyboardInterrupt:
print("\n🛑 用户停止服务")
except Exception as e:
print(f"❌ 错误: {e}")
finally:
if camera.picam2:
camera.picam2.stop()
camera.picam2.close()
print("✅ 网页预览系统已停止")
if __name__ == "__main__":
start_web_preview()

401
src/web_server.py Normal file
View File

@ -0,0 +1,401 @@
#!/usr/bin/env python3
"""
实时手部检测Web服务器
支持WebSocket通信实时视频流处理和机械臂控制
"""
import asyncio
import base64
import json
import logging
import os
import time
from threading import Thread
from typing import Dict, Optional, Any
import cv2
import numpy as np
from flask import Flask, render_template, request
from flask_socketio import SocketIO, emit
from PIL import Image
import io
from hand_detection_3d import load_mediapipe_model, process_frame_3d
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class HandDetectionWebServer:
"""实时手部检测Web服务器"""
def __init__(self, host='0.0.0.0', port=5000):
self.host = host
self.port = port
# Flask应用和SocketIO
# 设置模板和静态文件路径
import os
template_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'templates')
self.app = Flask(__name__, template_folder=template_dir)
self.app.config['SECRET_KEY'] = 'hand_detection_secret_key'
self.socketio = SocketIO(self.app, cors_allowed_origins="*", async_mode='threading')
# MediaPipe模型
self.hands_model = load_mediapipe_model()
# 状态管理
self.clients = {} # 连接的客户端
self.current_frame = None
self.previous_hand_data = None
self.detection_results = {
'x_angle': 90,
'y_angle': 90,
'z_angle': 90,
'grip': 0,
'action': 'none',
'speed': 5,
'timestamp': time.time()
}
# 性能监控
self.fps_counter = 0
self.last_fps_time = time.time()
self.current_fps = 0
# 配置路由和事件处理
self._setup_routes()
self._setup_socket_events()
def _setup_routes(self):
"""设置HTTP路由"""
@self.app.route('/')
def index():
"""主页面"""
return render_template('index.html')
@self.app.route('/api/status')
def api_status():
"""获取系统状态"""
return {
'status': 'running',
'fps': self.current_fps,
'clients': len(self.clients),
'detection_results': self.detection_results
}
def _setup_socket_events(self):
"""设置WebSocket事件处理"""
@self.socketio.on('connect')
def handle_connect():
"""客户端连接"""
client_id = request.sid
self.clients[client_id] = {
'connected_at': time.time(),
'type': 'unknown',
'last_ping': time.time()
}
logger.info(f"客户端 {client_id} 已连接")
# 发送欢迎消息和当前状态
emit('status', {
'message': '连接成功',
'client_id': client_id,
'current_results': self.detection_results
})
@self.socketio.on('disconnect')
def handle_disconnect():
"""客户端断开连接"""
client_id = request.sid
if client_id in self.clients:
del self.clients[client_id]
logger.info(f"客户端 {client_id} 已断开连接")
@self.socketio.on('register_client')
def handle_register_client(data):
"""注册客户端类型"""
client_id = request.sid
client_type = data.get('type', 'unknown')
if client_id in self.clients:
self.clients[client_id]['type'] = client_type
logger.info(f"客户端 {client_id} 注册为: {client_type}")
emit('registration_success', {
'client_id': client_id,
'type': client_type
})
@self.socketio.on('video_frame')
def handle_video_frame(data):
"""处理视频帧"""
try:
# 解码base64图像
frame_data = base64.b64decode(data['frame'])
frame = self._decode_frame(frame_data)
if frame is not None:
# 处理帧并检测手部
control_signal, hand_data = process_frame_3d(
frame, self.hands_model, self.previous_hand_data
)
# 更新检测结果
self.detection_results = control_signal
self.detection_results['timestamp'] = time.time()
self.previous_hand_data = hand_data
# 编码处理后的帧
processed_frame_data = self._encode_frame(frame)
# 发送结果给web预览客户端
self.socketio.emit('detection_results', {
'control_signal': control_signal,
'processed_frame': processed_frame_data,
'fps': self.current_fps
}, room=None)
# 发送控制信号给机械臂客户端
self._send_to_robot_clients(control_signal)
# 更新FPS
self._update_fps()
except Exception as e:
logger.error(f"处理视频帧时出错: {e}")
emit('error', {'message': str(e)})
@self.socketio.on('ping')
def handle_ping():
"""处理ping请求"""
client_id = request.sid
if client_id in self.clients:
self.clients[client_id]['last_ping'] = time.time()
emit('pong', {'timestamp': time.time()})
@self.socketio.on('get_detection_results')
def handle_get_detection_results():
"""获取最新的检测结果"""
emit('detection_results', {
'control_signal': self.detection_results,
'fps': self.current_fps
})
@self.socketio.on('start_local_test')
def handle_start_local_test(data=None):
"""处理开始本地测试请求"""
try:
# 如果提供了视频路径,使用指定的视频
if data and 'video_path' in data:
test_video = data['video_path']
if not os.path.exists(test_video):
emit('test_error', {
'message': f'视频文件不存在: {test_video}'
})
return
else:
# 检查是否有默认测试视频
test_videos = [
'data/videos/test_basic.mp4',
'data/videos/test_gesture.mp4'
]
# 找到第一个存在的测试视频
test_video = None
for video_path in test_videos:
if os.path.exists(video_path):
test_video = video_path
break
if not test_video:
# 没有找到测试视频,提供帮助信息
emit('test_error', {
'message': '未找到测试视频文件',
'help': '请先生成测试视频python create_test_video.py'
})
return
logger.info(f"开始本地测试,使用视频: {test_video}")
self.start_local_video_test(test_video)
emit('test_started', {
'message': f'本地测试已开始,使用视频: {os.path.basename(test_video)}',
'video_path': test_video
})
except Exception as e:
logger.error(f"启动本地测试时出错: {e}")
emit('test_error', {
'message': f'启动本地测试失败: {str(e)}'
})
@self.socketio.on('get_video_list')
def handle_get_video_list():
"""获取可用的视频文件列表"""
try:
video_dirs = ['data/videos', 'videos', '.']
video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.wmv', '.flv']
videos = []
for video_dir in video_dirs:
if os.path.exists(video_dir):
for file in os.listdir(video_dir):
if any(file.lower().endswith(ext) for ext in video_extensions):
file_path = os.path.join(video_dir, file)
try:
file_size = os.path.getsize(file_path)
size_mb = file_size / (1024 * 1024)
videos.append({
'path': file_path,
'name': file,
'size': f'{size_mb:.1f}MB'
})
except OSError:
continue
# 按文件名排序
videos.sort(key=lambda x: x['name'])
emit('video_list', {
'videos': videos
})
except Exception as e:
logger.error(f"获取视频列表时出错: {e}")
emit('video_list', {
'videos': []
})
def _decode_frame(self, frame_data: bytes) -> Optional[np.ndarray]:
"""解码图像帧"""
try:
# 使用PIL解码
image = Image.open(io.BytesIO(frame_data))
frame = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
return frame
except Exception as e:
logger.error(f"解码帧时出错: {e}")
return None
def _encode_frame(self, frame: np.ndarray) -> str:
"""编码图像帧为base64"""
try:
# 转换为RGB格式
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
image = Image.fromarray(frame_rgb)
# 编码为JPEG
buffer = io.BytesIO()
image.save(buffer, format='JPEG', quality=80)
frame_data = base64.b64encode(buffer.getvalue()).decode('utf-8')
return f"data:image/jpeg;base64,{frame_data}"
except Exception as e:
logger.error(f"编码帧时出错: {e}")
return ""
def _send_to_robot_clients(self, control_signal: Dict[str, Any]):
"""发送控制信号给机械臂客户端"""
robot_clients = [
client_id for client_id, info in self.clients.items()
if info.get('type') == 'robot'
]
if robot_clients:
for client_id in robot_clients:
self.socketio.emit('robot_control', control_signal, room=client_id)
def _update_fps(self):
"""更新FPS计数"""
self.fps_counter += 1
current_time = time.time()
if current_time - self.last_fps_time >= 1.0: # 每秒更新一次
self.current_fps = self.fps_counter
self.fps_counter = 0
self.last_fps_time = current_time
def start_local_video_test(self, video_path: str):
"""启动本地视频测试"""
def video_test_thread():
cap = cv2.VideoCapture(video_path)
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# 处理帧
control_signal, hand_data = process_frame_3d(
frame, self.hands_model, self.previous_hand_data
)
# 更新状态
self.detection_results = control_signal
self.detection_results['timestamp'] = time.time()
self.previous_hand_data = hand_data
# 编码帧
processed_frame_data = self._encode_frame(frame)
# 广播结果
self.socketio.emit('detection_results', {
'control_signal': control_signal,
'processed_frame': processed_frame_data,
'fps': self.current_fps
})
# 发送给机械臂
self._send_to_robot_clients(control_signal)
# 更新FPS
self._update_fps()
# 控制帧率
time.sleep(1/30) # 30 FPS
cap.release()
thread = Thread(target=video_test_thread)
thread.daemon = True
thread.start()
logger.info(f"本地视频测试已启动: {video_path}")
def run(self, debug=False):
"""启动Web服务器"""
logger.info(f"启动手部检测Web服务器 http://{self.host}:{self.port}")
self.socketio.run(
self.app,
host=self.host,
port=self.port,
debug=debug,
allow_unsafe_werkzeug=True
)
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description='实时手部检测Web服务器')
parser.add_argument('--host', default='0.0.0.0', help='服务器地址')
parser.add_argument('--port', type=int, default=5000, help='端口号')
parser.add_argument('--debug', action='store_true', help='调试模式')
parser.add_argument('--test-video', help='本地测试视频路径')
args = parser.parse_args()
# 创建服务器实例
server = HandDetectionWebServer(host=args.host, port=args.port)
# 如果指定了测试视频,启动本地视频测试
if args.test_video:
server.start_local_video_test(args.test_video)
# 启动服务器
server.run(debug=args.debug)
if __name__ == '__main__':
main()

116
start_robot_client.bat Normal file
View File

@ -0,0 +1,116 @@
@echo off
chcp 65001 > nul
setlocal enabledelayedexpansion
:: 机械臂客户端启动脚本 (Windows版本)
:: 包含虚拟环境激活和客户端启动
:: 设置脚本目录
cd /d "%~dp0"
:: 颜色定义 (Windows CMD不支持颜色使用echo代替)
echo ================================================
echo 🦾 机械臂客户端启动器 (Windows)
echo ================================================
echo.
:: 检查虚拟环境是否存在
if not exist "venv" (
echo [ERROR] 虚拟环境不存在,请先运行 start_service.bat 创建虚拟环境
pause
exit /b 1
)
:: 激活虚拟环境
echo [INFO] 正在激活虚拟环境...
call venv\Scripts\activate.bat
if %errorlevel% neq 0 (
echo [ERROR] 虚拟环境激活失败
pause
exit /b 1
)
echo [SUCCESS] 虚拟环境已激活
:: 检查必要文件是否存在
echo [INFO] 检查必要文件...
if not exist "src\robot_client.py" (
echo [ERROR] 缺少必要文件: src\robot_client.py
pause
exit /b 1
)
echo [SUCCESS] 所有必要文件存在
:: 解析命令行参数
set SERVER=http://localhost:5000
set MOCK=--mock
:parse_args
if "%~1"=="" goto :end_parse
if "%~1"=="--server" (
set SERVER=%~2
shift
shift
goto :parse_args
)
if "%~1"=="--real" (
set MOCK=
shift
goto :parse_args
)
if "%~1"=="--mock" (
set MOCK=--mock
shift
goto :parse_args
)
if "%~1"=="-h" goto :show_help
if "%~1"=="--help" goto :show_help
if "%~1"=="/?" goto :show_help
echo [ERROR] 未知参数: %~1
echo 使用 %~nx0 --help 查看帮助
pause
exit /b 1
:show_help
echo 用法: %~nx0 [选项]
echo.
echo 选项:
echo --server SERVER 服务器地址 (默认: http://localhost:5000)
echo --mock 使用模拟控制器 (默认)
echo --real 使用真实机械臂控制器
echo -h, --help, /? 显示帮助信息
echo.
echo 示例:
echo %~nx0 # 基本启动(模拟模式)
echo %~nx0 --server http://192.168.1.100:5000 # 连接远程服务器
echo %~nx0 --real # 真实机械臂模式
pause
exit /b 0
:end_parse
:: 显示启动信息
echo.
echo [INFO] 启动配置:
echo - 服务器地址: %SERVER%
if not "%MOCK%"=="" (
echo - 控制模式: 模拟模式
echo [WARNING] 当前使用模拟控制器,不会控制真实机械臂
) else (
echo - 控制模式: 真实机械臂
echo [WARNING] 当前使用真实机械臂控制器
)
echo.
echo [INFO] 按 Ctrl+C 停止客户端
echo ================================================
echo.
:: 切换到src目录
cd src
:: 启动客户端
echo [INFO] 正在启动机械臂客户端...
python robot_client.py --server %SERVER% %MOCK%
pause

128
start_robot_client.sh Executable file
View File

@ -0,0 +1,128 @@
#!/bin/bash
# 机械臂客户端启动脚本
# 包含虚拟环境激活和客户端启动
# 设置脚本目录
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$SCRIPT_DIR"
# 颜色定义
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# 打印彩色消息
print_info() {
echo -e "${BLUE}[INFO]${NC} $1"
}
print_success() {
echo -e "${GREEN}[SUCCESS]${NC} $1"
}
print_warning() {
echo -e "${YELLOW}[WARNING]${NC} $1"
}
print_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
# 显示标题
echo "================================================"
echo " 🦾 机械臂客户端启动器"
echo "================================================"
echo ""
# 检查虚拟环境是否存在
if [ ! -d "venv" ]; then
print_error "虚拟环境不存在,请先运行 ./start_service.sh 创建虚拟环境"
exit 1
fi
# 激活虚拟环境
print_info "正在激活虚拟环境..."
source venv/bin/activate
if [ $? -eq 0 ]; then
print_success "虚拟环境已激活"
else
print_error "虚拟环境激活失败"
exit 1
fi
# 检查必要文件是否存在
print_info "检查必要文件..."
if [ ! -f "src/robot_client.py" ]; then
print_error "缺少必要文件: src/robot_client.py"
exit 1
fi
print_success "所有必要文件存在"
# 解析命令行参数
SERVER="http://192.168.114.26:5000"
MOCK=""
while [[ $# -gt 0 ]]; do
case $1 in
--server)
SERVER="$2"
shift 2
;;
--real)
MOCK=""
shift
;;
--mock)
MOCK="--mock"
shift
;;
-h|--help)
echo "用法: $0 [选项]"
echo ""
echo "选项:"
echo " --server SERVER 服务器地址 (默认: http://localhost:5000)"
echo " --mock 使用模拟控制器 (默认)"
echo " --real 使用真实机械臂控制器"
echo " -h, --help 显示帮助信息"
echo ""
echo "示例:"
echo " $0 # 基本启动(模拟模式)"
echo " $0 --server http://192.168.1.100:5000 # 连接远程服务器"
echo " $0 --real # 真实机械臂模式"
exit 0
;;
*)
print_error "未知参数: $1"
echo "使用 $0 --help 查看帮助"
exit 1
;;
esac
done
# 显示启动信息
echo ""
print_info "启动配置:"
echo " - 服务器地址: $SERVER"
if [ ! -z "$MOCK" ]; then
echo " - 控制模式: 模拟模式"
print_warning "当前使用模拟控制器,不会控制真实机械臂"
else
echo " - 控制模式: 真实机械臂"
print_warning "当前使用真实机械臂控制器"
fi
echo ""
print_info "按 Ctrl+C 停止客户端"
echo "================================================"
echo ""
# 切换到src目录
cd src
# 启动客户端
print_info "正在启动机械臂客户端..."
python3 robot_client.py --server "$SERVER" $MOCK

181
start_service.bat Normal file
View File

@ -0,0 +1,181 @@
@echo off
chcp 65001 > nul
setlocal enabledelayedexpansion
:: 手部检测Web服务启动脚本 (Windows版本)
:: 包含虚拟环境激活和服务启动
:: 设置脚本目录
cd /d "%~dp0"
:: 颜色定义 (Windows CMD不支持颜色使用echo代替)
echo ================================================
echo 🤖 手部检测Web服务启动器 (Windows)
echo ================================================
echo.
:: 检查Python是否存在
python --version >nul 2>&1
if %errorlevel% neq 0 (
echo [ERROR] Python 未找到请先安装Python
pause
exit /b 1
)
for /f "tokens=*" %%i in ('python --version 2^>^&1') do set PYTHON_VERSION=%%i
echo [INFO] Python版本: %PYTHON_VERSION%
:: 检查虚拟环境是否存在
if not exist "venv" (
echo [WARNING] 虚拟环境不存在,正在创建...
python -m venv venv
if %errorlevel% neq 0 (
echo [ERROR] 虚拟环境创建失败
pause
exit /b 1
)
echo [SUCCESS] 虚拟环境创建成功
)
:: 激活虚拟环境
echo [INFO] 正在激活虚拟环境...
call venv\Scripts\activate.bat
if %errorlevel% neq 0 (
echo [ERROR] 虚拟环境激活失败
pause
exit /b 1
)
echo [SUCCESS] 虚拟环境已激活
:: 检查依赖是否已安装
echo [INFO] 检查Python依赖...
if exist "requirements.txt" (
python -c "import flask, flask_socketio, cv2, mediapipe" >nul 2>&1
if %errorlevel% neq 0 (
echo [WARNING] 依赖未完全安装,正在安装...
pip install -r requirements.txt
if %errorlevel% neq 0 (
echo [ERROR] 依赖安装失败
pause
exit /b 1
)
echo [SUCCESS] 依赖安装成功
) else (
echo [SUCCESS] 依赖已安装
)
) else (
echo [ERROR] requirements.txt 文件不存在
pause
exit /b 1
)
:: 检查必要文件是否存在
echo [INFO] 检查必要文件...
set "files=run_web_service.py src\web_server.py src\hand_detection_3d.py templates\index.html"
for %%f in (%files%) do (
if not exist "%%f" (
echo [ERROR] 缺少必要文件: %%f
pause
exit /b 1
)
)
echo [SUCCESS] 所有必要文件存在
:: 解析命令行参数
set HOST=0.0.0.0
set PORT=5000
set DEBUG=
set TEST_VIDEO=
:parse_args
if "%~1"=="" goto :end_parse
if "%~1"=="--host" (
set HOST=%~2
shift
shift
goto :parse_args
)
if "%~1"=="--port" (
set PORT=%~2
shift
shift
goto :parse_args
)
if "%~1"=="--debug" (
set DEBUG=--debug
shift
goto :parse_args
)
if "%~1"=="--test-video" (
set TEST_VIDEO=--test-video %~2
shift
shift
goto :parse_args
)
if "%~1"=="-h" goto :show_help
if "%~1"=="--help" goto :show_help
if "%~1"=="/?" goto :show_help
echo [ERROR] 未知参数: %~1
echo 使用 %~nx0 --help 查看帮助
pause
exit /b 1
:show_help
echo 用法: %~nx0 [选项]
echo.
echo 选项:
echo --host HOST 服务器地址 (默认: 0.0.0.0)
echo --port PORT 端口号 (默认: 5000)
echo --debug 启用调试模式
echo --test-video VIDEO 使用本地测试视频
echo -h, --help, /? 显示帮助信息
echo.
echo 示例:
echo %~nx0 # 基本启动
echo %~nx0 --host localhost --port 8080 # 自定义地址和端口
echo %~nx0 --debug # 调试模式
echo %~nx0 --test-video data\videos\test.mp4 # 本地视频测试
pause
exit /b 0
:end_parse
:: 显示启动信息
echo.
echo [INFO] 启动配置:
echo - 服务器地址: %HOST%
echo - 端口号: %PORT%
if not "%DEBUG%"=="" (
echo - 调试模式: 启用
)
if not "%TEST_VIDEO%"=="" (
echo - 测试视频: %TEST_VIDEO:~13%
)
echo.
:: 显示访问地址
if "%HOST%"=="0.0.0.0" (
echo [INFO] 服务启动后可通过以下地址访问:
echo - 本地访问: http://localhost:%PORT%
for /f "tokens=2 delims=:" %%i in ('ipconfig ^| findstr "IPv4"') do (
set IP=%%i
set IP=!IP: =!
echo - 局域网访问: http://!IP!:%PORT%
goto :continue
)
:continue
) else (
echo [INFO] 服务启动后访问地址: http://%HOST%:%PORT%
)
echo.
echo [INFO] 按 Ctrl+C 停止服务
echo ================================================
echo.
:: 启动服务
echo [INFO] 正在启动手部检测Web服务...
python run_web_service.py --host %HOST% --port %PORT% %DEBUG% %TEST_VIDEO%
pause

183
start_service.sh Executable file
View File

@ -0,0 +1,183 @@
#!/bin/bash
# 手部检测Web服务启动脚本
# 包含虚拟环境激活和服务启动
# 设置脚本目录
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$SCRIPT_DIR"
# 颜色定义
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# 打印彩色消息
print_info() {
echo -e "${BLUE}[INFO]${NC} $1"
}
print_success() {
echo -e "${GREEN}[SUCCESS]${NC} $1"
}
print_warning() {
echo -e "${YELLOW}[WARNING]${NC} $1"
}
print_error() {
echo -e "${RED}[ERROR]${NC} $1"
}
# 显示标题
echo "================================================"
echo " 🤖 手部检测Web服务启动器"
echo "================================================"
echo ""
# 检查Python3是否存在
if ! command -v python3 &> /dev/null; then
print_error "Python3 未找到请先安装Python3"
exit 1
fi
print_info "Python3 版本: $(python3 --version)"
# 检查虚拟环境是否存在
if [ ! -d "venv" ]; then
print_warning "虚拟环境不存在,正在创建..."
python3 -m venv venv
if [ $? -eq 0 ]; then
print_success "虚拟环境创建成功"
else
print_error "虚拟环境创建失败"
exit 1
fi
fi
# 激活虚拟环境
print_info "正在激活虚拟环境..."
source venv/bin/activate
if [ $? -eq 0 ]; then
print_success "虚拟环境已激活"
else
print_error "虚拟环境激活失败"
exit 1
fi
# 检查依赖是否已安装
print_info "检查Python依赖..."
if [ -f "requirements.txt" ]; then
# 检查是否需要安装依赖
if ! python -c "import flask, flask_socketio, cv2, mediapipe" &> /dev/null; then
print_warning "依赖未完全安装,正在安装..."
# 先强制降级 numpy避免 numpy 2.x 兼容性问题
pip install --break-system-packages --force-reinstall 'numpy<2'
pip install --break-system-packages -r requirements.txt
if [ $? -eq 0 ]; then
print_success "依赖安装成功"
else
print_error "依赖安装失败"
exit 1
fi
else
print_success "依赖已安装"
fi
else
print_error "requirements.txt 文件不存在"
exit 1
fi
# 检查必要文件是否存在
print_info "检查必要文件..."
required_files=("run_web_service.py" "src/web_server.py" "src/hand_detection_3d.py" "templates/index.html")
for file in "${required_files[@]}"; do
if [ ! -f "$file" ]; then
print_error "缺少必要文件: $file"
exit 1
fi
done
print_success "所有必要文件存在"
# 解析命令行参数
HOST="0.0.0.0"
PORT="5000"
DEBUG=""
TEST_VIDEO=""
while [[ $# -gt 0 ]]; do
case $1 in
--host)
HOST="$2"
shift 2
;;
--port)
PORT="$2"
shift 2
;;
--debug)
DEBUG="--debug"
shift
;;
--test-video)
TEST_VIDEO="--test-video $2"
shift 2
;;
-h|--help)
echo "用法: $0 [选项]"
echo ""
echo "选项:"
echo " --host HOST 服务器地址 (默认: 0.0.0.0)"
echo " --port PORT 端口号 (默认: 5000)"
echo " --debug 启用调试模式"
echo " --test-video VIDEO 使用本地测试视频"
echo " -h, --help 显示帮助信息"
echo ""
echo "示例:"
echo " $0 # 基本启动"
echo " $0 --host localhost --port 8080 # 自定义地址和端口"
echo " $0 --debug # 调试模式"
echo " $0 --test-video data/videos/test.mp4 # 本地视频测试"
exit 0
;;
*)
print_error "未知参数: $1"
echo "使用 $0 --help 查看帮助"
exit 1
;;
esac
done
# 显示启动信息
echo ""
print_info "启动配置:"
echo " - 服务器地址: $HOST"
echo " - 端口号: $PORT"
if [ ! -z "$DEBUG" ]; then
echo " - 调试模式: 启用"
fi
if [ ! -z "$TEST_VIDEO" ]; then
echo " - 测试视频: ${TEST_VIDEO#--test-video }"
fi
echo ""
# 显示访问地址
if [ "$HOST" = "0.0.0.0" ]; then
print_info "服务启动后可通过以下地址访问:"
echo " - 本地访问: http://localhost:$PORT"
echo " - 局域网访问: http://$(hostname -I | awk '{print $1}'):$PORT"
else
print_info "服务启动后访问地址: http://$HOST:$PORT"
fi
echo ""
print_info "按 Ctrl+C 停止服务"
echo "================================================"
echo ""
# 启动服务
print_info "正在启动手部检测Web服务..."
python3 run_web_service.py --host "$HOST" --port "$PORT" $DEBUG $TEST_VIDEO

548
templates/index.html Normal file
View File

@ -0,0 +1,548 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>实时手部检测控制系统</title>
<script src="https://cdn.socket.io/4.0.0/socket.io.min.js"></script>
<style>
body {
font-family: Arial, sans-serif;
margin: 0;
padding: 20px;
background-color: #f0f0f0;
}
.container {
max-width: 1200px;
margin: 0 auto;
background: white;
padding: 20px;
border-radius: 10px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}
h1 {
text-align: center;
color: #333;
margin-bottom: 30px;
}
.main-content {
display: grid;
grid-template-columns: 2fr 1fr;
gap: 20px;
margin-bottom: 20px;
}
.video-section {
background: #000;
border-radius: 10px;
padding: 10px;
min-height: 480px;
display: flex;
align-items: center;
justify-content: center;
}
.video-container {
position: relative;
width: 100%;
height: 100%;
max-width: 640px;
max-height: 480px;
}
#videoPreview {
width: 100%;
height: 100%;
object-fit: contain;
border-radius: 5px;
}
.no-video {
color: #999;
font-size: 18px;
text-align: center;
}
.control-panel {
background: #f8f9fa;
padding: 20px;
border-radius: 10px;
border: 1px solid #e0e0e0;
}
.status-section {
margin-bottom: 20px;
}
.status-item {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 10px;
padding: 8px 12px;
background: white;
border-radius: 5px;
border: 1px solid #ddd;
}
.status-label {
font-weight: bold;
color: #333;
}
.status-value {
color: #007bff;
font-family: monospace;
}
.angle-display {
display: grid;
grid-template-columns: 1fr 1fr 1fr;
gap: 10px;
margin-bottom: 20px;
}
.angle-card {
background: white;
padding: 15px;
border-radius: 8px;
border: 2px solid #ddd;
text-align: center;
}
.angle-card.x-axis {
border-color: #ff4757;
}
.angle-card.y-axis {
border-color: #2ed573;
}
.angle-card.z-axis {
border-color: #3742fa;
}
.angle-label {
font-size: 14px;
font-weight: bold;
margin-bottom: 5px;
}
.angle-value {
font-size: 24px;
font-weight: bold;
font-family: monospace;
}
.action-display {
background: white;
padding: 15px;
border-radius: 8px;
border: 1px solid #ddd;
margin-bottom: 20px;
}
.action-title {
font-weight: bold;
margin-bottom: 10px;
color: #333;
}
.action-value {
font-size: 18px;
padding: 8px 12px;
background: #e3f2fd;
border-radius: 5px;
color: #1976d2;
font-family: monospace;
}
.grip-status {
display: flex;
align-items: center;
justify-content: space-between;
padding: 10px;
background: white;
border-radius: 5px;
border: 1px solid #ddd;
}
.grip-indicator {
width: 20px;
height: 20px;
border-radius: 50%;
background: #ccc;
transition: background 0.3s ease;
}
.grip-indicator.active {
background: #ff6b6b;
}
.connection-status {
display: flex;
align-items: center;
gap: 10px;
padding: 10px;
background: white;
border-radius: 5px;
border: 1px solid #ddd;
margin-bottom: 20px;
}
.connection-dot {
width: 12px;
height: 12px;
border-radius: 50%;
background: #ccc;
transition: background 0.3s ease;
}
.connection-dot.connected {
background: #2ed573;
}
.test-controls {
margin-top: 20px;
}
.video-selector {
margin-bottom: 15px;
}
.video-select {
width: 100%;
padding: 10px;
border: 1px solid #ddd;
border-radius: 5px;
font-size: 14px;
background: white;
margin-bottom: 10px;
}
.video-select:focus {
outline: none;
border-color: #007bff;
}
.test-button {
width: 100%;
padding: 12px;
background: #007bff;
color: white;
border: none;
border-radius: 5px;
font-size: 16px;
cursor: pointer;
margin-bottom: 10px;
}
.test-button:hover {
background: #0056b3;
}
.test-button:disabled {
background: #ccc;
cursor: not-allowed;
}
.log-section {
background: #f8f9fa;
padding: 20px;
border-radius: 10px;
border: 1px solid #e0e0e0;
}
.log-title {
font-weight: bold;
margin-bottom: 10px;
color: #333;
}
.log-container {
background: #000;
color: #0f0;
padding: 15px;
border-radius: 5px;
font-family: monospace;
font-size: 12px;
height: 200px;
overflow-y: auto;
}
.log-entry {
margin-bottom: 5px;
}
.log-timestamp {
color: #888;
}
.fps-display {
font-size: 14px;
color: #666;
text-align: center;
margin-top: 10px;
}
</style>
</head>
<body>
<div class="container">
<h1>🤖 实时手部检测控制系统</h1>
<div class="main-content">
<div class="video-section">
<div class="video-container">
<img id="videoPreview" style="display:none;" alt="视频预览">
<div id="noVideo" class="no-video">
等待视频流...
</div>
</div>
</div>
<div class="control-panel">
<div class="connection-status">
<div id="connectionDot" class="connection-dot"></div>
<span id="connectionStatus">连接中...</span>
</div>
<div class="status-section">
<div class="status-item">
<span class="status-label">FPS:</span>
<span id="fpsValue" class="status-value">0</span>
</div>
<div class="status-item">
<span class="status-label">速度:</span>
<span id="speedValue" class="status-value">5</span>
</div>
</div>
<div class="angle-display">
<div class="angle-card x-axis">
<div class="angle-label" style="color: #ff4757;">X轴 (左右)</div>
<div id="xAngle" class="angle-value" style="color: #ff4757;">90°</div>
</div>
<div class="angle-card y-axis">
<div class="angle-label" style="color: #2ed573;">Y轴 (上下)</div>
<div id="yAngle" class="angle-value" style="color: #2ed573;">90°</div>
</div>
<div class="angle-card z-axis">
<div class="angle-label" style="color: #3742fa;">Z轴 (前后)</div>
<div id="zAngle" class="angle-value" style="color: #3742fa;">90°</div>
</div>
</div>
<div class="action-display">
<div class="action-title">当前动作:</div>
<div id="actionValue" class="action-value">none</div>
</div>
<div class="grip-status">
<span>抓取状态:</span>
<div class="grip-indicator" id="gripIndicator"></div>
</div>
<div class="test-controls">
<div class="video-selector">
<label for="videoSelect" style="font-size: 14px; font-weight: bold; margin-bottom: 8px; display: block;">选择测试视频:</label>
<select id="videoSelect" class="video-select">
<option value="">加载中...</option>
</select>
</div>
<button id="testButton" class="test-button">开始视频测试</button>
<button id="refreshButton" class="test-button" style="background: #28a745; margin-top: 5px;">刷新视频列表</button>
<div style="font-size: 12px; color: #666; margin-top: 5px; text-align: center;">
选择本地视频文件进行检测
</div>
</div>
</div>
</div>
<div class="log-section">
<div class="log-title">系统日志</div>
<div class="log-container" id="logContainer"></div>
</div>
</div>
<script>
class HandDetectionClient {
constructor() {
this.socket = io();
this.isConnected = false;
this.setupSocketEvents();
this.setupUI();
}
setupSocketEvents() {
this.socket.on('connect', () => {
this.isConnected = true;
this.updateConnectionStatus('已连接', true);
this.log('WebSocket连接已建立');
// 注册为web预览客户端
this.socket.emit('register_client', {
type: 'web_preview'
});
});
this.socket.on('disconnect', () => {
this.isConnected = false;
this.updateConnectionStatus('连接断开', false);
this.log('WebSocket连接已断开');
});
this.socket.on('status', (data) => {
this.log(`服务器状态: ${data.message}`);
});
this.socket.on('registration_success', (data) => {
this.log(`客户端注册成功: ${data.type}`);
});
this.socket.on('detection_results', (data) => {
this.updateDetectionResults(data);
});
this.socket.on('error', (data) => {
this.log(`错误: ${data.message}`, 'error');
});
this.socket.on('pong', (data) => {
// 处理ping响应
});
this.socket.on('test_started', (data) => {
this.log(`✅ ${data.message}`, 'info');
});
this.socket.on('test_error', (data) => {
this.log(`❌ ${data.message}`, 'error');
if (data.help) {
this.log(`💡 ${data.help}`, 'info');
}
});
this.socket.on('video_list', (data) => {
this.updateVideoList(data.videos);
});
}
setupUI() {
document.getElementById('testButton').addEventListener('click', () => {
this.startLocalTest();
});
document.getElementById('refreshButton').addEventListener('click', () => {
this.refreshVideoList();
});
// 请求视频列表
this.refreshVideoList();
// 定期发送ping
setInterval(() => {
if (this.isConnected) {
this.socket.emit('ping');
}
}, 5000);
}
updateConnectionStatus(status, connected) {
const statusElement = document.getElementById('connectionStatus');
const dotElement = document.getElementById('connectionDot');
statusElement.textContent = status;
if (connected) {
dotElement.classList.add('connected');
} else {
dotElement.classList.remove('connected');
}
}
updateDetectionResults(data) {
const { control_signal, processed_frame, fps } = data;
// 更新角度显示
document.getElementById('xAngle').textContent = `${control_signal.x_angle.toFixed(1)}°`;
document.getElementById('yAngle').textContent = `${control_signal.y_angle.toFixed(1)}°`;
document.getElementById('zAngle').textContent = `${control_signal.z_angle.toFixed(1)}°`;
// 更新动作显示
document.getElementById('actionValue').textContent = control_signal.action;
// 更新速度显示
document.getElementById('speedValue').textContent = control_signal.speed;
// 更新抓取状态
const gripIndicator = document.getElementById('gripIndicator');
if (control_signal.grip === 1) {
gripIndicator.classList.add('active');
} else {
gripIndicator.classList.remove('active');
}
// 更新FPS显示
document.getElementById('fpsValue').textContent = fps || 0;
// 更新视频预览
if (processed_frame) {
const videoPreview = document.getElementById('videoPreview');
const noVideo = document.getElementById('noVideo');
videoPreview.src = processed_frame;
videoPreview.style.display = 'block';
noVideo.style.display = 'none';
}
}
startLocalTest() {
const button = document.getElementById('testButton');
const videoSelect = document.getElementById('videoSelect');
const selectedVideo = videoSelect.value;
if (!selectedVideo) {
this.log('❌ 请先选择一个视频文件', 'error');
return;
}
button.disabled = true;
button.textContent = '启动中...';
// 请求开始本地测试,带上选择的视频
this.socket.emit('start_local_test', {
video_path: selectedVideo
});
this.log(`正在启动视频测试: ${selectedVideo}`);
setTimeout(() => {
button.disabled = false;
button.textContent = '开始视频测试';
}, 5000);
}
refreshVideoList() {
this.log('正在刷新视频列表...');
this.socket.emit('get_video_list');
}
updateVideoList(videos) {
const videoSelect = document.getElementById('videoSelect');
videoSelect.innerHTML = '';
if (videos.length === 0) {
const option = document.createElement('option');
option.value = '';
option.textContent = '未找到视频文件';
videoSelect.appendChild(option);
this.log('❌ 未找到测试视频文件', 'error');
this.log('💡 运行 python create_test_video.py 生成测试视频', 'info');
} else {
// 添加默认选项
const defaultOption = document.createElement('option');
defaultOption.value = '';
defaultOption.textContent = '请选择视频文件';
videoSelect.appendChild(defaultOption);
// 添加视频文件选项
videos.forEach(video => {
const option = document.createElement('option');
option.value = video.path;
option.textContent = `${video.name} (${video.size})`;
videoSelect.appendChild(option);
});
this.log(`✅ 找到 ${videos.length} 个视频文件`);
}
}
log(message, type = 'info') {
const logContainer = document.getElementById('logContainer');
const timestamp = new Date().toLocaleTimeString();
const logEntry = document.createElement('div');
logEntry.className = 'log-entry';
const color = type === 'error' ? '#f00' : '#0f0';
logEntry.innerHTML = `<span class="log-timestamp">[${timestamp}]</span> <span style="color: ${color}">${message}</span>`;
logContainer.appendChild(logEntry);
logContainer.scrollTop = logContainer.scrollHeight;
// 限制日志数量
const entries = logContainer.querySelectorAll('.log-entry');
if (entries.length > 100) {
entries[0].remove();
}
}
}
// 初始化客户端
document.addEventListener('DOMContentLoaded', () => {
new HandDetectionClient();
});
</script>
</body>
</html>

206
test_system.py Executable file
View File

@ -0,0 +1,206 @@
#!/usr/bin/env python3
"""
系统测试脚本
测试手部检测Web服务的各个组件
"""
import sys
import os
import time
import json
# 添加src目录到Python路径
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
def test_import_modules():
"""测试模块导入"""
print("🔍 测试模块导入...")
try:
import cv2
print("✅ OpenCV 导入成功")
import mediapipe as mp
print("✅ MediaPipe 导入成功")
import flask
import flask_socketio
print("✅ Flask 和 SocketIO 导入成功")
from hand_detection_3d import load_mediapipe_model, process_frame_3d
print("✅ 手部检测模块导入成功")
from web_server import HandDetectionWebServer
print("✅ Web服务器模块导入成功")
from robot_client import RobotArmClient
print("✅ 机械臂客户端模块导入成功")
return True
except Exception as e:
print(f"❌ 模块导入失败: {e}")
return False
def test_hand_detection():
"""测试手部检测功能"""
print("\n🤖 测试手部检测功能...")
try:
from hand_detection_3d import load_mediapipe_model, process_frame_3d
import numpy as np
# 加载模型
model = load_mediapipe_model()
print("✅ MediaPipe模型加载成功")
# 创建测试图像
test_frame = np.zeros((480, 640, 3), dtype=np.uint8)
test_frame[:] = (50, 50, 50) # 灰色背景
# 处理帧
control_signal, hand_data = process_frame_3d(test_frame, model)
# 检查输出格式
required_keys = ['x_angle', 'y_angle', 'z_angle', 'grip', 'action', 'speed']
if all(key in control_signal for key in required_keys):
print("✅ 手部检测输出格式正确")
print(f" 控制信号: {control_signal}")
return True
else:
print("❌ 手部检测输出格式不正确")
return False
except Exception as e:
print(f"❌ 手部检测测试失败: {e}")
return False
def test_web_server():
"""测试Web服务器"""
print("\n🌐 测试Web服务器...")
try:
from web_server import HandDetectionWebServer
# 创建服务器实例
server = HandDetectionWebServer(port=5003)
print("✅ Web服务器创建成功")
# 测试路由配置
with server.app.test_client() as client:
# 测试主页
response = client.get('/')
if response.status_code == 200:
print("✅ 主页路由正常")
else:
print(f"❌ 主页路由失败: {response.status_code}")
return False
# 测试API状态
response = client.get('/api/status')
if response.status_code == 200:
print("✅ API状态路由正常")
status_data = json.loads(response.data)
print(f" 状态信息: {status_data}")
else:
print(f"❌ API状态路由失败: {response.status_code}")
return False
return True
except Exception as e:
print(f"❌ Web服务器测试失败: {e}")
return False
def test_robot_client():
"""测试机械臂客户端"""
print("\n🦾 测试机械臂客户端...")
try:
from robot_client import RobotArmClient, MockRobotArmController
# 创建模拟控制器
mock_controller = MockRobotArmController()
print("✅ 模拟机械臂控制器创建成功")
# 测试控制器方法
mock_controller.move_to_angles(45, 90, 135, 5)
print("✅ 机械臂移动测试成功")
mock_controller.set_gripper_state(1)
print("✅ 抓取器控制测试成功")
# 创建客户端(不连接服务器)
client = RobotArmClient('http://localhost:5003')
print("✅ 机械臂客户端创建成功")
return True
except Exception as e:
print(f"❌ 机械臂客户端测试失败: {e}")
return False
def test_file_structure():
"""测试文件结构"""
print("\n📁 测试文件结构...")
required_files = [
'README.md',
'requirements.txt',
'run_web_service.py',
'start_service.sh',
'start_service.bat',
'start_robot_client.sh',
'start_robot_client.bat',
'src/__init__.py',
'src/hand_detection_3d.py',
'src/web_server.py',
'src/robot_client.py',
'templates/index.html'
]
missing_files = []
for file_path in required_files:
if not os.path.exists(file_path):
missing_files.append(file_path)
if missing_files:
print(f"❌ 缺少文件: {missing_files}")
return False
else:
print("✅ 所有必需文件存在")
return True
def main():
"""主测试函数"""
print("=" * 60)
print("🧪 手部检测Web服务系统测试")
print("=" * 60)
tests = [
test_file_structure,
test_import_modules,
test_hand_detection,
test_web_server,
test_robot_client
]
passed = 0
total = len(tests)
for test in tests:
if test():
passed += 1
else:
print(f"\n⚠️ 测试失败,请检查上述错误")
print("\n" + "=" * 60)
print(f"📊 测试结果: {passed}/{total} 个测试通过")
if passed == total:
print("🎉 所有测试通过!系统准备就绪")
print("\n🚀 启动命令:")
print(" Linux/Mac: ./start_service.sh")
print(" Windows: start_service.bat")
print(" 访问地址: http://localhost:5000")
else:
print("❌ 部分测试失败,请修复问题后重试")
print("=" * 60)
return passed == total
if __name__ == '__main__':
success = main()
sys.exit(0 if success else 1)