累加器

累加器(Accumulators)是通过一个组合和交替的操作,只能“加”的变量,因此,它能够有效地支持并行。他们常被用于实现计数器(例如在MapReduce中)或求和。Spark原生支持数值类型的计数器,程序开发人员可以增加对新类型的支持。

如果累加器使用名字被创建,他们将在Spark UI上显示。对于理解运行阶段的过程,这是非常有用的(注意:在Python里,还没有支持)。

Accumulators in Spark UI

通过调用SparkContext.accumulator(v),一个以v为初始值的累加器被创建。运行在一个集群上的任务,可以使用add方法或+=操作(在Scala和Python里)来对它进行加操作。然而,他们不能读它的值。只有驱动程序可以使用它的value方法来读取累加器的值。

以下代码显示了:一个累加器被用来对累加一个数组的元素。

scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Long = 10

尽管这个代码使用的是内置支持的Long类型的累加器,程序开发人员也可以通过继续[AccumulatorV2](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.AccumulatorV2)类来创建自己的类型。抽象类AccumulatorV2有一些方法需要重写:reset,用于重置累加器为0,add,用于加一个值到累加器,merge,用于合并另一个相同类型的累加器到当前累加器。其他需要重写的方法可以参考Scala API文档。例如,假设我们有一个MyVector类来呈现数学向量,我们可以写:

object VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
  val vec_ : MyVector = MyVector.createZeroVector
  def reset(): MyVector = {
    vec_.reset()
  }
  def add(v1: MyVector, v2: MyVector): MyVector = {
    vec_.add(v2)
  }
  ...
}

// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")

注意:当程序开发人员定义自己的AccumulatorV2类型时,结果类型与相加的元素,可以相同或不同。

对于累加器的更新仅仅是在行动操作(action)里面,Spark保证,每个任务对累加器的更新都只应用一次,例如,重起任务将不会更新这个值。在转换操作(transformations)中,用户应该知道,如果任务或作业阶段被重新执行,对每个任务的更新可能会应用多次。

累加器不能改变Spark的惰性评估模型。如果在一个对RDD的操作里面,他们正要被更新,他们的值仅仅更新一次,RDD是被作为一个行动的一部分被计算。因此,当在一个惰性转换操作(如map())情况下,累加器的更新是不能被保证执行的。以下代码片段描述了这个属性:

val accum = sc.accumulator(0)
data.map { x => accum += x; x }
// Here, accum is still 0 because no actions have caused the map operation to be computed.

results matching ""

    No results matching ""