spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Leonidas Fegaras <fega...@cse.uta.edu>
Subject Problem with File Streams
Date Fri, 16 Jan 2015 22:54:59 GMT
Dear Spark users,
I have a problem using File Streams in Java on Spark 1.2.0. I can 
process hadoop files in local mode using:

spark_context.newAPIHadoopFile(path,F.class,K.class,V.class,conf)

where F extends 
org.apache.hadoop.mapreduce.lib.input.FileInputFormat<K,V>. But when I 
try to to do the same thing in Spark Streaming using:

stream_context.<K,V,F>fileStream(directory)
.foreachRDD(new Function<JavaPairRDD<K,V>,Void>() {
                     public Void call ( JavaPairRDD<K,V> rdd ) throws 
Exception {
                        ...
                     }
                 });

and when I drop a new text file in the directory I get the following error:

15/01/16 16:29:43 ERROR scheduler.JobScheduler: Error generating jobs 
for time 1421447383000 ms
java.lang.ClassCastException: java.lang.Object cannot be cast to 
org.apache.hadoop.mapreduce.InputFormat
     at 
org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:91)
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
     at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:203)
     at scala.Option.getOrElse(Option.scala:120)
     at org.apache.spark.rdd.RDD.partitions(RDD.scala:203)
     at 
org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:236)
     at 
org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$3.apply(FileInputDStream.scala:234)
     at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
     at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
     at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
     at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
     at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
     at scala.collection.AbstractTraversable.map(Traversable.scala:105)
     at 
org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:234)
     at 
org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:128)
     at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:296)
     at 
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:288)
     at scala.Option.orElse(Option.scala:257)

Same error when I process hadoop sequence files. I am sure my input 
format F extends org.apache.hadoop.mapreduce.InputFormat. Any ideas?
Thank you
Leonidas Fegaras


Mime
View raw message