3.使用spark开发第一个程序WordCount程序及多方式运行代码
WordCount是一个快速入门案例,单词统计,通过此案例,学习如何用scala来编写spark程序,spark 跑任务的几种方式,日志聚合
目录
概述
WordCount 是一个快速入门案例,单词统计,通过此案例,学习如何用 scala 来编写 spark 程序,spark 支持 java,scalal 这些语言,目前在企业中大部分公司都是使用 scala 进行开发,后序的 flink 是基于 java 开发的,这与官网的引导有关,flink 的源码在去scala 化 ,基于此,将要实现以下几个目标:
- WordCount程序
- 任务提交
- 使用
idea - 使用
spark-submit - 使用
spark-shell
- 使用
Spark historyServer配置
前置文章请参考:
| 文章 | 地址 |
|---|---|
| scala安装 | 地址 |
| idea如何开发spark代码 | 地址 |
| Spark的工作与架构原理 | 地址 |
| RDD编程指南 | 地址 |
| RDD持久化 | 地址 |
| Spark共享变量 | 地址 |
此篇文章涉及到相关的知识点较多,跟着做完,即能有所得
WordCount程序
本程序 基于 spark 3.2.4 ,scala 2.12.x 版本开发
准备工作
由于后续任务运行时,需要在 客户端节点或hadoop节点机器上 操作,文件来源,使用 hdfshello.txt上传至 hdfs如下操作
[root@hadoop01 data]# hdfs dfs -put hello.txt /tmp/
2023-11-02 09:23:45,159 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[root@hadoop01 data]# ls
hadoop_repo hello.txt hive_repo soft
上传结果如下图
scala编写代码
编码之前,要清楚代码的逻辑,即加载的数据如何一步一步变换成最终想要的结果 ,由下分析步骤可知,万变不离其宗 S -> T -> S 三板斧, 详细请参考 Spark的工作与架构原理
- 创建SparkContext
- 加载数据(
S) - 对数据进行分隔,将一行数据分隔成一个一个的单词 (
T) - 迭代 words,将第个 word 转化为 (word,1) 这种形式 (
T) - 第五步:根据 key (其实就是 word) 进行分组聚合统计 (
T) - 第六步:结果打印 (
S)
代码如下
object WordCount {
def main(args: Array[String]): Unit = {
// "hdfs:///tmp/hello.txt"
var path = "/Users/hyl/Desktop/fun/sts/spark-demo/hello.txt"
if(args.length==1){
path = args(0)
}
// 第一步:创建SparkContext
val conf = new SparkConf()
conf
// 设置任务名称
.setAppName("WordCount")
// local 表示本地运行
.setMaster("local")
val sp = new SparkContext(conf)
// 第二步:加载数据
val lineRdd = sp.textFile(path)
// 第三步:对数据进行分隔,将一行数据分隔成一个一个的单词
val wordsRdd = lineRdd.flatMap(_.split(" "))
// 第四步:迭代 words,将第个 word 转化为 (word,1) 这种形式
val pairRdd = wordsRdd.map((_, 1))
// 第五步:根据 key (其实就是 word) 进行分组聚合统计
val wordCountRdd = pairRdd.reduceByKey(_ + _)
// 第六步:结果打印
wordCountRdd.foreach(println _)
}
}
任务提交
使用 idea
运行 idea 得如下结果
来体验一下 scala 函数式编程的极简之美
object WordCount2 {
def main(args: Array[String]): Unit = {
var path = "/Users/hyl/Desktop/fun/sts/spark-demo/hello.txt"
if (args.length == 1) {
path = args(0)
}
val conf = new SparkConf()
conf
// 设置任务名称
.setAppName("WordCount")
// local 表示本地运行
.setMaster("local")
new SparkContext(conf).textFile(path).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).foreach(println _)
}
}
使用 spark-submit
使用 spark-submit 提交到集群执行,实际工作中会使用这种方式
那接下来需要把代码提交到集群中去执行
这个时候就需要对代码打包了
首先在项目的 pom文件中添加 build 配置,和 dependencies 标签平级,详细请参考 源码
注意: 打包时,要将如下图的代码注释掉
[root@hadoop01 jar]# ls
spark-demo-1.0-SNAPSHOT.jar
[root@hadoop01 jar]# pwd
/data/jar
spark-submit \
--class com.fun.scala.WordCount2 \
--master yarn \
--deploy-mode client \
--executor-memory 1G \
--num-executors 1 \
/data/jar/spark-demo-1.0-SNAPSHOT.jar \
hdfs:///tmp/hello.txt
异常
Exception in thread “main” org.apache.spark.SparkException: When running with master ‘yarn’ either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.

[root@hadoop01 bin]# vi /etc/profile
export HADOOP_CONF_DIR=/data/soft/hadoop-3.2.4/etc/hadoop/
[root@hadoop01 bin]# source /etc/profile
[root@hadoop01 bin]#
执行成功
使用 spark-shell
这种方式方便在集群环境中调试代码
有一些代码对环境没有特殊依赖的时候可以直接使用第一种方式,在idea中调试代码
但是有时候代码需要依赖线上的一些环境,例如:需要依赖线上的数据库中的数据,由于权限问题,在本地是无法连接的
这个时候想要调试代码的话,可以选择使用spark-shell的方式,直接在线上服务器中开启一个spark 的交互式命令行窗口
注意:使用
spark-shell的时候,也可以选择指定开启本地spark集群,或者连接standalone集群,或者使用on yarn模式,都是可以的
[root@hadoop01 bin]# spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2023-11-02 10:20:11,746 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://hadoop01:4040
Spark context available as 'sc' (master = local[*], app id = local-1698891612526).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.2.4
/_/
Using Scala version 2.12.15 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_391)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
执行需要的代码
scala> val path = "hdfs:///tmp/hello.txt"
path: String = hdfs:///tmp/hello.txt
scala> sc.textFile(path).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).foreach(println _)
(hyl,1)0:> (0 + 2) / 2]
(word,1)
(hello,4)
(12755167,1)
(test,1)
scala>
效果图
Spark historyServer 配置
请先配置 hadoop historyServer
刚才使用 on yarn 模式的时候会发现看不到输出的日志信息,这主要是因为没有开启 spark 的historyserver,只开启了hadoop的historyserver
需要修改spark-defaults.conf和spark-env.sh
首先对spark-defaults.conf.template重命名
然后在spark-defaults.conf中增加以下内容
spark.eventLog.enabled=true
spark.eventLog.compress=true
spark.eventLog.dir=hdfs:///tmp/logs/root/logs
spark.history.fs.logDirectory=hdfs:///tmp/logs/root/logs
spark.yarn.historyServer.address=http://hadoop01:18080
注意:在哪个节点上启动spark的
historyserver进程,spark.yarn.historyServer.address的值里面就指定哪个节点的主机名信息
在spark-env.sh中增加以下内容
export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs:///tmp/logs/root/logs"
bug
java.io.FileNotFoundException: File does not exist: hdfs:/tmp/logs/root/logs

解决 : 预先建立一个目录
[root@hadoop01 spark-3.2.4-bin-hadoop3.2]# hdfs dfs -mkdir hdfs:/tmp/logs/root/logs
2023-11-02 10:36:47,136 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[root@hadoop01 spark-3.2.4-bin-hadoop3.2]# sbin/start-history-server.sh
starting org.apache.spark.deploy.history.HistoryServer, logging to /data/soft/spark-3.2.4-bin-hadoop3.2/logs/spark-root-org.apache.spark.deploy.history.HistoryServer-1-hadoop01.out
[root@hadoop01 spark-3.2.4-bin-hadoop3.2]# jps
11716 NameNode
2628 Jps
12117 SecondaryNameNode
12503 ResourceManager
10520 Master
11243 RunJar
2333 HistoryServer
在 yarn 上重新执行 WorkCount 程序,可以看到以下输出内容
结束
至此 WordCount 程序开发及运行结束,如有问题,欢迎评论区留言。
更多推荐


所有评论(0)