数据结构分析与预处理

粉丝可以查看mxnet的代码详解! 如果有torch版本的请给我交流!

一、原始数据结构

shape:(时间点长度,节点个数,特征个数)

1.1 案例示意

选取6月20日早上8点到9点数据,网络的站点(网格划分)是5个,每个站点具有3个特征。

  • 8:00-8:05
[ 特征1, 特征2,  特征3]  ----节点1的特征
[        4,          5,         6]  -----节点2的特征
[        7,          8,         9]  -----节点3的特征
[       10,         11,        12]  -----节点4的特征
[       13,         14,        15]  -----节点5的特征
  • 8:05-8:10
[ 特征1, 特征2,  特征3]  ----节点1的特征
[        4,          5,           6]  -----节点2的特征
[        4,          5,           7]  -----节点3的特征
[       10,          9,           8]  -----节点4的特征
[       13,         14,          15]  -----节点5的特征
  • 8:10-8:15
  • 8:55-9:00
[ 特征1, 特征2,  特征3]  ----节点1的特征
[        4,          5,           6]  -----节点2的特征
[        7,          8,           9]  -----节点3的特征
[       10,        11,        12]  -----节点4的特征
[        13,       14,        15]  -----节点5的特征

1.2 python中的数据形式

>>>data
array([
[[ 特征1, 特征2,  特征3]  ,
[        4,        5,          6]  ,
[        7,        8,          9]  ,
[       10,        11,        12]  ,
[       13,        14,        15]  ],   # 8:-8:05

[[ 特征1, 特征2,  特征3] ,
[        4,        5,           6]  ,
[        4,        5,           7]  ,
[       10,        9,           8] ,
[       13,       14,          15]],  #8:05-8:10

...
[[ 特征1, 特征2,  特征3]  ,
[        4,          5,           6]  ,
[        7,          8,           9]  ,
[       10,          11,         12]  ,
[        13,         14,         15]] #8:55-9:00
])


>>>data.shape
(12, 5, 3)

二、原始数据批量处理之生成每个批量的索引序列

2.1 代码分析

def search_data(sequence_length, num_of_batches, label_start_idx,
                num_for_predict, units, points_per_hour):
    '''
    Parameters
    ----------
    sequence_length      : int, length of all history data                     历史数据的序列长度
    num_of_batches       : int, the number of batches will be used for training 用于训练的batch 的个数
    label_start_idx      : int, the first index of predicting target           预测目标的第一索引
    num_for_predict       : int,the number of points will be predicted for each sample 每个样本的预测的节点数
    units                         : int, week: 7 * 24, day: 24, recent(hour): 1
    points_per_hour       : int, number of points per hour, depends on data     每个小时划分的时间段的个数

    Returns
    ----------
    list[(start_idx, end_idx)]
    '''
    
    if points_per_hour < 0:  # 每个小时划分的时间段的个数要>=0,否则报错
        raise ValueError("points_per_hour should be greater than 0!")

    if label_start_idx + num_for_predict > sequence_length:   # 如果: 预测目标的开始索引+预测的个数>序列长度;也就是说在这个索引的位置上,不足以进行预测了
        return None

    x_idx = []
    for i in range(1, num_of_batches + 1):   # 在【1,batch的个数+1) 之间循环;共循环 num_of_batches 次 ;对于每一个batch
        start_idx = label_start_idx - points_per_hour * units * i  # batch 的开始索引=预测标签的开始索引-每小时的片段个数(12)*单元时间(周:7*24)*i
        end_idx = start_idx + num_for_predict                      # batch 的结束索引=开始索引+每个样本预测的节点个数
        if start_idx >= 0:   # 如果 batch的开始索引 >=0(即未出现负数的时候)  将这个batch的(开始索引,结束索引)添加到列表x_idx中,否则退出函数返回none
            x_idx.append((start_idx, end_idx))
        else:
            return None

    if len(x_idx) != num_of_batches:  # 如果(开始索引,结束索引)的列表的长度 不等于 batch的数目,则退出函数返回none
        return None

    return x_idx[::-1]   # 列表的取元素【开始:结束:步长】 在这里-1代表逆向步长 也就是说列表的头与尾的颠倒
    # 返回的是个 列表:每个元素都是一个元组,每个元组有两个元素(开始索引,结束索引)
    #                 列表`最多`共有 num_of_batches 个元组
    # 实质内容为:     返回的是每个batch的(开始索引,结束索引)

2.2 举例分析

如果采用的是一个月的数据,正好是30天的数据。以每5分钟记录一条数据的话,则共有30∗24∗12=864030*24*12=8640302412=8640个时间划分。预测的时间长度为1个小时=12∗5min12*5min125min

假设1:num_of_batches=50​

  1. 对于week_sample:以一周为单位,units=7∗24=7*24=724

第一个batch开始的索引start_idx=8640−7∗24∗12∗1=6624=8640-7*24*12*1=6624=8640724121=6624,

结束索引end_idx=star_idx+num_for_predict=6624+126624+126624+12 # 288是一天的时间段

第二个batch开始的索引start_idx=8640−7∗24∗12∗2=4608,=8640-7*24*12*2=4608,=8640724122=4608,

结束索引end_idx=star_idx+num_for_predict=4608+124608+124608+12 # 288是一天的时间段

第三个batch开始的索引start_idx=8640−7∗24∗12∗3=2592,=8640-7*24*12*3=2592,=8640724123=2592,

结束索引end_idx=star_idx+num_for_predict=2592+122592+122592+12 # 288是一天的时间段

第四个batch开始的索引start_idx=8640−7∗24∗12∗4=576,=8640-7*24*12*4=576,=8640724124=576,

结束索引end_idx=star_idx+num_for_predict=$576+12 # 288是一天的时间段

显然,当计算第5个batch的开始索引时已经为负数了,根据if判断条件,返回None

number_of_batches对于week_samples来说是最多为4,才能保证不返回none

假设2:num_of_batches=2

  1. 对于week_sample:以一周为单位,units=7∗24=7*24=724
x_idx=[(4608,4608+12),(6624,6624+12)]
  1. 对于day_sample:以一日为单位,units=24

第一个batch开始的索引start_idx=8640−24∗12∗1=8352=8640-24*12*1=8352=864024121=8352,

第二个batch开始的索引start_idx=8640−24∗12∗2=8064=8640-24*12*2=8064=864024122=8064,

x_idx=[(8064,8064+12),(8352,8352+12)]
  1. 对于hour_sample:以一日为单位,units=1

第一个batch开始的索引start_idx=8640−12∗1=8628=8640-12*1=8628=8640121=8628,

第二个batch开始的索引start_idx=8640−12∗2=8616=8640- 12*2=8616=8640122=8616,

x_idx=[(8616,8616+12),(8628,8628+12)]

三、批量处理之生成4类批量数据

  • 将根据文件路径加载的原始数据集data_seq传递给参数data_sequence
  • 因此原始数据集应该是3维数组的形式

3.1 代码分析

def get_sample_indices(data_sequence, num_of_weeks, num_of_days, num_of_hours,
                       label_start_idx, num_for_predict, points_per_hour=12):
    '''
    Parameters
    ----------
    data_sequence   : np.ndarray    数据序列(3维数组)
                   shape is (sequence_length, num_of_vertices, num_of_features)
    num_of_weeks, num_of_days, num_of_hours: int
    label_start_idx : int, the first index of predicting target               预测目标的第一个索引
    num_for_predict : int,the number of points will be predicted for each sample 对每个样本预测的数目
    points_per_hour : int, default 12, number of points per hour  每个小时的时间片段数,以5分钟为一个时间段,每个小时共划分12个
    Returns
    ----------
    week_sample: np.ndarray  3维数组
                 shape is (num_of_weeks * points_per_hour,
                           num_of_vertices, num_of_features)

    day_sample: np.ndarray
                 shape is (num_of_days * points_per_hour,
                           num_of_vertices, num_of_features)

    hour_sample: np.ndarray
                 shape is (num_of_hours * points_per_hour,
                           num_of_vertices, num_of_features)

    target: np.ndarray
            shape is (num_for_predict, num_of_vertices, num_of_features)
    '''
    
    week_indices = search_data(data_sequence.shape[0], num_of_weeks,   # 生成周的索引序列
                               label_start_idx, num_for_predict,
                               7 * 24, points_per_hour)
    if not week_indices:
        return None

    day_indices = search_data(data_sequence.shape[0], num_of_days,     # 生成日的索引序列
                              label_start_idx, num_for_predict,
                              24, points_per_hour)
    if not day_indices:
        return None

    hour_indices = search_data(data_sequence.shape[0], num_of_hours,  # 生成时的索引序列
                               label_start_idx, num_for_predict,
                               1, points_per_hour)
    if not hour_indices:
        return None
    # 根据分类,对每个batch的数据合并在一起
    week_sample = np.concatenate([data_sequence[i: j]
                                  for i, j in week_indices], axis=0)
    day_sample = np.concatenate([data_sequence[i: j]
                                 for i, j in day_indices], axis=0)
    hour_sample = np.concatenate([data_sequence[i: j]
                                  for i, j in hour_indices], axis=0)

    target = data_sequence[label_start_idx: label_start_idx + num_for_predict]

    return week_sample, day_sample, hour_sample, target

3.2 举例分析

在‘假设2:num_of_batches=2’的条件下,分别得到week,day,hour的对应的x_idx为例,生成对应的week_sample,day_sample,hour_sample

  1. week_sample
x_idx=[(4608,4608+12),(6624,6624+12)]
>>>data_sequence[4608:4620]
# 以5个节点3个特征为例,数据应为1.2 中介绍的形式(12,5,3)
>>>data_sequence[6624:6636]
# 以5个节点3个特征为例,数据应为1.2 中介绍的形式(12,5,3)
week_sample=np.concatenate([data_sequence[4608:4620],data_sequence[6624:6636]],axis=0)
# 按照axis=0合并的话,将后两维看成一个整体的话(12,5*3),对应到矩阵行列中,按照增加行的方式合并。
# 合并后的week_sample的shape:(24,5,3)
  1. day_sample

同理,如上所示,合并后的day_sample的shape:(24,5,3)

  1. hour_sample

同理,如上所示,合并后的hour_sample的shape:(24,5,3)

  1. target
 target = data_sequence[label_start_idx: label_start_idx + num_for_predict]
 target=data_sequence[8640:8652] # 8640=30*24*12

四、生成train、test、validation数据集

这是一个完整定义的函数,非常长,为了方便理解和记忆,我将其分布为几个步骤

4.1 函数的定义名称与参数解释

def read_and_generate_dataset(graph_signal_matrix_filename,
                              num_of_weeks, num_of_days,
                              num_of_hours, num_for_predict,
                              points_per_hour=12, merge=False):
    '''
    读取并生成数据集
    Parameters
    ----------
    graph_signal_matrix_filename: str, path of graph signal matrix file 图信号矩阵 的文件名
    num_of_weeks, num_of_days, num_of_hours: int  周的个数; 天的个数; 小时的个数
    num_for_predict: int  预测的个数
    points_per_hour: int, default 12, depends on data  每个小时的节点数,默认12,因为 1个小时,每5分钟一个时间段
    merge: boolean, default False,
           whether to merge training set and validation set to train model 是否将训练集和验证集用于训练

    Returns
    ----------
    feature: np.ndarray,
             shape is (num_of_samples, num_of_batches * points_per_hour,
                       num_of_vertices, num_of_features)
    target: np.ndarray,
            shape is (num_of_samples, num_of_vertices, num_for_predict)

    '''

4.2对数据集的加载与轴的变换(在def之内)

 # def read_and_generate_dataset():   
    # 代码开始
    data_seq = np.load(graph_signal_matrix_filename)['data'] # 加载数据集

    all_samples = []
    for idx in range(data_seq.shape[0]): # 数据集的第一维
        sample = get_sample_indices(data_seq, num_of_weeks, num_of_days,  
                                    num_of_hours, idx, num_for_predict,   
                                    points_per_hour)
        ## 返回3个样本: 周样本,日样本,时样本; + 1个标签:target;都是3维数组
        ##  如果传入 周数目、天数目、时数目、为空,则返回none
        
        if not sample: 
            continue
        # 当sample=none时,not sample 的布尔值为true,则跳过 for 循环中 continue 下面的内容
       
       week_sample, day_sample, hour_sample, target = sample  
       # 样本sample不为none时,则对返回的3个样本和1个标签 进行拆包
        all_samples.append((
            np.expand_dims(week_sample, axis=0).transpose((0, 2, 3, 1)),
            np.expand_dims(day_sample, axis=0).transpose((0, 2, 3, 1)),
            np.expand_dims(hour_sample, axis=0).transpose((0, 2, 3, 1)),
            np.expand_dims(target, axis=0).transpose((0, 2, 3, 1))[:, :, 0, :]  # target 的特征个数为1个
        ))
函数解析
np.expand_dims(ndarray,axis)

对ndarray的shape下(aixa=0,axis=1,axis=2)也就是在axis=0的位置进行升维度

案例:

a = np.array([[[1,2,3],[4,5,6]]])  a.shape:   (1, 2, 3)
b = np.expand_dims(a, axis=0)      b.shape:(1, 1, 2, 3)
b:array([[[[1,2,3],[4,5,6]]]])
a:list([1,2,3]
       [4, 5, 6])
b:DataFrame([1, 2, 3]
           [4, 5, 6]])
transpose坐标轴转换函数

以week_sample=7∗247*24724
本来shape(0,1,2,3)=(1, 7*24*12, 顶点个数, 特征个数)

⟹\Longrightarrow (1, 顶点个数, 特征个数, 7*24*12)

结果分析
all_samples:list
  • 它的元素个数是:data_seg.shape[0][0][0](加载数据集的个数) 根据:for 循环
  • 它的每个元素是一个四元元组(week_sample, day_sample, hour_dample, target)
  • 对于这个四元元组具体而言:每一个都是4维数组
    • week_sample: (1, 顶点个数, 特征个数, 72412)
    • day_sample : (1, 顶点个数, 特征个数, 24*12)
    • hour_sample: (1, 顶点个数, 特征个数, 1*12)
    • target : (1, 顶点个数, 特征个数, num_for_predict)

4.3 按比例划分train, test, validation

样本分割线
    split_line1 = int(len(all_samples) * 0.6)  # 样本分割线,0.6的分割比例
    split_line2 = int(len(all_samples) * 0.8)  # 样本分割线,0.8的分割比例
数据的划分方式:
  • merge=True :将训练数据集验证数据集混合在一起,做训练
  • merge=Fals:将训练数据集验证数据集分开,用于训练的数据集中不含验证数据集
    if not merge:
        training_set = [np.concatenate(i, axis=0)    # axis=0,沿这个轴增加内容
                        for i in zip(*all_samples[:split_line1])]
    else:
        print('Merge training set and validation set!')
        training_set = [np.concatenate(i, axis=0)
                        for i in zip(*all_samples[:split_line2])] 
                        # zip*(all_samples) 拆包(week_sample,day_sample,hour_sample,target)打开并重组
                        
    validation_set = [np.concatenate(i, axis=0)
                      for i in zip(*all_samples[split_line1: split_line2])]
                     # 如上构造验证集,只是抽样个数为:len(all_sample)*0.2
    testing_set = [np.concatenate(i, axis=0)
                   for i in zip(*all_samples[split_line2:])]
                     # 如上构造验证集,只是抽样个数为:len(all_sample)*0.2
结果分析

training_set:list (分割比例为:0.6)

  • for 循环的内容为:( (week_sample_s) , (day_sample_s) , (hour_sample_s) , (target_s) )

    • 循环次数为4,每次循环做的事为:一次 np.concatenate

      np.concatenate

      • 它可以将 元组(如:week_smple_s)中包括的所有的array一起拼接在一起。

      • 并且 拼接一次的结果作为 列表training_set 中的一个元素

    • 元组(week_sample_s)里面包含 all_sample里面截取的元素个数 个 week_sample ,自己的定义方式如下(为方便理解)

      week_sample_s=(week_sample, week_sample, week_sample,…,week_sample)

  • 第一次循环:

    • 它 传入的元组是 week_sample_s,将这个元组里面的所有的week_sample 按照aixs=0 拼接在一起。
    • training_set的第一个元素是array数组,其shape 变为( len(all_sample)*0.6_抽样个数, 顶点个数, 特征个数, 7*24*12 )
  • 第二次循环:

    • 它 传入的元组是 day_sample_s,将这个元组里面所有的 day_sample,继续按照axis=0,拼接一起
    • training_set的第二个元素是array数组,其shape 变为( len(all_sample)*0.6_抽样个数, 顶点个数, 特征个数, 24*12)
  • 第三次循环:

    • trainning_set的第三个元素是array数组,其 shape 变为( len(all_sample)*0.6_抽样个数, 顶点个数, 特征个数, 1*12)
  • 第四次循环:

    • trainning_set的第四个元素是array数组,其 shape 变为( len(all_sample)*0.6_抽样个数, 顶点个数, 特征个数, num_for_predict)

在这里插入图片描述

训练集、测试集、验证集细分week,day,hour,target并打印数据集的shape
    train_week, train_day, train_hour, train_target = training_set  
    # 训练数据集中,按照 train_周, train_日, train_时, train_target 拆包

    val_week, val_day, val_hour, val_target = validation_set        
    # 验证数据集中, 按照 var_周, var_日,  var_时, var_target 拆包

    test_week, test_day, test_hour, test_target = testing_set       
    # 测试数据集中, 按照 test_周, test_日, test_时, test_target 拆包

    print('training data: week: {}, day: {}, recent: {}, target: {}'.format(
        train_week.shape, train_day.shape,
        train_hour.shape, train_target.shape))

    print('validation data: week: {}, day: {}, recent: {}, target: {}'.format(
        val_week.shape, val_day.shape, val_hour.shape, val_target.shape))         
        # 打印出 验证集 拆包后的每个的shape,形式如上,只是第一维变成len(all_sample)*0.2_抽样个数

    print('testing data: week: {}, day: {}, recent: {}, target: {}'.format(
        test_week.shape, test_day.shape, test_hour.shape, test_target.shape))     
        # 打印出 测试集 拆包后的每个的shape,形式如上,只是第一维变成len(all_sample)*0.2_抽样个数
结果分析

打印出 训练集 拆包后的每个的shape

  • train_week.shape:( len(all_sample)*0.6_抽样个数, 顶点个数, 特征个数, 7*24*12 )
  • train_day.shape: ( len(all_sample)*0.6_抽样个数, 顶点个数, 特征个数, 24*12)
  • train_hour.shape:( len(all_sample)*0.6_抽样个数, 顶点个数, 特征个数, 1*12)
  • train_target.shape:( len(all_sample)*0.6_抽样个数, 顶点个数, 特征个数, num_for_predict)

4.4 对分割好的数据集进行 标准化(z_score)处理

利用normalization函数进行标准化处理,该函数返回:

  • 1个字典:训练集的均值 和 方差

  • 3个数组:标准化后的训练集,标准化后的验证集,标准化后的测试集

    # 周数据的 训练集、验证集、测试集
    (week_stats, train_week_norm,
     val_week_norm, test_week_norm) = normalization(train_week,
                                                    val_week,
                                                    test_week)
     # 日数据的 训练集、验证集、测试集
    (day_stats, train_day_norm,
     val_day_norm, test_day_norm) = normalization(train_day,
                                                  val_day,
                                                  test_day)
     # 时数据的 训练集、验证集、测试集
    (recent_stats, train_recent_norm,
     val_recent_norm, test_recent_norm) = normalization(train_hour,
                                                        val_hour,
                                                        test_hour)

4.5 采用双重字典的方式表示表示各个部分的数据集

    # 所有的数据,变成一个 双重字典 的方式
    all_data = {
        'train': {
            'week': train_week_norm,
            'day': train_day_norm,
            'recent': train_recent_norm,
            'target': train_target,
        },
        'val': {
            'week': val_week_norm,
            'day': val_day_norm,
            'recent': val_recent_norm,
            'target': val_target
        },
        'test': {
            'week': test_week_norm,
            'day': test_day_norm,
            'recent': test_recent_norm,
            'target': test_target
        },
        'stats': {
            'week': week_stats,
            'day': day_stats,
            'recent': recent_stats
        }
    }

    return all_data

到此为止,该函数结束。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐