基于Scala的Spark编程
基于Scala的Spark编程
[TOC]
RDD
定义
RDD是一个可分区、可容错、不可变、具有弹性的数据集合,它可以在Spark中进行并行处理。RDD是Spark的核心数据结构,它支持多种操作,包括转换和动作,以及缓存、检查点等高级功能。
特点
- 可分区:一个RDD可以被分成多个分区,每个分区可以在不同的节点上进行计算,实现数据并行处理。
- 可容错:RDD具有弹性,可以通过备份或重新计算来恢复数据,从而实现容错机制。
- 不可变:RDD中的数据不可被修改,只能通过转换操作生成新的RDD。
- 高效:RDD可以在内存中缓存,以提高计算效率。
创建
RDD可以通过多种方式创建,包括从文件中读取数据、从内存中创建数据、从其他RDD转换等。以下是一些创建RDD的常用方式:
- 从文件中创建RDD
- 从程序中的数据集创建RDD
- 从其他RDD转换
方法
- 转换操作
map(func) // 对RDD中的每个元素应用函数func
filter(func) // 对RDD中的每个元素应用函数func并保留结果为True的元素
flatMap(func) // 对RDD中的每个元素应用函数func并合并为一个结果返回
union(other_rdd) // 返回包含两个RDD中所有元素的新RDD
distince() // 对RDD中的元素去重并返回
- 动作操作
count() // 返回RDD中元素的数量
collect() // 将RDD中所有元素收集到一个列表中
reduce(func) // 对RDD中的所有元素应用函数func并返回一个结果
foreach(func) // 对RDD中的每个元素应用函数func
take(num) // 取RDD中前n个数据
RDD执行过程
具体来说,RDD的计算过程包括以下阶段:
- 转换阶段:在这个阶段,Spark将根据转换操作构建依赖关系DAG。
- 划分阶段:在这个阶段,Spark将DAG划分为多个阶段,每个阶段包括一组具有相同依赖关系的RDD。
- 调度阶段:在这个阶段,Spark将每个阶段划分为多个任务,并将任务分发给集群上的不同节点进行并行计算。
- 执行阶段:在这个阶段,Spark在集群上执行任务,并将结果返回给驱动程序。 在RDD的执行过程中,Spark会尽量将数据存储在内存中以提高计算效率,如果内存不足,则会将一部分数据写入磁盘以释放内存。此外,Spark还提供了缓存和检查点等高级功能来进一步提高计算效率和容错性能。
RDD为什么要持久化
- 当对RDD进行一系列的转换操作后,如果每次都重新计算RDD,这会导致计算成本非常高,降低了Spark的性能。为了解决这个问题,Spark提供了RDD持久化机制。RDD持久化即将RDD的数据存储到内存或磁盘中,以便在后续的操作中重复使用,从而避免重复计算的开销。
- 持久化可以提高Spark的性能,特别是在多次迭代的机器学习算法中。在这些算法中,每个迭代的计算都需要重复读取和计算数据,如果使用持久化机制,就可以避免这种重复计算,提高计算效率。
- 同时,持久化机制还可以增加容错性,因为如果在计算RDD的过程中发生故障,持久化的数据可以被重复使用,从而减少重新计算的时间。
RDD的属性
- 分区列表
- 计算分区的函数
- RDD之间的依赖关系
- 分区器
- 数据分区的地址列表
Spark中RDD和DF的区别
- 数据类型:RDD中的每个元素都是一个Java、Scala或Python对象,而DataFrame是一组列,每个列都有一个数据类型。
- 操作:RDD提供了基本的操作(如map、filter、reduce等),这些操作在元素级别上执行。DataFrame提供了更高级别的操作,例如选择、过滤、聚合等,这些操作在列级别上执行。
- 性能:DataFrame比RDD更加高效。DataFrame使用了一些优化技术,如列式存储、谓词下推等,这些技术可以加快处理速度。
- 编程接口:RDD可以使用Java、Scala或Python进行编程,而DataFrame支持Spark SQL和多种编程语言,如Java、Scala、Python和R等。 总的来说,如果处理的是非结构化数据,或者需要在数据处理时执行复杂的自定义逻辑,则使用RDD更为合适;如果处理的是结构化数据,则使用DataFrame会更为方便和高效。
DF的创建
- 从文件中创建:可以使用Spark的读取器(如spark.read)读取各种格式的文件,例如CSV、JSON、Parquet等,然后将其转换为DataFrame。
// parquet
val df = spark.read.load("path/to/file.parquet")
// 其他文件需要进行一次格式化
val df = spark.read.format("csv").load("path/to/file.csv")
- 从RDD中创建:可以使用RDD的toDF()方法将一个RDD转换为DataFrame。
val rdd = sc.parallelize(Seq((1, "John"), (2, "Jane"), (3, "Mike")))
val df = rdd.toDF("id", "name")
DF的常用操作
select():选择DataFrame中的一个或多个列。 filter():根据某个条件过滤DataFrame的行。 groupBy():将DataFrame按照一个或多个列进行分组。 agg():对DataFrame进行聚合操作,如求和、平均值等。 orderBy():按照一个或多个列对DataFrame进行排序。 join():将两个DataFrame根据一个或多个共同的列连接起来。 distinct():返回DataFrame中唯一不同的行。 dropDuplicates():删除DataFrame中重复的行。 withColumn():在DataFrame中添加一个新列。 drop():删除DataFrame中的一个或多个列。 union():将两个DataFrame中的行合并起来。 describe():计算DataFrame中每个数值列的统计摘要。