kaggle enefit能源大赛时序预测之特征工程及神经网络实战
注(1):此函数既可以作用于train(train = create_features(df_train,df_client,df_weather_f)),又可以改变参数作用于test(df_test = create_features(test,client_test,forecast_weather_test,False, previous_revealed_targets=previous_
1、引入神经网络必要的keras包
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from scipy.spatial.distance import cdist
from sklearn.metrics import mean_absolute_error
import holidays
from keras.models import Sequential
from keras.layers import Dense, LSTM,BatchNormalization,Dropout
import keras
import tensorflow as tf
2、导入数据
此赛题提供的数据表格较多,所以合并数据、分组汇总、特征工程的任务量较大,最终需要得到一个完整的可以用于训练的train dataframe
TRAIN = "/kaggle/input/predict-energy-behavior-of-prosumers/train.csv"
CLIENT = "/kaggle/input/predict-energy-behavior-of-prosumers/client.csv"
FORCAST_WEATHER = "/kaggle/input/predict-energy-behavior-of-prosumers/forecast_weather.csv"
WEATHERSTATION_TO_COUNTY = "/kaggle/input/predict-energy-behavior-of-prosumers/weather_station_to_county_mapping.csv"
N_day_lags = 14
df_train = pd.read_csv(TRAIN)
df_client = pd.read_csv(CLIENT)
df_weather_f = pd.read_csv(FORCAST_WEATHER)
station = pd.read_csv(WEATHERSTATION_TO_COUNTY)
station = station.drop_duplicates(subset=['longitude', 'latitude','county'])
station = station.dropna()
station.tail()
weather_columns = ['direct_solar_radiation', 'surface_solar_radiation_downwards', 'cloudcover_total', 'temperature',
'dewpoint', 'snowfall', '10_metre_v_wind_component', '10_metre_u_wind_component' ,
'cloudcover_high', 'cloudcover_low', 'cloudcover_mid', 'total_precipitation']
df_weather_f比weather_columns多的列包括:
{'data_block_id', 'forecast_datetime', 'hours_ahead', 'latitude', 'longitude', 'origin_datetime'}
注:几个重要的特征含义
train中,
target
- The consumption or production amount for the relevant segment for the hour. The segments are defined by thecounty
,is_business
, andproduct_type
.prediction_unit_id
- A unique identifier for thecounty
,is_business
, andproduct_type
combination. New prediction units can appear or disappear in the test set.data_block_id -
所有共享相同data_block_id
的数据行,都是在同一个“预测时间点”可用的。 这意味着,在制作某个时间点的预测时,我们只能使用data_block_id
小于或等于某个值的数据。在训练模型预测某个日期的目标时,所使用的所有特征数据(天气、历史销量等)的data_block_id
必须小于或等于你进行预测那天所使用的data_block_id
。这样才能保证你的模型没有“偷看”未来,其预测能力是真实有效的。
forecast_weather中,
origin_datetime
- The timestamp of when the forecast was generated.hours_ahead
- The number of hours between the forecast generation and the forecast weather. Each forecast covers 48 hours in total.forecast_datetime
- The timestamp of the predicted weather. Generated fromorigin_datetime
plushours_ahead
. This represents the start of the 1-hour period for which weather data are forecasted.
test中,
相比于train,少了target目标列,多了currently_scored的列,存储的是bool值
3、特征工程
接下来定义函数,来对train和test数据生成一些滞后特征、以及对forecast_weather生成一些分组统计特征。
def create_revealed_targets_train(data, N_day_lags):
'''Create past revealed_targets for train set based on number of day lags N_day_lags '''
data['datetime'] = pd.to_datetime(data['datetime'])
original_datetime = data['datetime']
revealed_targets = data[['datetime', 'prediction_unit_id', 'is_consumption', 'target']].copy()
# Create revealed targets for all day lags
for day_lag in range(2, N_day_lags + 1):
revealed_targets['datetime'] = original_datetime + pd.DateOffset(day_lag)
revealed_targets['datetime'] = pd.to_datetime(revealed_targets['datetime'])
# print(data.dtypes)
# print(revealed_targets.dtypes)
data = data.merge(revealed_targets,
how='left',
on=['datetime', 'prediction_unit_id', 'is_consumption'],
suffixes=('', f'_{day_lag}_days_ago')
)
return data
对train生成滞后14天的特征,是应用过去真实的历史数据,用pd.DateOffset以及merge的方法得到滞后特征;而对test生成滞后特征时,依据的是循环返回的revealed_targets,通过previous_revealed_targets.insert(0, revealed_targets)来得到dataframe的列表,最前面的dataframe是最新的lag特征,然后用previous_revealed_targets.insert(0, revealed_targets),将滞后特征merge到test数据集上,得到和train同样的滞后特征,test的函数如下:
def create_revealed_targets_test(data, previous_revealed_targets, N_day_lags):
''' Create new test data based on previous_revealed_targets and N_day_lags '''
for count, revealed_targets in enumerate(previous_revealed_targets) :
day_lag = count + 2
# Get hour
revealed_targets['hour'] = pd.to_datetime(revealed_targets['datetime']).dt.hour
# Select columns and rename target
revealed_targets = revealed_targets[['hour', 'prediction_unit_id', 'is_consumption', 'target']]
revealed_targets = revealed_targets.rename(columns = {"target" : f"target_{day_lag}_days_ago"})
data['hour'] = pd.to_datetime(data['datetime']).dt.hour
# Add past revealed targets
data = pd.merge(data,
revealed_targets,
how = 'left',
on = ['hour', 'prediction_unit_id', 'is_consumption'],
)
# If revealed_target_columns not available, replace by nan
all_revealed_columns = [f"target_{day_lag}_days_ago" for day_lag in range(2, N_day_lags+1)]
missing_columns = list(set(all_revealed_columns) - set(data.columns))
data[missing_columns] = np.nan
return data
再定义一个生成天气相关特征的函数:
def create_weather_forcast(df_weather_f):
df_weather_f['forecast_datetime'] = pd.to_datetime(df_weather_f['forecast_datetime']).dt.tz_localize(None)
df_weather_f['ration_ration'] = df_weather_f['direct_solar_radiation'] / df_weather_f['surface_solar_radiation_downwards']
query_coord = df_weather_f[['latitude', 'longitude']]
coords = station[['latitude', 'longitude']].values
distances = cdist(query_coord, coords, metric='euclidean')
# Finde den Index des nächsten Punktes in df2 für jeden Punkt in df1
nearest_indices = distances.argmin(axis=1)
values_at_indexes = station.iloc[nearest_indices]
values_at_indexes = values_at_indexes['county']
df_weather_f = pd.concat([df_weather_f.reset_index(drop=True), values_at_indexes.reset_index(drop=True)],
axis=1)
df_weather_f.rename(columns={'forecast_datetime': 'datetime'}, inplace=True)
weather_mean = df_weather_f.groupby(['county', 'datetime'])[weather_columns].mean()
weather_mean = weather_mean.rename(columns={c: c + '_mean' for c in weather_mean.columns
if c in weather_columns})
weather_std = df_weather_f.groupby(['county', 'datetime'])[weather_columns].std()
weather_std = weather_std.rename(columns={c: c + '_std' for c in weather_std.columns
if c in weather_columns})
weather_grouped = pd.merge(weather_mean, weather_std, left_index=True, right_index=True)
weather_grouped = weather_grouped.reset_index()
return weather_grouped
此函数利用经纬度信息找到最近的station所在的county,用county和日期来进行分组聚合,生成mean和std的特征。
最后,用一个函数生成所有时序相关的特征以及将weather的预测信息通过county(地理位置)和日期来merge到train的dataframe中,从而完成特征工程。
def create_features(data,df_client,df_weather_f,train=True,previous_revealed_targets=None):
if train:
data = create_revealed_targets_train(data, N_day_lags)
else:
data = create_revealed_targets_test(data,previous_revealed_targets,
N_day_lags=N_day_lags)
estonia_holidays = [date for date, name in sorted(holidays.EE(years=[2021, 2022, 2023, 2024]).items())]
data['date'] = pd.to_datetime(data['datetime']).dt.date
""" feature ingenierung just on time series"""
data['hour'] = pd.to_datetime(data['datetime']).dt.hour.astype(int)
data['month'] = pd.to_datetime(data['datetime']).dt.month
data['dayofweak'] = pd.to_datetime(data['datetime']).dt.dayofweek
data['is_weekend'] = (data['datetime'].dt.dayofweek // 5).astype(int)
data['is_weekday'] = ~data['is_weekend']
data['dayofyear'] = pd.to_datetime(data['datetime']).dt.dayofyear
data['week'] = pd.to_datetime(data['datetime']).dt.isocalendar().week.astype(int)
data['quarter'] = pd.to_datetime(data['datetime']).dt.quarter
feiertage_check = pd.to_datetime(data['datetime']).apply(lambda x: x.to_pydatetime().date()).isin(estonia_holidays)
#df['Datum'].dt.date.apply(lambda x: x.to_pydatetime().date())
data['is_holiday'] = feiertage_check
data['sin(hour)'] = np.sin(np.pi * data['hour'] / 12)
data['cos(hour)'] = np.cos(np.pi * data['hour'] / 12)
data['sin(dayofyear)'] = np.sin(
np.pi * pd.to_datetime(data['datetime']).dt.dayofyear / 183) # Anpassen des Zyklus entsprechend dem Zeitraum
data['cos(dayofyear)'] = np.cos(np.pi * pd.to_datetime(data['datetime']).dt.dayofyear / 183)
data["target_mean"] = data[[f"target_{i}_days_ago" for i in range(2, N_day_lags + 1)]].mean(1)
data["target_std"] = data[[f"target_{i}_days_ago" for i in range(2, N_day_lags + 1)]].std(1)
#data["target_min"] = data[[f"target_{i}_days_ago" for i in range(2, N_day_lags + 1)]].min(1)
#data["target_max"] = data[[f"target_{i}_days_ago" for i in range(2, N_day_lags + 1)]].max(1)
data["target_ratio"] = data["target_2_days_ago"] / (data["target_7_days_ago"] + 1e-3)
df_client['date'] = pd.to_datetime(df_client['date']).dt.date
df_client['date'] = df_client['date'] + pd.Timedelta(days=2)
if 'data_block_id' in df_client:
df_client = df_client.drop(['data_block_id'], axis=1)
data = pd.merge(data, df_client, on=['product_type','county', 'is_business', 'date'], how='left')
#data['installed_cap_producer'] = data['installed_capacity'] * abs(data['is_consumption'] - 1)
#data['installed_cap_business'] = data['installed_capacity'] * abs(data['is_consumption'] - 1) * data['is_business']
data['cap/eic_ratio'] = data['installed_capacity']/data['eic_count']
df_weather_f = create_weather_forcast(df_weather_f)
data = pd.merge(data, df_weather_f, on=['county', 'datetime'], how='left')
''' prepare for training'''
data = data.drop(['row_id'], axis=1)
if 'data_block_id' in data:
data = data.drop(['data_block_id'], axis=1)
#data = data.drop(['date'], axis=1)
data = data.drop(['datetime'], axis=1)
data = data.drop(['date'], axis=1)
data = data.drop(['prediction_unit_id'], axis=1)
return data
注(1):此函数既可以作用于train(train = create_features(df_train,df_client,df_weather_f)),又可以改变参数作用于test(df_test = create_features(test,client_test,forecast_weather_test,False, previous_revealed_targets=previous_revealed_targets.copy())),便于对训练测试数据生成相同的特征。
注(2):train的数据最后drop掉日期特征,因为有hour等更细致化掉日期特征;drop掉row_id这种纯排序的特征;以及prediction_unit_id这个组合特征:A unique identifier for the county
, is_business
, and product_type
combination.
4、区分验证集测试集生成训练数据
train = create_features(df_train,df_client,df_weather_f)
train = train.fillna(0)
train = train.astype(float)
X_train = train
y_train = X_train['target']
X_train = X_train.drop(['target'], axis=1)
train_size = int(0.95 * len(X_train))
test_size = len(X_train) - train_size
train_x = X_train.iloc[:train_size]
train_y = y_train.iloc[:train_size]
test_x = X_train.iloc[-test_size:]
test_y = y_train.iloc[-test_size:]
5、定义神经网络模型
首先,定义三个callback函数
measure_to_monitor = 'val_mae'
modality = 'min'
checkpoint_filepath = '/kaggle/working/'
#在训练过程中定期保存模型或权重
model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
filepath=checkpoint_filepath, # 保存路径
save_weights_only=True, # 只保存权重(False则保存整个模型)
monitor=measure_to_monitor, # 监控指标:'val_mae'(验证集平均绝对误差)
mode=modality, # 模式:'min'(越小越好)
verbose=2, # 输出详细信息
save_best_only=True) # 只保存最佳模型
#当监控指标停止改善时,自动降低学习率
reduce_lr_on_plateau = tf.keras.callbacks.ReduceLROnPlateau(
monitor=measure_to_monitor, # 监控指标:'val_mae'
mode=modality, # 模式:'min'
patience=2, # 等待2个epoch没有改善
factor=0.5, # 学习率乘以0.5(减半)
verbose=2) # 输出详细信息
#当监控指标不再改善时,提前终止训练
early_stopping = tf.keras.callbacks.EarlyStopping(
monitor=measure_to_monitor, # 监控指标:'val_mae'
mode=modality, # 模式:'min'
patience=5, # 等待5个epoch没有改善
verbose=0) # 不输出信息
接着,定义模型
tf.random.set_seed(14)
model = Sequential([
Dense(64, activation='gelu', input_shape=(train_x.shape[-1],) ),
#Dropout(0.1),
Dense(128, activation='gelu'),
#Dropout(0.1),
Dense(64, activation='gelu'),
Dense(1)
])
learning_rate = 0.0001
model.compile(optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
loss='mse', metrics=['mae'],)
model.fit(train_x, train_y, epochs=50, batch_size=56,
validation_data=(test_x, test_y),
callbacks=[model_checkpoint_callback,reduce_lr_on_plateau,early_stopping])
test_loss, test_mae = model.evaluate(test_x, test_y)
print(f'Test Loss: {test_loss}, Test MAE: {test_mae}')
调取保存的模型权重,对测试集进行预测,果然得到的mae会更小
model.load_weights(checkpoint_filepath)
prediction = model.predict(test_x)
mae = mean_absolute_error(test_y, prediction)
print(mae)
6、获取test数据并预测
features = [col for col in X_train.columns]
#为了后面创造出test的dataframe后,取与train同样的特征
previous_revealed_targets = []
import enefit
enefit.make_env.func_dict['__called__'] = False
env = enefit.make_env()
iter_test = env.iter_test()
counter = 0
for (test, revealed_targets, client_test, historical_weather_test,
forecast_weather_test,
electricity_test,
gas_test,
sample_prediction) in iter_test:
test = test.rename(columns = {'prediction_datetime': 'datetime'})
#print(revealed_targets)
previous_revealed_targets.insert(0, revealed_targets)
if len(previous_revealed_targets) == N_day_lags:
previous_revealed_targets.pop()
df_test = create_features(test,client_test,forecast_weather_test,False,previous_revealed_targets=previous_revealed_targets.copy())
# Make prediction
X_test = df_test[features]
X_test = X_test.fillna(0)
X_test = X_test.astype(float)
pred = model.predict(X_test).clip(0) #确保非负预测
#print(pred)
test['target'] = pred
sample_prediction["target"] = test['target']
env.predict(sample_prediction)
本项目全部代码都可以跑通,可以直接粘贴进notebook进行提交,成绩70+,不算高但是可以作为baseline来学习改进,还有更好的特征工程或模型方法欢迎评论,接下来会继续分享时间序列相关的项目实战代码,欢迎点赞收藏关注
更多推荐
所有评论(0)