[ https://issues.apache.org/jira/browse/SPARK-5297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Patrick Wendell updated SPARK-5297:
-----------------------------------
Assignee: Saisai Shao
> File Streams do not work with custom key/values
> -----------------------------------------------
>
> Key: SPARK-5297
> URL: https://issues.apache.org/jira/browse/SPARK-5297
> Project: Spark
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 1.2.0
> Reporter: Leonidas Fegaras
> Assignee: Saisai Shao
> Labels: backport-needed
> Fix For: 1.3.0
>
>
> The following code:
> {code}
> stream_context.<K,V,SequenceFileInputFormat<K,V>>fileStream(directory)
> .foreachRDD(new Function<JavaPairRDD<K,V>,Void>() {
> public Void call ( JavaPairRDD<K,V> rdd ) throws Exception {
> for ( Tuple2<K,V> x: rdd.collect() )
> System.out.println("# "+x._1+" "+x._2);
> return null;
> }
> });
> stream_context.start();
> stream_context.awaitTermination();
> {code}
> for custom (serializable) classes K and V compiles fine but gives an error
> when I drop a new hadoop sequence file in the directory:
> {quote}
> 15/01/17 09:13:59 ERROR scheduler.JobScheduler: Error generating jobs for time 1421507639000
ms
> java.lang.ClassCastException: java.lang.Object cannot be cast to org.apache.hadoop.mapreduce.InputFormat
> at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:91)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
> at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:236)
> at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:234)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:234)
> at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:128)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:296)
> at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)
> at scala.Option.orElse(Option.scala:257)
> {quote}
> The same classes K and V work fine for non-streaming Spark:
> {code}
> spark_context.newAPIHadoopFile(path,F.class,K.class,SequenceFileInputFormat.class,conf)
> {code}
> also streaming works fine for TextFileInputFormat.
> The issue is that class manifests are erased to object in the Java file stream constructor,
but those are relied on downstream when creating the Hadoop RDD that backs each batch of the
file stream.
> https://github.com/apache/spark/blob/v1.2.0/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala#L263
> https://github.com/apache/spark/blob/v1.2.0/core/src/main/scala/org/apache/spark/SparkContext.scala#L753
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org
|