行动算子触发作业执行的方法。底层代码调用的环境对象的 runJob 方法。底层代码中会创建 ActiveJob,并提交。与转换算子不同,行动算子直接出结果。而转换算子只是将 RDD 转换成 RDD,并不触发底层的任务执行。以 collect() 算子为例,collect() 源码中包含 runJob() 方法。
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
一层层的 runJob() 跟踪下去,最后在 runJob 的类中,看到 dagScheduler.runJob() 有向无环图的 scheduler 运行 Job。
def runJob[T, U: ClassTag](
...
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
...
}
继续进入 runJob() 方法,会有 submitJob() 提交任务。
def runJob[T, U](
...
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
...
}
}
继续跟进,eventProcessLoop.post(JobSubmitted()) 会提交 Job。
def submitJob[T, U](
...
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
Utils.cloneProperties(properties)))
waiter
}
既然有 JobSubmitted() 就会有 handleJobSubmitted() 其中有 new ActiveJob() 创建激活的作业。
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties): Unit = {
...
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
...
}
reduce()
函数签名
def reduce(f: (T, T) => T): T
函数说明
聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据。
object RDD_Action_reduce {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 reduce
val rdd = sc.makeRDD(List(1, 2, 3, 4))
val i = rdd.reduce(_ + _)
println(i)
// Step3: 关闭环境
sc.stop()
}
}
结果:10,直接出聚合结果。
collect()
采集算子,将不同分区的数据按照分区顺序采集到 Driver 端内存中,形成数组。
函数签名
def collect(): Array[T]
函数说明
在驱动程序中,以数组 Array 的形式返回数据集的所有元素。
object RDD_Action_collect {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 collect
val rdd = sc.makeRDD(List(1, 2, 3, 4))
rdd.collect().foreach(println)
// Step3: 关闭环境
sc.stop()
}
}
返回结果:
1
2
3
4
count()
函数签名
def count(): Long
函数说明
返回 RDD 中元素的个数。
object RDD_Action_count {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 count
val rdd = sc.makeRDD(List(1, 2, 3, 4))
val count = rdd.count()
println(count)
// Step3: 关闭环境
sc.stop()
}
}
返回结果:4
first()
函数签名
def first(): T
函数说明
返回 RDD 中的第一个元素。
object RDD_Action_first {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 first
val rdd = sc.makeRDD(List(1, 2, 3, 4))
val first = rdd.first()
println(first)
// Step3: 关闭环境
sc.stop()
}
}
返回结果:1
take()
函数签名
def take(num: Int): Array[T]
函数说明
返回一个由 RDD 的前 n 个元素组成的数组。
object RDD_Action_collect {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 collect
val rdd = sc.makeRDD(List(1, 2, 3, 4))
println(rdd.take(3).mkString(","))
// Step3: 关闭环境
sc.stop()
}
}
返回结果:
1,2,3
takeOrdered()
函数签名
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
函数说明
object RDD_Action_takeOrdered {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 takeOrdered
val rdd = sc.makeRDD(List(4, 2, 3, 1))
println(rdd.takeOrdered(3).mkString(","))
// Step3: 关闭环境
sc.stop()
}
}
可以选择排序方式,默认是升序排列,如果想要降序排列,需要指定第二个参数。
println(rdd.takeOrdered(3)(Ordering[Int].reverse).mkString(","))
输出结果:
4,3,2
aggregate()
函数签名
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
函数说明
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合。
object RDD_Action_aggregate {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 aggregate
val rdd = sc.makeRDD(List(4, 2, 3, 1), numSlices = 2)
println(rdd.aggregate(0)(_ + _, _ + _))
// Step3: 关闭环境
sc.stop()
}
}
输出结果:10
与 aggregateByKey() 区别:
- aggregateByKey():初始值只会参与分区内计算;
- aggregate():初始值会参与分区内计算,并且参与分区间计算。
object RDD_Action_aggregate {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 aggregate
// 10+1+2 + 10+3+4 + 10 aggregate zeroValue 参与分区间计算,因此结果为 40
val rdd = sc.makeRDD(List(1, 2, 3, 4),numSlices = 2)
println(rdd.aggregate(10)(_ + _, _ + _))
// 10+1+2 + 10+3+4 = 30 aggregateByKey() zeroValue 不参与分区间计算
val keyRDD = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)), numSlices = 2)
keyRDD.aggregateByKey(zeroValue = 10)(_+_,_+_).collect().foreach(println)
// Step3: 关闭环境
sc.stop()
}
}
fold()
函数签名
def fold(zeroValue: T)(op: (T, T) => T): T
函数说明
折叠操作,aggregate 的简化版操作。
object RDD_Action_fold {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 fold
val rdd = sc.makeRDD(List(1, 2, 3, 4),numSlices = 2)
println(rdd.fold(10)(_ + _))
// Step3: 关闭环境
sc.stop()
}
}
输出结果:10
countByValue()
函数签名
def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long]
函数说明
计算 RDD 中值出现的次数,以 Map 的方式输出,Map 的 key 是原数组中的 Value,value 是值出现的次数。
object RDD_Action_countByValue {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 countByValue
val rdd = sc.makeRDD(List(1, 2, 3, 4),numSlices = 2)
println(rdd.countByValue())
val rdd2 = sc.makeRDD(List(1, 1, 3, 1),numSlices = 2)
println(rdd2.countByValue())
// Step3: 关闭环境
sc.stop()
}
}
输出结果:
Map(4 -> 1, 2 -> 1, 1 -> 1, 3 -> 1)
Map(1 -> 3, 3 -> 1)
countByKey()
函数签名
def countByKey(): Map[K, Long]
函数说明
统计每种 key 的个数。
object RDD_Action_countByKey {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 countByKey
val rdd = sc.makeRDD(List(("a",1), ("a",2), ("a",3), ("a",4)))
println(rdd.countByValue())
val stringToLong: collection.Map[String, Long] = rdd.countByKey()
println(stringToLong)
// Step3: 关闭环境
sc.stop()
}
}
输出结果:
Map((a,2) -> 1, (a,4) -> 1, (a,3) -> 1, (a,1) -> 1)
Map(a -> 4)
save 相关算子
函数签名
def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(
path: String,
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
函数说明
将数据保存到不同格式的文件中。
注意:saveAsSequenceFile() 方法对应的 RDD 的数据类型必须是 Key-Value 类型。
object RDD_Action_save {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 save
val rdd = sc.makeRDD(List(("a",1),("b",2),("c",3)), numSlices = 3)
rdd.saveAsTextFile("output")
rdd.saveAsObjectFile("output1")
rdd.saveAsSequenceFile("output2")
// Step3: 关闭环境
sc.stop()
}
}
在输出结果中:
saveAsObjectFile()、saveAsSequenceFile() 均为二级制码,不可阅读,而 saveAsTextFile() 输出是
foreach()
函数签名
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
函数说明
分布式遍历 RDD 中的每一个元素,调用指定函数。
object RDD_Action_foreach {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 foreach
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), numSlices = 3)
rdd.collect().foreach(println)
println("******************")
rdd.foreach(println)
// Step3: 关闭环境
sc.stop()
}
}
上述代码运行结果如下,
- rdd.collect().foreach(println):先调用 collect() 方法再调用 foreach() 算子,相当于在遍历 RDD 前,先进行分区级的数据采集后再进行数据遍历,打印操作在 Driver 端执行;
- rdd.foreach(println):而直接调用 foreach() 则不进行数据采集直接遍历,遍历也就不涉及排序,在 Executor 中执行打印操作,打印的结果也是乱序的。println 方法被分发到 Executor,在Executor 端进行计算。
1
2
3
4
5
6
******************
1
2
5
3
6
4
为了区分不同处理效果,将 RDD 的方法称为算子。RDD 方法外部的操作都是在 Driver 端执行,而方法内部的逻辑代码在 Executor 中执行。
关于 foreach() 的实例
object RDD_Action_foreach_Sample {
def main(args: Array[String]): Unit = {
// Step1: 准备环境
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
val sc = new SparkContext(sparkConf)
// Step2: 算子 foreach
val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6))
var user = new User()
rdd.foreach(
num =>{
println("age = " + (user.age + num))
}
)
// Step3: 关闭环境
sc.stop()
}
class User extends Serializable {
var age: Int = 30
}
}
User 如果不进行序列化,会报 SparkException: Task not serializable 因为User 对象需要在网络之间传递,从 Driver 传递到 Executor,因此需要进行序列化。
样例类在编译时,会自动混入序列化特性。因此在类定义前加 case 修饰符也也可让类达到序列化的效果。
case class User() {
var age: Int = 30
}
RDD 算子中传递的函数是会包含闭包操作,那么就会进行检测功能,进行闭包检测。
闭包检测:从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor 端执行。那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给 Executor 端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。
foreach() 源码中, clean() 就是闭包检测函数。
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
一直跟踪 clean() 函数,到达 clean() 函数的定义,其中:
- !isClosure(func.getClass):是 Spark 2.11 版本前的检测规则;
- maybeIndylambdaProxy.isEmpty:是 Spark 2.12 版本以后得检测规则;
- if (checkSerializable) :这段代码,确保闭包中的匿名函数对象能够被序列化。
private def clean(
func: AnyRef,
checkSerializable: Boolean,
cleanTransitively: Boolean,
accessedFields: Map[Class[_], Set[String]]): Unit = {
....
if (!isClosure(func.getClass) && maybeIndylambdaProxy.isEmpty) {
logDebug(s"Expected a closure; got ${func.getClass.getName}")
return
}
...
if (checkSerializable) {
ensureSerializable(func)
}
}
因此报序列化错误异常的位置在 ClosureCleaner$.ensureSerializable 处抛出。
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
附
在各种转换算子、行动算子学习后,特别整理了一个计算 wordcount 的聚合算子的合集,其中包括了 9 中计算 workcount 的逻辑。请大家参考。
spark-core/src/main/java/com/wuji1626/spark/cases/RDD_WordCount.scala · wuji1626/bigdata - Gitee.com