用视觉检测设备疲劳,看外壳微小形变,预测故障。
工业设备疲劳视觉检测系统 —— 基于外壳微小形变的故障预测
一、实际应用场景描述
在工业生产中,许多关键设备(如液压泵、电机外壳、压力容器、齿轮箱等)在长期运行过程中,由于交变载荷、热循环、振动等因素,会产生疲劳损伤。这种损伤初期表现为外壳或结构件的微小形变,随着时间积累,最终可能导致严重的设备故障甚至安全事故。
典型应用场景:
1. 风力发电机主轴检测:叶片旋转产生的交变应力导致主轴外壳产生微米级形变,传统方法难以发现
2. 化工压力容器监控:内部压力周期性变化导致壳体产生疲劳裂纹前的微小膨胀
3. 轨道交通车轮轴承座检测:长期振动导致轴承座螺栓孔周围产生塑性变形
4. 注塑机模板监测:频繁的开合模动作导致模板产生累积形变,影响产品质量
传统检测方式的局限性:
- 人工巡检:依赖经验,主观性强,无法量化微小形变
- 接触式测量:使用千分尺、应变片等,需要停机安装,影响生产
- 定期检修:时间间隔长,可能错过疲劳发展的关键阶段
- 大型设备盲区:内部结构难以触及,只能检测外部可见部分
本系统通过工业机器视觉技术,实时监测设备外壳的亚像素级形变,结合时间序列分析,实现疲劳损伤的早期预警和剩余寿命预测。
二、引入痛点
1. 形变信号极其微弱:疲劳导致的形变通常在几十到几百微米,远低于普通视觉系统的分辨率极限
2. 环境干扰严重:工业现场的振动、温度变化、光照不均会掩盖真实的形变信号
3. 特征提取困难:外壳表面常有纹理、锈蚀、标识等干扰因素,影响形变测量的准确性
4. 实时性要求高:需要在设备运行过程中实时监测,不能影响正常生产
5. 长期稳定性:监测系统本身需要具备长期运行的可靠性,抗漂移、抗老化
三、核心逻辑讲解
本系统的核心技术路线是高精度视觉测量 + 时间序列疲劳分析:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 图像采集 │────▶│ 预处理 │────▶│ 特征提取 │
│ (工业相机) │ │ (去噪/配准) │ │ (形变场计算) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 结果输出 │◀────│ 故障预测 │◀────│ 疲劳分析 │
│ (健康报告) │ │ (寿命估计) │ │ (趋势分析) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
核心算法流程:
1. 亚像素级图像采集:
- 使用高分辨率工业相机(≥12MP)配合远心镜头,消除透视误差
- 采用频闪照明,冻结设备运行中的振动,获取稳定图像
- 建立稳定的参考坐标系,确保多时刻图像的可比性
2. 鲁棒性预处理:
- 图像配准:使用基于特征点(SIFT/SURF/ORB)的配准算法,消除设备微小位移的影响
- 光照归一化:采用同态滤波和自适应直方图均衡化,消除光照变化干扰
- 纹理抑制:使用Gabor滤波器或小波变换,抑制表面纹理,突出形变特征
3. 高精度形变测量:
- 相位相关法:在频域计算图像间的相对位移,实现亚像素级精度
- 数字图像相关法(DIC):在感兴趣区域(ROI)内建立散斑图案,跟踪局部变形
- 光流法:计算连续帧间的运动矢量场,检测异常形变模式
4. 疲劳特征提取与时序分析:
- 形变场分析:计算形变的幅值、方向、梯度等统计特征
- 频谱分析:对形变时间序列进行FFT,识别与设备运行周期相关的特征频率
- 趋势建模:使用ARIMA、LSTM等模型拟合形变增长趋势,预测达到失效阈值的剩余时间
5. 故障预测与预警:
- 结合材料的疲劳特性(S-N曲线),将视觉测量的形变转化为疲劳损伤度
- 建立多级预警机制(注意、警告、危险),支持多种通知方式
四、代码模块化实现
项目结构
fatigue_detection/
├── main.py # 主程序入口,系统协调与控制
├── config.py # 配置文件:相机参数、检测区域、阈值设置
├── camera_handler.py # 相机控制模块:高分辨率采集、频闪同步
├── image_registrar.py # 图像配准模块:消除位移干扰,确保可比性
├── preprocessor.py # 预处理模块:去噪、光照归一化、纹理抑制
├── strain_analyzer.py # 应变分析模块:DIC算法、形变场计算
├── fatigue_predictor.py # 疲劳预测模块:时序分析、寿命估计
├── alert_manager.py # 预警管理模块:多级预警、通知推送
├── utils.py # 工具函数:图像处理、数学计算、可视化
├── models/ # 存储训练好的预测模型和标定参数
├── calibration/ # 相机标定和ROI标定数据
├── logs/ # 运行日志和历史数据
├── requirements.txt # Python依赖包
└── README.md # 项目说明文档
1. config.py - 配置文件
"""
工业设备疲劳视觉检测系统 - 配置文件
包含所有系统参数,可根据实际应用场景调整
"""
import os
# --- 路径配置 ---
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
MODELS_DIR = os.path.join(BASE_DIR, 'models')
CALIBRATION_DIR = os.path.join(BASE_DIR, 'calibration')
LOGS_DIR = os.path.join(BASE_DIR, 'logs')
# 创建必要目录
for dir_path in [MODELS_DIR, CALIBRATION_DIR, LOGS_DIR]:
if not os.path.exists(dir_path):
os.makedirs(dir_path)
# --- 相机配置 (高分辨率工业相机) ---
CAMERA_CONFIG = {
'device_id': 0,
'width': 4096, # 高分辨率确保亚像素测量精度
'height': 3072,
'fps': 5, # 低频采集,配合频闪使用
'pixel_size': 3.45e-6, # 像素尺寸 (米),用于物理尺寸换算
'focal_length': 0.05, # 焦距 (米)
'stroboscope_freq': 0, # 频闪频率,0表示不使用
'trigger_mode': 'software' # 触发模式
}
# --- 图像配准配置 ---
REGISTRATION_CONFIG = {
'method': 'sift', # 特征匹配方法: 'sift', 'orb', 'surf'
'max_features': 1000, # 最大特征点数
'good_match_ratio': 0.75, # 优质匹配比例
'ransac_threshold': 5.0, # RANSAC重投影误差阈值(像素)
'use_gpu': False # 是否使用GPU加速
}
# --- 预处理配置 ---
PREPROCESSING_CONFIG = {
'denoise_method': 'bilateral', # 去噪方法: 'gaussian', 'bilateral', 'nlmeans'
'texture_suppression': True, # 是否抑制表面纹理
'gabor_params': { # Gabor滤波器参数
'kernels': 8,
'scales': 5,
'lambd': 10.0,
'sigma': 5.0,
'gamma': 0.5
}
}
# --- 应变分析配置 ---
STRAIN_CONFIG = {
'roi_definitions': { # 感兴趣区域定义
'pump_casing': { # 泵体外壳区域
'x': 500,
'y': 400,
'width': 800,
'height': 600,
'reference_points': [(550, 450), (1150, 450), (850, 900)]
},
'flange_joint': { # 法兰连接区域
'x': 1200,
'y': 300,
'width': 400,
'height': 300,
'reference_points': [(1250, 350), (1550, 350), (1400, 550)]
}
},
'dic_params': { # DIC算法参数
'subset_size': 21, # 子集大小(像素),奇数
'step_size': 5, # 步长(像素)
'interpolation': 'bicubic'
},
'deformation_threshold': 0.001, # 形变检测阈值 (相对形变)
'noise_filter_size': 3 # 形变噪声滤波窗口
}
# --- 疲劳预测配置 ---
FATIGUE_CONFIG = {
'material_properties': { # 材料疲劳特性 (示例:结构钢)
'youngs_modulus': 200e9, # 杨氏模量 (Pa)
'yield_strength': 250e6, # 屈服强度 (Pa)
'endurance_limit': 150e6, # 耐久极限 (Pa)
'fatigue_coefficient': 0.5, # Basquin定律系数
'fatigue_exponent': -0.1 # Basquin定律指数
},
'failure_threshold': 0.02, # 失效阈值 (总应变)
'measurement_interval': 3600, # 测量间隔 (秒)
'prediction_models': ['arima', 'lstm', 'polynomial'] # 使用的预测模型
}
# --- 预警配置 ---
ALERT_CONFIG = {
'levels': {
'info': {'threshold': 0.005, 'cooldown': 3600},
'warning': {'threshold': 0.010, 'cooldown': 1800},
'critical': {'threshold': 0.015, 'cooldown': 300}
},
'notification_methods': ['log', 'email', 'webhook'], # 通知方式
'email_recipients': ['admin@factory.com']
}
# --- 输出配置 ---
OUTPUT_CONFIG = {
'save_images': True,
'save_deformation_fields': True,
'visualization_enabled': True,
'log_level': 'INFO'
}
2. camera_handler.py - 相机控制模块
"""
工业设备疲劳视觉检测系统 - 相机控制模块
负责高分辨率图像的稳定采集,支持频闪同步和触发控制
"""
import cv2
import numpy as np
import logging
from datetime import datetime
from config import CAMERA_CONFIG, BASE_DIR, LOGS_DIR
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(os.path.join(LOGS_DIR, 'camera.log')),
logging.StreamHandler()
]
)
logger = logging.getLogger('CameraHandler')
class CameraHandler:
"""
工业相机处理器类
专为高精度形变测量优化,支持硬件触发和频闪同步
"""
def __init__(self, device_id=None):
"""
初始化相机处理器
Args:
device_id: 相机设备ID,默认使用配置文件中的设置
"""
self.device_id = device_id or CAMERA_CONFIG['device_id']
self.camera = None
self.is_initialized = False
self.calibration_matrix = None
self.distortion_coeffs = None
self.reference_frame = None
self.timestamp_last_capture = None
# 加载相机标定参数
self._load_calibration()
def _load_calibration(self):
"""加载相机标定参数(内参和畸变系数)"""
calib_file = os.path.join(CALIBRATION_DIR, 'camera_calibration.npz')
if os.path.exists(calib_file):
data = np.load(calib_file)
self.calibration_matrix = data['camera_matrix']
self.distortion_coeffs = data['dist_coeffs']
logger.info("相机标定参数加载成功")
else:
logger.warning(f"标定文件不存在: {calib_file},将使用默认参数")
def initialize(self):
"""
初始化相机连接和参数配置
Returns:
bool: 初始化成功返回True,失败返回False
"""
try:
# 使用DirectShow后端(Windows)或V4L2(Linux)
backend = cv2.CAP_DSHOW if os.name == 'nt' else cv2.CAP_V4L2
self.camera = cv2.VideoCapture(self.device_id, backend)
if not self.camera.isOpened():
logger.error(f"无法打开相机设备 {self.device_id}")
return False
# 设置高分辨率参数
config = CAMERA_CONFIG
self.camera.set(cv2.CAP_PROP_FRAME_WIDTH, config['width'])
self.camera.set(cv2.CAP_PROP_FRAME_HEIGHT, config['height'])
self.camera.set(cv2.CAP_PROP_FPS, config['fps'])
# 禁用自动增益和白平衡,确保图像稳定性
self.camera.set(cv2.CAP_PROP_AUTO_EXPOSURE, 0.25) # 手动模式
self.camera.set(cv2.CAP_PROP_EXPOSURE, -6) # 曝光值
self.camera.set(cv2.CAP_PROP_GAIN, 1.0) # 增益
self.camera.set(cv2.CAP_PROP_AUTO_WB, 0) # 关闭自动白平衡
# 配置频闪输出(如果相机支持)
if config['stroboscope_freq'] > 0:
self._configure_stroboscope(config['stroboscope_freq'])
self.is_initialized = True
logger.info(f"相机 {self.device_id} 初始化成功: {config['width']}x{config['height']} @ {config['fps']}fps")
return True
except Exception as e:
logger.error(f"相机初始化异常: {str(e)}")
return False
def _configure_stroboscope(self, frequency):
"""
配置频闪灯同步输出
Args:
frequency: 频闪频率 (Hz)
"""
# 注意:此功能需要相机硬件支持
# 不同厂商的SDK调用方式不同
try:
# 示例:使用GenICam标准属性
self.camera.set(cv2.CAP_PROP_STROBE, 1) # 启用频闪
# 实际实现需要根据具体相机SDK调整
logger.info(f"频闪灯配置: {frequency} Hz")
except Exception as e:
logger.warning(f"频闪灯配置失败: {e}")
def capture_frame(self, wait_for_trigger=False):
"""
捕获单帧高分辨率图像
Args:
wait_for_trigger: 是否等待硬件触发信号
Returns:
numpy.ndarray: 捕获的图像帧,失败返回None
"""
if not self.is_initialized or self.camera is None:
logger.warning("相机未初始化")
return None
try:
if wait_for_trigger:
# 等待硬件触发脉冲
self.camera.set(cv2.CAP_PROP_TRIGGER, 1)
while self.camera.get(cv2.CAP_PROP_TRIGGER) != 0:
pass
ret, frame = self.camera.read()
if ret:
self.timestamp_last_capture = datetime.now()
# 图像畸变校正
if self.calibration_matrix is not None:
frame = cv2.undistort(frame, self.calibration_matrix,
self.distortion_coeffs)
return frame
else:
logger.warning("图像捕获失败")
return None
except Exception as e:
logger.error(f"图像捕获异常: {str(e)}")
return None
def capture_reference_frame(self):
"""
捕获参考帧,用于后续形变计算的基准
Returns:
numpy.ndarray: 参考帧图像
"""
logger.info("正在捕获参考帧...")
frame = self.capture_frame(wait_for_trigger=True)
if frame is not None:
self.reference_frame = frame.copy()
logger.info("参考帧捕获成功")
return frame
def set_reference_frame(self, frame):
"""
手动设置参考帧
Args:
frame: 参考帧图像
"""
self.reference_frame = frame.copy()
logger.info("参考帧已更新")
def get_resolution(self):
"""
获取当前相机分辨率
Returns:
tuple: (宽度, 高度)
"""
if self.camera and self.is_initialized:
width = int(self.camera.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(self.camera.get(cv2.CAP_PROP_FRAME_HEIGHT))
return (width, height)
return (0, 0)
def release(self):
"""
释放相机资源
"""
if self.camera is not None:
self.camera.release()
self.is_initialized = False
logger.info("相机资源已释放")
# --- 测试代码 ---
if __name__ == "__main__":
handler = CameraHandler()
if handler.initialize():
print("按 'c' 捕获参考帧,按 'q' 退出测试")
while True:
frame = handler.capture_frame()
if frame is not None:
cv2.imshow("Camera Test", cv2.resize(frame, (640, 480)))
key = cv2.waitKey(1) & 0xFF
if key == ord('c'):
handler.capture_reference_frame()
print("参考帧已捕获")
elif key == ord('q'):
break
handler.release()
cv2.destroyAllWindows()
3. image_registrar.py - 图像配准模块
"""
工业设备疲劳视觉检测系统 - 图像配准模块
负责消除设备微小位移的影响,确保多时刻图像的可比性
使用特征点匹配和几何变换实现亚像素级配准
"""
import cv2
import numpy as np
import logging
from config import REGISTRATION_CONFIG, BASE_DIR, LOGS_DIR
# 配置日志
logger = logging.getLogger('ImageRegistrar')
class ImageRegistrar:
"""
图像配准器类
使用特征点检测和匹配算法,计算图像间的几何变换关系
"""
def __init__(self):
"""初始化图像配准器"""
self.method = REGISTRATION_CONFIG['method']
self.feature_detector = None
self.matcher = None
self._initialize_detector()
self.transform_matrix = None # 当前变换矩阵
self.registration_quality = 1.0 # 配准质量评分 (0-1)
def _initialize_detector(self):
"""初始化特征检测器和匹配器"""
method = self.method
if method == 'sift':
try:
self.feature_detector = cv2.SIFT_create(
nfeatures=REGISTRATION_CONFIG['max_features']
)
self.matcher = cv2.BFMatcher(cv2.NORM_L2, crossCheck=False)
logger.info("使用SIFT特征检测器")
except AttributeError:
logger.warning("OpenCV未编译SIFT支持,切换到ORB")
method = 'orb'
if method == 'orb':
self.feature_detector = cv2.ORB_create(
nfeatures=REGISTRATION_CONFIG['max_features']
)
self.matcher = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
logger.info("使用ORB特征检测器")
if method == 'surf':
try:
self.feature_detector = cv2.xfeatures2d.SURF_create(400)
self.matcher = cv2.BFMatcher(cv2.NORM_L2, crossCheck=False)
logger.info("使用SURF特征检测器")
except AttributeError:
logger.warning("OpenCV未编译SURF支持,切换到ORB")
self._initialize_detector()
def register(self, current_frame, reference_frame=None):
"""
将当前帧配准到参考帧
Args:
current_frame: 当前图像帧
reference_frame: 参考图像帧,默认使用内部存储的参考帧
Returns:
tuple: (配准后的图像, 变换矩阵, 配准质量)
"""
if reference_frame is None:
reference_frame = self.reference_frame
if reference_frame is None:
logger.error("没有可用的参考帧")
return current_frame, None, 0.0
# 转换为灰度图
if len(current_frame.shape) == 3:
gray_current = cv2.cvtColor(current_frame, cv2.COLOR_BGR2GRAY)
gray_ref = cv2.cvtColor(reference_frame, cv2.COLOR_BGR2GRAY)
else:
gray_current = current_frame
gray_ref = reference_frame
# 检测特征点
kp1, desc1 = self.feature_detector.detectAndCompute(gray_ref, None)
kp2, desc2 = self.feature_detector.detectAndCompute(gray_current, None)
if desc1 is None or desc2 is None or len(kp1) < 10 or len(kp2) < 10:
logger.warning("特征点数量不足,无法进行可靠配准")
return current_frame, None, 0.0
# 特征匹配
matches = self.matcher.match(desc1, desc2)
if len(matches) < 10:
logger.warning(f"匹配点数量不足: {len(matches)}")
return current_frame, None, 0.0
# 筛选优质匹配
matches = sorted(matches, key=lambda x: x.distance)
good_matches = matches[:int(len(matches) * REGISTRATION_CONFIG['good_match_ratio'])]
if len(good_matches) < 4:
logger.warning("优质匹配点不足")
return current_frame, None, 0.0
# 提取匹配点坐标
src_pts = np.float32([kp1[m.queryIdx].pt for m in good_matches]).reshape(-1, 1, 2)
dst_pts = np.float32([kp2[m.trainIdx].pt for m in good_matches]).reshape(-1, 1, 2)
# 使用RANSAC计算几何变换
transform_matrix, mask = cv2.estimateAffinePartial2D(
src_pts, dst_pts,
method=cv2.RANSAC,
ransacReprojThreshold=REGISTRATION_CONFIG['ransac_threshold']
)
if transform_matrix is None:
logger.error("无法计算有效的变换矩阵")
return current_frame, None, 0.0
# 计算配准质量
inlier_ratio = np.sum(mask) / len(mask) if mask is not None else 0
self.registration_quality = inlier_ratio
# 应用变换
h, w = reference_frame.shape[:2]
registered_frame = cv2.warpAffine(
current_frame, transform_matrix, (w, h),
flags=cv2.INTER_CUBIC + cv2.WARP_INVERSE_MAP
)
self.transform_matrix = transform_matrix
logger.debug(f"配准完成,质量: {inlier_ratio:.3f}, 内点: {np.sum(mask)}/{len(mask)}")
return registered_frame, transform_matrix, inlier_ratio
def get_translation(self):
"""
从变换矩阵中提取平移分量
Returns:
tuple: (dx, dy) 平移量(像素)
"""
if self.transform_matrix is not None:
return self.transform_matrix[0, 2], self.transform_matrix[1, 2]
return (0.0, 0.0)
def reset_reference(self, frame):
"""
重置参考帧
Args:
frame: 新的参考帧
"""
self.reference_frame = frame.copy()
self.transform_matrix = None
self.registration_quality = 1.0
logger.info("参考帧已重置")
# --- 测试代码 ---
if __name__ == "__main__":
# 创建测试图像
ref_frame = np.random.randint(0, 255, (1000, 1000, 3), dtype=np.uint8)
# 在参考帧上添加一些特征点
cv2.circle(ref_frame, (200, 200), 50, (255, 0, 0), -1)
cv2.circle(ref_frame, (800, 300), 30, (0, 255, 0), -1)
cv2.rectangle(ref_frame, (400, 600), (600, 800), (0, 0, 255), 3)
# 创建有轻微位移的当前帧
current_frame = np.roll(ref_frame, 5, axis=1) # 水平平移5像素
current_frame = np.roll(current_frame, 3, axis=0) # 垂直平移3像素
registrar = ImageRegistrar()
registered, matrix, quality = registrar.register(current_frame, ref_frame)
print(f"变换矩阵:\n{matrix}")
print(f"配准质量: {quality:.3f}")
# 显示结果
cv2.imshow("Reference", ref_frame)
cv2.imshow("Current", current_frame)
cv2.imshow("Registered", registered)
cv2.waitKey(0)
cv2.destroyAllWindows()
4. preprocessor.py - 预处理模块
"""
工业设备疲劳视觉检测系统 - 预处理模块
负责图像去噪、光照归一化、纹理抑制,为形变分析做准备
"""
import cv2
import numpy as np
import logging
from config import PREPROCESSING_CONFIG, BASE_DIR, LOGS_DIR
# 配置日志
logger = logging.getLogger('Preprocessor')
class Preprocessor:
"""
图像预处理器类
实现去噪、光照归一化、纹理抑制等预处理操作
"""
def __init__(self):
"""初始化预处理器"""
self.config = PREPROCESSING_CONFIG
self._setup_gabor_filters()
logger.info("预处理器初始化完成")
def _setup_gabor_filters(self):
"""设置Gabor滤波器组用于纹理抑制"""
params = self.config['gabor_params']
self.gabor_kernels = []
for theta in np.linspace(0, np.pi, params['kernels'], endpoint=False):
for sigma in params['scales'] * np.array([1]):
kernel = cv2.getGaborKernel(
(params['lambd'] * 4, params['lambd'] * 4),
sigma,
theta,
params['lambd'],
params['gamma'],
0,
ktype=cv2.CV_64F
)
self.gabor_kernels.append(kernel)
def denoise(self, image):
"""
图像去噪
Args:
image: 输入图像
Returns:
numpy.ndarray: 去噪后的图像
"""
method = self.config['denoise_method']
if method == 'gaussian':
return cv2.GaussianBlur(image, (5, 5), 0)
elif method == 'bilateral':
# 双边滤波:保边去噪,适合保留形变边缘
if len(image.shape) == 3:
return cv2.bilater
利用AI解决实际问题,如果你觉得这个工具好用,欢迎关注长安牧笛!
