spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrew Ash <and...@andrewash.com>
Subject Re: Reading from .bz2 files with Spark
Date Tue, 20 May 2014 03:40:01 GMT
Hi Xiangrui, many thanks to you and Sandy for fixing this issue!


On Fri, May 16, 2014 at 10:23 PM, Xiangrui Meng <mengxr@gmail.com> wrote:

> Hi Andrew,
>
> I submitted a patch and verified it solves the problem. You can
> download the patch from
> https://issues.apache.org/jira/browse/HADOOP-10614 .
>
> Best,
> Xiangrui
>
> On Fri, May 16, 2014 at 6:48 PM, Xiangrui Meng <mengxr@gmail.com> wrote:
> > Hi Andrew,
> >
> > This is the JIRA I created:
> > https://issues.apache.org/jira/browse/MAPREDUCE-5893 . Hopefully
> > someone wants to work on it.
> >
> > Best,
> > Xiangrui
> >
> > On Fri, May 16, 2014 at 6:47 PM, Xiangrui Meng <mengxr@gmail.com> wrote:
> >> Hi Andre,
> >>
> >> I could reproduce the bug with Hadoop 2.2.0. Some older version of
> >> Hadoop do not support splittable compression, so you ended up with
> >> sequential reads. It is easy to reproduce the bug with the following
> >> setup:
> >>
> >> 1) Workers are configured with multiple cores.
> >> 2) BZip2 files are big enough or minPartitions is large enough when
> >> you load the file via sc.textFile(), so that one worker has more than
> >> one tasks.
> >>
> >> Best,
> >> Xiangrui
> >>
> >> On Fri, May 16, 2014 at 4:06 PM, Andrew Ash <andrew@andrewash.com>
> wrote:
> >>> Hi Xiangrui,
> >>>
> >>> // FYI I'm getting your emails late due to the Apache mailing list
> outage
> >>>
> >>> I'm using CDH4.4.0, which I think uses the MapReduce v2 API.  The
> .jars are
> >>> named like this: hadoop-hdfs-2.0.0-cdh4.4.0.jar
> >>>
> >>> I'm also glad you were able to reproduce!  Please paste a link to the
> Hadoop
> >>> bug you file so I can follow along.
> >>>
> >>> Thanks!
> >>> Andrew
> >>>
> >>>
> >>> On Tue, May 13, 2014 at 9:08 AM, Xiangrui Meng <mengxr@gmail.com>
> wrote:
> >>>>
> >>>> Which hadoop version did you use? I'm not sure whether Hadoop v2 fixes
> >>>> the problem you described, but it does contain several fixes to bzip2
> >>>> format. -Xiangrui
> >>>>
> >>>> On Wed, May 7, 2014 at 9:19 PM, Andrew Ash <andrew@andrewash.com>
> wrote:
> >>>> > Hi all,
> >>>> >
> >>>> > Is anyone reading and writing to .bz2 files stored in HDFS from
> Spark
> >>>> > with
> >>>> > success?
> >>>> >
> >>>> >
> >>>> > I'm finding the following results on a recent commit (756c96 from
> 24hr
> >>>> > ago)
> >>>> > and CDH 4.4.0:
> >>>> >
> >>>> > Works: val r = sc.textFile("/user/aa/myfile.bz2").count
> >>>> > Doesn't work: val r =
> sc.textFile("/user/aa/myfile.bz2").map((s:String)
> >>>> > =>
> >>>> > s+"| " ).count
> >>>> >
> >>>> > Specifically, I'm getting an exception coming out of the bzip2
> libraries
> >>>> > (see below stacktraces), which is unusual because I'm able to read
> from
> >>>> > that
> >>>> > file without an issue using the same libraries via Pig.  It was
> >>>> > originally
> >>>> > created from Pig as well.
> >>>> >
> >>>> > Digging a little deeper I found this line in the .bz2 decompressor's
> >>>> > javadoc
> >>>> > for CBZip2InputStream:
> >>>> >
> >>>> > "Instances of this class are not threadsafe." [source]
> >>>> >
> >>>> >
> >>>> > My current working theory is that Spark has a much higher level
of
> >>>> > parallelism than Pig/Hadoop does and thus I get these wild
> >>>> > IndexOutOfBounds
> >>>> > exceptions much more frequently (as in can't finish a run over
a
> little
> >>>> > 2M
> >>>> > row file) vs hardly at all in other libraries.
> >>>> >
> >>>> > The only other reference I could find to the issue was in
> presto-users,
> >>>> > but
> >>>> > the recommendation to leave .bz2 for .lzo doesn't help if I
> actually do
> >>>> > want
> >>>> > the higher compression levels of .bz2.
> >>>> >
> >>>> >
> >>>> > Would love to hear if I have some kind of configuration issue or
if
> >>>> > there's
> >>>> > a bug in .bz2 that's fixed in later versions of CDH, or generally
> any
> >>>> > other
> >>>> > thoughts on the issue.
> >>>> >
> >>>> >
> >>>> > Thanks!
> >>>> > Andrew
> >>>> >
> >>>> >
> >>>> >
> >>>> > Below are examples of some exceptions I'm getting:
> >>>> >
> >>>> > 14/05/07 15:09:49 WARN scheduler.TaskSetManager: Loss was due to
> >>>> > java.lang.ArrayIndexOutOfBoundsException
> >>>> > java.lang.ArrayIndexOutOfBoundsException: 65535
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.hbCreateDecodeTables(CBZip2InputStream.java:663)
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.createHuffmanDecodingTables(CBZip2InputStream.java:790)
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.recvDecodingTables(CBZip2InputStream.java:762)
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:798)
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
> >>>> >         at java.io.InputStream.read(InputStream.java:101)
> >>>> >         at
> >>>> >
> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
> >>>> >         at
> >>>> > org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
> >>>> >         at
> >>>> >
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
> >>>> >
> >>>> >
> >>>> >
> >>>> >
> >>>> > java.lang.ArrayIndexOutOfBoundsException: 900000
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:900)
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
> >>>> >         at java.io.InputStream.read(InputStream.java:101)
> >>>> >         at
> >>>> >
> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
> >>>> >         at
> >>>> > org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
> >>>> >         at
> >>>> >
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
> >>>> >         at
> >>>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
> >>>> >         at
> >>>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
> >>>> >         at
> >>>> > org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35)
> >>>> >         at
> >>>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> >>>> >         at
> >>>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> >>>> >         at
> >>>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> >>>> >         at
> scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >>>> >         at
> >>>> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >>>> >         at
> >>>> >
> >>>> > org.apache.spark.rdd.RDD.org
> $apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868)
> >>>> >
> >>>> >
> >>>> >
> >>>> > java.lang.ArrayIndexOutOfBoundsException: -921878509
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode0(CBZip2InputStream.java:1011)
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:826)
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:432)
> >>>> >         at java.io.InputStream.read(InputStream.java:101)
> >>>> >         at
> >>>> >
> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
> >>>> >         at
> >>>> > org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
> >>>> >         at
> >>>> >
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
> >>>> >         at
> >>>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
> >>>> >         at
> >>>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
> >>>> >         at
> >>>> > org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35)
> >>>> >         at
> >>>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> >>>> >         at
> >>>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> >>>> >         at
> >>>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> >>>> >         at
> scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >>>> >         at
> >>>> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >>>> >         at
> >>>> >
> >>>> > org.apache.spark.rdd.RDD.org
> $apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868)
> >>>> >         at org.apache.spark.rdd.RDD$$anonfun$24.apply(RDD.scala:879)
> >>>> >         at org.apache.spark.rdd.RDD$$anonfun$24.apply(RDD.scala:879)
> >>>> >         at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:548)
> >>>> >         at org.apache.spark.rdd.RDD$$anonfun$12.apply(RDD.scala:548)
> >>>> >
> >>>> >
> >>>> >
> >>>> > java.lang.ArrayIndexOutOfBoundsException: -1321104434
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode0(CBZip2InputStream.java:1011)
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.getAndMoveToFrontDecode(CBZip2InputStream.java:826)
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.initBlock(CBZip2InputStream.java:502)
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.changeStateToProcessABlock(CBZip2InputStream.java:333)
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.hadoop.io.compress.bzip2.CBZip2InputStream.read(CBZip2InputStream.java:397)
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.hadoop.io.compress.BZip2Codec$BZip2CompressionInputStream.read(BZip2Codec.java:426)
> >>>> >         at java.io.InputStream.read(InputStream.java:101)
> >>>> >         at
> >>>> >
> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:209)
> >>>> >         at
> >>>> > org.apache.hadoop.util.LineReader.readLine(LineReader.java:173)
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:203)
> >>>> >         at
> >>>> >
> org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:43)
> >>>> >         at
> >>>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)
> >>>> >         at
> >>>> > org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)
> >>>> >         at
> >>>> > org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> >>>> >         at
> >>>> >
> >>>> >
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:35)
> >>>> >         at
> >>>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> >>>> >         at
> >>>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> >>>> >         at
> >>>> > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> >>>> >         at
> scala.collection.Iterator$class.foreach(Iterator.scala:727)
> >>>> >         at
> >>>> > scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> >>>> >         at
> >>>> >
> >>>> > org.apache.spark.rdd.RDD.org
> $apache$spark$rdd$RDD$$countPartition$1(RDD.scala:868)
> >>>
> >>>
>

Mime
View raw message