开始
│
├─ 数据准备
│ ├─ 导入Kaggle数据集
│ ├─ 遍历数据文件
│ └─ 合并数据集
│
├─ 数据预处理
│ ├─ 转换数据格式(宽表→长表)
│ ├─ 数据质量检查(样本量/时长)
│ └─ 异常传感器处理(排除a1)
│
├─ 时域分析
│ ├─ 可视化时间序列
│ ├─ 分布分析(箱型图)
│ └─ 特征提取(统计特征)
│
├─ 特征工程
│ ├─ 分割数据片段(1000点/段)
│ ├─ 计算统计特征(均值/标准差等)
│ └─ 构建特征矩阵
│
├─ 机器学习建模
│ ├─ 划分训练/测试集
│ ├─ 随机森林训练
│ ├─ 模型评估(准确率)
│ └─ 多模型比较(lazypredict)
│
├─ 频域分析
│ ├─ FFT频谱分析
│ ├─ 峰值检测
│ ├─ 功率谱密度(PSD)
│ └─ 健康/故障PSD对比
│
├─ 时频分析
│ └─ 谱图可视化
│
└─ 结束
详细算法步骤
数据准备阶段 :
从Kaggle下载齿轮箱故障诊断数据集
遍历数据集目录结构,识别符合命名规范的文件
从文件名提取关键信息:状态(健康/断齿)、负载值
合并所有数据文件为统一DataFrame
数据预处理 :
转换数据格式:将宽格式(每列一个传感器)转换为长格式(堆叠传感器读数)
分析数据完整性:计算最小样本量及对应时长
识别并排除异常传感器(a1)对整体分布的影响
时域分析 :
可视化不同负载下健康/故障状态的时间序列
使用箱型图比较各传感器在不同状态下的读数分布
计算全局读数分布,验证数据质量
特征工程 :
将连续信号分割为固定长度片段(1000个样本)
为每个片段计算多种统计特征:
传感器类型指示器
负载值
均值、标准差
峰度、偏度
高阶矩
构建特征矩阵和标签向量(故障=1,健康=0)
机器学习建模 :
划分训练集(80%)和测试集(20%),保持类别比例
训练随机森林分类器
评估模型准确率
使用lazypredict自动比较多种分类算法性能
频域分析 :
对单传感器数据进行FFT变换,获取频谱
检测频谱中的显著峰值
计算功率谱密度(PSD)分析信号能量分布
比较健康/故障状态下PSD的差异和相关程度
时频分析 :
计算并可视化所有传感器在不同状态下的谱图
展示信号频率成分随时间的变化
算法在信号分析中的应用
故障检测 | ||
故障分类 | ||
负载影响分析 | ||
传感器评估 | ||
频带特征提取 | ||
状态趋势监测 | ||
信号质量评估 | ||
故障定位 | ||
运行优化 | ||
诊断模型验证 |
结合机器学习和深度学习
特征自动提取 | ||
多传感器融合 | ||
异常检测 | ||
迁移学习 | ||
注意力机制 | ||
时序建模 | ||
图神经网络 | ||
生成对抗网络 | ||
多任务学习 | ||
模型可解释性 | ||
在线学习 |
# 导入Kaggle数据集
import kagglehub
# 下载齿轮箱故障诊断数据集
brjapon_gearbox_fault_diagnosis_path = kagglehub.dataset_download('brjapon/gearbox-fault-diagnosis')
print('数据源导入完成。')
# 导入基础数据分析库
import numpy as np # 数值计算库
import pandas as pd # 数据处理库
# 遍历输入目录下的所有文件
import os
for dirname, _, filenames in os.walk('/kaggle/input'):
for filename in filenames:
print(os.path.join(dirname, filename)) # 打印文件路径
# 导入可视化与分析库
import matplotlib.pyplot as plt # 数据可视化
import seaborn as sns # 高级数据可视化
from scipy import fft, signal, stats # 信号处理与统计
from tqdm.notebook import tqdm # 进度条工具
"""
数据集说明:
齿轮箱故障诊断数据集包含使用SpectraQuest齿轮箱故障诊断模拟器记录的振动数据。
数据集使用4个不同方向的振动传感器记录,负载从0%到90%变化。
包含两种场景:
1) 健康状态
2) 断齿状态
共20个文件,健康齿轮箱10个,断齿齿轮箱10个。
每个文件对应0-90%的负载(步长10%)。
文件名结构:
- 首字符:"h"表示健康,"b"表示断齿
- 中间包含"30hz"表示采样率
- 末尾字符:负载值(0-90)
"""
# 初始化数据存储列表
dfs = []
# 遍历输入目录下的所有文件
for dirname, _, filenames in tqdm(os.walk('/kaggle/input')):
for filename in tqdm(filenames, leave=False):
# 检查文件名是否符合预期模式
if not (filename.startswith('h') or filename.startswith('b')) or '30hz' not in filename or not filename.endswith('.csv'):
continue # 跳过不符合模式的文件
# 从文件名提取状态(健康/断齿)
state = filename[0]
# 从文件名提取负载值
load = int(filename.split('.')[0][5:])
# 读取CSV文件
df = pd.read_csv(os.path.join(dirname, filename))
# 添加状态列
df['state'] = state
# 添加负载列
df['load'] = load
# 添加到列表
dfs.append(df)
# 合并所有数据集并重置索引
df = pd.concat(dfs).reset_index().rename(columns={'index':'sample_index'})
# 将数据转换为"长格式"(传感器读数堆叠)
sensor_readings = df.melt(
id_vars=['sample_index','state','load'], # 保留的标识列
value_vars=['a1','a2','a3','a4'], # 要堆叠的传感器列
var_name='sensor', # 新列名:传感器类型
value_name='reading' # 新列名:读数
)
# 基本探索性数据分析(EDA)
# 可视化各文件样本数量分布
sns.countplot(
data=sensor_readings[sensor_readings.sensor=='a1'], # 使用传感器a1的数据
y='load', # y轴为负载
hue='state', # 按状态着色
)
# 计算最小样本数及对应时长
lowest_samples = df.groupby(['state','load']).sample_index.count().min()
print(f'最小样本数 = {lowest_samples}')
print(f'对应时长 = {lowest_samples/30:0.2f}秒 或 {lowest_samples/30/60:0.2f}分钟')
# 数据筛选辅助函数
def rdg(df, state=None, load=None, sensor=None):
"""根据状态、负载和传感器筛选数据"""
df_st = df[df.state==state] if state is not None else df
df_lo = df_st[df_st.load==load] if load is not None else df_st
df_se = df_lo[df_lo.sensor==sensor] if sensor is not None else df_lo
return df_se
# 时域分析:绘制传感器读数时间序列
g = sns.FacetGrid(
data=pd.concat([
rdg(sensor_readings, load=0, sensor='a1'), # 负载0%的数据
rdg(sensor_readings, load=90, sensor='a1') # 负载90%的数据
]),
col='load', # 按负载分列
row='state', # 按状态分行
height=2.5, # 子图高度
aspect=2.5 # 子图宽高比
)
g.map(plt.plot, 'reading') # 在每个子图上绘制读数
plt.show()
# 使用箱型图比较读数分布
print('每行表示不同传感器,每列表示不同负载')
print('每个子图展示健康与故障齿轮箱的读数分布')
sns.catplot(
data=sensor_readings,
col='load', row='sensor', # 按负载分列,按传感器分行
x='state', y='reading', # x轴为状态,y轴为读数
kind='boxen', # 使用箱型图变体
height=10 # 图形高度
)
# 分析传感器a1对整体分布的影响
display(sensor_readings.groupby(['sensor','state']).reading.std().unstack())
print(f"包含传感器a1的标准差: {sensor_readings.reading.std()}")
print(f"排除传感器a1的标准差: {sensor_readings[sensor_readings.sensor!='a1'].reading.std()}")
# 排除传感器a1的数据
readings = sensor_readings[sensor_readings.sensor!='a1']
# 特征工程:创建机器学习特征
data = [] # 特征数据存储
labels = [] # 标签存储
# 分组处理:按状态、负载、传感器分组
for (state,load,sensor),g in sensor_readings.groupby(['state','load','sensor']):
vals = g.reading.values # 获取该组的所有读数
# 将数据分割为1000个样本的片段
splits = np.split(vals, range(1000,vals.shape[0],1000))
for s in splits[:-1]: # 跳过最后一个可能不完整的片段
# 提取时域特征
data.append({
'sensor_a1': int(sensor=='a1'), # 传感器指示器
'sensor_a2': int(sensor=='a2'),
'sensor_a3': int(sensor=='a3'),
'load': load, # 负载值
'mean': np.mean(s), # 均值
'std': np.std(s), # 标准差
'kurt': stats.kurtosis(s), # 峰度
'skew': stats.skew(s), # 偏度
'moment': stats.moment(s), # 矩
})
labels.append(int(state=='b')) # 标签:1表示故障,0表示健康
# 转换为DataFrame
df_data = pd.DataFrame(data)
data = df_data.values
labels = np.array(labels)
# 数据集统计
print(f'总样本数: {len(labels)}')
print(f'故障类样本: {np.sum(labels)} ({np.sum(labels)/len(labels):0.1%})')
# 划分训练集和测试集
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(
data, labels,
train_size=0.8, # 80%训练
random_state=42, # 随机种子
stratify=labels # 分层抽样保持类别比例
)
print(f'训练数据: {X_train.shape}')
print(f'测试数据: {X_test.shape}')
# 随机森林模型训练与评估
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
model = RandomForestClassifier(random_state=42)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
print(f'准确率: {accuracy_score(y_test, y_pred):0.2%}')
# 使用lazypredict自动比较多个模型
get_ipython().system('pip install lazypredict')
from lazypredict.Supervised import LazyClassifier
clf = LazyClassifier(verbose=0, ignore_warnings=True)
models, predictions = clf.fit(X_train, X_test, y_train, y_test)
print(models.head()) # 显示表现最好的模型
# 频域分析:快速傅里叶变换(FFT)
sensor_data = rdg(sensor_readings, 'h', 10, 'a4').reading.values
y = np.abs(fft.rfft(sensor_data)) # 计算FFT幅度
x = fft.rfftfreq(sensor_data.shape[0], 1/30) # 计算频率轴(30Hz采样率)
plt.plot(x, y)
plt.xlabel('频率 (Hz)')
plt.ylabel('信号强度')
plt.show()
# 在频谱中检测峰值
fig, ax = plt.subplots(figsize=(25,3))
ax.plot(x, y)
# 设置峰值检测参数
x_peak_spacing = y.shape[0] / x.max() # 最小峰值间距(1Hz)
x_peak_prominence = np.quantile(y,0.99) # 最小峰值显著度(99%分位数)
# 检测峰值
peaks, _ = signal.find_peaks(y, distance=x_peak_spacing, prominence=x_peak_prominence)
for peak in peaks:
ax.scatter(x=x[peak], y=y[peak], c='r', marker='o', s=30) # 标记峰值
ax.set_xlabel('频率 (Hz)')
ax.set_ylabel('信号强度')
plt.show()
# 功率谱密度(PSD)分析
x, y = signal.welch(sensor_data, fs=30) # 使用Welch方法计算PSD
plt.plot(x, y)
plt.xlabel('频率 (Hz)')
plt.ylabel('功率谱密度 (V^2/Hz)')
plt.show()
# 比较不同状态下的PSD
fig, ax = plt.subplots(ncols=4, nrows=10, figsize=(15,20))
for ((load,sensor), dfg), axi in tqdm(zip(sensor_readings.groupby(['load','sensor']), ax.ravel()), total=40):
# 分离健康和故障数据
healthy_raw = dfg[dfg.state=='h'].reading.values
broken_raw = dfg[dfg.state=='b'].reading.values
# 计算PSD
fh, Ph = signal.welch(healthy_raw, fs=30)
fb, Pb = signal.welch(broken_raw, fs=30)
# 绘制并计算相关性
axi.plot(fh, Ph, c='g', alpha=0.6, label='健康')
axi.plot(fb, Pb, c='r', alpha=0.6, label='故障')
axi.set_ylim(0,50)
axi.set_title(f'负载={load}, 传感器 {sensor}, 相关性: {np.corrcoef(Ph, Pb)[0,1]:0.2}')
plt.tight_layout()
plt.show()
# 时频分析:谱图可视化
fig, ax = plt.subplots(ncols=8, nrows=10, figsize=(25,20))
for ((load, sensor, state), dfg), axi in tqdm(zip(sensor_readings.groupby(['load','sensor','state']), ax.ravel()), total=80):
raw = dfg.reading.values
# 计算谱图
f, t, Sxx = signal.spectrogram(raw, fs=30)
# 绘制谱图
axi.pcolormesh(t, f, Sxx, cmap='nipy_spectral')
axi.set_title(f'负载={load}, 传感器 {sensor}, 状态 {state}')
plt.tight_layout()
plt.show()
https://www.zhihu.com/consult/people/792359672131756032?isMe=1
擅长领域:信号滤波/降噪,机器学习/深度学习,时间序列预分析/预测,设备故障诊断/缺陷检测/异常检测