spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Patrick Wendell (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-5297) File Streams do not work with custom key/values
Date Wed, 21 Jan 2015 07:39:38 GMT

     [ https://issues.apache.org/jira/browse/SPARK-5297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Patrick Wendell updated SPARK-5297:
-----------------------------------
    Labels: backport-needed  (was: )

> File Streams do not work with custom key/values
> -----------------------------------------------
>
>                 Key: SPARK-5297
>                 URL: https://issues.apache.org/jira/browse/SPARK-5297
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.2.0
>            Reporter: Leonidas Fegaras
>            Assignee: Saisai Shao
>              Labels: backport-needed
>             Fix For: 1.3.0
>
>
> The following code:
> {code}
> stream_context.<K,V,SequenceFileInputFormat<K,V>>fileStream(directory)
> .foreachRDD(new Function<JavaPairRDD<K,V>,Void>() {
>      public Void call ( JavaPairRDD<K,V> rdd ) throws Exception {
>          for ( Tuple2<K,V> x: rdd.collect() )
>              System.out.println("# "+x._1+" "+x._2);
>          return null;
>      }
>   });
> stream_context.start();
> stream_context.awaitTermination();
> {code}
> for custom (serializable) classes K and V compiles fine but gives an error
> when I drop a new hadoop sequence file in the directory:
> {quote}
> 15/01/17 09:13:59 ERROR scheduler.JobScheduler: Error generating jobs for time 1421507639000
ms
> java.lang.ClassCastException: java.lang.Object cannot be cast to org.apache.hadoop.mapreduce.InputFormat
> 	at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:91)
> 	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
> 	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
> 	at scala.Option.getOrElse(Option.scala:120)
> 	at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
> 	at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:236)
> 	at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:234)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> 	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> 	at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> 	at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:234)
> 	at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:128)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:296)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)
> 	at scala.Option.orElse(Option.scala:257)
> {quote}
> The same classes K and V work fine for non-streaming Spark:
> {code}
> spark_context.newAPIHadoopFile(path,F.class,K.class,SequenceFileInputFormat.class,conf)
> {code}
> also streaming works fine for TextFileInputFormat.
> The issue is that class manifests are erased to object in the Java file stream constructor,
but those are relied on downstream when creating the Hadoop RDD that backs each batch of the
file stream.
> https://github.com/apache/spark/blob/v1.2.0/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala#L263
> https://github.com/apache/spark/blob/v1.2.0/core/src/main/scala/org/apache/spark/SparkContext.scala#L753



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message