spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 张仪yf1 <zhangyi...@hikvision.com>
Subject an problem about zippartition
Date Tue, 13 Oct 2015 11:09:11 GMT



Hi,there
         I problem an issue when using the zippartition, first I created a rdd from a seq,then
created another one,and zippartitioned them with rdd3, then cached the rdd3,then created a
new rdd ,and zippartitioned it with rdd3.I repeat this operation many times, and I found that,
The task serialized became bigger and bigger, the serialize time cost became bigger too.Does
anyone else encounter the same problem ,please help.
Code:
def main(args: Array[String]) {
  var i: Int = 0
  var seq = Seq[String]()
  while (i < 720) {
    {
      seq = seq.+:("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
    }
    i = i + 1
  }
  val sc = new SparkContext("spark://hdh010016146205:7077", "ScalaTest")
  var rdd0: RDD[String] = null
  var j = 0;
  while (j < 200) {
    j = j + 1
    val rdd1 = sc.parallelize(seq, 72)
    if (rdd0 == null) {
      rdd0 = rdd1
      rdd0.cache()
      rdd0.count()
    } else {
      val rdd2 = new BillZippedPartitionsRDD2[String, String, String](sc, { (thisIter, otherIter)
=>
        //        val rdd2 = rdd0.zipPartitions(rdd1, true)({ (thisIter, otherIter) =>
        new Iterator[String] {
          def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match {
            case (true, true) => true
            case (false, false) => false
            case _ => throw new SparkException("Can only zip RDDs with " +
              "same number of elements in each partition")
          }
          def next(): String = (thisIter.next() + "--" + otherIter.next())
        }
      }, rdd0, rdd1, false)
      rdd2.cache()
      rdd2.count()
      rdd0 = rdd2
    }
  }
}

Mime
View raw message