在能源智能化管理领域,基于大模型的电力能源预测是实现电网高效调度、能耗优化的核心技术支撑。电力能源预测数据具有「强时序性、多影响因素耦合(气象、负荷类型、节假日)、数据维度多样(有功功率、电压、电流)」等典型特征,与通用时序预测任务相比,其数据准备需重点解决时序完整性、多维度特征对齐、异常值修正及模型时序输入适配等问题。本文以“短期区域电力负荷预测”为目标场景,完整讲解从电力数据收集整理、质量优化、特征工程到模型编码的全流程,通过可复现的样本数据实例与代码,拆解每一步技术细节与实现逻辑,为大模型微调提供高质量的数据基础。

一、需求背景与样本数据设计

1.1 场景定义

本次案例聚焦「某城市核心区域短期电力负荷预测」,目标是微调一个时序大模型(如TimeLLM、Transformer-XL、Temporal Fusion Transformer,TFT),输入过去24小时的多维度电力及关联数据,输出未来12小时的区域电力负荷预测值(单位:MW)。该场景直接服务于电网调度中心,需保证预测精度以应对峰谷负荷波动,提升电力资源配置效率。

1.2 电力能源预测的核心数据维度

电力负荷受多种因素影响,样本数据需完整覆盖核心影响维度,形成“时序电力数据+关联影响因素”的多维度数据集,具体维度如下:

数据类型 核心指标 说明
时序电力数据 有功功率(MW)、无功功率(MVar)、电压(kV)、电流(kA)、负荷率(%) 每15分钟采集一次,形成连续时序序列,核心预测目标为有功功率(负荷)
气象影响数据 温度(℃)、湿度(%)、降水量(mm)、风速(m/s)、天气类型(晴/阴/雨/雪) 与电力数据时序对齐,温度是影响负荷的关键因素(夏季制冷、冬季采暖)
时间特征数据 时间戳、小时、工作日/周末、节假日、季节(春/夏/秋/冬) 刻画负荷的时间周期性(日内、周内、季节性波动)
负荷属性数据 区域负荷类型(商业/工业/居民混合)、用户数量、平均用电功率 反映区域用电特性,辅助模型学习不同负荷类型的波动规律

1.3 样本数据生成(含真实场景脏数据)

考虑到真实电力数据的敏感性,此处通过模拟生成符合场景特征的样本数据(含缺失、异常、重复等脏数据),后续流程与真实数据处理完全一致。


import pandas as pd
import numpy as np
import random
from datetime import datetime, timedelta

# 1. 基础配置定义
START_DATE = datetime(2024, 1, 1)  # 数据起始时间
END_DATE = datetime(2024, 3, 31)    # 数据结束时间(3个月,约1440个15分钟间隔)
TIME_INTERVAL = timedelta(minutes=15)  # 数据采集间隔
AREA_INFO = {
    "area_id": "A001",
    "load_type": "商业+工业+居民混合",
    "user_count": 5200,
    "avg_power": 120.5  # 平均用电功率(MW)
}
# 负荷波动参数(基于时间、季节、气象的影响系数)
SEASON_COEFF = {"春": 1.0, "夏": 1.3, "秋": 0.9, "冬": 1.2}
HOUR_COEFF = [0.6, 0.5, 0.4, 0.4, 0.5, 0.7, 0.9, 1.1, 1.2, 1.1, 1.0, 1.0,
              1.05, 1.0, 0.95, 1.0, 1.05, 1.2, 1.3, 1.25, 1.1, 0.9, 0.7, 0.6]  # 24小时负荷系数
WEATHER_TYPE = ["晴", "阴", "雨", "雪"]
WEATHER_COEFF = {"晴": 1.0, "阴": 1.02, "雨": 1.05, "雪": 1.1}

# 2. 生成时间序列
def generate_time_series(start_date, end_date, interval):
    time_series = []
    current_date = start_date
    while current_date <= end_date:
        time_series.append(current_date)
        current_date += interval
    return time_series

# 3. 生成单条样本数据
def generate_single_sample(timestamp):
    # 时间特征提取
    hour = timestamp.hour
    weekday = 1 if timestamp.weekday() < 5 else 0  # 1=工作日,0=周末
    # 判断是否为节假日(2024年元旦、春节)
    holiday = 1 if (timestamp.month == 1 and timestamp.day in [1,2,3]) or \
                   (timestamp.month == 2 and timestamp.day in [10,11,12,13,14,15,16]) else 0
    # 季节判断
    month = timestamp.month
    if month in [3,4,5]:
        season = "春"
    elif month in [6,7,8]:
        season = "夏"
    elif month in [9,10,11]:
        season = "秋"
    else:
        season = "冬"
    
    # 基础负荷计算(加入周期性波动)
    base_load = AREA_INFO["avg_power"] * SEASON_COEFF[season] * HOUR_COEFF[hour]
    # 加入随机波动(±5%)
    random_fluctuation = base_load * random.uniform(-0.05, 0.05)
    active_power = base_load + random_fluctuation
    
    # 关联电力指标(基于有功功率推导)
    reactive_power = active_power * random.uniform(0.2, 0.4)  # 无功功率与有功功率正相关
    voltage = random.uniform(10.5, 11.5)  # 10kV配网电压范围
    current = active_power / (np.sqrt(3) * voltage)  # 电流=功率/(√3×电压)
    load_rate = (active_power / (AREA_INFO["avg_power"] * 1.5)) * 100  # 负荷率(最大负荷为平均1.5倍)
    
    # 气象数据生成
    weather = random.choice(WEATHER_TYPE)
    temperature = random.uniform(-5, 20) if season == "冬" else random.uniform(5, 25)  # 冬夏温度差异
    temperature = temperature * WEATHER_COEFF[weather]  # 天气影响温度
    humidity = random.uniform(30, 80)
    precipitation = random.uniform(0, 20) if weather in ["雨", "雪"] else 0
    wind_speed = random.uniform(0.5, 5.0)
    
    # 加入脏数据(10%比例)
    if random.random() < 0.1:
        if random.random() < 0.3:
            # 缺失值
            active_power = np.nan
        elif random.random() < 0.5:
            # 异常值(负荷突变)
            active_power = active_power * random.uniform(2.0, 3.0)
        else:
            # 重复数据标记(后续去重)
            duplicate_flag = 1
    else:
        duplicate_flag = 0
    
    return {
        "timestamp": timestamp.strftime("%Y-%m-%d %H:%M:%S"),
        "area_id": AREA_INFO["area_id"],
        "load_type": AREA_INFO["load_type"],
        "user_count": AREA_INFO["user_count"],
        "active_power": round(active_power, 2) if not np.isnan(active_power) else np.nan,
        "reactive_power": round(reactive_power, 2),
        "voltage": round(voltage, 2),
        "current": round(current, 2),
        "load_rate": round(load_rate, 2),
        "temperature": round(temperature, 2),
        "humidity": round(humidity, 2),
        "precipitation": round(precipitation, 2),
        "wind_speed": round(wind_speed, 2),
        "weather": weather,
        "hour": hour,
        "weekday": weekday,
        "holiday": holiday,
        "season": season,
        "duplicate_flag": duplicate_flag
    }

# 4. 批量生成样本数据
def generate_power_load_data():
    # 生成时间序列
    time_series = generate_time_series(START_DATE, END_DATE, TIME_INTERVAL)
    # 生成样本列表
    samples = []
    for ts in time_series:
        sample = generate_single_sample(ts)
        samples.append(sample)
    # 转换为DataFrame
    df = pd.DataFrame(samples)
    # 故意添加部分重复行(模拟数据采集重复)
    duplicate_rows = df.sample(n=50, random_state=42)
    df = pd.concat([df, duplicate_rows], ignore_index=True)
    
    # 保存原始数据
    df.to_csv("raw_power_load_data.csv", index=False, encoding="utf-8-sig")
    print(f"样本数据生成完成!共{len(df)}条记录")
    print(f"数据时间范围:{START_DATE.strftime('%Y-%m-%d')}{END_DATE.strftime('%Y-%m-%d')}")
    print("\n前5条原始数据示例:")
    print(df.head().to_string())
    return df

# 执行数据生成
raw_df = generate_power_load_data()

1.4 真实电力数据收集与整理补充

模拟数据用于流程验证,真实场景下电力数据的收集与整理需遵循以下规范:

  1. 数据采集渠道:从电网SCADA系统( Supervisory Control And Data Acquisition,数据采集与监控系统)获取时序电力数据,从气象部门API获取同步气象数据,从政务系统获取节假日信息;

  2. 数据格式规范:统一时间戳格式为“YYYY-MM-DD HH:MM:SS”,电力指标保留2位小数,气象数据按官方发布格式整理;

  3. 数据归档:按“区域-年月”建立结构化目录(如“power_data/A001/202401/”),每日增量备份数据,避免数据丢失;

  4. 初步筛选:剔除明显的采集错误数据(如功率为负、电压超出10kV±10%范围),保留有效时序片段。

1.5 高质量电力预测数据的核心特性

针对短期电力负荷预测场景,高质量数据需满足:

  1. 时序完整性:无大面积连续缺失数据,单条时序序列的缺失率≤5%,保证负荷波动规律的连续性;

  2. 数据准确性:电力指标、气象数据符合物理/自然规律(如功率非负、温度在合理范围),无异常突变;

  3. 维度对齐性:电力数据、气象数据、时间特征的时间戳严格一致,无错位;

  4. 特征有效性:包含所有关键影响因素(温度、节假日、小时特征),能刻画负荷的周期性与波动性;

  5. 模型适配性:数据可转换为时序大模型要求的“输入序列-目标序列”格式,特征可标准化到统一范围。

1.6 模型输入要求

本次选择TimeLLM(专为长时序预测优化的大模型)作为微调模型,其输入要求:

  1. 输入格式:采用“历史序列+静态特征”的组合输入,历史序列为过去24小时的负荷及关联特征(每15分钟1个点,共96个时间步),静态特征为区域负荷类型、用户数量等;

  2. 目标格式:未来12小时的负荷序列(每15分钟1个点,共48个时间步);

  3. 特征标准化:所有数值型特征(功率、温度、湿度等)需归一化到[0,1]或[-1,1]区间;

  4. 类别特征编码:天气、季节等类别特征需转换为独热编码或嵌入编码;

  5. 数据类型:输入输出均为PyTorch张量,形状为(batch_size, seq_len, feature_dim)。

二、数据清洗:剔除低质数据,保障时序完整性

电力数据清洗的核心是解决“缺失值、异常值、重复值、时序错位”四大问题,同时保留时序数据的连续性,避免破坏负荷波动规律。

2.1 清洗步骤与代码实现


import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# 1. 加载原始数据
raw_df = pd.read_csv("raw_power_load_data.csv", encoding="utf-8-sig")
# 转换时间戳为datetime类型(关键:确保时序有序)
raw_df["timestamp"] = pd.to_datetime(raw_df["timestamp"])
print(f"原始数据条数:{len(raw_df)}")
print(f"原始数据缺失值统计:\n{raw_df.isnull().sum()}")
print(f"原始数据时间范围:{raw_df['timestamp'].min()}{raw_df['timestamp'].max()}")

# 2. 定义清洗规则与函数
class PowerDataCleaner:
    def __init__(self):
        # 电力指标正常范围(基于10kV配网标准)
        self.power_ranges = {
            "active_power": (0, 300),    # 有功功率正常范围(MW)
            "reactive_power": (0, 120),   # 无功功率正常范围(MVar)
            "voltage": (9.5, 12.5),      # 电压正常范围(kV)
            "current": (0, 20),          # 电流正常范围(kA)
            "load_rate": (0, 100)        # 负荷率正常范围(%)
        }
        # 气象指标正常范围
        self.weather_ranges = {
            "temperature": (-20, 40),    # 温度正常范围(℃)
            "humidity": (0, 100),        # 湿度正常范围(%)
            "precipitation": (0, 50),    # 降水量正常范围(mm)
            "wind_speed": (0, 20)        # 风速正常范围(m/s)
        }
    
    def remove_duplicates(self, df):
        """去重:基于时间戳+区域ID去重,保留第一条"""
        before_len = len(df)
        df = df.drop_duplicates(subset=["timestamp", "area_id"], keep="first")
        after_len = len(df)
        print(f"去重完成:删除重复数据{before_len - after_len}条")
        return df
    
    def fill_missing_values(self, df):
        """缺失值填充:时序数据采用线性插值,保证趋势连续"""
        # 先按时间戳排序(关键:时序插值需有序)
        df = df.sort_values("timestamp").reset_index(drop=True)
        # 数值型特征线性插值
        numeric_cols = list(self.power_ranges.keys()) + list(self.weather_ranges.keys())
        for col in numeric_cols:
            if df[col].isnull().any():
                df[col] = df[col].interpolate(method="linear")  # 线性插值
                # 插值后仍有缺失(序列开头/结尾),用前后值填充
                df[col] = df[col].fillna(method="ffill").fillna(method="bfill")
        print("缺失值填充完成")
        return df
    
    def correct_outliers(self, df):
        """异常值修正:基于3σ原则或正常范围,替换为边界值"""
        outlier_count = 0
        # 修正电力指标异常值
        for col, (min_val, max_val) in self.power_ranges.items():
            # 低于最小值→替换为最小值,高于最大值→替换为最大值
            outliers = (df[col] < min_val) | (df[col] > max_val)
            outlier_count += outliers.sum()
            df.loc[outliers, col] = np.clip(df.loc[outliers, col], min_val, max_val)
        # 修正气象指标异常值
        for col, (min_val, max_val) in self.weather_ranges.items():
            outliers = (df[col] < min_val) | (df[col] > max_val)
            outlier_count += outliers.sum()
            df.loc[outliers, col] = np.clip(df.loc[outliers, col], min_val, max_val)
        print(f"异常值修正完成:共修正{outlier_count}个异常值")
        return df
    
    def check_time_sequence(self, df):
        """时序完整性检查:确保时间间隔均匀,无缺失时间步"""
        # 计算时间间隔
        df = df.sort_values("timestamp").reset_index(drop=True)
        time_diff = df["timestamp"].diff().dropna()
        # 检查是否存在非15分钟的间隔
        abnormal_interval = time_diff[time_diff != TIME_INTERVAL]
        if len(abnormal_interval) > 0:
            print(f"发现{len(abnormal_interval)}个异常时间间隔,示例:{abnormal_interval.head()}")
            # 生成完整时间序列,缺失时间步填充NaN后再插值
            full_time_series = pd.date_range(
                start=df["timestamp"].min(),
                end=df["timestamp"].max(),
                freq=TIME_INTERVAL
            )
            full_df = pd.DataFrame({"timestamp": full_time_series})
            df = pd.merge(full_df, df, on="timestamp", how="left")
            # 对新填充的缺失值再次插值
            df = self.fill_missing_values(df)
            print("时序完整性修复完成:补全缺失时间步并插值")
        else:
            print("时序完整性检查通过:所有时间间隔均为15分钟")
        return df
    
    def clean(self, df):
        """完整清洗流程"""
        print("开始数据清洗...")
        # 步骤1:去重
        df = self.remove_duplicates(df)
        # 步骤2:时序完整性检查与修复
        df = self.check_time_sequence(df)
        # 步骤3:缺失值填充
        df = self.fill_missing_values(df)
        # 步骤4:异常值修正
        df = self.correct_outliers(df)
        # 重置索引
        df = df.reset_index(drop=True)
        print("数据清洗完成!")
        return df

# 3. 执行清洗
cleaner = PowerDataCleaner()
cleaned_df = cleaner.clean(raw_df)

# 4. 保存清洗后数据
cleaned_df.to_csv("cleaned_power_load_data.csv", index=False, encoding="utf-8-sig")

# 5. 清洗效果验证
print(f"\n清洗后数据条数:{len(cleaned_df)}")
print(f"清洗后缺失值统计:\n{cleaned_df.isnull().sum()}")
# 可视化清洗前后有功功率对比(随机选1天数据)
sample_day = cleaned_df[cleaned_df["timestamp"].dt.date == pd.to_datetime("2024-01-02").date()]
raw_sample_day = raw_df[raw_df["timestamp"].dt.date == pd.to_datetime("2024-01-02").date()]

plt.figure(figsize=(12, 6))
plt.plot(raw_sample_day["timestamp"], raw_sample_day["active_power"], label="清洗前有功功率", alpha=0.6)
plt.plot(sample_day["timestamp"], sample_day["active_power"], label="清洗后有功功率", color="red")
plt.xlabel("时间")
plt.ylabel("有功功率(MW)")
plt.title("2024-01-02 有功功率清洗前后对比")
plt.legend()
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig("power_cleaning_comparison.png")
print("清洗效果对比图已保存:power_cleaning_comparison.png")

2.2 清洗步骤说明

  1. 去重处理:基于“时间戳+区域ID”双关键字去重,避免同一时间点的重复采集数据,保留第一条有效记录;

  2. 时序完整性修复:先检查时间间隔是否均匀(均为15分钟),若存在缺失时间步,生成完整时间序列并补全缺失值,再通过线性插值填充,保证时序连续性;

  3. 缺失值填充:时序数据优先采用线性插值(能保留负荷的趋势变化),序列开头/结尾的缺失值用前后向填充兜底,避免因缺失值破坏时序规律;

  4. 异常值修正:基于电力、气象指标的物理/自然正常范围,采用“边界值替换”策略(而非直接删除),将异常值修正为范围边界,保留样本的其他有效特征;

  5. 清洗效果验证:通过缺失值统计、时序间隔检查、可视化对比(清洗前后有功功率曲线),确保清洗后数据质量达标。

三、数据标准化:特征工程与格式统一

电力预测数据的标准化核心是“特征工程+数值归一化+类别编码”,既要将数据转换为模型可理解的格式,也要提升特征的有效性,辅助模型学习负荷波动规律。

3.1 标准化步骤与代码实现


import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler, OneHotEncoder
import joblib

# 1. 加载清洗后的数据
cleaned_df = pd.read_csv("cleaned_power_load_data.csv", encoding="utf-8-sig")
cleaned_df["timestamp"] = pd.to_datetime(cleaned_df["timestamp"])
print(f"清洗后数据条数:{len(cleaned_df)}")

# 2. 定义标准化配置与函数
class PowerDataStandardizer:
    def __init__(self):
        # 数值型特征(需归一化)
        self.numeric_features = [
            "active_power", "reactive_power", "voltage", "current", "load_rate",
            "temperature", "humidity", "precipitation", "wind_speed", "hour"
        ]
        # 类别型特征(需编码)
        self.categorical_features = ["weather", "season"]
        # 静态特征(无需时序处理)
        self.static_features = ["area_id", "load_type", "user_count"]
        # 时间特征(已处理为数值/类别)
        self.time_features = ["weekday", "holiday"]
        
        # 初始化归一化器和编码器
        self.scaler = MinMaxScaler(feature_range=(0, 1))  # 归一化到[0,1]
        self.encoder = OneHotEncoder(sparse_output=False, drop="first")  # 独热编码,丢弃第一个类别避免共线性
    
    def feature_engineering(self, df):
        """特征工程:新增有效时序特征,提升模型预测能力"""
        # 1. 新增负荷滞后特征(前1个、前2个时间步的负荷)
        df["active_power_lag1"] = df["active_power"].shift(1)
        df["active_power_lag2"] = df["active_power"].shift(2)
        # 2. 新增负荷滚动统计特征(过去4个时间步的均值、最大值)
        df["active_power_roll_mean4"] = df["active_power"].rolling(window=4).mean()
        df["active_power_roll_max4"] = df["active_power"].rolling(window=4).max()
        # 3. 填充新增特征的缺失值(滚动特征开头会有缺失)
        df = df.fillna(method="ffill")
        # 4. 更新数值型特征列表(加入新增特征)
        self.numeric_features.extend(["active_power_lag1", "active_power_lag2", 
                                      "active_power_roll_mean4", "active_power_roll_max4"])
        print("特征工程完成:新增滞后特征和滚动统计特征")
        return df
    
    def normalize_numeric_features(self, df):
        """数值型特征归一化:MinMaxScaler"""
        # 拟合归一化器(仅用训练集拟合,避免数据泄露,此处先整体拟合用于演示)
        self.scaler.fit(df[self.numeric_features])
        # 执行归一化
        normalized_numeric = self.scaler.transform(df[self.numeric_features])
        # 构造归一化后的特征列名
        normalized_cols = [f"{col}_norm" for col in self.numeric_features]
        # 合并到原数据框
        normalized_df = pd.DataFrame(normalized_numeric, columns=normalized_cols, index=df.index)
        df = pd.concat([df, normalized_df], axis=1)
        print("数值型特征归一化完成")
        return df
    
    def encode_categorical_features(self, df):
        """类别型特征编码:独热编码"""
        # 拟合编码器
        self.encoder.fit(df[self.categorical_features])
        # 执行编码
        encoded_categorical = self.encoder.transform(df[self.categorical_features])
        # 构造编码后的特征列名
        encoded_cols = []
        for i, col in enumerate(self.categorical_features):
            categories = self.encoder.categories_[i][1:]  # 丢弃第一个类别
            encoded_cols.extend([f"{col}_{cat}" for cat in categories])
        # 合并到原数据框
        encoded_df = pd.DataFrame(encoded_categorical, columns=encoded_cols, index=df.index)
        df = pd.concat([df, encoded_df], axis=1)
        print("类别型特征编码完成")
        return df
    
    def select_model_features(self, df):
        """选择模型输入特征:归一化数值特征+编码类别特征+时间特征"""
        # 模型输入特征列表
        model_features = [f"{col}_norm" for col in self.numeric_features] + \
                        [f"{col}_{cat}" for col in self.categorical_features 
                         for cat in self.encoder.categories_[self.categorical_features.index(col)][1:]] + \
                        self.time_features
        # 提取模型特征和目标变量(active_power_norm)
        model_df = df[["timestamp"] + model_features + ["active_power_norm"]]
        print(f"模型输入特征选择完成,共{len(model_features)}个特征")
        print(f"模型特征列表:{model_features}")
        return model_df
    
    def standardize(self, df):
        """完整标准化流程"""
        print("开始数据标准化...")
        # 步骤1:特征工程
        df = self.feature_engineering(df)
        # 步骤2:类别型特征编码
        df = self.encode_categorical_features(df)
        # 步骤3:数值型特征归一化
        df = self.normalize_numeric_features(df)
        # 步骤4:选择模型特征
        model_df = self.select_model_features(df)
        print("数据标准化完成!")
        return df, model_df

# 3. 执行标准化
standardizer = PowerDataStandardizer()
standardized_df, model_df = standardizer.standardize(cleaned_df)

# 4. 保存标准化结果与工具
# 保存标准化后的数据
standardized_df.to_csv("standardized_power_load_data.csv", index=False, encoding="utf-8-sig")
# 保存模型输入数据(时序特征+目标)
model_df.to_csv("model_input_power_data.csv", index=False, encoding="utf-8-sig")
# 保存归一化器和编码器(后续推理复用)
joblib.dump(standardizer.scaler, "power_scaler.pkl")
joblib.dump(standardizer.encoder, "power_encoder.pkl")

# 5. 标准化效果验证
print(f"\n标准化后数据条数:{len(standardized_df)}")
print(f"模型输入数据条数:{len(model_df)}")
print("\n模型输入数据前5行:")
print(model_df.head().to_string())
# 查看归一化后特征的分布
print(f"\n归一化后有功功率统计:{standardized_df['active_power_norm'].describe()}")

3.2 标准化步骤说明

  1. 特征工程:新增时序相关特征(滞后特征、滚动统计特征),滞后特征(前1、2个时间步的负荷)能捕捉负荷的短期相关性,滚动统计特征(过去4个时间步的均值、最大值)能刻画负荷的局部趋势,提升模型预测精度;

  2. 数值型特征归一化:采用MinMaxScaler将所有数值型特征归一化到[0,1]区间,消除不同特征的量纲差异(如功率单位为MW,温度单位为℃),适配大模型对输入数据范围的要求;

  3. 类别型特征编码:对天气、季节等类别特征进行独热编码,丢弃第一个类别避免共线性问题,将非数值型特征转换为模型可理解的数值格式;

  4. 模型特征选择:筛选出“归一化数值特征+编码类别特征+时间特征”作为模型输入特征,目标变量为归一化后的有功功率(active_power_norm),形成最终的模型输入数据框;

  5. 工具保存:保存归一化器(scaler)和编码器(encoder),后续模型推理时需用相同的工具对新数据进行标准化,保证数据格式一致。

四、数据编码:时序序列构造与模型输入转换

时序大模型的输入需为“历史序列-目标序列”的配对格式,需将标准化后的一维数据框转换为三维张量(batch_size, seq_len, feature_dim),同时完成训练/验证/测试集的拆分。

4.1 编码步骤与代码实现


import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split

# 1. 加载模型输入数据
model_df = pd.read_csv("model_input_power_data.csv", encoding="utf-8-sig")
model_df["timestamp"] = pd.to_datetime(model_df["timestamp"])
# 按时间戳排序(关键:时序数据必须有序)
model_df = model_df.sort_values("timestamp").reset_index(drop=True)
print(f"模型输入数据条数:{len(model_df)}")
print(f"模型输入特征数:{len(model_df.columns) - 2}")  # 减去timestamp和active_power_norm

# 2. 定义时序序列构造参数
INPUT_SEQ_LEN = 96    # 输入序列长度(过去24小时,每15分钟1个点:24*4=96)
OUTPUT_SEQ_LEN = 48   # 输出序列长度(未来12小时,每15分钟1个点:12*4=48)
STEP = 1              # 滑动步长(每1个时间步构造一个样本)

# 3. 构造时序样本(输入序列-目标序列)
def create_time_series_samples(df, input_seq_len, output_seq_len, step):
    """
    构造时序样本
    输入:df - 模型输入数据框(含timestamp、特征、目标)
    输出:inputs - 输入序列(shape: (num_samples, input_seq_len, feature_dim))
    输出:targets - 目标序列(shape: (num_samples, output_seq_len))
    输出:timestamps - 样本对应的时间戳(用于后续评估)
    """
    # 提取特征和目标
    features = df.drop(["timestamp", "active_power_norm"], axis=1).values
    target = df["active_power_norm"].values
    timestamps = df["timestamp"].values
    
    inputs = []
    targets = []
    sample_timestamps = []
    
    # 滑动窗口构造样本
    for i in range(input_seq_len, len(df) - output_seq_len + 1, step):
        # 输入序列:i-input_seq_len 到 i-1
        input_seq = features[i - input_seq_len:i, :]
        # 目标序列:i 到 i+output_seq_len-1
        target_seq = target[i:i + output_seq_len]
        # 样本时间戳:目标序列的起始时间
        sample_ts = timestamps[i]
        
        inputs.append(input_seq)
        targets.append(target_seq)
        sample_timestamps.append(sample_ts)
    
    # 转换为numpy数组
    inputs = np.array(inputs, dtype=np.float32)
    targets = np.array(targets, dtype=np.float32)
    sample_timestamps = np.array(sample_timestamps)
    
    print(f"时序样本构造完成!")
    print(f"输入序列形状:{inputs.shape}")  # (num_samples, input_seq_len, feature_dim)
    print(f"目标序列形状:{targets.shape}")  # (num_samples, output_seq_len)
    print(f"样本数量:{len(sample_timestamps)}")
    return inputs, targets, sample_timestamps

# 4. 执行时序样本构造
inputs, targets, sample_timestamps = create_time_series_samples(
    model_df, INPUT_SEQ_LEN, OUTPUT_SEQ_LEN, STEP
)

# 5. 数据拆分(按时间顺序拆分,避免数据泄露)
def split_time_series_data(inputs, targets, sample_timestamps, test_ratio=0.2):
    """按时间顺序拆分训练集和测试集(时序数据不能随机拆分)"""
    split_idx = int(len(inputs) * (1 - test_ratio))
    # 训练集:前80%
    train_inputs = inputs[:split_idx]
    train_targets = targets[:split_idx]
    train_timestamps = sample_timestamps[:split_idx]
    # 测试集:后20%
    test_inputs = inputs[split_idx:]
    test_targets = targets[split_idx:]
    test_timestamps = sample_timestamps[split_idx:]
    
    # 再从训练集中拆分验证集(前70%训练,10%验证)
    val_split_idx = int(len(train_inputs) * 0.875)  # 0.7/(0.8) = 0.875
    val_inputs = train_inputs[val_split_idx:]
    val_targets = train_targets[val_split_idx:]
    val_timestamps = train_timestamps[val_split_idx:]
    train_inputs = train_inputs[:val_split_idx]
    train_targets = train_targets[:val_split_idx]
    train_timestamps = train_timestamps[:val_split_idx]
    
    print(f"时序数据拆分完成!")
    print(f"训练集样本数:{len(train_inputs)}")
    print(f"验证集样本数:{len(val_inputs)}")
    print(f"测试集样本数:{len(test_inputs)}")
    return (train_inputs, train_targets, train_timestamps), \
           (val_inputs, val_targets, val_timestamps), \
           (test_inputs, test_targets, test_timestamps)

# 执行数据拆分
train_data, val_data, test_data = split_time_series_data(inputs, targets, sample_timestamps)
train_inputs, train_targets, train_timestamps = train_data
val_inputs, val_targets, val_timestamps = val_data
test_inputs, test_targets, test_timestamps = test_data

# 6. 转换为PyTorch张量并创建数据加载器
class PowerLoadDataset(Dataset):
    def __init__(self, inputs, targets):
        self.inputs = torch.tensor(inputs, dtype=torch.float32)
        self.targets = torch.tensor(targets, dtype=torch.float32)
    
    def __len__(self):
        return len(self.inputs)
    
    def __getitem__(self, idx):
        return self.inputs[idx], self.targets[idx]

def create_data_loaders(train_inputs, train_targets, val_inputs, val_targets, test_inputs, test_targets, batch_size=32):
    """创建训练、验证、测试数据加载器"""
    train_dataset = PowerLoadDataset(train_inputs, train_targets)
    val_dataset = PowerLoadDataset(val_inputs, val_targets)
    test_dataset = PowerLoadDataset(test_inputs, test_targets)
    
    train_loader = DataLoader(
        train_dataset, batch_size=batch_size, shuffle=False, num_workers=0  # 时序数据不shuffle
    )
    val_loader = DataLoader(
        val_dataset, batch_size=batch_size, shuffle=False, num_workers=0
    )
    test_loader = DataLoader(
        test_dataset, batch_size=batch_size, shuffle=False, num_workers=0
    )
    
    return train_loader, val_loader, test_loader

# 执行数据加载器创建
batch_size = 32
train_loader, val_loader, test_loader = create_data_loaders(
    train_inputs, train_targets, val_inputs, val_targets, test_inputs, test_targets, batch_size=batch_size
)

# 7. 保存编码后的数据与配置
# 保存张量数据
np.savez("encoded_power_train_data.npz", inputs=train_inputs, targets=train_targets, timestamps=train_timestamps)
np.savez("encoded_power_val_data.npz", inputs=val_inputs, targets=val_targets, timestamps=val_timestamps)
np.savez("encoded_power_test_data.npz", inputs=test_inputs, targets=test_targets, timestamps=test_timestamps)

# 保存数据加载器配置
loader_config = {
    "batch_size": batch_size,
    "input_seq_len": INPUT_SEQ_LEN,
    "output_seq_len": OUTPUT_SEQ_LEN,
    "train_loader_length": len(train_loader),
    "val_loader_length": len(val_loader),
    "test_loader_length": len(test_loader)
}
import json
with open("power_data_loader_config.json", "w", encoding="utf-8") as f:
    json.dump(loader_config, f, ensure_ascii=False, indent=2)

# 8. 编码效果验证
print(f"\n数据编码完成!")
print(f"训练集输入张量形状:{train_inputs.shape}")
print(f"验证集目标张量形状:{val_targets.shape}")
print(f"测试集时间戳范围:{test_timestamps.min()}{test_timestamps.max()}")
# 验证数据加载器
for batch_inputs, batch_targets in train_loader:
    print(f"\n批量输入形状:{batch_inputs.shape}")  # (batch_size, input_seq_len, feature_dim)
    print(f"批量目标形状:{batch_targets.shape}")  # (batch_size, output_seq_len)
    break

4.2 编码步骤说明

  1. 时序样本构造:采用滑动窗口法,以“过去24小时(96个时间步)”为输入序列,“未来12小时(48个时间步)”为目标序列,滑动步长为1,构造大量时序配对样本;

  2. 时序数据拆分:严格按时间顺序拆分训练集(70%)、验证集(10%)、测试集(20%),避免随机拆分导致的“数据泄露”(未来数据提前被模型学习);

  3. 张量转换:将numpy数组格式的输入/目标序列转换为PyTorch张量(float32类型),适配大模型的张量输入要求;

  4. 数据加载器创建:自定义Dataset类,通过DataLoader按批量大小打包数据,时序数据不开启shuffle,保证序列的时间顺序;

  5. 结果保存:保存编码后的张量数据(训练/验证/测试集)和数据加载器配置,便于后续模型微调直接加载使用。

五、完整流程总结与扩展建议

5.1 电力能源预测数据准备全流程

数据收集整理:SCADA采集+气象/节假日补充→格式规范

数据清洗:去重→时序修复→缺失值填充→异常值修正

数据标准化:特征工程→类别编码→数值归一化→特征选择

数据编码:时序样本构造→时间拆分→张量转换→批量加载器生成

模型微调训练:TimeLLM/Transformer-XL

5.2 核心要点回顾

  1. 时序特性适配:电力数据的核心是“时序连续性”,清洗时优先采用线性插值填充缺失值,拆分时严格按时间顺序,避免破坏负荷波动规律;

  2. 特征工程关键:新增滞后特征、滚动统计特征能有效提升模型对负荷短期相关性和局部趋势的捕捉能力;

  3. 数据泄露防控:时序数据的归一化器、编码器需仅用训练集拟合,拆分时按时间顺序,避免未来数据影响模型训练;

  4. 模型输入适配:需严格按大模型要求构造“输入序列-目标序列”的配对格式,保证张量维度为(batch_size, seq_len, feature_dim)。

5.3 扩展建议

  1. 数据增强:时序数据可通过“时间平移、噪声注入、幅度缩放”等方式增强(如对负荷序列添加±3%的随机噪声),提升模型泛化性;

  2. 多区域数据融合:若预测范围扩大到多个区域,可加入区域间的负荷相关性特征,采用多变量时序预测模型;

  3. 特征选择优化:可通过互信息、皮尔逊相关系数筛选与负荷相关性强的特征,剔除冗余特征,提升模型训练效率;

  4. 模型适配调整:若使用不同时序大模型(如TFT、Informer),需调整输入序列长度、特征维度等参数,部分模型需添加时间编码特征;

  5. 实时数据更新:实际部署时,需建立实时数据采集-清洗-标准化流水线,每日增量更新训练数据,保证模型预测精度的时效性。

通过以上完整流程,可将原始的电力时序数据逐步转换为大模型微调所需的高质量、结构化、可直接训练的格式。电力能源预测数据准备的核心是“时序完整性保障+有效特征工程”,只有兼顾数据质量与特征有效性,才能为后续模型微调奠定坚实基础,最终实现高精度的电力负荷

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐