Spark编程题:使用RDD求支付金额的Top N值
Spark 编程题:使用 RDD 求支付金额的 Top N 值
一、题目要求
使用 Spark RDD 从给定的支付数据中找出支付金额(payment 字段)最高的前 5 个值,并按降序排列输出。数据格式为逗号分隔的字符串,其中第三个字段是支付金额。具体要求如下:
- 从 HDFS 或本地文件系统读取数据
- 过滤掉无效数据行
- 提取 payment 字段
- 转换为键值对形式
- 按 payment 值降序排序
- 输出前 5 个最高支付金额及其排名
测试数据如下:
1 | |
二、解题思路
- 数据读取:利用 Spark 的
textFile方法从 HDFS 或本地文件系统读取数据文件。 - 数据清洗:对读取的数据进行处理,通过字符串分割、条件判断等操作,过滤掉无效数据行。
- 字段提取与转换:提取支付金额字段,并将数据转换为键值对形式,方便后续排序操作。
- 排序:使用
sortBy方法按照支付金额进行降序排序。 - 结果输出:选取排序后的数据中的前 5 条记录,输出其排名和支付金额。
三、详细步骤
3.1 准备测试数据
在本地创建一个 CSV 格式的文件,用于存储测试数据。在命令行中执行以下命令:
1 | |
上述命令使用cat >创建一个名为payment_data.csv的文件,并将测试数据写入其中。完成后,可以使用cat payment_data.csv命令查看文件内容,确认数据写入正确。
3.2 启动 HDFS
如果计划将数据存储在 HDFS 上,需要先确保 HDFS 服务正常启动。在 Hadoop 环境中,执行以下命令启动 HDFS:
1 | |
启动完成后,可以使用jps命令查看进程,确认NameNode和DataNode进程已经正常运行。
3.3 将数据上传至 HDFS
在 HDFS 启动后,创建存储数据的目录,并将本地的测试数据文件上传至 HDFS。执行以下命令:
1 | |
上述命令首先在 HDFS
中创建了/user/hadoop/data目录(如果目录不存在),然后将本地的payment_data.csv文件上传至该目录。
上传完成后,可以使用以下命令验证数据是否成功上传:
1 | |
hdfs dfs -ls命令用于列出目录下的文件,hdfs dfs -cat命令用于查看文件内容。
3.4 编写 Spark 代码
使用文本编辑器(如vi或vim)创建一个 Python
文件,命名为top_n_payments.py,并编写以下代码:
1 | |
代码说明:
- 初始化
SparkContext:通过
SparkConf和SparkContext创建 Spark 应用程序的上下文,设置应用名称为TopNPayments。 - 数据读取:使用
sc.textFile方法从 HDFS 路径读取数据文件。 - 数据处理
map方法将每一行数据按逗号分割成列表。filter方法过滤掉数据不完整(字段数量不足 3 个)或关键字段为空的无效数据行。- 再次使用
map方法将数据转换为键值对形式,键为用户 ID(转换为整数类型),值为支付金额(转换为浮点数类型)。 sortBy方法按照支付金额(值)进行降序排序。take方法获取排序后数据的前 5 条记录。
- 结果输出:遍历前 5 条记录,输出其排名和支付金额。
- 停止 SparkContext:在程序执行完毕后,停止 Spark 应用程序的上下文。
3.5 提交 Spark 作业
在命令行中执行以下命令提交 Spark 作业:
1 | |
--master local[*]表示在本地模式下运行 Spark
作业,使用所有可用的 CPU 核心。执行命令后,Spark
会开始处理数据,并输出支付金额最高的前 5 个值及其排名。
四、常见问题及解决方法
4.1 数据读取失败
如果出现数据读取失败的情况,可能原因如下:
- 文件路径错误:确保
file_path变量中的 HDFS 路径与实际存储路径一致,检查 NameNode 地址和端口是否正确。 - HDFS
服务未启动:使用
jps命令检查NameNode进程是否正常运行,若未运行,需启动 HDFS 服务。 - 权限问题:确认运行 Spark 作业的用户有权限访问 HDFS
中的数据文件,可以通过
hdfs dfs -chmod命令修改文件权限。
4.2 代码语法错误
如遇到SyntaxError错误,可能是由于 Python
版本不兼容导致。例如,代码中使用了 f-strings(Python 3.6+
支持),而环境中的 Python 版本低于 3.6。此时可以将 f-strings
替换为str.format()或%格式化方法,或者升级
Python 版本到 3.6 及以上。
4.3 连接 HDFS 失败
当出现连接 HDFS
失败的错误,如java.net.ConnectException: Connection refused,可参考以下解决方法:
- 检查 HDFS
服务状态:使用
start-dfs.sh启动 HDFS 服务,并通过jps命令确认NameNode和DataNode进程正常运行。 - 查看日志文件:查看
NameNode的日志文件(通常位于$HADOOP_HOME/logs目录下),获取详细的错误信息,根据提示进行问题排查。 - 检查防火墙和 SELinux:临时关闭防火墙和 SELinux 进行测试,若关闭后能正常连接,则需配置防火墙规则允许 HDFS 相关端口的访问。
- 检查配置文件:确认
core-site.xml和hdfs-site.xml中的配置正确,尤其是fs.defaultFS和dfs.namenode.http-address等关键属性。
通过以上详细的步骤和问题解决方法,希望能够帮助你顺利完成使用 Spark RDD 求支付金额 Top N 值的编程任务。在实际操作过程中,可能会遇到各种不同的情况,需要根据具体的错误提示和环境进行灵活处理 。