spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sean Owen (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-5297) JavaStreamingContext.fileStream won't work because type info isn't propagated
Date Sat, 24 Jan 2015 14:43:34 GMT

     [ https://issues.apache.org/jira/browse/SPARK-5297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Sean Owen updated SPARK-5297:
-----------------------------
    Summary: JavaStreamingContext.fileStream won't work because type info isn't propagated
 (was: File Streams do not work with custom key/values)

> JavaStreamingContext.fileStream won't work because type info isn't propagated
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-5297
>                 URL: https://issues.apache.org/jira/browse/SPARK-5297
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.2.0
>            Reporter: Leonidas Fegaras
>            Assignee: Saisai Shao
>              Labels: backport-needed
>             Fix For: 1.3.0
>
>
> The following code:
> {code}
> stream_context.<K,V,SequenceFileInputFormat<K,V>>fileStream(directory)
> .foreachRDD(new Function<JavaPairRDD<K,V>,Void>() {
>      public Void call ( JavaPairRDD<K,V> rdd ) throws Exception {
>          for ( Tuple2<K,V> x: rdd.collect() )
>              System.out.println("# "+x._1+" "+x._2);
>          return null;
>      }
>   });
> stream_context.start();
> stream_context.awaitTermination();
> {code}
> for custom (serializable) classes K and V compiles fine but gives an error
> when I drop a new hadoop sequence file in the directory:
> {quote}
> 15/01/17 09:13:59 ERROR scheduler.JobScheduler: Error generating jobs for time 1421507639000
ms
> java.lang.ClassCastException: java.lang.Object cannot be cast to org.apache.hadoop.mapreduce.InputFormat
> 	at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:91)
> 	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
> 	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
> 	at scala.Option.getOrElse(Option.scala:120)
> 	at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
> 	at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:236)
> 	at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:234)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> 	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> 	at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> 	at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:234)
> 	at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:128)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:296)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)
> 	at scala.Option.orElse(Option.scala:257)
> {quote}
> The same classes K and V work fine for non-streaming Spark:
> {code}
> spark_context.newAPIHadoopFile(path,F.class,K.class,SequenceFileInputFormat.class,conf)
> {code}
> also streaming works fine for TextFileInputFormat.
> The issue is that class manifests are erased to object in the Java file stream constructor,
but those are relied on downstream when creating the Hadoop RDD that backs each batch of the
file stream.
> https://github.com/apache/spark/blob/v1.2.0/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala#L263
> https://github.com/apache/spark/blob/v1.2.0/core/src/main/scala/org/apache/spark/SparkContext.scala#L753



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message