累加器
累加器(Accumulators)是通过一个组合和交替的操作,只能“加”的变量,因此,它能够有效地支持并行。他们常被用于实现计数器(例如在MapReduce中)或求和。Spark原生支持数值类型的计数器,程序开发人员可以增加对新类型的支持。
如果累加器使用名字被创建,他们将在Spark UI上显示。对于理解运行阶段的过程,这是非常有用的(注意:在Python里,还没有支持)。
通过调用SparkContext.accumulator(v)
,一个以v为初始值的累加器被创建。运行在一个集群上的任务,可以使用add
方法或+=
操作(在Scala和Python里)来对它进行加操作。然而,他们不能读它的值。只有驱动程序可以使用它的value方法来读取累加器的值。
以下代码显示了:一个累加器被用来对累加一个数组的元素。
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Long = 10
尽管这个代码使用的是内置支持的Long类型的累加器,程序开发人员也可以通过继续[AccumulatorV2](http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.AccumulatorV2)
类来创建自己的类型。抽象类AccumulatorV2
有一些方法需要重写:reset
,用于重置累加器为0,add
,用于加一个值到累加器,merge
,用于合并另一个相同类型的累加器到当前累加器。其他需要重写的方法可以参考Scala API文档。例如,假设我们有一个MyVector类来呈现数学向量,我们可以写:
object VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
val vec_ : MyVector = MyVector.createZeroVector
def reset(): MyVector = {
vec_.reset()
}
def add(v1: MyVector, v2: MyVector): MyVector = {
vec_.add(v2)
}
...
}
// Then, create an Accumulator of this type:
val myVectorAcc = new VectorAccumulatorV2
// Then, register it into spark context:
sc.register(myVectorAcc, "MyVectorAcc1")
注意:当程序开发人员定义自己的AccumulatorV2
类型时,结果类型与相加的元素,可以相同或不同。
对于累加器的更新仅仅是在行动操作(action)里面,Spark保证,每个任务对累加器的更新都只应用一次,例如,重起任务将不会更新这个值。在转换操作(transformations)中,用户应该知道,如果任务或作业阶段被重新执行,对每个任务的更新可能会应用多次。
累加器不能改变Spark的惰性评估模型。如果在一个对RDD的操作里面,他们正要被更新,他们的值仅仅更新一次,RDD是被作为一个行动的一部分被计算。因此,当在一个惰性转换操作(如map())情况下,累加器的更新是不能被保证执行的。以下代码片段描述了这个属性:
val accum = sc.accumulator(0)
data.map { x => accum += x; x }
// Here, accum is still 0 because no actions have caused the map operation to be computed.