spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jerry Lam <chiling...@gmail.com>
Subject Re: [Spark SQL]: Spark Job Hangs on the refresh method when saving over 1 million files
Date Mon, 26 Oct 2015 02:58:16 GMT
Hi guys,

I mentioned that the partitions are generated so I tried to read the
partition data from it. The driver is OOM after few minutes. The stack
trace is below. It looks very similar to the the jstack above (note on the
refresh method). Thanks!

Name: java.lang.OutOfMemoryError
Message: GC overhead limit exceeded
StackTrace: java.util.Arrays.copyOf(Arrays.java:2367)
java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415)
java.lang.StringBuilder.append(StringBuilder.java:132)
org.apache.hadoop.fs.Path.toString(Path.java:384)
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:447)
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:453)org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute(interfaces.scala:465)org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache(interfaces.scala:463)
org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:470)
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache.refresh(ParquetRelation.scala:381)org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache$lzycompute(ParquetRelation.scala:145)org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache(ParquetRelation.scala:143)
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.dataSchema(ParquetRelation.scala:196)
org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:561)
org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:560)
org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:31)
org.apache.spark.sql.SQLContext.baseRelationToDataFrame(SQLContext.scala:395)
org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:267)


On Sun, Oct 25, 2015 at 10:25 PM, Jerry Lam <chilinglam@gmail.com> wrote:

> Hi Josh,
>
> No I don't have speculation enabled. The driver took about few hours until
> it was OOM. Interestingly, all partitions are generated successfully
> (_SUCCESS file is written in the output directory). Is there a reason why
> the driver needs so much memory? The jstack revealed that it called refresh
> some file statuses. Is there a way to avoid OutputCommitCoordinator to
> use so much memory?
>
> Ultimately, I choose to use partitions because most of the queries I have
> will execute based the partition field. For example, "SELECT events from
> customer where customer_id = 1234". If the partition is based on
> customer_id, all events for a customer can be easily retrieved without
> filtering the entire dataset which is much more efficient (I hope).
> However, I notice that the implementation of the partition logic does not
> seem to allow this type of use cases without using a lot of memory which is
> a bit odd in my opinion. Any help will be greatly appreciated.
>
> Best Regards,
>
> Jerry
>
>
>
> On Sun, Oct 25, 2015 at 9:25 PM, Josh Rosen <rosenville@gmail.com> wrote:
>
>> Hi Jerry,
>>
>> Do you have speculation enabled? A write which produces one million files
>> / output partitions might be using tons of driver memory via the
>> OutputCommitCoordinator's bookkeeping data structures.
>>
>> On Sun, Oct 25, 2015 at 5:50 PM, Jerry Lam <chilinglam@gmail.com> wrote:
>>
>>> Hi spark guys,
>>>
>>> I think I hit the same issue SPARK-8890
>>> https://issues.apache.org/jira/browse/SPARK-8890. It is marked as
>>> resolved. However it is not. I have over a million output directories for 1
>>> single column in partitionBy. Not sure if this is a regression issue? Do I
>>> need to set some parameters to make it more memory efficient?
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>>
>>>
>>>
>>> On Sun, Oct 25, 2015 at 8:39 PM, Jerry Lam <chilinglam@gmail.com> wrote:
>>>
>>>> Hi guys,
>>>>
>>>> After waiting for a day, it actually causes OOM on the spark driver. I
>>>> configure the driver to have 6GB. Note that I didn't call refresh myself.
>>>> The method was called when saving the dataframe in parquet format. Also I'm
>>>> using partitionBy() on the DataFrameWriter to generate over 1 million
>>>> files. Not sure why it OOM the driver after the job is marked _SUCCESS in
>>>> the output folder.
>>>>
>>>> Best Regards,
>>>>
>>>> Jerry
>>>>
>>>>
>>>> On Sat, Oct 24, 2015 at 9:35 PM, Jerry Lam <chilinglam@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Spark users and developers,
>>>>>
>>>>> Does anyone encounter any issue when a spark SQL job produces a lot of
>>>>> files (over 1 millions), the job hangs on the refresh method? I'm using
>>>>> spark 1.5.1. Below is the stack trace. I saw the parquet files are produced
>>>>> but the driver is doing something very intensively (it uses all the cpus).
>>>>> Does it mean Spark SQL cannot be used to produce over 1 million files
in a
>>>>> single job?
>>>>>
>>>>> Thread 528: (state = BLOCKED)
>>>>>  - java.util.Arrays.copyOf(char[], int) @bci=1, line=2367 (Compiled
>>>>> frame)
>>>>>  - java.lang.AbstractStringBuilder.expandCapacity(int) @bci=43,
>>>>> line=130 (Compiled frame)
>>>>>  - java.lang.AbstractStringBuilder.ensureCapacityInternal(int)
>>>>> @bci=12, line=114 (Compiled frame)
>>>>>  - java.lang.AbstractStringBuilder.append(java.lang.String) @bci=19,
>>>>> line=415 (Compiled frame)
>>>>>  - java.lang.StringBuilder.append(java.lang.String) @bci=2, line=132
>>>>> (Compiled frame)
>>>>>  - org.apache.hadoop.fs.Path.toString() @bci=128, line=384 (Compiled
>>>>> frame)
>>>>>  -
>>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(org.apache.hadoop.fs.FileStatus)
>>>>> @bci=4, line=447 (Compiled frame)
>>>>>  -
>>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(java.lang.Object)
>>>>> @bci=5, line=447 (Compiled frame)
>>>>>  -
>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
>>>>> @bci=9, line=244 (Compiled frame)
>>>>>  -
>>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
>>>>> @bci=2, line=244 (Compiled frame)
>>>>>  -
>>>>> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
>>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
>>>>>  - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
>>>>> @bci=2, line=108 (Compiled frame)
>>>>>  -
>>>>> scala.collection.TraversableLike$class.map(scala.collection.TraversableLike,
>>>>> scala.Function1, scala.collection.generic.CanBuildFrom) @bci=17, line=244
>>>>> (Compiled frame)
>>>>>  - scala.collection.mutable.ArrayOps$ofRef.map(scala.Function1,
>>>>> scala.collection.generic.CanBuildFrom) @bci=3, line=108 (Interpreted
frame)
>>>>>  -
>>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(java.lang.String[])
>>>>> @bci=279, line=447 (Interpreted frame)
>>>>>  -
>>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh()
>>>>> @bci=8, line=453 (Interpreted frame)
>>>>>  - org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute()
>>>>> @bci=26, line=465 (Interpreted frame)
>>>>>  - org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache()
>>>>> @bci=12, line=463 (Interpreted frame)
>>>>>  - org.apache.spark.sql.sources.HadoopFsRelation.refresh() @bci=1,
>>>>> line=540 (Interpreted frame)
>>>>>  -
>>>>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.refresh()
>>>>> @bci=1, line=204 (Interpreted frame)
>>>>>  -
>>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp()
>>>>> @bci=392, line=152 (Interpreted frame)
>>>>>  -
>>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
>>>>> @bci=1, line=108 (Interpreted frame)
>>>>>  -
>>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
>>>>> @bci=1, line=108 (Interpreted frame)
>>>>>  -
>>>>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(org.apache.spark.sql.SQLContext,
>>>>> org.apache.spark.sql.SQLContext$QueryExecution, scala.Function0) @bci=96,
>>>>> line=56 (Interpreted frame)
>>>>>  -
>>>>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(org.apache.spark.sql.SQLContext)
>>>>> @bci=718, line=108 (Interpreted frame)
>>>>>  -
>>>>> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute()
>>>>> @bci=20, line=57 (Interpreted frame)
>>>>>  - org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult()
>>>>> @bci=15, line=57 (Interpreted frame)
>>>>>  - org.apache.spark.sql.execution.ExecutedCommand.doExecute() @bci=12,
>>>>> line=69 (Interpreted frame)
>>>>>  - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply()
>>>>> @bci=11, line=140 (Interpreted frame)
>>>>>  - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply()
>>>>> @bci=1, line=138 (Interpreted frame)
>>>>>  -
>>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(org.apache.spark.SparkContext,
>>>>> java.lang.String, boolean, boolean, scala.Function0) @bci=131, line=147
>>>>> (Interpreted frame)
>>>>>  - org.apache.spark.sql.execution.SparkPlan.execute() @bci=189,
>>>>> line=138 (Interpreted frame)
>>>>>  - org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute()
>>>>> @bci=21, line=933 (Interpreted frame)
>>>>>  - org.apache.spark.sql.SQLContext$QueryExecution.toRdd() @bci=13,
>>>>> line=933 (Interpreted frame)
>>>>>  -
>>>>> org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(org.apache.spark.sql.SQLContext,
>>>>> java.lang.String, java.lang.String[], org.apache.spark.sql.SaveMode,
>>>>> scala.collection.immutable.Map, org.apache.spark.sql.DataFrame) @bci=293,
>>>>> line=197 (Interpreted frame)
>>>>>  - org.apache.spark.sql.DataFrameWriter.save() @bci=64, line=146
>>>>> (Interpreted frame)
>>>>>  - org.apache.spark.sql.DataFrameWriter.save(java.lang.String)
>>>>> @bci=24, line=137 (Interpreted frame)
>>>>>  - org.apache.spark.sql.DataFrameWriter.parquet(java.lang.String)
>>>>> @bci=8, line=304 (Interpreted frame)
>>>>>
>>>>> Best Regards,
>>>>>
>>>>> Jerry
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message