spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Pat Ferrel <...@occamsmachete.com>
Subject rdd.saveAsSequenceFile(path)
Date Sat, 27 Jun 2015 21:46:13 GMT
Our project is having a hard time following what we are supposed to do to migrate this function
from Spark 1.2 to 1.3.

  /**
   * Dump matrix as computed Mahout's DRM into specified (HD)FS path
   * @param path
   */
  def dfsWrite(path: String) = {
    val ktag = implicitly[ClassTag[K]]
    //val vtag = implicitly[ClassTag[Vector]]

    implicit val k2wFunc: (K) => Writable =
      if (ktag.runtimeClass == classOf[Int]) (x: K) => new IntWritable(x.asInstanceOf[Int])
      else if (ktag.runtimeClass == classOf[String]) (x: K) => new Text(x.asInstanceOf[String])
      else if (ktag.runtimeClass == classOf[Long]) (x: K) => new LongWritable(x.asInstanceOf[Long])
      else if (classOf[Writable].isAssignableFrom(ktag.runtimeClass)) (x: K) => x.asInstanceOf[Writable]
      else throw new IllegalArgumentException("Do not know how to convert class tag %s to
Writable.".format(ktag))

// the problem is here =====
    // this worked in Spark 1.2 and as we understand things should in 1.3 if we have the right
implicits
    //  rdd.saveAsSequenceFile(path)

    // this works in Spark 1.3 but uses a deprecated method
    SparkContext.rddToSequenceFileRDDFunctions(rdd.asInstanceOf[RDD[(K, Vector)]]).saveAsSequenceFile(path)
  }

As we understand it, we need to supply implicit writeable factories now instead of writables?
The rdd is a sequence of key = one of the classes above, value = a Mahout “Vector". These
are usually serialized through Kryo (not JavaSerializer) for closures so we have compatible
classes for that. 

Any pointers would be helpful.
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message