当前位置: 首页 > news >正文

识别用户坐姿,当检测到不良坐姿时,通过震动提醒矫正,保护颈椎。

智能坐姿矫正系统设计与实现

一、实际应用场景与痛点分析

应用场景

随着现代人长时间使用电脑、手机,不良坐姿导致的颈椎病、腰椎病日益普遍。本系统面向办公室人员、学生、远程工作者等长期坐姿人群,通过智能识别不良坐姿并提供实时矫正提醒。

主要痛点

1. 无意识习惯 - 用户常在不自觉中形成不良坐姿

2. 缺乏即时反馈 - 传统方法无法提供实时提醒

3. 个体差异大 - 标准矫正方法不适用于所有人

4. 难以坚持 - 矫正设备复杂,用户依从性差

5. 隐私问题 - 摄像头监控令人不适

6. 干扰工作 - 矫正提醒过于频繁影响工作

二、核心逻辑与智能控制原理

系统架构

感知层 → 处理层 → 决策层 → 执行层

↓ ↓ ↓ ↓

IMU传感器 → 姿态估计 → 模糊推理 → 震动提醒

压力传感器 → 特征提取 → 专家系统 → 声音提示

摄像头 → 深度学习 → 自适应控制 → 界面显示

核心智能控制原理

1. 模糊控制 - 处理"轻微前倾"、"严重低头"等模糊概念

2. 专家系统 - 基于人体工程学和康复医学规则库

3. 自适应控制 - 根据用户习惯调整灵敏度

4. 状态机控制 - 管理坐姿状态转移

5. PID控制 - 平滑提醒强度变化

三、代码实现

主程序:smart_posture_corrector.py

#!/usr/bin/env python3

"""

智能坐姿矫正系统

基于智能控制原理的实时坐姿监测与矫正系统

"""

import json

import datetime

import time

import threading

import queue

import numpy as np

from typing import Dict, List, Tuple, Optional, Any

from dataclasses import dataclass, asdict, field

from enum import Enum

import matplotlib.pyplot as plt

from matplotlib.animation import FuncAnimation

from collections import deque

import warnings

from abc import ABC, abstractmethod

import random

import logging

from dataclasses_json import dataclass_json

import os

from scipy import signal

from scipy.spatial.transform import Rotation

import pickle

# 配置日志

logging.basicConfig(

level=logging.INFO,

format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',

handlers=[

logging.FileHandler('posture.log', encoding='utf-8'),

logging.StreamHandler()

]

)

logger = logging.getLogger(__name__)

class PostureType(Enum):

"""坐姿类型枚举"""

CORRECT = "正确坐姿"

LEAN_FORWARD = "身体前倾"

LEAN_BACKWARD = "身体后仰"

LEAN_LEFT = "身体左倾"

LEAN_RIGHT = "身体右倾"

HEAD_DOWN = "低头"

HEAD_UP = "仰头"

TORSO_TWIST = "躯干扭转"

SHOULDER_ASYMMETRY = "肩膀不对称"

CROSS_LEGS = "翘二郎腿"

class SensorType(Enum):

"""传感器类型枚举"""

IMU = "惯性测量单元" # 加速度计+陀螺仪+磁力计

PRESSURE = "压力传感器"

CAMERA = "摄像头"

ULTRASONIC = "超声波"

INFRARED = "红外"

class AlertLevel(Enum):

"""提醒级别枚举"""

NONE = 0 # 无需提醒

MILD = 1 # 轻度提醒

MODERATE = 2 # 中度提醒

SEVERE = 3 # 重度提醒

CRITICAL = 4 # 严重提醒

@dataclass_json

@dataclass

class PostureData:

"""姿态数据"""

timestamp: datetime.datetime

pitch: float # 俯仰角(前倾/后仰) 单位:度

roll: float # 横滚角(左右倾斜) 单位:度

yaw: float # 偏航角(扭转) 单位:度

acceleration: Tuple[float, float, float] # 三轴加速度

angular_velocity: Tuple[float, float, float] # 三轴角速度

pressure_distribution: Optional[List[float]] = None # 压力分布

sensor_type: SensorType = SensorType.IMU

@dataclass_json

@dataclass

class PostureState:

"""坐姿状态"""

posture_type: PostureType

severity: float # 严重程度 0-1

confidence: float # 置信度 0-1

duration: float # 持续时间(秒)

start_time: datetime.datetime

alert_level: AlertLevel = AlertLevel.NONE

@dataclass_json

@dataclass

class UserProfile:

"""用户基本信息"""

user_id: str

name: str

age: int

height: float # 厘米

weight: float # 公斤

occupation: str

health_conditions: List[str] # 健康状况

posture_habits: List[str] # 坐姿习惯

work_hours: Tuple[float, float] # 工作时间段

sensitivity: float = 0.5 # 灵敏度 0-1

training_data: List[PostureData] = field(default_factory=list)

class PostureFeatureExtractor:

"""

姿态特征提取器

从原始传感器数据提取特征

"""

def __init__(self, window_size: int = 10):

self.window_size = window_size

self.data_buffer = deque(maxlen=window_size)

def add_data(self, data: PostureData):

"""添加数据到缓冲区"""

self.data_buffer.append(data)

def extract_features(self) -> Dict[str, Any]:

"""从缓冲区提取特征"""

if len(self.data_buffer) < 2:

return {}

# 获取最新数据

latest_data = self.data_buffer[-1]

# 角度特征

angle_features = {

'pitch': latest_data.pitch,

'roll': latest_data.roll,

'yaw': latest_data.yaw,

'pitch_abs': abs(latest_data.pitch),

'roll_abs': abs(latest_data.roll),

'yaw_abs': abs(latest_data.yaw)

}

# 加速度特征

accel = np.array(latest_data.acceleration)

accel_features = {

'accel_magnitude': np.linalg.norm(accel),

'accel_x': accel[0],

'accel_y': accel[1],

'accel_z': accel[2],

'accel_variance': np.var([d.acceleration for d in self.data_buffer], axis=0).tolist()

}

# 角速度特征

gyro = np.array(latest_data.angular_velocity)

gyro_features = {

'gyro_magnitude': np.linalg.norm(gyro),

'gyro_x': gyro[0],

'gyro_y': gyro[1],

'gyro_z': gyro[2]

}

# 动态特征(需要历史数据)

if len(self.data_buffer) >= 5:

pitch_values = [d.pitch for d in self.data_buffer]

roll_values = [d.roll for d in self.data_buffer]

dynamic_features = {

'pitch_velocity': (pitch_values[-1] - pitch_values[-2]) * 10, # 度/秒

'roll_velocity': (roll_values[-1] - roll_values[-2]) * 10,

'pitch_std': np.std(pitch_values),

'roll_std': np.std(roll_values),

'pitch_change': max(pitch_values) - min(pitch_values),

'roll_change': max(roll_values) - min(roll_values)

}

else:

dynamic_features = {

'pitch_velocity': 0,

'roll_velocity': 0,

'pitch_std': 0,

'roll_std': 0,

'pitch_change': 0,

'roll_change': 0

}

# 综合特征

all_features = {

**angle_features,

**accel_features,

**gyro_features,

**dynamic_features,

'timestamp': latest_data.timestamp,

'data_count': len(self.data_buffer)

}

return all_features

def calculate_stability_score(self) -> float:

"""计算坐姿稳定性评分(0-100)"""

if len(self.data_buffer) < 5:

return 100.0

# 获取角度数据

pitch_values = [d.pitch for d in self.data_buffer]

roll_values = [d.roll for d in self.data_buffer]

# 计算角度变化的标准差

pitch_std = np.std(pitch_values)

roll_std = np.std(roll_values)

# 计算稳定性分数

# 标准差越小,稳定性越高

max_std = 5.0 # 最大可接受标准差

pitch_score = max(0, 100 - (pitch_std / max_std * 50))

roll_score = max(0, 100 - (roll_std / max_std * 50))

stability_score = (pitch_score + roll_score) / 2

return stability_score

class FuzzyPostureClassifier:

"""

模糊姿态分类器

处理模糊姿态边界

"""

def __init__(self):

# 模糊集合定义

self.angle_sets = {

'correct': {'center': 0, 'range': 5}, # 正确范围:±5度

'mild': {'center': 10, 'range': 10}, # 轻度:5-15度

'moderate': {'center': 20, 'range': 10}, # 中度:15-25度

'severe': {'center': 30, 'range': 10} # 严重:25-35度

}

# 持续时间模糊集合

self.duration_sets = {

'short': {'center': 5, 'range': 5}, # 短暂:0-10秒

'medium': {'center': 20, 'range': 10}, # 中等:10-30秒

'long': {'center': 60, 'range': 30} # 长期:30-90秒

}

def fuzzy_angle_classification(self, angle: float) -> Dict[str, float]:

"""模糊角度分类"""

memberships = {}

for level, params in self.angle_sets.items():

center = params['center']

width = params['range']

# 计算隶属度(三角隶属函数)

if abs(angle) <= center - width/2 or abs(angle) >= center + width/2:

membership = 0

elif abs(angle) <= center:

membership = (abs(angle) - (center - width/2)) / (width/2)

else:

membership = ((center + width/2) - abs(angle)) / (width/2)

memberships[level] = max(0, min(1, membership))

return memberships

def fuzzy_duration_classification(self, duration: float) -> Dict[str, float]:

"""模糊持续时间分类"""

memberships = {}

for level, params in self.duration_sets.items():

center = params['center']

width = params['range']

if duration <= center - width/2 or duration >= center + width/2:

membership = 0

elif duration <= center:

membership = (duration - (center - width/2)) / (width/2)

else:

membership = ((center + width/2) - duration) / (width/2)

memberships[level] = max(0, min(1, membership))

return memberships

def infer_posture_type(self, features: Dict[str, Any]) -> List[Tuple[PostureType, float]]:

"""模糊推理坐姿类型"""

pitch = features.get('pitch', 0)

roll = features.get('roll', 0)

yaw = features.get('yaw', 0)

# 计算各个角度的模糊分类

pitch_membership = self.fuzzy_angle_classification(pitch)

roll_membership = self.fuzzy_angle_classification(roll)

yaw_membership = self.fuzzy_angle_classification(yaw)

# 定义模糊规则

rules = [

# 前倾/后仰

{

'condition': lambda: pitch_membership.get('moderate', 0) > 0.5 and pitch > 0,

'type': PostureType.LEAN_FORWARD,

'confidence': pitch_membership.get('moderate', 0) * 0.8

},

{

'condition': lambda: pitch_membership.get('moderate', 0) > 0.5 and pitch < 0,

'type': PostureType.LEAN_BACKWARD,

'confidence': pitch_membership.get('moderate', 0) * 0.8

},

# 左右倾斜

{

'condition': lambda: roll_membership.get('moderate', 0) > 0.5 and roll > 0,

'type': PostureType.LEAN_LEFT,

'confidence': roll_membership.get('moderate', 0) * 0.8

},

{

'condition': lambda: roll_membership.get('moderate', 0) > 0.5 and roll < 0,

'type': PostureType.LEAN_RIGHT,

'confidence': roll_membership.get('moderate', 0) * 0.8

},

# 低头/仰头

{

'condition': lambda: pitch_membership.get('severe', 0) > 0.7 and pitch > 15,

'type': PostureType.HEAD_DOWN,

'confidence': pitch_membership.get('severe', 0) * 0.9

},

{

'condition': lambda: pitch_membership.get('severe', 0) > 0.7 and pitch < -15,

'type': PostureType.HEAD_UP,

'confidence': pitch_membership.get('severe', 0) * 0.9

},

# 躯干扭转

{

'condition': lambda: yaw_membership.get('moderate', 0) > 0.6,

'type': PostureType.TORSO_TWIST,

'confidence': yaw_membership.get('moderate', 0) * 0.7

}

]

# 应用模糊规则

results = []

for rule in rules:

if rule['condition']():

results.append((rule['type'], rule['confidence']))

# 如果没有检测到不良坐姿,则认为是正确坐姿

if not results and pitch_membership.get('correct', 0) > 0.7 and roll_membership.get('correct', 0) > 0.7:

results.append((PostureType.CORRECT, 0.9))

return results

class PostureExpertSystem:

"""

坐姿专家系统

基于人体工程学和康复医学规则库

"""

def __init__(self):

self.rules = self._initialize_rules()

self.alerts_history = deque(maxlen=100)

def _initialize_rules(self) -> List[Dict]:

"""初始化专家规则库"""

return [

{

"name": "严重前倾",

"condition": lambda data: data.get('pitch', 0) > 25,

"action": "立即挺直背部,收下巴,肩胛骨后缩",

"alert_level": AlertLevel.CRITICAL,

"message": "⚠️ 严重前倾!立即矫正,避免颈椎压力过大",

"priority": 10

},

{

"name": "长时间低头",

"condition": lambda data: data.get('pitch', 0) > 15 and data.get('duration', 0) > 30,

"action": "抬头,调整屏幕高度,使视线平视",

"alert_level": AlertLevel.SEVERE,

"message": "📱 长时间低头,调整视线高度",

"priority": 8

},

{

"name": "身体倾斜",

"condition": lambda data: abs(data.get('roll', 0)) > 15,

"action": "调整坐姿,保持身体左右对称",

"alert_level": AlertLevel.MODERATE,

"message": "⚖️ 身体倾斜,保持左右平衡",

"priority": 6

},

{

"name": "轻微前倾",

"condition": lambda data: 10 < data.get('pitch', 0) <= 20,

"action": "轻微挺直,放松肩膀",

"alert_level": AlertLevel.MILD,

"message": "💺 轻微前倾,注意坐姿",

"priority": 4

},

{

"name": "坐姿稳定",

"condition": lambda data: abs(data.get('pitch', 0)) <= 5 and abs(data.get('roll', 0)) <= 5,

"action": "保持当前良好坐姿",

"alert_level": AlertLevel.NONE,

"message": "✅ 坐姿良好,继续保持!",

"priority": 0

},

{

"name": "频繁晃动",

"condition": lambda data: data.get('stability_score', 100) < 60,

"action": "放松身体,减少不必要的晃动",

"alert_level": AlertLevel.MILD,

"message": "🔄 坐姿不稳定,放松身体",

"priority": 3

},

{

"name": "后仰过度",

"condition": lambda data: data.get('pitch', 0) < -20,

"action": "调整椅子角度,保持适当后倾",

"alert_level": AlertLevel.MODERATE,

"message": "🪑 后仰过度,调整椅子角度",

"priority": 5

},

{

"name": "肩部不对称",

"condition": lambda data: data.get('shoulder_asymmetry', 0) > 0.3,

"action": "放松肩膀,保持双肩水平",

"alert_level": AlertLevel.MILD,

"message": "🤷 肩部不对称,放松肩膀",

"priority": 4

}

]

def evaluate_posture(self, features: Dict, current_state: PostureState) -> Dict:

"""评估坐姿并返回建议"""

evaluation = {

"needs_alert": False,

"alert_level": AlertLevel.NONE,

"recommendations": [],

"messages": [],

"timestamp": datetime.datetime.now()

}

# 准备评估数据

eval_data = {

**features,

"duration": current_state.duration if current_state else 0,

"current_type": current_state.posture_type if current_state else None

}

# 应用专家规则

triggered_rules = []

for rule in self.rules:

try:

if rule["condition"](eval_data):

triggered_rules.append(rule)

except Exception as e:

logger.warning(f"规则执行失败: {rule['name']}, 错误: {e}")

# 按优先级排序

triggered_rules.sort(key=lambda x: x["priority"], reverse=True)

# 处理触发的规则

for rule in triggered_rules[:3]: # 最多返回3条建议

if rule["alert_level"] != AlertLevel.NONE:

evaluation["needs_alert"] = True

if rule["alert_level"].value > evaluation["alert_level"].value:

evaluation["alert_level"] = rule["alert_level"]

evaluation["recommendations"].append(rule["action"])

evaluation["messages"].append(rule["message"])

# 记录提醒历史

if evaluation["needs_alert"]:

self.alerts_history.append({

"time": datetime.datetime.now(),

"level": evaluation["alert_level"],

"message": evaluation["messages"][0] if evaluation["messages"] else "",

"features": features

})

return evaluation

class AdaptiveAlertController:

"""

自适应提醒控制器

基于PID控制和状态机

"""

def __init__(self, user_sensitivity: float = 0.5):

self.user_sensitivity = user_sensitivity

self.last_alert_time = None

self.alert_count = 0

self.alert_history = deque(maxlen=20)

# PID控制器参数

self.kp = 0.8 # 比例增益

self.ki = 0.1 # 积分增益

self.kd = 0.2 # 微分增益

self.error_integral = 0

self.last_error = 0

# 状态机

self.state = "NORMAL"

self.state_transitions = {

"NORMAL": ["MILD_ALERT", "IGNORE"],

"MILD_ALERT": ["NORMAL", "MODERATE_ALERT"],

"MODERATE_ALERT": ["NORMAL", "SEVERE_ALERT"],

"SEVERE_ALERT": ["NORMAL", "CRITICAL_ALERT"],

"CRITICAL_ALERT": ["NORMAL"],

"IGNORE": ["NORMAL"]

}

def calculate_alert_intensity(self, severity: float, duration: float) -> float:

"""

计算提醒强度(0-1)

基于PID控制算法

"""

# 期望的坐姿质量(理想状态为0,表示无不良坐姿)

desired_quality = 0

# 当前误差 = 严重程度

current_error = severity

# 计算PID控制输出

self.error_integral += current_error

error_derivative = current_error - self.last_error

# PID输出

pid_output = (self.kp * current_error +

self.ki * self.error_integral +

self.kd * error_derivative)

# 考虑持续时间的影响

duration_factor = min(1.0, duration / 60) # 持续时间超过60秒达到最大影响

# 综合强度计算

base_intensity = pid_output * (1.0 + duration_factor)

# 考虑用户灵敏度

intensity = base_intensity * (1.5 - self.user_sensitivity) # 灵敏度低则强度高

# 限制在0-1范围内

intensity = max(0, min(1, intensity))

self.last_error = current_error

return intensity

def update_state(self, alert_level: AlertLevel, response_received: bool = False):

"""更新状态机状态"""

current_state = self.state

# 定义状态转移规则

if alert_level == AlertLevel.NONE:

new_state = "NORMAL"

elif alert_level == AlertLevel.MILD:

if current_state == "NORMAL":

new_state = "MILD_ALERT"

elif current_state == "MILD_ALERT" and not response_received:

new_state = "MODERATE_ALERT"

else:

new_state = current_state

elif alert_level == AlertLevel.MODERATE:

if current_state in ["NORMAL", "MILD_ALERT"]:

new_state = "MODERATE_ALERT"

elif current_state == "MODERATE_ALERT" and not response_received:

new_state = "SEVERE_ALERT"

else:

new_state = current_state

elif alert_level == AlertLevel.SEVERE:

new_state = "SEVERE_ALERT"

elif alert_level == AlertLevel.CRITICAL:

new_state = "CRITICAL_ALERT"

else:

new_state = current_state

# 如果用户有响应,恢复到较低状态

if response_received and new_state != "NORMAL":

if "CRITICAL" in new_state:

new_state = "SEVERE_ALERT"

elif "SEVERE" in new_state:

new_state = "MODERATE_ALERT"

elif "MODERATE" in new_state:

new_state = "MILD_ALERT"

elif "MILD" in new_state:

new_state = "NORMAL"

# 应用状态转移

if new_state in self.state_transitions.get(current_state, []):

self.state = new_state

logger.info(f"状态转移: {current_state} -> {new_state}")

return self.state

def get_alert_pattern(self, intensity: float, alert_level: AlertLevel) -> Dict:

"""根据强度生成提醒模式"""

if alert_level == AlertLevel.NONE or intensity < 0.1:

return {"type": "none", "pattern": []}

# 基本震动模式

base_patterns

如果你觉得这个工具好用,欢迎关注我!

http://www.jsqmd.com/news/224674/

相关文章:

  • Segment Anything:AI如何革新图像分割开发流程
  • RaNER模型贡献指南:如何参与开源项目开发部署
  • 学术小白必看:5分钟上手SUPERXIE全指南
  • Qwen3-VL-WEBUI游戏开发辅助:UI自动生成部署教程
  • position: sticky vs 传统JS实现:效率对比实测
  • Qwen2.5-7B多语言支持测试:云端一键切换环境
  • 比PS快10倍!FastStone批量改图工作流全解析
  • AI实体侦测服务边缘计算:本地化部署与离线推理方案
  • RaNER模型部署优化:CPU环境下极速推理配置指南
  • Qwen2.5-7B代码生成实战:云端10分钟部署,2块钱玩整天
  • Qwen3-VL-WEBUI远程办公:会议截图内容提取实战教程
  • Kubectl CP从入门到精通:新手必学的10个文件传输技巧
  • 不用安装MAVEN?在线构建Java项目的黑科技
  • Qwen2.5-7B保姆级教程:小白5分钟上手,1小时仅需1块钱
  • LLM面试真题集锦(收藏版):从一面到三面全覆盖,助你轻松斩获大厂offer!
  • 鳄鱼线 主图 源码 原理 用法介绍
  • Qwen2.5 vs DeepSeek实测:云端GPU 2小时低成本对比
  • Qwen2.5-7B+Stable Diffusion套餐:云端AI创作全家桶
  • Qwen3-VL-WEBUI部署避坑指南:显存不足问题解决方案
  • AI智能实体侦测服务适合哪些行业?多场景落地应用分析
  • 用SpringDoc快速验证API设计:原型开发新思路
  • Qwen2.5-7B企业内训套餐:10个预装好案例的实训环境
  • AI智能实体侦测服务高级教程:RaNER模型参数调优与性能测试
  • Doxygen入门指南:5分钟学会基础文档生成
  • 金融数据分析师如何快速搭建Python开发环境
  • Vite vs Webpack:开发效率对比实测
  • 一次遍历+维护前后缀+枚举中间+位运算
  • Qwen2.5-7B远程办公:云端GPU让老家电脑变工作站
  • AI如何帮你掌握Vue2生命周期?自动生成代码示例
  • 零基础入门:5分钟用UPnP搭建家庭网络共享