spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Xiangrui Meng <men...@gmail.com>
Subject Re: Reading from .bz2 files with Spark
Date Sat, 17 May 2014 01:47:35 GMT
Hi Andre,

I could reproduce the bug with Hadoop 2.2.0. Some older version of
Hadoop do not support splittable compression, so you ended up with
sequential reads. It is easy to reproduce the bug with the following
setup:

1) Workers are configured with multiple cores.
2) BZip2 files are big enough or minPartitions is large enough when
you load the file via sc.textFile(), so that one worker has more than
one tasks.

Best,
Xiangrui

On Fri, May 16, 2014 at 4:06 PM, Andrew Ash <andrew@andrewash.com> wrote:
> Hi Xiangrui,
>
> // FYI I'm getting your emails late due to the Apache mailing list outage
>
> I'm using CDH4.4.0, which I think uses the MapReduce v2 API.  The .jars are
> named like this: hadoop-hdfs-2.0.0-cdh4.4.0.jar
>
> I'm also glad you were able to reproduce!  Please paste a link to the Hadoop
> bug you file so I can follow along.
>
> Thanks!
> Andrew
>
>
> On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng <mengxr@gmail.com> wrote:
>>
>> Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes
>> the problem you described, but it does contain several fixes to bzip2
>> format. -Xiangrui
>>
>> On Wed, May 7, 2014 at 9:19 PM, Andrew Ash <andrew@andrewash.com> wrote:
>> > Hi all,
>> >
>> > Is anyone reading and writing to .bz2 files stored in HDFS from Spark
>> > with
>> > success?
>> >
>> >
>> > I'm finding the following results on a recent commit (756c96 from 24hr
>> > ago)
>> > and CDH 4.4.0:
>> >
>> > Works: val r = sc.textFile("/user/aa/myfile.bz2").count
>> > Doesn't work: val r = sc.textFile("/user/aa/myfile.bz2").map((s:String)
>> > =>
>> > s+"| " ).count
>> >
>> > Specifically, I'm getting an exception coming out of the bzip2 libraries
>> > (see below stacktraces), which is unusual because I'm able to read from
>> > that
>> > file without an issue using the same libraries via Pig.  It was
>> > originally
>> > created from Pig as well.
>> >
>> > Digging a little deeper I found this line in the .bz2 decompressor's
>> > javadoc
>> > for CBZip2InputStream:
>> >
>> > "Instances of this class are not threadsafe." [source]
>> >
>> >
>> > My current working theory is that Spark has a much higher level of
>> > parallelism than Pig/Hadoop does and thus I get these wild
>> > IndexOutOfBounds
>> > exceptions much more frequently (as in can't finish a run over a little
>> > 2M
>> > row file) vs hardly at all in other libraries.
>> >
>> > The only other reference I could find to the issue was in presto-users,
>> > but
>> > the recommendation to leave .bz2 for .lzo doesn't help if I actually do
>> > want
>> > the higher compression levels of .bz2.
>> >
>> >
>> > Would love to hear if I have some kind of configuration issue or if
>> > there's
>> > a bug in .bz2 that's fixed in later versions of CDH, or generally any
>> > other
>> > thoughts on the issue.
>> >
>> >
>> > Thanks!
>> > Andrew
>> >
>> >
>> >
>> > Below are examples of some exceptions I'm getting:
>> >
>> > 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
>> > java.lang.ArrayIndexOutOfBoundsException
>> > java.lang.ArrayIndexOutOfBoundsException: 65535
>> >         at
>> >
>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663)
>> >         at
>> >
>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790)
>> >         at
>> >
>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762)
>> >         at
>> >
>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798)
>> >         at
>> >
>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
>> >         at
>> >
>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
>> >         at
>> >
>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
>> >         at
>> >
>> > org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
>> >         at java.io.InputStream.read(InputStream.java:101)
>> >         at
>> > org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
>> >         at
>> > org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
>> >         at
>> >
>> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
>> >         at
>> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
>> >
>> >
>> >
>> >
>> > java.lang.ArrayIndexOutOfBoundsException: 900000
>> >         at
>> >
>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900)
>> >         at
>> >
>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
>> >         at
>> >
>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
>> >         at
>> >
>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
>> >         at
>> >
>> > org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
>> >         at java.io.InputStream.read(InputStream.java:101)
>> >         at
>> > org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
>> >         at
>> > org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
>> >         at
>> >
>> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
>> >         at
>> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
>> >         at
>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
>> >         at
>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
>> >         at
>> > org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>> >         at
>> >
>> > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35)
>> >         at
>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> >         at
>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> >         at
>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> >         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >         at
>> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >         at
>> >
>> > org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868)
>> >
>> >
>> >
>> > java.lang.ArrayIndexOutOfBoundsException: -921878509
>> >         at
>> >
>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode0(CBZip2InputStream.java:1011)
>> >         at
>> >
>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:826)
>> >         at
>> >
>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
>> >         at
>> >
>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
>> >         at
>> >
>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
>> >         at
>> >
>> > org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:432)
>> >         at java.io.InputStream.read(InputStream.java:101)
>> >         at
>> > org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
>> >         at
>> > org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
>> >         at
>> >
>> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
>> >         at
>> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
>> >         at
>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
>> >         at
>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
>> >         at
>> > org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>> >         at
>> >
>> > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35)
>> >         at
>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> >         at
>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> >         at
>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> >         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >         at
>> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >         at
>> >
>> > org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868)
>> >         at org.apache.spark.rdd.RDD$$anonfun$24.apply(RDD.scala:879)
>> >         at org.apache.spark.rdd.RDD$$anonfun$24.apply(RDD.scala:879)
>> >         at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:548)
>> >         at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:548)
>> >
>> >
>> >
>> > java.lang.ArrayIndexOutOfBoundsException: -1321104434
>> >         at
>> >
>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode0(CBZip2InputStream.java:1011)
>> >         at
>> >
>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:826)
>> >         at
>> >
>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
>> >         at
>> >
>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
>> >         at
>> >
>> > org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
>> >         at
>> >
>> > org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
>> >         at java.io.InputStream.read(InputStream.java:101)
>> >         at
>> > org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
>> >         at
>> > org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
>> >         at
>> >
>> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
>> >         at
>> > org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
>> >         at
>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
>> >         at
>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
>> >         at
>> > org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>> >         at
>> >
>> > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35)
>> >         at
>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> >         at
>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> >         at
>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>> >         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> >         at
>> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> >         at
>> >
>> > org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868)
>
>

Mime
View raw message