SparkMl使用的不多,一两年前业务上需要就用了一下,之后就没再使用了,最近又有需求了,使用SparkMl做了一下时序预测,先在这一篇笔记里记录一下之前使用SparkMl的简单应用。
这个案例使用的是随机森林模型。

1.先准备一下训练数据和测试数据,两个数据集的scheme的一样

val data: DataFrame=......
val trainDF: DataFrame= data.where("date_time<'2024-02-22'")
val testDF: DataFrame= data.where("date_time='2024-02-22'")

2.离散变量转为连续变量,硬编码

    val indexCols = Array(
      "peaks_pm_out", "lot_nature_code_new", "seg", "peaks_inner_pm_value", "peaks_am_out", "peaks_inner_am_value",
      "peaks_am_inner", "weekday", "is_holiday", "peaks_pf", "peaks_af", "peaks_out_pm_value", "is_positive",
      "peaks_pm_inner", "peaks_out_am_value")
    //离散变量 转为连续变量 硬编码
    val labelIndexers: Array[StringIndexer] = indexCols.map(col => {
      new StringIndexer()
        .setInputCol(col)
        .setOutputCol(s"le_$col") //转换后的字段名为原字段名加上le前缀
        .setStringOrderType("alphabetAsc")
        .setHandleInvalid("keep")
    })

3.设置特征向量,这些字段将作为特征用于训练模型,会汇总成一个字段"features"

    val assembler = new VectorAssembler().setInputCols(
      Array("out_num"
        , "avg_time_seg_median"
        , "le_peaks_pm_out"
        , "le_lot_nature_code_new"
        , "le_seg"
        , "le_peaks_inner_pm_value"
        , "come_num"
        , "le_peaks_am_out"
        , "flow_diff"
        , "le_peaks_inner_am_value"
        , "le_peaks_am_inner"
        , "le_weekday"
        , "avg_time_day7_mean"
        , "le_is_holiday"
        , "le_peaks_pf"
        , "le_peaks_af"
        , "le_peaks_out_pm_value"
        , "le_is_positive"
        , "le_peaks_pm_inner"
        , "le_peaks_out_am_value"
        , "total_space")
    ).setOutputCol("features").setHandleInvalid("skip")

4.创建模型对象,各个参数有啥用就需要去看随机森林的文档了

    //随机森林模型对象
    val rf: RandomForestRegressor = new RandomForestRegressor()
      .setNumTrees(100)
      .setMaxDepth(15)
      .setMinInstancesPerNode(1)
      .setMinInfoGain(0.0)
      .setImpurity("variance")
      .setFeaturesCol("features")
      .setLabelCol("lable")
      .setFeatureSubsetStrategy("auto")
      .setMaxBins(97)
      .setCacheNodeIds(true)
      .setSubsamplingRate(1.0)
      .setSeed(42)

5.创建机器学习管道,并训练得到模型

 //创建 机器学习pipeline
    val serializables = labelIndexers :+ assembler :+ rf
    val pipeline = new Pipeline().setStages(serializables)
 //基于训练集 模型训练
    val model: PipelineModel = pipeline.fit(train)

6.使用上述模型结果预测数据

//测试数据集 模型预测结果验证,result这个dataframe中包含了label值和prediction值,可以基于这两个值
//进行偏差的计算,比如mae,mse
    val result = model.transform(testDF)

7.加入结果不错,模型可以保存下来,共后续使用,我是保存到hdfs中

model.write.overwrite().save(s"hdfs://xxxxx/spark/model/xxxxx/xxxx_model")

8.其它spark程序加载模型并使用模型预测结果

val result: DataFrame=.....
val model = PipelineModel.load("hhdfs://xxxxx/spark/model/xxxxx/xxxx_model")
//预测结果
val predictionDF = model
            .transform(result)
            .where("prediction<=1.0")
            .drop("features")


Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐