实验 7 - 认识 Spark¶
概述¶
本次实验中,我们需要搭建起 Spark 集群并在其基础上执行一些基础操作。
本实验需要实验 1 的基础,如果你还未完成,请先完成实验 1.
安装 Spark¶
一如既往,我们需要在集群内的所有机器上先完成 Spark 的安装和配置工作,才能正常使用。
下载¶
不像 HBase 那边的情况,Spark 目前的官方预编译版本直接跟随了最新的 Hadoop,所以你可以直接下载 Spark 官方版本。
安装¶
注意用户
现在,我们正在执行系统维护操作(软件安装),我们会使用 super 用户(或者其它具有 sudo 权限的管理员用户)。请确保你正在使用此类用户执行操作,再继续。
先将 Spark 包(spark-3.5.5-bin-hadoop3-scala2.13.tgz)传输至虚拟机。具体步骤可以参考实验 1.
依然,登录到虚拟机,你可以使用 SSH 远程连接,或者直接在虚拟机内登录操作。
以下步骤需要在集群内所有机器上均运行一遍。
似曾相识?
是的,快夸夸自己,Spark 的安装过程和 HBase 有异曲同工之妙,而且绝大多数准备工作我们都在实验 1 完成了,便省下了很多工作量。
-
创建 Spark 安装目录:
-
将 Spark 压缩包移动至目的地:
-
前往 Spark 安装目录:
-
解压缩:
-
调整位置1:
-
移除目录:
-
移除压缩包:
-
修正权限1:
-
修改
profile: -
添加以下内容:
-
按下 Ctrl+X,确认保存(按 Y),确认文件名(Enter)
-
应用变更:
-
切换到
hadoop用户: -
进入 Spark 配置文件目录:
-
使用模板创建新的
spark-env.sh: -
修改
spark-env.sh:-
添加以下几行2:
-
-
使用模板创建新的
workers: -
修改
workers:-
删除现有的
localhost -
添加以下几行:
-
运行¶
注意用户
在继续之前,请确定你正在使用 hadoop 用户身份进行操作。
终于完成了!我们来运行 Spark 试一下:
为什么要这么麻烦?
因为需要避免和 Hadoop 的 start-all.sh 发生冲突。
视情况启动 Hadoop
Spark 集群独立运行,如果不需要执行读写 HDFS 的任务,无需启动 Hadoop 集群。
类似地,如果需要停止集群:
现在你可以尝试访问 Spark 面板,即 namenode 的 8080 端口 查看 Spark 状态了。你也可以使用 jps 命令确认是否在 namenode 上同时有 Worker 和 Master 在运行,且在其他节点上有 Worker 在运行。
如果你的 Spark 面板长得像这样,恭喜,你的 Spark 集群已经搭建完成,正常运行了。
测试¶
Spark 内置了几个测试项目,我们可以运行其中的一些来尝试,以 hadoop 用户身份:
spark-submit --class org.apache.spark.examples.SparkPi --master spark://namenode:7077 $SPARK_HOME/examples/jars/spark-examples_2.13-3.5.5.jar
这个测试项目是在 Spark 上计算圆周率,仔细看,会在日志接近末尾的部分发现输出:
这就是集群计算后的输出内容,每次运行的结果可能不一样。同时也可以在 Spark 面板上注意到出现完成的任务:
使用 Spark Shell¶
Spark Shell 提供了学习 API 的简单方法,也是交互式分析数据的强大工具。以 hadoop 用户身份:
-
运行 Hadoop 集群(不需要启动 YARN):
2. 在确保 HDFS 安全模式已停用时,把 Spark 的README.md文件上传至 Hadoop HDFS,作为需要处理的数据: 3. 运行 Spark Shell:
当你看到 scala> 的提示时,你就成功进入 Spark Shell 了。此时可以在 Spark 面板上注意到 Spark Shell 正在作为一个运行在集群上的任务运行:
同时可以访问当前任务的临时 WebUI 查看具体信息,只需将端口从 Spark 面板的 8080 替换为 4040 即可:
在 Spark Shell 中输入代码:
如果正确,会得到这个输出:
这就代表我们已经顺利载入了刚刚上传至 HDFS 上的文件了。我们可以执行一些基本的分析操作:
scala> textFile.count() // 计算总行数
val res0: Long = 125
scala> textFile.first() // 获取第一行的文本
val res1: String = # Apache Spark
// 筛选出包含 "Spark" 的行,作为新的数据集,再获得总行数
// 由此可以获得包含 "Spark" 的行的数量
scala> textFile.filter(line => line.contains("Spark")).count()
val res2: Long = 20
完成了!按 Ctrl+C 退出 Spark Shell.
关闭集群¶
-
停止 Spark 集群:
2. 如果已经启动 Hadoop 集群,一并停止:



