spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Alec Swan <alecs...@gmail.com>
Subject Re: Process large JSON file without causing OOM
Date Tue, 14 Nov 2017 04:07:40 GMT
Hi Joel,

Here are the relevant snippets of my code and an OOM error thrown
in frameWriter.save(..). Surprisingly, the heap dump is pretty small ~60MB
even though I am running with -Xmx10G and 4G executor and driver memory as
shown below.

        SparkConf sparkConf = new SparkConf()
                .setAppName("My Service")
                .setMaster("local[*]")
                .set("spark.ui.enabled", "true")
                .set("spark.executor.memory", "4G")
                .set("spark.driver.memory", "4G");

        sparkSessionBuilder =
SparkSession.builder().config(sparkConf).enableHiveSupport();

        Dataset<Row> events = sparkSession.read()
                .format("json")
                .schema(inputConfig.getSchema())
                .load(inputFile.getPath());

        DataFrameWriter<Row> frameWriter =
events.selectExpr(JavaConversions.asScalaBuffer(outputSchema.getColumns()))
// select "data.customer AS `customer`", ...
                .write()
                .options(outputConfig.getProperties()) // compression=zlib
                .format("orc")

.partitionBy(JavaConversions.asScalaBuffer(outputSchema.getPartitions()))
// partition by "customer"
                .save(outputUri.getPath());


Here is the error log I get at runtime:

17/11/14 03:36:16 INFO CodeGenerator: Code generated in 115.616924 ms
17/11/14 03:36:17 INFO CodecPool: Got brand-new decompressor [.snappy]
java.lang.OutOfMemoryError: Java heap space
Dumping heap to java_pid3790.hprof ...
Heap dump file created [62653841 bytes in 2.212 secs]
#
# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill -9 %p"
#   Executing "kill -9 3790"...


And here is the thread from the thread dump that caused OOM:

"Executor task launch worker for task 0" daemon prio=5 tid=90 RUNNABLE
at java.lang.OutOfMemoryError.<init>(OutOfMemoryError.java:48)
at
org.apache.hadoop.io.compress.BlockDecompressorStream.getCompressedData(BlockDecompressorStream.java:123)
at
org.apache.hadoop.io.compress.BlockDecompressorStream.decompress(BlockDecompressorStream.java:98)
at
org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
at java.io.InputStream.read(InputStream.java:101)
at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)
   Local Variable: byte[]#3957
   Local Variable: org.apache.hadoop.io.compress.BlockDecompressorStream#1
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
   Local Variable: org.apache.hadoop.mapreduce.lib.input.SplitLineReader#1
   Local Variable: org.apache.hadoop.io.Text#5
at
org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:144)
at
org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:184)
   Local Variable: org.apache.hadoop.mapreduce.lib.input.LineRecordReader#1
at
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
   Local Variable:
org.apache.spark.sql.execution.datasources.RecordReaderIterator#1
at
org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:50)
   Local Variable:
org.apache.spark.sql.execution.datasources.HadoopFileLinesReader#1
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
   Local Variable: scala.collection.Iterator$$anon$12#1
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
   Local Variable:
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1#1
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:190)
at
org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:108)
   Local Variable: org.apache.spark.sql.execution.UnsafeExternalRowSorter#1
   Local Variable: org.apache.spark.executor.TaskMetrics#2
at
org.apache.spark.sql.execution.SortExec$$anonfun$1.apply(SortExec.scala:101)
   Local Variable: org.apache.spark.sql.execution.SortExec$$anonfun$1#2
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
   Local Variable: scala.collection.Iterator$$anon$11#2
   Local Variable:
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25#2
   Local Variable: java.lang.Integer#1
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
   Local Variable:
org.apache.spark.sql.execution.datasources.FilePartition#2
   Local Variable: org.apache.spark.storage.StorageLevel#1
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
   Local Variable: org.apache.spark.rdd.MapPartitionsRDD#4
   Local Variable: org.apache.spark.serializer.JavaSerializerInstance#4
   Local Variable: scala.Tuple2#1572
   Local Variable:
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$apply$mcV$sp$1#2
   Local Variable: scala.Tuple2#1571
   Local Variable: org.apache.spark.TaskContextImpl#1
at org.apache.spark.scheduler.Task.run(Task.scala:108)
   Local Variable: org.apache.spark.scheduler.ResultTask#2
   Local Variable: org.apache.spark.metrics.MetricsSystem#1
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
   Local Variable: org.apache.spark.serializer.JavaSerializerInstance#5
   Local Variable: org.apache.spark.memory.TaskMemoryManager#1
   Local Variable: sun.management.ThreadImpl#1
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   Local Variable: java.util.concurrent.ThreadPoolExecutor#6
   Local Variable: org.apache.spark.executor.Executor$TaskRunner#1
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   Local Variable: java.util.concurrent.ThreadPoolExecutor$Worker#26
at java.lang.Thread.run(Thread.java:745)



Thanks,

Alec

On Mon, Nov 13, 2017 at 8:30 PM, Joel D <games2013.sam@gmail.com> wrote:

> Have you tried increasing driver, exec mem (gc overhead too if required)?
>
> your code snippet and stack trace will be helpful.
>
> On Mon, Nov 13, 2017 at 7:23 PM Alec Swan <alecswan@gmail.com> wrote:
>
>> Hello,
>>
>> I am using the Spark library to convert JSON/Snappy files to ORC/ZLIB
>> format. Effectively, my Java service starts up an embedded Spark cluster
>> (master=local[*]) and uses Spark SQL to convert JSON to ORC. However, I
>> keep getting OOM errors with large (~1GB) files.
>>
>> I've tried different ways to reduce memory usage, e.g. by partitioning
>> data with dataSet.partitionBy("customer).save(filePath), or capping
>> memory usage by setting spark.executor.memory=1G, but to no vail.
>>
>> I am wondering if there is a way to avoid OOM besides splitting the
>> source JSON file into multiple smaller ones and processing the small ones
>> individually? Does Spark SQL have to read the JSON/Snappy (row-based) file
>> in it's entirety before converting it to ORC (columnar)? If so, would it
>> make sense to create a custom receiver that reads the Snappy file and use
>> Spark streaming for ORC conversion?
>>
>> Thanks,
>>
>> Alec
>>
>>
>>
>>
>>

Mime
View raw message