spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andre Bois-Crettez <andre.b...@kelkoo.com>
Subject Re: Reading from .bz2 files with Spark
Date Thu, 15 May 2014 09:12:47 GMT
We never saw your exception when reading bzip2 files with spark.

But when we wrongly compiled spark against older version of hadoop (was
default in spark), we ended up with sequential reading of bzip2 file,
not taking advantage of block splits to work in parallel.
Once we compiled spark with SPARK_HADOOP_VERSION=2.2.0, files were read
in parallel, as expected with a recent hadoop.

http://spark.apache.org/docs/0.9.1/#a-note-about-hadoop-versions

Make sure Spark is compiled against Hadoop v2

André

On 2014-05-13 18:08, Xiangrui Meng wrote:
> Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes
> the problem you described, but it does contain several fixes to bzip2
> format. -Xiangrui
>
> On Wed, May 7, 2014 at 9:19 PM, Andrew Ash <andrew@andrewash.com> wrote:
>> Hi all,
>>
>> Is anyone reading and writing to .bz2 files stored in HDFS from Spark with
>> success?
>>
>>
>> I'm finding the following results on a recent commit (756c96 from 24hr ago)
>> and CDH 4.4.0:
>>
>> Works: val r = sc.textFile("/user/aa/myfile.bz2").count
>> Doesn't work: val r = sc.textFile("/user/aa/myfile.bz2").map((s:String) =>
>> s+"| " ).count
>>
>> Specifically, I'm getting an exception coming out of the bzip2 libraries
>> (see below stacktraces), which is unusual because I'm able to read from that
>> file without an issue using the same libraries via Pig.  It was originally
>> created from Pig as well.
>>
>> Digging a little deeper I found this line in the .bz2 decompressor's javadoc
>> for CBZip2InputStream:
>>
>> "Instances of this class are not threadsafe." [source]
>>
>>
>> My current working theory is that Spark has a much higher level of
>> parallelism than Pig/Hadoop does and thus I get these wild IndexOutOfBounds
>> exceptions much more frequently (as in can't finish a run over a little 2M
>> row file) vs hardly at all in other libraries.
>>
>> The only other reference I could find to the issue was in presto-users, but
>> the recommendation to leave .bz2 for .lzo doesn't help if I actually do want
>> the higher compression levels of .bz2.
>>
>>
>> Would love to hear if I have some kind of configuration issue or if there's
>> a bug in .bz2 that's fixed in later versions of CDH, or generally any other
>> thoughts on the issue.
>>
>>
>> Thanks!
>> Andrew
>>
>>
>>
>> Below are examples of some exceptions I'm getting:
>>
>> 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
>> java.lang.ArrayIndexOutOfBoundsException
>> java.lang.ArrayIndexOutOfBoundsException: 65535
>>          at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663)
>>          at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790)
>>          at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762)
>>          at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798)
>>          at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
>>          at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
>>          at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
>>          at
>> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
>>          at java.io.InputStream.read(InputStream.java:101)
>>          at
>> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
>>          at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
>>          at
>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
>>          at
>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
>>
>>
>>
>>
>> java.lang.ArrayIndexOutOfBoundsException: 900000
>>          at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900)
>>          at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
>>          at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
>>          at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
>>          at
>> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
>>          at java.io.InputStream.read(InputStream.java:101)
>>          at
>> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
>>          at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
>>          at
>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
>>          at
>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
>>          at
>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
>>          at
>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
>>          at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>          at
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35)
>>          at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>          at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>          at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>          at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>          at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>          at
>> org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868)
>>
>>
>>
>> java.lang.ArrayIndexOutOfBoundsException: -921878509
>>          at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode0(CBZip2InputStream.java:1011)
>>          at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:826)
>>          at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
>>          at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
>>          at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
>>          at
>> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:432)
>>          at java.io.InputStream.read(InputStream.java:101)
>>          at
>> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
>>          at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
>>          at
>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
>>          at
>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
>>          at
>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
>>          at
>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
>>          at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>          at
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35)
>>          at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>          at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>          at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>          at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>          at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>          at
>> org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868)
>>          at org.apache.spark.rdd.RDD$$anonfun$24.apply(RDD.scala:879)
>>          at org.apache.spark.rdd.RDD$$anonfun$24.apply(RDD.scala:879)
>>          at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:548)
>>          at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:548)
>>
>>
>>
>> java.lang.ArrayIndexOutOfBoundsException: -1321104434
>>          at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode0(CBZip2InputStream.java:1011)
>>          at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:826)
>>          at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
>>          at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
>>          at
>> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
>>          at
>> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
>>          at java.io.InputStream.read(InputStream.java:101)
>>          at
>> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
>>          at org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
>>          at
>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
>>          at
>> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
>>          at
>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
>>          at
>> org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
>>          at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>          at
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35)
>>          at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>          at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>          at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>          at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>          at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>          at
>> org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868)


--
André Bois-Crettez

Software Architect
Big Data Developer
http://www.kelkoo.com/


Kelkoo SAS
Société par Actions Simplifiée
Au capital de € 4.168.964,30
Siège social : 8, rue du Sentier 75002 Paris
425 093 069 RCS Paris

Ce message et les pièces jointes sont confidentiels et établis à l'attention exclusive
de leurs destinataires. Si vous n'êtes pas le destinataire de ce message, merci de le détruire
et d'en avertir l'expéditeur.

Mime
View raw message