醋醋百科网

Good Luck To You!

10 RDD 行动算子(rdd行动操作)

行动算子触发作业执行的方法。底层代码调用的环境对象的 runJob 方法。底层代码中会创建 ActiveJob,并提交。与转换算子不同,行动算子直接出结果。而转换算子只是将 RDD 转换成 RDD,并不触发底层的任务执行。以 collect() 算子为例,collect() 源码中包含 runJob() 方法。

def collect(): Array[T] = withScope {
  val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  Array.concat(results: _*)
}

一层层的 runJob() 跟踪下去,最后在 runJob 的类中,看到 dagScheduler.runJob() 有向无环图的 scheduler 运行 Job。

def runJob[T, U: ClassTag](
...
  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
...
}

继续进入 runJob() 方法,会有 submitJob() 提交任务。

def runJob[T, U](
...
  val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
...
  }
}

继续跟进,eventProcessLoop.post(JobSubmitted()) 会提交 Job。

def submitJob[T, U](
...
  eventProcessLoop.post(JobSubmitted(
    jobId, rdd, func2, partitions.toArray, callSite, waiter,
    Utils.cloneProperties(properties)))
  waiter
}

既然有 JobSubmitted() 就会有 handleJobSubmitted() 其中有 new ActiveJob() 创建激活的作业。

private[scheduler] def handleJobSubmitted(jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties): Unit = {
     ...
     val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
     ...       
}

reduce()

函数签名

def reduce(f: (T, T) => T): T

函数说明

聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据。

object RDD_Action_reduce {

  def main(args: Array[String]): Unit = {

    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 reduce
    val rdd = sc.makeRDD(List(1, 2, 3, 4))

    val i = rdd.reduce(_ + _)
    println(i)
    // Step3: 关闭环境
    sc.stop()
  }

}

结果:10,直接出聚合结果。

collect()

采集算子,将不同分区的数据按照分区顺序采集到 Driver 端内存中,形成数组。

函数签名

def collect(): Array[T]

函数说明

在驱动程序中,以数组 Array 的形式返回数据集的所有元素。

object RDD_Action_collect {

  def main(args: Array[String]): Unit = {

    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 collect
    val rdd = sc.makeRDD(List(1, 2, 3, 4))
    rdd.collect().foreach(println)
    // Step3: 关闭环境
    sc.stop()
  }
}

返回结果:

1
2
3
4

count()

函数签名

def count(): Long

函数说明

返回 RDD 中元素的个数。

object RDD_Action_count {

  def main(args: Array[String]): Unit = {

    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 count
    val rdd = sc.makeRDD(List(1, 2, 3, 4))
    val count = rdd.count()
    println(count)
    // Step3: 关闭环境
    sc.stop()
  }

}

返回结果:4

first()

函数签名

def first(): T

函数说明

返回 RDD 中的第一个元素。

object RDD_Action_first {

  def main(args: Array[String]): Unit = {

    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 first
    val rdd = sc.makeRDD(List(1, 2, 3, 4))
    val first = rdd.first()
    println(first)
    // Step3: 关闭环境
    sc.stop()
  }

}

返回结果:1

take()

函数签名

def take(num: Int): Array[T]

函数说明

返回一个由 RDD 的前 n 个元素组成的数组。

object RDD_Action_collect {

  def main(args: Array[String]): Unit = {

    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 collect
    val rdd = sc.makeRDD(List(1, 2, 3, 4))
    println(rdd.take(3).mkString(","))
    // Step3: 关闭环境
    sc.stop()
  }
}

返回结果:

1,2,3

takeOrdered()

函数签名

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]

函数说明

object RDD_Action_takeOrdered {

  def main(args: Array[String]): Unit = {

    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 takeOrdered
    val rdd = sc.makeRDD(List(4, 2, 3, 1))
    println(rdd.takeOrdered(3).mkString(","))
    // Step3: 关闭环境
    sc.stop()
  }
}

可以选择排序方式,默认是升序排列,如果想要降序排列,需要指定第二个参数。

println(rdd.takeOrdered(3)(Ordering[Int].reverse).mkString(","))

输出结果:

4,3,2

aggregate()

函数签名

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

函数说明

分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合。

object RDD_Action_aggregate {

  def main(args: Array[String]): Unit = {

    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 aggregate
    val rdd = sc.makeRDD(List(4, 2, 3, 1), numSlices = 2)
    println(rdd.aggregate(0)(_ + _, _ + _))
    // Step3: 关闭环境
    sc.stop()
  }
}

输出结果:10

与 aggregateByKey() 区别:

  • aggregateByKey():初始值只会参与分区内计算;
  • aggregate():初始值会参与分区内计算,并且参与分区间计算。
object RDD_Action_aggregate {

  def main(args: Array[String]): Unit = {

    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 aggregate
    // 10+1+2  + 10+3+4 + 10 aggregate zeroValue 参与分区间计算,因此结果为 40
    val rdd = sc.makeRDD(List(1, 2, 3, 4),numSlices = 2)
    println(rdd.aggregate(10)(_ + _, _ + _))

    // 10+1+2 + 10+3+4 = 30 aggregateByKey() zeroValue 不参与分区间计算
    val keyRDD = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)), numSlices = 2)
    keyRDD.aggregateByKey(zeroValue = 10)(_+_,_+_).collect().foreach(println)

    // Step3: 关闭环境
    sc.stop()
  }
}

fold()

函数签名

def fold(zeroValue: T)(op: (T, T) => T): T

函数说明

折叠操作,aggregate 的简化版操作。

object RDD_Action_fold {

  def main(args: Array[String]): Unit = {

    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 fold
    val rdd = sc.makeRDD(List(1, 2, 3, 4),numSlices = 2)
    println(rdd.fold(10)(_ + _))

    // Step3: 关闭环境
    sc.stop()
  }
}

输出结果:10

countByValue()

函数签名

def countByValue()(implicit ord: Ordering[T] = null): Map[T, Long]

函数说明

计算 RDD 中值出现的次数,以 Map 的方式输出,Map 的 key 是原数组中的 Value,value 是值出现的次数。

object RDD_Action_countByValue {

  def main(args: Array[String]): Unit = {

    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 countByValue
    val rdd = sc.makeRDD(List(1, 2, 3, 4),numSlices = 2)
    println(rdd.countByValue())

    val rdd2 = sc.makeRDD(List(1, 1, 3, 1),numSlices = 2)
    println(rdd2.countByValue())
    // Step3: 关闭环境
    sc.stop()
  }
}

输出结果:

Map(4 -> 1, 2 -> 1, 1 -> 1, 3 -> 1)
Map(1 -> 3, 3 -> 1)

countByKey()

函数签名

def countByKey(): Map[K, Long]

函数说明

统计每种 key 的个数。

object RDD_Action_countByKey {

  def main(args: Array[String]): Unit = {

    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 countByKey
    val rdd = sc.makeRDD(List(("a",1), ("a",2), ("a",3), ("a",4)))
    println(rdd.countByValue())
    val stringToLong: collection.Map[String, Long] = rdd.countByKey()
    println(stringToLong)
    // Step3: 关闭环境
    sc.stop()
  }
}

输出结果:

Map((a,2) -> 1, (a,4) -> 1, (a,3) -> 1, (a,1) -> 1)
Map(a -> 4)

save 相关算子

函数签名

def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(
 path: String,
 codec: Option[Class[_ <: CompressionCodec]] = None): Unit

函数说明

将数据保存到不同格式的文件中。

注意:saveAsSequenceFile() 方法对应的 RDD 的数据类型必须是 Key-Value 类型。

object RDD_Action_save {

  def main(args: Array[String]): Unit = {

    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 save
    val rdd = sc.makeRDD(List(("a",1),("b",2),("c",3)), numSlices = 3)

    rdd.saveAsTextFile("output")
    rdd.saveAsObjectFile("output1")
    rdd.saveAsSequenceFile("output2")
    // Step3: 关闭环境
    sc.stop()
  }
}

在输出结果中:

saveAsObjectFile()、saveAsSequenceFile() 均为二级制码,不可阅读,而 saveAsTextFile() 输出是

foreach()

函数签名

def foreach(f: T => Unit): Unit = withScope {
 val cleanF = sc.clean(f)
 sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}

函数说明

分布式遍历 RDD 中的每一个元素,调用指定函数。

object RDD_Action_foreach {

  def main(args: Array[String]): Unit = {

    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 foreach
    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), numSlices = 3)

    rdd.collect().foreach(println)
    println("******************")
    rdd.foreach(println)
    // Step3: 关闭环境
    sc.stop()
  }

}

上述代码运行结果如下,

  • rdd.collect().foreach(println):先调用 collect() 方法再调用 foreach() 算子,相当于在遍历 RDD 前,先进行分区级的数据采集后再进行数据遍历,打印操作在 Driver 端执行;
  • rdd.foreach(println):而直接调用 foreach() 则不进行数据采集直接遍历,遍历也就不涉及排序,在 Executor 中执行打印操作,打印的结果也是乱序的。println 方法被分发到 Executor,在Executor 端进行计算。
1
2
3
4
5
6
******************
1
2
5
3
6
4

为了区分不同处理效果,将 RDD 的方法称为算子。RDD 方法外部的操作都是在 Driver 端执行,而方法内部的逻辑代码在 Executor 中执行。

关于 foreach() 的实例

object RDD_Action_foreach_Sample {

  def main(args: Array[String]): Unit = {

    // Step1: 准备环境
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    // Step2: 算子 foreach
    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6))

    var user = new User()

    rdd.foreach(
      num =>{
        println("age = " + (user.age + num))
      }
    )

    // Step3: 关闭环境
    sc.stop()
  }
  class User extends Serializable   {
    var age: Int = 30
  }
}

User 如果不进行序列化,会报 SparkException: Task not serializable 因为User 对象需要在网络之间传递,从 Driver 传递到 Executor,因此需要进行序列化。

样例类在编译时,会自动混入序列化特性。因此在类定义前加 case 修饰符也也可让类达到序列化的效果。

case class User() {
    var age: Int = 30
}

RDD 算子中传递的函数是会包含闭包操作,那么就会进行检测功能,进行闭包检测。

闭包检测:从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor 端执行。那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给 Executor 端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列化,这个操作我们称之为闭包检测。

foreach() 源码中, clean() 就是闭包检测函数。

def foreach(f: T => Unit): Unit = withScope {
  val cleanF = sc.clean(f)
  sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}

一直跟踪 clean() 函数,到达 clean() 函数的定义,其中:

  • !isClosure(func.getClass):是 Spark 2.11 版本前的检测规则;
  • maybeIndylambdaProxy.isEmpty:是 Spark 2.12 版本以后得检测规则;
  • if (checkSerializable) :这段代码,确保闭包中的匿名函数对象能够被序列化。
private def clean(
    func: AnyRef,
    checkSerializable: Boolean,
    cleanTransitively: Boolean,
    accessedFields: Map[Class[_], Set[String]]): Unit = {
....

  if (!isClosure(func.getClass) && maybeIndylambdaProxy.isEmpty) {
    logDebug(s"Expected a closure; got ${func.getClass.getName}")
    return
  }
  ...
  if (checkSerializable) {
      ensureSerializable(func)
  }
}

因此报序列化错误异常的位置在 ClosureCleaner$.ensureSerializable 处抛出。

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)

在各种转换算子、行动算子学习后,特别整理了一个计算 wordcount 的聚合算子的合集,其中包括了 9 中计算 workcount 的逻辑。请大家参考。

spark-core/src/main/java/com/wuji1626/spark/cases/RDD_WordCount.scala · wuji1626/bigdata - Gitee.com

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言