val rdd = sc.makeRDD( List( "scala", "scala", "scala", "scala", "scala", "spark", "spark", "spark", ) )
val acc = new WordCountAccumulator()
sc.register(acc, "WordCount")
rdd.foreach( word => { acc.add(word) } )
println(acc.value)
sc.stop() }
class WordCountAccumulator extends AccumulatorV2[String, mutable.Map[String, Int]] {
private val wcMap = mutable.Map[String, Int]()
override def isZero: Boolean = { wcMap.isEmpty }
override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = { new WordCountAccumulator() }
override def reset(): Unit = { wcMap.clear() }
override def add(word: String): Unit = { val oldCnt = wcMap.getOrElse(word, 0) wcMap.update(word, oldCnt + 1) }
override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = { other.value.foreach { case (word, cnt) => { val oldCnt = wcMap.getOrElse(word, 0) wcMap.update(word, oldCnt + cnt) } } }
override def value: mutable.Map[String, Int] = wcMap }
|