Hi All,

Continuing on this discussion... Is there a good reason why the def of "saveAsNewAPIHadoopFiles" in org/apache/spark/streaming/api/java/JavaPairDStream.scala is defined like this - 

def saveAsNewAPIHadoopFiles(
      prefix: String,
      suffix: String,
      keyClass: Class[_],
      valueClass: Class[_],
      outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
      conf: Configuration = new Configuration) {
      dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
  }


As pointed out earlier due to type erasure on the Java side we have to add this code to keep the compiler quite 

@SuppressWarnings("unchecked")
Class<? extends OutputFormat<?,?>> outputFormatClass = (Class<? extends OutputFormat<?,?>>) (Class<?>) SequenceFileOutputFormat.class;

Its works fine, but adds a layer of confusion and inconsistency when compared to its counterpart from the regular RDD....


saveAsNewAPIHadoopFile as defined in spark / core / src / main / scala / org / apache / spark / api / java / JavaPairRDD.scala

  /** Output the RDD to any Hadoop-supported file system. */
  def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]](
      path: String,
      keyClass: Class[_],
      valueClass: Class[_],
      outputFormatClass: Class[F],
      conf: Configuration) {
    rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf)
  }



So, is it possible to change the code for "saveAsNewAPIHadoopFiles" in org/apache/spark/streaming/api/java/JavaPairDStream.scala to the following - 

def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[_, _]](
      prefix: String,
      suffix: String,
      keyClass: Class[_],
      valueClass: Class[_],
      outputFormatClass: Class[F],
      conf: Configuration = new Configuration) {
      dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
  }


Less confusion, more readability and better consistency...

-abe


On Mon, Oct 6, 2014 at 1:51 PM, Abraham Jacob <abe.jacobs@gmail.com> wrote:
Sean, 

Thanks a ton Sean... This is exactly what I was looking for. 

As mentioned in the code - 

    // This horrible, separate declaration is necessary to appease the compiler
    @SuppressWarnings("unchecked")
    Class<? extends OutputFormat<?,?>> outputFormatClass = (Class<? extends OutputFormat<?,?>>) (Class<?>) SequenceFileOutputFormat.class;
    writableDStream.saveAsNewAPIHadoopFiles(dataDirString + "/oryx", "data", keyWritableClass, messageWritableClass, outputFormatClass, streamingContext.sparkContext().hadoopConfiguration());

I was just having a hard time with the OutputFormatClass parameter.  The scala code in org/apache/spark/streaming/api/java/JavaPairDStream.scala defines saveAsNewAPIHadoopFiles as the following - 

  /**
   * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
   * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
   */
  def saveAsNewAPIHadoopFiles(
      prefix: String,
      suffix: String,
      keyClass: Class[_],
      valueClass: Class[_],
      outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
      conf: Configuration = new Configuration) {
      dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
  }

The problem is that Class[_ <: NewOutputFormat[_, _]] in scala cannot be assigned as the following (say you are using TextOutputFormat & Text as KeyClass and IntWritable as ValueClass)  TextOuputFormat<Text, IntWritable>.class in java due to 'type erasure". The parameterized types lose there type arguments when they are translated to byte code during compilation.


Looks like adding this works - 

@SuppressWarnings("unchecked")
 Class<? extends OutputFormat<?,?>> outputFormatClass = (Class<? extends OutputFormat<?,?>>) (Class<?>) SequenceFileOutputFormat.class;


Thanks again Sean...


On Mon, Oct 6, 2014 at 12:23 PM, Sean Owen <sowen@cloudera.com> wrote:
Here's an example:

https://github.com/OryxProject/oryx/blob/master/oryx-lambda/src/main/java/com/cloudera/oryx/lambda/BatchLayer.java#L131

On Mon, Oct 6, 2014 at 7:39 PM, Abraham Jacob <abe.jacobs@gmail.com> wrote:
> Hi All,
>
> Would really appreciate from the community if anyone has implemented the
> saveAsNewAPIHadoopFiles method in "Java" found in the
> org.apache.spark.streaming.api.java.JavaPairDStream<K,V>
>
> Any code snippet or online link would be greatly appreciated.
>
> Regards,
> Jacob
>
>



--
~



--
~