spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Akshay Bhardwaj <akshay.bhardwaj1...@gmail.com>
Subject Re: spark structured streaming crash due to decompressing gzip file failure
Date Thu, 07 Mar 2019 13:05:42 GMT
Hi,

In your spark-submit command, try using the below config property and see
if this solves the problem.

--conf spark.sql.files.ignoreCorruptFiles=true

For me this worked to ignore reading empty/partially uploaded gzip files in
s3 bucket.

Akshay Bhardwaj
+91-97111-33849


On Thu, Mar 7, 2019 at 11:28 AM Lian Jiang <jiangok2006@gmail.com> wrote:

> Hi,
>
> I have a structured streaming job which listens to a hdfs folder
> containing jsonl.gz files. The job crashed due to error:
>
> java.io.IOException: incorrect header check
>     at
> org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native
> Method)
>     at
> org.apache.hadoop.io.compress.zlib.ZlibDecompressor.decompress(ZlibDecompressor.java:225)
>     at
> org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:111)
>     at
> org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
>     at java.io.InputStream.read(InputStream.java:101)
>     at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:182)
>     at
> org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:218)
>     at org.apache.hadoop.util.LineReader.readLine(LineReader.java:176)
>     at
> org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:152)
>     at
> org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:192)
>     at
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
>     at
> org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
>     at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>     at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
>     at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:186)
>     at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:109)
>     at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source)
>     at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>     at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>     at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:148)
>     at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>     at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>     at org.apache.spark.scheduler.Task.run(Task.scala:109)
>     at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
>
>
> Is there a way to skip the gz files that cannot be decompressed? Exception
> handling seems not help. The only workaround I can think of is to
> decompress the gz files into another folder first and make the spark
> streaming job listen to this new folder. But this workaround may not be
> better compared with the solution using a unstructured streaming job to
> directly decompress the gz file, read jsonl file, validate the records and
> write the validated records into parquet.
>
> Any idea is highly appreciated!
>
>
>
>
>

Mime
View raw message