返回列表

11th place solution: LSTM-CNN + rolling features

555. Parkinsons Freezing of Gait Prediction | tlvmc-parkinsons-freezing-gait-prediction

开始: 2023-03-09 结束: 2023-06-08 临床决策支持 数据算法赛
```html

第11名解决方案:LSTM-CNN + 滚动特征

作者:Ismail
发布时间:2023-06-18 10:03:43

特征工程

我使用以下从AccV、AccML、AccAP生成的特征作为模型输入:

  • 滞后特征
  • 全局均值、中位数、最大值、平均值、标准差和分位数
  • 滚动均值、中位数、最大值、平均值、标准差和分位数(同时应用于反转时间序列)
  • 时间序列一阶差分的滚动均值、中位数、最大值、平均值、标准差和分位数(同时应用于反转时间序列)
  • 滚动窗口内符号变化次数的平均值(含反转时间序列)
  • 时间序列一阶差分的指数加权均值
  • 已过时间比例

特征生成代码:

def rolling_agg(
    dt: pd.DataFrame, step: int, 
    aggfunc: str, cols: list, back: bool = False) -> pd.DataFrame:
    
    if back:
        rolling = dt[cols][::-1].rolling(step, min_periods=0)
        suffix = f"_back_rolling_{step}_{aggfunc}"
    else:
        rolling = dt[cols].rolling(step, min_periods=0)
        suffix = f"_rolling_{step}_{aggfunc}"
        
    if aggfunc.startswith("quantile"):
        quantile = int(aggfunc.split("_")[1]) / 100
        
        return (
            rolling.quantile(quantile)
            .add_suffix(suffix))
    else:
        return (
            rolling.agg(aggfunc)
            .add_suffix(suffix))

def create_dataset(data, defog=False, verbose=False):
    cols = ['AccV', 'AccML', 'AccAP']
    dt = data.copy()
    
    if not defog:
        data[cols] = data[cols] / 9.80665
    dt["defog"] = int(defog)
    if verbose: print("Global stats")
    for aggfunc in ["mean", "max", "min", "std", "median"]:
        dt = dt.join(
            dt[cols].groupby(dt.assign(dummy=1).dummy)
            .transform(aggfunc).add_suffix(f"_{aggfunc}")
        )
    
    step1 = 500
    step2 = 100
    if verbose: print("Shifts stats")
    for shift in [1, 2, -1, -2]:
        
        if shift > 0:
            suffix_name = f"_lag_{shift}"
            fill_data = dt[cols].iloc[: shift]
        else:
            suffix_name = f"_lead_{abs(shift)}"
            fill_data = dt[cols].iloc[shift:]
        
        dt = dt.join(
            dt[cols]
            .shift(shift)
            .fillna(fill_data)
            .add_suffix(suffix_name)
        )
    
    aggfuncs = [
        "mean", "std", "max", "min", "median", 
        "quantile_75", "quantile_25", "quantile_99"
    ]
    if verbose: print("Rolling stats, step 1")

    for aggfunc in aggfuncs:
        
        dt = dt.join(
            rolling_agg(dt, step1, aggfunc, cols)
        )
    
    if verbose: print("Rolling stats, step 2")

    for aggfunc in aggfuncs:
        funcname = aggfunc if isinstance(aggfunc, str) else aggfunc.__name__
        dt = dt.join(
            rolling_agg(dt, step2, aggfunc, cols)
        )
    if verbose: print("Back Rolling stats, step 1")

    for aggfunc in aggfuncs:
        dt = dt.join(
            rolling_agg(dt, step1, aggfunc, cols, back=True)
        )
    if verbose: print("Back Rolling stats, step 2")

    for aggfunc in aggfuncs:
        dt = dt.join(
            rolling_agg(dt[::-1], step2, aggfunc, cols, back=True)
        )
    if verbose: print("Calculating diffs")

    diff = dt[cols].transform("diff").add_suffix("_diff")
    diff = diff.fillna(diff.iloc[0])
    cols = ["AccV_diff", "AccML_diff", "AccAP_diff"]
    if verbose: print("Diff rolling stat step 1")
    
    for aggfunc in aggfuncs:
        dt = dt.join(
            rolling_agg(diff, step1, aggfunc, cols)
        )
        
    if verbose: print("Diff rolling stat step 2")
    for aggfunc in aggfuncs:
        dt = dt.join(
            rolling_agg(diff, step2, aggfunc, cols)
        )
        
    if verbose: print("Back Diff rolling stat step 1")

    for aggfunc in aggfuncs:
        dt = dt.join(
            rolling_agg(diff, step1, aggfunc, cols, back=True)
        )
    
    if verbose: print("Back Diff rolling stat step 2")

    for aggfunc in aggfuncs:
        dt = dt.join(
            rolling_agg(diff, step2, aggfunc, cols, back=True)
        )
    
    if verbose: print("Sign change")
    sign_change = (
        diff.apply(np.sign)
        .transform("diff")
        .apply(np.abs)
        .divide(2)
        .fillna(0)
        .add_suffix("_sc")
    )
    
    cols = ["AccV_diff_sc", "AccML_diff_sc", "AccAP_diff_sc"]
    aggfuncs = ["mean"]
    if verbose: print("Sign change rolling stat step 1")

    for aggfunc in aggfuncs:
        dt = dt.join(
            rolling_agg(sign_change, step1, aggfunc, cols)
        )
    if verbose: print("Sign change rolling stat step 2")

    for aggfunc in aggfuncs:
        dt = dt.join(
            rolling_agg(sign_change, step2, aggfunc, cols)
        )
    if verbose: print("Back Sign change rolling stat step 1")

    for aggfunc in aggfuncs:
        dt = dt.join(
            rolling_agg(sign_change, step1, aggfunc, cols, back=True)
        )
    if verbose: print("Back Sign change rolling stat step 2")

    for aggfunc in aggfuncs:
        dt = dt.join(
            rolling_agg(sign_change, step2, aggfunc, cols, back=True)
        )
        
    if verbose: print("time spent")
    
    dt["time_spent"] = dt.Time.divide(dt.Time.max())
    return dt.drop("Time", axis=1).fillna(0)

完成这些转换后,我对数据进行了缩放,并将所有时间序列分割成长度为10000的片段,保存为feather格式文件。

模型结构

我使用了LSTM-CNN混合模型:输入首先进入三个并行Conv1D模块(核大小分别为3、5和7),然后将输入与卷积层输出连接,送入两层LSTM序列,最后通过一个线性层进行分类。

模型定义代码:

import torch.nn as nn
import torch.nn.functional as F

def block(kernel_size):
    return nn.Sequential(
        nn.Conv1d(240, 128, kernel_size, padding="same"),
        nn.ReLU(),
        nn.Conv1d(128, 64, kernel_size, padding="same"),
        nn.ReLU(),
    )

class ParkinsonModel(nn.Module):
    def __init__(self, kernels=[3, 5, 7]):
        self.kernels = kernels
        super().__init__()
        self.conv_nets = nn.ModuleList([
            block(i) for i in kernels
        ])
        self.lstm = nn.LSTM(
            64 * len(self.kernels) + 240, 
            128, 2, batch_first=True, bidirectional=True, dropout=.1)
        self.linear = nn.Linear(128 * 2, 4)

    def forward(self, x):
        conv_res = []
        for net in self.conv_nets:
            conv_res.append(net(x))
        conv_res.append(x)
            
        conv_res_tensor = torch.concat(conv_res, axis=1)
        lstm_out, _ = self.lstm(conv_res_tensor.transpose(2, 1))
        res = self.linear(lstm_out).transpose(2, 1)
        return res

我没有建立有效的验证流程,因此未使用交叉验证。模型训练了30个epoch,监控约10%受试者的验证数据损失。

```
同比赛其他方案