spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mingyu Kim <m...@palantir.com>
Subject An open HDFS connection fails RDD.take()
Date Fri, 10 Jan 2014 00:25:05 GMT
Here¹s a snippet of code that throws exception. I create a FileSystem object
to an HDFS and tries to read a csv in the HDFS as RDD and do take().

> public static void main(String[] args) throws IOException {
>     Configuration conf = new Configuration(false);
>     conf.set("fs.default.name", "hdfs://localhost:8020");
>     conf.set("fs.hdfs.impl", DistributedFileSystem.class.getCanonicalName());
>     FileSystem fileSystem = FileSystem.get(conf);
>     // fileSystem.close();
>  
>  
>     JavaSparkContext sc = new JavaSparkContext("spark://localhost:7077",
> ³MySpark", "/path/to/spark", new String[]{});
>     JavaRDD<String> rdd = sc.textFile("hdfs://localhost:8020/path/to/csv");
>     System.out.println(rdd.take(300));
> }
> 
It throws the following exception.

> Exception in thread "main" java.lang.IllegalStateException: Must not use
> direct buffers with InputStream API
> 
> at com.google.common.base.Preconditions.checkState(Preconditions.java:149)
> 
> at 
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(Packet
> Receiver.java:211)
> 
> at 
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketRecei
> ver.java:134)
> 
> at 
> org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(
> PacketReceiver.java:102)
> 
> at 
> org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.ja
> va:164)
> 
> at org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:129)
> 
> at 
> org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.
> java:559)
> 
> at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:611)
> 
> at 
> 
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:665>
)
> 
> at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:706)
> 
> at java.io.DataInputStream.read(DataInputStream.java:100)
> 
> at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
> 
> at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
> 
> at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:160)
> 
> at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:38)
> 
> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:103)
> 
> at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:83)
> 
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> 
> at scala.collection.Iterator$$anon$19.hasNext(Iterator.scala:400)
> 
> at scala.collection.Iterator$$anon$18.hasNext(Iterator.scala:381)
> 
> at scala.collection.Iterator$class.foreach(Iterator.scala:772)
> 
> at scala.collection.Iterator$$anon$18.foreach(Iterator.scala:379)
> 
> at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> 
> at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:102)
> 
> at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:250)
> 
> at scala.collection.Iterator$$anon$18.toBuffer(Iterator.scala:379)
> 
> at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:237)
> 
> at scala.collection.Iterator$$anon$18.toArray(Iterator.scala:379)
> 
> at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:768)
> 
> at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:768)
> 
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758)
> 
> at 
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:758)
> 
> at 
> org.apache.spark.scheduler.DAGScheduler.runLocallyWithinThread(DAGScheduler.sc
> ala:484)
> 
> at org.apache.spark.scheduler.DAGScheduler$$anon$2.run(DAGScheduler.scala:470)


However, if I comment back in ³fileSystem.close() in the original code,
take() finishes successfully.

This happens not only on my local machine. It also happens on EC2. Is this a
bug in Spark or am I using spark and HDFS in a wrong way?

Thanks,
Mingyu



Mime
View raw message