本项目基于每个产品1913天的数据,预测后续28天的销量数据

关键名称解释:

id is a concatenation of an item_id and a store_id

For the validation rows, F1-F28 corresponds to d_1914 - d_1941,for the evaluation (corresponding to the Private leaderboard) rows, F1-F28 corresponds to d_1942 - d_1969

1、引入必要代码包

from  datetime import datetime, timedelta
import gc
import numpy as np, pandas as pd
import lightgbm as lgb
from matplotlib import pyplot as plt
import sys
from  datetime import datetime, timedelta

2、导入数据

sale_data=pd.read_csv("/kaggle/input/m5-forecasting-accuracy/sales_train_validation.csv")
day_data=sale_data[[f'd_{day}' for day in range(1,1914)]]
total_sum=np.sum(day_data, axis=0).values
plt.plot(total_sum)

画图展示每一天不同商品销量总和随时间的变化趋势,可以看出总的时间序列有上升趋势,并且有季节性。

画图展示100以内销量数值的直方图,可以看出销量分布更倾向于泊松分布:

plt.hist(day_data[day_data<100].values.reshape(-1),bins=100)

mse倾向于针对target高斯分布的,而本项目的销售数据以0为主,更倾向于泊松分布,因此损失函数尽量不直接使用mse,用更能体现评估指标的损失函数。 

定义函数用来读取price数据、calendar数据以及最关键的包含销量的train数据,读取数据时候直接指定好数据类型(尤其是category类型),三个for循环,分别将三个表中的category类型编码为从0开始的数值。如果是生成test数据,留出后面28天的列并用nan占位。用pd.melt函数将宽表格转换为长表格,最后把三个表格merge到一起:

In this case what the melt function is doing is that it is converting the sales dataframe which is in wide format to a long format. I have kept the id variables as iditem_iddept_idcat_idstore_id and state_id. They have in total 30490 unique values when compunded together. Now the total number of days for which we have the data is 1913 days. Therefore the melted dataframe will be having 30490x1913 rows

def create_train_data(train_start=750,test_start=1800,is_train=True):
    # 基本参数
    PRICE_DTYPES = {"store_id": "category", "item_id": "category", "wm_yr_wk": "int16","sell_price":"float32" }
    CAL_DTYPES={"event_name_1": "category", "event_name_2": "category", "event_type_1": "category", 
            "event_type_2": "category", "weekday": "category", 'wm_yr_wk': 'int16', "wday": "int16",
            "month": "int16", "year": "int16", "snap_CA": "float32", 'snap_TX': 'float32', 'snap_WI': 'float32' }

    start_day = train_start if is_train else test_start
    numcols = [f"d_{day}" for day in range(start_day,1914)]
    catcols = ['id', 'item_id', 'dept_id','store_id', 'cat_id', 'state_id']
    SALE_DTYPES = {numcol:"float32" for numcol in numcols} 
    SALE_DTYPES.update({col: "category" for col in catcols if col != "id"})

    # 加载price数据
    price_data = pd.read_csv('/kaggle/input/m5-forecasting-accuracy/sell_prices.csv',dtype=PRICE_DTYPES)
    # 加载cal数据
    cal_data = pd.read_csv('/kaggle/input/m5-forecasting-accuracy/calendar.csv',dtype=CAL_DTYPES)
    # 加载sale数据
    sale_data = pd.read_csv('/kaggle/input/m5-forecasting-accuracy/sales_train_validation.csv',dtype=SALE_DTYPES,usecols=catcols+numcols)


    # 类别标签转换
    for col, col_dtype in PRICE_DTYPES.items():
        if col_dtype == "category":
            price_data[col] = price_data[col].cat.codes.astype("int16")
            price_data[col] -= price_data[col].min()

    cal_data["date"] = pd.to_datetime(cal_data["date"])
    for col, col_dtype in CAL_DTYPES.items():
        if col_dtype == "category":
            cal_data[col] = cal_data[col].cat.codes.astype("int16")
            cal_data[col] -= cal_data[col].min()


    for col in catcols:
        if col != "id":
            sale_data[col] = sale_data[col].cat.codes.astype("int16")
            sale_data[col] -= sale_data[col].min()

    # 注意提交格式里有一部分为空
    if not is_train:
        for day in range(1913+1, 1913+ 2*28 +1):
            sale_data[f"d_{day}"] = np.nan

    sale_data = pd.melt(sale_data,
            id_vars = catcols,
            value_vars = [col for col in sale_data.columns if col.startswith("d_")],
            var_name = "d",
            value_name = "sales")
    sale_data = sale_data.merge(cal_data, on= "d", copy = False)
    sale_data = sale_data.merge(price_data, on = ["store_id", "item_id", "wm_yr_wk"], copy = False)
    return sale_data
sale_data = create_train_data(train_start=750,is_train=True)

pd.melt函数常用参数介绍如下:

sale_data = pd.melt(sale_data,          # 要处理的数据框(DataFrame)
            id_vars = catcols,          # 作为标识符的列(保持不变)
            value_vars = [col for col in sale_data.columns if col.startswith("d_")], # 要被“熔化”的列
            var_name = "d",             # 用于存储‘被熔化列名’的新列名
            value_name = "sales")       # 用于存储‘被熔化列值’的新列名

3、特征工程

导入数据的create_train_data函数已经完成label_encoding,把category类型转换成从0编码的数值。接下来获取前7天、28天的数据(用groupby和shift),并且计算前7天、28天的滚动平均值(用groupby和transform以及rolling),再生成时间特征:

def create_feature(sale_data, is_train=True, day=None):
    # 可以在这里加入更多的特征抽取方法
    # 获取7天前的数据,28天前的数据
    lags = [7, 28]
    lag_cols = [f"lag_{lag}" for lag in lags ]

    # 如果是测试集只需要计算一天的特征,减少计算量
    # 注意训练集和测试集特征生成要一致
    if is_train:
        for lag, lag_col in zip(lags, lag_cols):
            sale_data[lag_col] = sale_data[["id","sales"]].groupby("id")["sales"].shift(lag)
    else:
        for lag, lag_col in zip(lags, lag_cols):
            sale_data.loc[sale_data.date == day, lag_col] = sale_data.loc[sale_data.date ==day-timedelta(days=lag), 'sales'].values  


    # 将获取7天前的数据,28天前的数据做移动平均
    wins = [7, 28]

    if is_train:
        for win in wins :
            for lag,lag_col in zip(lags, lag_cols):
                sale_data[f"rmean_{lag}_{win}"] = sale_data[["id", lag_col]].groupby("id")[lag_col].transform(lambda x : x.rolling(win).mean())
    else:
        for win in wins:
            for lag in lags:
                df_window = sale_data[(sale_data.date <= day-timedelta(days=lag)) & (sale_data.date > day-timedelta(days=lag+win))]
                df_window_grouped = df_window.groupby("id").agg({'sales':'mean'}).reindex(sale_data.loc[sale_data.date==day,'id'])
                sale_data.loc[sale_data.date == day,f"rmean_{lag}_{win}"] = df_window_grouped.sales.values   

    # 处理时间特征
    # 有的时间特征没有,通过datetime的方法自动生成
    date_features = {
            "wday": "weekday",
            "week": "weekofyear",
            "month": "month",
            "quarter": "quarter",
            "year": "year",
            "mday": "day",
        }

    for date_feat_name, date_feat_func in date_features.items():
        if date_feat_name in sale_data.columns:
            sale_data[date_feat_name] = sale_data[date_feat_name].astype("int16")
        else:
            sale_data[date_feat_name] = getattr(sale_data["date"].dt, date_feat_func).astype("int16")
    return sale_data
sale_data = create_feature(sale_data)

其中train和test数据虽然生成特征相同,但做法不同,计算滚动平均时,对train数据应用transform给每行都输出数据,对test用agg以及reindex提高效率(reindex确保结果顺序与当天需要预测的id顺序完全一致,如果某个id在窗口数据中不存在,对应位置会是nan),给每组一个值,因为test只需要计算一天的数据,这两种聚合方法区别如下:

方面 transform agg
输出形状 与输入相同 压缩为每个组一行
索引保持 自动保持 需要手动reindex
适用场景 为所有记录添加特征 为特定目标计算聚合值

getattr(sale_data["date"].dt, date_feat_func)用来动态指定属性名的场景。

4、获取训练集测试集

# 清洗数据,选择需要训练的数据
sale_data.dropna(inplace=True)
cat_feats = ['item_id', 'dept_id','store_id', 'cat_id', 'state_id'] + ["event_name_1", "event_name_2", "event_type_1", "event_type_2"]
useless_cols = ["id", "date", "sales","d", "wm_yr_wk", "weekday"]
train_cols = sale_data.columns[~sale_data.columns.isin(useless_cols)]
X_train = sale_data[train_cols]
y_train = sale_data["sales"]
train_data = lgb.Dataset(X_train, label = y_train, categorical_feature=cat_feats, free_raw_data=False)
valid_inds = np.random.choice(len(X_train), 10000)
valid_data = lgb.Dataset(X_train.iloc[valid_inds], label = y_train.iloc[valid_inds],categorical_feature=cat_feats, free_raw_data=False)

5、定义并训练模型lgb

def train_model(train_data,valid_data):
    params = {
        "objective" : "tweedie",
        "metric" :"rmse",
        "force_row_wise" : True,
        "learning_rate" : 0.075,
        "sub_feature" : 0.8,
        "sub_row" : 0.75,
        "bagging_freq" : 1,
        "lambda_l2" : 0.1,
        "metric": ["rmse"],
        "nthread": 8,
        "tweedie_variance_power":1.2,
    'verbosity': 1,
    'num_iterations' : 1500,
    'num_leaves': 128,
    "min_data_in_leaf": 104,
    }

    m_lgb = lgb.train(params, train_data, valid_sets = [valid_data], verbose_eval=50)
    return m_lgb

m_lgb = train_model(train_data,valid_data)

6、定义预测函数,并将结果改成提交格式

def predict_ensemble(train_cols,m_lgb):
    date = datetime(2016,4, 25) 
    # 选择要乘以的系数
    alphas = [1.035, 1.03, 1.025]
    weights = [1/len(alphas)]*len(alphas)
    sub = 0.

    test_data = create_train_data(is_train=False)

    for icount, (alpha, weight) in enumerate(zip(alphas, weights)):

        test_data_c = test_data.copy()
        cols = [f"F{i}" for i in range(1,29)]


        for i in range(0, 28):
            day = date + timedelta(days=i)
            print(i, day)
            tst = test_data_c[(test_data_c.date >= day - timedelta(days=57)) & (test_data_c.date <= day)].copy()
            tst = create_feature(tst,is_train=False, day=day)
            tst = tst.loc[tst.date == day , train_cols]
            test_data_c.loc[test_data_c.date == day, "sales"] = alpha*m_lgb.predict(tst)

        # 改为提交数据的格式
        test_sub = test_data_c.loc[test_data_c.date >= date, ["id", "sales"]].copy()
        test_sub["F"] = [f"F{rank}" for rank in test_sub.groupby("id")["id"].cumcount()+1]
        test_sub = test_sub.set_index(["id", "F" ]).unstack()["sales"][cols].reset_index()
        test_sub.fillna(0., inplace = True)
        test_sub.sort_values("id", inplace = True)
        test_sub.reset_index(drop=True, inplace = True)
        test_sub.to_csv(f"submission_{icount}.csv",index=False)
        if icount == 0 :
            sub = test_sub
            sub[cols] *= weight
        else:
            sub[cols] += test_sub[cols]*weight
        print(icount, alpha, weight)
    
    sub2 = sub.copy()
    # 把大于28天后的validation替换成evaluation
    sub2["id"] = sub2["id"].str.replace("validation$", "evaluation")
    sub = pd.concat([sub, sub2], axis=0, sort=False)
    sub.to_csv("submissionV3.csv",index=False)

因为有向上的趋势,因此人为定义三个参数(weight各为1/3),乘进去增大预测值

对于未来的28天,每次预测一天的值,记录到test_data_c中,然后用create_feature函数重新生成特征,以此来完成递归预测

unstack将数据从长格式转换为宽格式,按F值展开sales列

第一次reset_index:是因为unstack()操作后,id仍然留在索引中,需要将其恢复为数据列

第二次reset_index:是因为sort_values()操作后,索引顺序被打乱,重置索引可以确保索引是连续且有序的,这在生成最终提交文件时很重要

最终提交结果为0.75,后续如果有提高方案会持续更新,欢迎关注~

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐