在这里向大家介绍我们在PAKDD2020 Alibaba AI Ops Competition -大规模磁盘故障预测比赛采取的方案框架。本次比赛聚焦解决大规模生产系统中的硬盘故障预测问题,需解决数据噪声、正负样本不均衡等技术问题,同时也要保障预测算法长期稳定有效。我们团队采用了简单的二分类算法,在LightGBM模型基础上采用了Focal Loss目标函数,决赛中的得分为0.4047(第三名)。
1. 数据预处理
在我们的二分类框架中,1表示磁盘会在未来30天出现故障,0表示无法判断磁盘是否会在未来30天内出现故障。同时,在本次比赛中,组织方提供了故障磁盘的故障时间,因此,我们可以按照故障时间对训练集中的磁盘样本打上标签。我们最终采取的方式是:将故障硬盘中离故障时间7天以内的样本标记为正样本,其余样本均为负样本。
import pandas as pd
import numpy as np
from tqdm import tqdm
from lightgbm.sklearn import LGBMClassifier
import copy
import gc
import os
pd.options.display.max_columns = None
pd.options.display.max_rows = None
def process_data():
test = pd.read_csv('disk_sample_smart_log_test_a.csv')
test['dt'] = test['dt'].apply(lambda x: ''.join(str(x)[0:4] + '-' + str(x)[4:6] + '-' + str(x)[6:]))
test['dt'] = pd.to_datetime(test['dt'])
test = test.sort_values(['serial_number', 'dt'])
test = test.drop_duplicates().reset_index(drop=True)
tag = pd.read_csv('disk_sample_fault_tag.csv')
tag['fault_time'] = pd.to_datetime(tag['fault_time'])
tag['tag'] = tag['tag'].astype(str)
tag = tag.groupby(['serial_number', 'manufacturer', 'model', 'fault_time'])['tag'].apply(lambda x: '|'.join(x)).reset_index()
tag_2018_8 = pd.read_csv('disk_sample_fault_tag_201808.csv')
tag_2018_8['fault_time'] = pd.to_datetime(tag_2018_8['fault_time'])
tag_2018_8['tag'] = tag_2018_8['tag'].astype(str)
tag_2018_8 = tag_2018_8.groupby(['serial_number', 'manufacturer', 'model', 'fault_time'])['tag'].apply(lambda x: '|'.join(x)).reset_index()
tag = pd.concat([tag, tag_2018_8], axis=0, ignore_index=True)
fea_list = []
for i in [col for col in test.columns]:
if (test[i].nunique() >= 1) & (test[i].notnull().sum() > 0.01 * test.shape[0]):
fea_list.append(i)
## 为磁盘样本打上标签,1表示磁盘会在未来30天出现故障,0表示无法判断磁盘是否会在未来30天内出现故障
def get_label(df):
df = df[fea_list]
df.loc[:, 'dt'] = df['dt'].apply(lambda x: ''.join(str(x)[0:4] + '-' + str(x)[4:6] + '-' + str(x)[6:]))
df.loc[:, 'dt'] = pd.to_datetime(df['dt'])
df = df.merge(tag[['serial_number', 'model', 'fault_time']], how='left', on=['serial_number', 'model'])
df.loc[:,'diff_day'] = (df['fault_time'] - df['dt']).dt.days
df.loc[:,'label'] = 0
df.loc[(df['diff_day'] >= 0) & (df['diff_day'] <= 7), 'label'] = 1 # 这里参数选为7天,大家可以根据自己的方案多次测试,选择其他时间长度,
return df
##原始数据集比较大,这里每次读取200000行,分批处理;打上标签后保存好数据,一次计算后重复使用;台式机上处理这些数据大概一个小时左右。
def save_data(a,b):
if os.path.exists(b):
os.remove(b)
df = pd.read_csv(a,chunksize=200000)
for chunk in df:
df_chunk = get_label(chunk)
if not os.path.exists(b):
pd.DataFrame(df_chunk).to_csv(b,index=None)
else:
pd.DataFrame(df_chunk).to_csv(b, mode='a', header=None,index=None)
a = ['disk_sample_smart_log_201707.csv','disk_sample_smart_log_201708.csv','disk_sample_smart_log_201709.csv',
'disk_sample_smart_log_201710.csv','disk_sample_smart_log_201711.csv','disk_sample_smart_log_201712.csv',
'disk_sample_smart_log_201801.csv','disk_sample_smart_log_201802.csv','disk_sample_smart_log_201803.csv',
'disk_sample_smart_log_201804.csv','disk_sample_smart_log_201805.csv','disk_sample_smart_log_201806.csv',
'disk_sample_smart_log_201807.csv','disk_sample_smart_log_201808.csv']
b = ['train_2017_7.csv','train_2017_8.csv','train_2017_9.csv','train_2017_10.csv','train_2017_11.csv','train_2017_12.csv',
'train_2018_1.csv','train_2018_2.csv','train_2018_3.csv','train_2018_4.csv','train_2018_5.csv','train_2018_6.csv',
'train_2018_7.csv', 'train_2018_8.csv']
for i in range(len(a)):
save_data(a[i], b[i])
if __name__=='__main__':
process_data()
2. 模型训练
我们使用了2017年7月-2018年8月的训练数据,在特征选取阶段,选取了11维的SMART原始特征,在这个阶段,去除了取值为空和故障前数值没有出现变化的特征;在特征变换阶段,将最近7个样本的前后差分绝对值的指数加权滑动平均值作为最终特征;在模型训练阶段,使用Focal loss作为训练的目标函数。
import pandas as pd
import numpy as np
from lightgbm.sklearn import LGBMClassifier
import lightgbm
import copy
import gc
import joblib
from process_data import process_data
from sklearn.metrics import f1_score,precision_score,recall_score
import os
from scipy.misc import derivative
import warnings
warnings.filterwarnings("ignore")
pd.options.display.max_columns = None
pd.options.display.max_rows = None
process_data() # 打标签,数据预处理
feature_name = ['smart_1raw','smart_5raw','smart_7raw','smart_9raw','smart_187raw','smart_188raw',
'smart_191raw','smart_192raw','smart_193raw','smart_197raw','smart_198raw']
# 选取了最终11维SMART 原始特征,我们没有使用SMART归一化特征,因为归一化特征是由原始特征转化而来,且不少归一化特征和原始特征线性相关。
train_column_name = feature_name + ['serial_number', 'manufacturer', 'model', 'dt','label']
test_column_name = feature_name + ['serial_number', 'manufacturer', 'model', 'dt']
# 特征提取,这里我们采用了SMART 特征在一个窗口内前后差分绝对值的指数加权滑动平均值作为唯一特征变换方式,它衡量了数据在最近一段时间内的变化率,
# 且对离当前时刻的样本赋予更高的权重
def transform(data):
width = 7
alpha = 0.7
data = data.sort_values(['serial_number', 'manufacturer', 'model', 'dt'])
data['count'] = data.groupby(['serial_number', 'manufacturer', 'model'])['serial_number'].transform(lambda x: list(range(len(x))))
f1 = copy.deepcopy(data)
f1[feature_name] = alpha**0* np.abs(data[feature_name] - data[feature_name].shift(1)) + alpha**1* np.abs(data[feature_name].shift(1) - data[feature_name].shift(2))\
+alpha**2* np.abs(data[feature_name].shift(2) - data[feature_name].shift(3)) + alpha**3* np.abs(data[feature_name].shift(3) - data[feature_name].shift(4))\
+alpha**4* np.abs(data[feature_name].shift(4) - data[feature_name].shift(5)) + alpha**5* np.abs(data[feature_name].shift(5) - data[feature_name].shift(6))
f1 = f1.loc[data['count'] >= width-1]
feature = f1[feature_name]
feature = pd.concat([feature, f1['model']], axis=1, ignore_index=True)
del f1
gc.collect()
if 'label' in data.columns:
label = data.loc[data['count'] >= width-1].label.values
else:
label = np.zeros((feature.shape[0], 1))
sub = data.loc[data['count'] >= width-1][['manufacturer','model','serial_number','dt']]
return feature, label, sub
# 我们利用了比赛中全量数据集,大概有5000千万个样本左右
train = pd.read_csv('train_2018_8.csv',usecols=train_column_name)
b = ['train_2018_7.csv','train_2018_6.csv','train_2018_5.csv','train_2018_4.csv','train_2018_3.csv',
'train_2018_2.csv','train_2018_1.csv','train_2017_12.csv','train_2017_11.csv','train_2017_10.csv',
'train_2017_9.csv','train_2017_8.csv','train_2017_7.csv']
for i in range(len(b)):
data = pd.read_csv(b[i],usecols=train_column_name)
train = pd.concat([train, data], axis=0, ignore_index=True)
train = train.drop_duplicates().reset_index(drop=True)
train_x, train_y,sub = transform(train)
print(train_x.shape, np.sum(train_y))
# 我们采用了focal loss 函数作为训练目标,focal loss 函数可以较好的处理样本不均衡的问题,在训练中更加注重小样本的学习,
a,g = 0.9833, 2
def focal_loss_lgb(y_true, y_pred):
def fl(x,t):
p = 1/(1+np.exp(-x))
return -( a*t + (1-a)*(1-t) ) * (( 1 - ( t*p + (1-t)*(1-p)) )**g) * ( t*np.log(p)+(1-t)*np.log(1-p) )
partial_fl = lambda x: fl(x, y_true)
grad = derivative(partial_fl, y_pred, n=1, dx=1e-6)
hess = derivative(partial_fl, y_pred, n=2, dx=1e-6)
return grad, hess
def focal_loss_lgb_eval(y_true, y_pred):
p = 1/(1+np.exp(-y_pred))
loss = -( a*y_true + (1-a)*(1-y_true) ) * (( 1 - ( y_true*p + (1-y_true)*(1-p)) )**g) * ( y_true*np.log(p)+(1-y_true)*np.log(1-p) )
return 'focal_loss', np.mean(loss), False
# 我们采用了LightGBM 模型,实测发现最终结果对参数比较鲁棒
clf = LGBMClassifier(
learning_rate=0.05,
n_estimators=100,
num_leaves=127,
subsample=0.7,
colsample_bytree=0.7,
random_state=2019,
metric=None,
objective=focal_loss_lgb,
)
print('************** training **************')
print(train_x.shape)
clf.fit(
train_x, train_y,
eval_set=[(train_x, train_y)],
eval_metric=focal_loss_lgb_eval,
verbose=1
)
joblib.dump(clf,"train_model.dat")
train_y_hat = clf.predict_proba(train_x)[:, 1]
p_th = np.percentile(train_y_hat[train_y == 0], 99.97) # 通过取训练集中负样本的99.97%分位数作为在线预测的阈值
joblib.dump(p_th,"p_th.dat")
print(p_th)
3. 异常检测/预测
在测试阶段,我们采用了两步走的预测逻辑,首先通过固定概率阈值的方法进行初选,然后提交每日异常概率TOPN的样本。
import pandas as pd
import numpy as np
from lightgbm.sklearn import LGBMClassifier
import lightgbm
import copy
import gc
import joblib
from sklearn.metrics import f1_score,precision_score,recall_score
import os
pd.options.display.max_columns = None
pd.options.display.max_rows = None
import warnings
warnings.filterwarnings("ignore")
feature_name = ['smart_1raw','smart_5raw','smart_7raw','smart_12raw','smart_187raw','smart_188raw',
'smart_191raw','smart_192raw','smart_193raw','smart_197raw','smart_198raw']
train_column_name = feature_name + ['serial_number', 'manufacturer', 'model', 'dt','label']
test_column_name = feature_name +['serial_number', 'manufacturer', 'model', 'dt']
def transform(data):
width = 7
alpha = 0.7
data = data.sort_values(['serial_number', 'manufacturer', 'model', 'dt'])
data['count'] = data.groupby(['serial_number', 'manufacturer', 'model'])['serial_number'].transform(lambda x: list(range(len(x))))
f1 = copy.deepcopy(data)
f1[feature_name] = alpha**0* np.abs(data[feature_name] - data[feature_name].shift(1)) + alpha**1* np.abs(data[feature_name].shift(1) - data[feature_name].shift(2))\
+alpha**2* np.abs(data[feature_name].shift(2) - data[feature_name].shift(3)) + alpha**3* np.abs(data[feature_name].shift(3) - data[feature_name].shift(4))\
+alpha**4* np.abs(data[feature_name].shift(4) - data[feature_name].shift(5)) + alpha**5* np.abs(data[feature_name].shift(5) - data[feature_name].shift(6))
f1 = f1.loc[data['count'] >= width-1]
feature = f1[feature_name]
feature = pd.concat([feature, f1['model']], axis=1, ignore_index=True)
del f1
gc.collect()
if 'label' in data.columns:
label = data.loc[data['count'] >= width-1].label.values
else:
label = np.zeros((feature.shape[0], 1))
sub = data.loc[data['count'] >= width-1][['manufacturer','model','serial_number','dt']]
return feature, label, sub
b = [ 'disk_sample_smart_log_20180823_round2.csv','disk_sample_smart_log_20180824_round2.csv',
'disk_sample_smart_log_20180825_round2.csv','disk_sample_smart_log_20180826_round2.csv',
'disk_sample_smart_log_20180827_round2.csv','disk_sample_smart_log_20180828_round2.csv',
'disk_sample_smart_log_20180829_round2.csv','disk_sample_smart_log_20180830_round2.csv',
'disk_sample_smart_log_20180831_round2.csv','disk_sample_smart_log_20180901_round2.csv',
'disk_sample_smart_log_20180902_round2.csv','disk_sample_smart_log_20180903_round2.csv',
'disk_sample_smart_log_20180904_round2.csv','disk_sample_smart_log_20180905_round2.csv',
'disk_sample_smart_log_20180906_round2.csv','disk_sample_smart_log_20180907_round2.csv',
'disk_sample_smart_log_20180908_round2.csv','disk_sample_smart_log_20180909_round2.csv',
'disk_sample_smart_log_20180910_round2.csv','disk_sample_smart_log_20180911_round2.csv',
'disk_sample_smart_log_20180912_round2.csv','disk_sample_smart_log_20180913_round2.csv',
'disk_sample_smart_log_20180914_round2.csv','disk_sample_smart_log_20180915_round2.csv',
'disk_sample_smart_log_20180916_round2.csv','disk_sample_smart_log_20180917_round2.csv',
'disk_sample_smart_log_20180918_round2.csv','disk_sample_smart_log_20180919_round2.csv',
'disk_sample_smart_log_20180920_round2.csv','disk_sample_smart_log_20180921_round2.csv',
'disk_sample_smart_log_20180922_round2.csv','disk_sample_smart_log_20180923_round2.csv',
'disk_sample_smart_log_20180924_round2.csv','disk_sample_smart_log_20180925_round2.csv',
'disk_sample_smart_log_20180926_round2.csv','disk_sample_smart_log_20180927_round2.csv',
'disk_sample_smart_log_20180928_round2.csv','disk_sample_smart_log_20180929_round2.csv',
'disk_sample_smart_log_20180930_round2.csv']
test = pd.read_csv(os.path.join('/tcdata/disk_sample_smart_log_round2','disk_sample_smart_log_20180822_round2.csv'),usecols=test_column_name)
for i in range(len(b)):
data = pd.read_csv(os.path.join('/tcdata/disk_sample_smart_log_round2',b[i]),usecols=test_column_name)
test = pd.concat([test, data], axis=0, ignore_index=True)
test = test.dropna(axis=0, how='any')
test = test.drop_duplicates().reset_index(drop=True)
test['dt'] = test['dt'].apply(lambda x: ''.join(str(x)[0:4] + '-' + str(x)[4:6] + '-' + str(x)[6:]))
test['dt'] = pd.to_datetime(test['dt'])
test_x, test_y, sub = transform(test)
test_x = test_x[(pd.to_datetime(sub['dt']) - pd.to_datetime('2018-08-31')).dt.days > 0]
test_y = test_y[(pd.to_datetime(sub['dt']) - pd.to_datetime('2018-08-31')).dt.days > 0]
sub = sub[(pd.to_datetime(sub['dt']) - pd.to_datetime('2018-08-31')).dt.days > 0]
a = 0.9833
g = 2
def focal_loss_lgb(y_true, y_pred):
def fl(x,t):
p = 1/(1+np.exp(-x))
return -( a*t + (1-a)*(1-t) ) * (( 1 - ( t*p + (1-t)*(1-p)) )**g) * ( t*np.log(p)+(1-t)*np.log(1-p) )
partial_fl = lambda x: fl(x, y_true)
grad = derivative(partial_fl, y_pred, n=1, dx=1e-6)
hess = derivative(partial_fl, y_pred, n=2, dx=1e-6)
return grad, hess
def focal_loss_lgb_eval(y_true, y_pred):
p = 1/(1+np.exp(-y_pred))
loss = -( a*y_true + (1-a)*(1-y_true) ) * (( 1 - ( y_true*p + (1-y_true)*(1-p)) )**g) * ( y_true*np.log(p)+(1-y_true)*np.log(1-p))
return 'focal_loss', np.mean(loss), False
clf = joblib.load("/train_model.dat")
p_th = 0.12
sub.loc[:,'p'] = clf.predict_proba(test_x)[:, 1]
sub.loc[:, 'label'] = sub['p'] >= p_th # 利用概率阈值进行初选
submit = sub.loc[sub.label == 1]
submit = submit.sort_values(['manufacturer','model','dt', 'p'])
submit['count'] = submit.groupby(['manufacturer','model','dt'])['p'].transform(lambda x: (-x).rank())
submit = submit[submit['count'] <= 9] # 提交每日异常概率TOP9的样本
submit = submit.drop_duplicates(['serial_number','model'])
submit[['manufacturer','model','serial_number','dt']].to_csv("/result.csv", index=False, header=None)
print(submit.shape)
4. 总结
本次比赛中的数据源于生产环境,存在数据缺失和噪声现象,且正负样本不均衡现象严重,我们采用的Focal loss 目标函数能在一定程度上应对样本不均衡问题。除了将此问题建模为二分类问题,还可以参考其他选手的方案,另辟蹊径,将此问题建模为回归问题,可能可以取得更好的效果。