spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Patrick Wendell (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-5297) File Streams do not work with custom key/values
Date Tue, 20 Jan 2015 06:48:36 GMT

     [ https://issues.apache.org/jira/browse/SPARK-5297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Patrick Wendell updated SPARK-5297:
-----------------------------------
    Description: 
The following code:
{code}
stream_context.<K,V,SequenceFileInputFormat<K,V>>fileStream(directory)
.foreachRDD(new Function<JavaPairRDD<K,V>,Void>() {
     public Void call ( JavaPairRDD<K,V> rdd ) throws Exception {
         for ( Tuple2<K,V> x: rdd.collect() )
             System.out.println("# "+x._1+" "+x._2);
         return null;
     }
  });
stream_context.start();
stream_context.awaitTermination();
{code}
for custom (serializable) classes K and V compiles fine but gives an error
when I drop a new hadoop sequence file in the directory:
{quote}
15/01/17 09:13:59 ERROR scheduler.JobScheduler: Error generating jobs for time 1421507639000
ms
java.lang.ClassCastException: java.lang.Object cannot be cast to org.apache.hadoop.mapreduce.InputFormat
	at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:91)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
	at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:236)
	at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
	at scala.collection.AbstractTraversable.map(Traversable.scala:105)
	at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:234)
	at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:128)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:296)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)
	at scala.Option.orElse(Option.scala:257)
{quote}
The same classes K and V work fine for non-streaming Spark:
{code}
spark_context.newAPIHadoopFile(path,F.class,K.class,SequenceFileInputFormat.class,conf)
{code}
also streaming works fine for TextFileInputFormat.

The issue is that class manifests are erased to object in the Java file stream constructor,
but those are relied on downstream when creating the Hadoop RDD that backs each batch of the
file stream.

https://github.com/apache/spark/blob/v1.2.0/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala#L263
https://github.com/apache/spark/blob/v1.2.0/core/src/main/scala/org/apache/spark/SparkContext.scala#L753


  was:
The following code:
{code}
stream_context.<K,V,SequenceFileInputFormat<K,V>>fileStream(directory)
.foreachRDD(new Function<JavaPairRDD<K,V>,Void>() {
     public Void call ( JavaPairRDD<K,V> rdd ) throws Exception {
         for ( Tuple2<K,V> x: rdd.collect() )
             System.out.println("# "+x._1+" "+x._2);
         return null;
     }
  });
stream_context.start();
stream_context.awaitTermination();
{code}
for custom (serializable) classes K and V compiles fine but gives an error
when I drop a new hadoop sequence file in the directory:
{quote}
15/01/17 09:13:59 ERROR scheduler.JobScheduler: Error generating jobs for time 1421507639000
ms
java.lang.ClassCastException: java.lang.Object cannot be cast to org.apache.hadoop.mapreduce.InputFormat
	at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:91)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
	at scala.Option.getOrElse(Option.scala:120)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
	at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:236)
	at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
	at scala.collection.AbstractTraversable.map(Traversable.scala:105)
	at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:234)
	at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:128)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:296)
	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)
	at scala.Option.orElse(Option.scala:257)
{quote}
The same classes K and V work fine for non-streaming Spark:
{code}
spark_context.newAPIHadoopFile(path,F.class,K.class,SequenceFileInputFormat.class,conf)
{code}
also streaming works fine for TextFileInputFormat.



> File Streams do not work with custom key/values
> -----------------------------------------------
>
>                 Key: SPARK-5297
>                 URL: https://issues.apache.org/jira/browse/SPARK-5297
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.2.0
>            Reporter: Leonidas Fegaras
>
> The following code:
> {code}
> stream_context.<K,V,SequenceFileInputFormat<K,V>>fileStream(directory)
> .foreachRDD(new Function<JavaPairRDD<K,V>,Void>() {
>      public Void call ( JavaPairRDD<K,V> rdd ) throws Exception {
>          for ( Tuple2<K,V> x: rdd.collect() )
>              System.out.println("# "+x._1+" "+x._2);
>          return null;
>      }
>   });
> stream_context.start();
> stream_context.awaitTermination();
> {code}
> for custom (serializable) classes K and V compiles fine but gives an error
> when I drop a new hadoop sequence file in the directory:
> {quote}
> 15/01/17 09:13:59 ERROR scheduler.JobScheduler: Error generating jobs for time 1421507639000
ms
> java.lang.ClassCastException: java.lang.Object cannot be cast to org.apache.hadoop.mapreduce.InputFormat
> 	at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:91)
> 	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
> 	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
> 	at scala.Option.getOrElse(Option.scala:120)
> 	at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
> 	at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:236)
> 	at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:234)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> 	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> 	at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> 	at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:234)
> 	at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:128)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:296)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)
> 	at scala.Option.orElse(Option.scala:257)
> {quote}
> The same classes K and V work fine for non-streaming Spark:
> {code}
> spark_context.newAPIHadoopFile(path,F.class,K.class,SequenceFileInputFormat.class,conf)
> {code}
> also streaming works fine for TextFileInputFormat.
> The issue is that class manifests are erased to object in the Java file stream constructor,
but those are relied on downstream when creating the Hadoop RDD that backs each batch of the
file stream.
> https://github.com/apache/spark/blob/v1.2.0/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala#L263
> https://github.com/apache/spark/blob/v1.2.0/core/src/main/scala/org/apache/spark/SparkContext.scala#L753



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message