spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Leonidas Fegaras <fega...@cse.uta.edu>
Subject Re: Problem with File Streams
Date Sat, 17 Jan 2015 15:38:15 GMT
My key/value classes are custom serializable classes. It looks like a 
bug. So I filed it on JIRA as SPARK-5297
Thanks
Leonidas

On 01/17/2015 03:07 AM, Akhil Das wrote:
> Try:
>
>     JavaPairDStream<String, String> foo = ssc.<String, String,
>     SequenceFileInputFormat>fileStream("/sigmoid/foo");
>
>
> Thanks
> Best Regards
>
> On Sat, Jan 17, 2015 at 4:24 AM, Leonidas Fegaras <fegaras@cse.uta.edu 
> <mailto:fegaras@cse.uta.edu>> wrote:
>
>     Dear Spark users,
>     I have a problem using File Streams in Java on Spark 1.2.0. I can
>     process hadoop files in local mode using:
>
>     spark_context.newAPIHadoopFile(path,F.class,K.class,V.class,conf)
>
>     where F extends
>     org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V>. But
>     when I try to to do the same thing in Spark Streaming using:
>
>     stream_context.<K,V,F>fileStream(directory)
>     .foreachRDD(new Function<JavaPairRDD<K,V>,Void>() {
>                         public Void call ( JavaPairRDD<K,V> rdd )
>     throws Exception {
>                            ...
>                         }
>                     });
>
>     and when I drop a new text file in the directory I get the
>     following error:
>
>     15/01/16 16:29:43 ERROR scheduler.JobScheduler: Error generating
>     jobs for time 1421447383000 ms
>     java.lang.ClassCastException: java.lang.Object cannot be cast to
>     org.apache.hadoop.mapreduce.InputFormat
>         at
>     org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:91)
>         at
>     org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
>         at
>     org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
>         at scala.Option.getOrElse(Option.scala:120)
>         at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
>         at
>     org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:236)
>         at
>     org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:234)
>         at
>     scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at
>     scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>         at
>     scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>         at
>     scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>         at
>     scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>         at org.apache.spark.streaming.dstream.FileInputDStream.org
>     <http://org.apache.spark.streaming.dstream.FileInputDStream.org>$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:234)
>         at
>     org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:128)
>         at
>     org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:296)
>         at
>     org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)
>         at scala.Option.orElse(Option.scala:257)
>
>     Same error when I process hadoop sequence files. I am sure my
>     input format F extends org.apache.hadoop.mapreduce.InputFormat.
>     Any ideas?
>     Thank you
>     Leonidas Fegaras
>
>


Mime
View raw message