spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fengdong Yu <fengdo...@everstring.com>
Subject Re: [Spark SQL]: Spark Job Hangs on the refresh method when saving over 1 million files
Date Mon, 26 Oct 2015 06:50:22 GMT
How many partitions you generated?
if Millions generated, then there is a huge memory consumed.





> On Oct 26, 2015, at 10:58 AM, Jerry Lam <chilinglam@gmail.com> wrote:
> 
> Hi guys,
> 
> I mentioned that the partitions are generated so I tried to read the partition data from
it. The driver is OOM after few minutes. The stack trace is below. It looks very similar to
the the jstack above (note on the refresh method). Thanks!
> 
> Name: java.lang.OutOfMemoryError
> Message: GC overhead limit exceeded
> StackTrace: java.util.Arrays.copyOf(Arrays.java:2367)
> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415)
> java.lang.StringBuilder.append(StringBuilder.java:132)
> org.apache.hadoop.fs.Path.toString(Path.java:384)
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:447)
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:453)
> org.apache.spark.sql.sources.HadoopFsRelation.org <http://org.apache.spark.sql.sources.hadoopfsrelation.org/>$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute(interfaces.scala:465)
> org.apache.spark.sql.sources.HadoopFsRelation.org <http://org.apache.spark.sql.sources.hadoopfsrelation.org/>$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache(interfaces.scala:463)
> org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:470)
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache.refresh(ParquetRelation.scala:381)
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org <http://org.apache.spark.sql.execution.datasources.parquet.parquetrelation.org/>$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache$lzycompute(ParquetRelation.scala:145)
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org <http://org.apache.spark.sql.execution.datasources.parquet.parquetrelation.org/>$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache(ParquetRelation.scala:143)
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
> scala.Option.getOrElse(Option.scala:120)
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.dataSchema(ParquetRelation.scala:196)
> org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:561)
> org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:560)
> org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:31)
> org.apache.spark.sql.SQLContext.baseRelationToDataFrame(SQLContext.scala:395)
> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:267)
> 
> On Sun, Oct 25, 2015 at 10:25 PM, Jerry Lam <chilinglam@gmail.com <mailto:chilinglam@gmail.com>>
wrote:
> Hi Josh,
> 
> No I don't have speculation enabled. The driver took about few hours until it was OOM.
Interestingly, all partitions are generated successfully (_SUCCESS file is written in the
output directory). Is there a reason why the driver needs so much memory? The jstack revealed
that it called refresh some file statuses. Is there a way to avoid OutputCommitCoordinator
to use so much memory? 
> 
> Ultimately, I choose to use partitions because most of the queries I have will execute
based the partition field. For example, "SELECT events from customer where customer_id = 1234".
If the partition is based on customer_id, all events for a customer can be easily retrieved
without filtering the entire dataset which is much more efficient (I hope). However, I notice
that the implementation of the partition logic does not seem to allow this type of use cases
without using a lot of memory which is a bit odd in my opinion. Any help will be greatly appreciated.
> 
> Best Regards,
> 
> Jerry
> 
> 
> 
> On Sun, Oct 25, 2015 at 9:25 PM, Josh Rosen <rosenville@gmail.com <mailto:rosenville@gmail.com>>
wrote:
> Hi Jerry,
> 
> Do you have speculation enabled? A write which produces one million files / output partitions
might be using tons of driver memory via the OutputCommitCoordinator's bookkeeping data structures.
> 
> On Sun, Oct 25, 2015 at 5:50 PM, Jerry Lam <chilinglam@gmail.com <mailto:chilinglam@gmail.com>>
wrote:
> Hi spark guys,
> 
> I think I hit the same issue SPARK-8890 https://issues.apache.org/jira/browse/SPARK-8890
<https://issues.apache.org/jira/browse/SPARK-8890>. It is marked as resolved. However
it is not. I have over a million output directories for 1 single column in partitionBy. Not
sure if this is a regression issue? Do I need to set some parameters to make it more memory
efficient?
> 
> Best Regards,
> 
> Jerry
> 
> 
> 
> 
> On Sun, Oct 25, 2015 at 8:39 PM, Jerry Lam <chilinglam@gmail.com <mailto:chilinglam@gmail.com>>
wrote:
> Hi guys,
> 
> After waiting for a day, it actually causes OOM on the spark driver. I configure the
driver to have 6GB. Note that I didn't call refresh myself. The method was called when saving
the dataframe in parquet format. Also I'm using partitionBy() on the DataFrameWriter to generate
over 1 million files. Not sure why it OOM the driver after the job is marked _SUCCESS in the
output folder. 
> 
> Best Regards,
> 
> Jerry
> 
> 
> On Sat, Oct 24, 2015 at 9:35 PM, Jerry Lam <chilinglam@gmail.com <mailto:chilinglam@gmail.com>>
wrote:
> Hi Spark users and developers,
> 
> Does anyone encounter any issue when a spark SQL job produces a lot of files (over 1
millions), the job hangs on the refresh method? I'm using spark 1.5.1. Below is the stack
trace. I saw the parquet files are produced but the driver is doing something very intensively
(it uses all the cpus). Does it mean Spark SQL cannot be used to produce over 1 million files
in a single job?
> 
> Thread 528: (state = BLOCKED)
>  - java.util.Arrays.copyOf(char[], int) @bci=1, line=2367 (Compiled frame)
>  - java.lang.AbstractStringBuilder.expandCapacity(int) @bci=43, line=130 (Compiled frame)
>  - java.lang.AbstractStringBuilder.ensureCapacityInternal(int) @bci=12, line=114 (Compiled
frame)
>  - java.lang.AbstractStringBuilder.append(java.lang.String) @bci=19, line=415 (Compiled
frame)
>  - java.lang.StringBuilder.append(java.lang.String) @bci=2, line=132 (Compiled frame)
>  - org.apache.hadoop.fs.Path.toString() @bci=128, line=384 (Compiled frame)
>  - org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(org.apache.hadoop.fs.FileStatus)
@bci=4, line=447 (Compiled frame)
>  - org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(java.lang.Object)
@bci=5, line=447 (Compiled frame)
>  - scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object) @bci=9, line=244
(Compiled frame)
>  - scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object) @bci=2, line=244
(Compiled frame)
>  - scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
scala.Function1) @bci=22, line=33 (Compiled frame)
>  - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1) @bci=2, line=108
(Compiled frame)
>  - scala.collection.TraversableLike$class.map(scala.collection.TraversableLike, scala.Function1,
scala.collection.generic.CanBuildFrom) @bci=17, line=244 (Compiled frame)
>  - scala.collection.mutable.ArrayOps$ofRef.map(scala.Function1, scala.collection.generic.CanBuildFrom)
@bci=3, line=108 (Interpreted frame)
>  - org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(java.lang.String[])
@bci=279, line=447 (Interpreted frame)
>  - org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh() @bci=8, line=453
(Interpreted frame)
>  - org.apache.spark.sql.sources.HadoopFsRelation.org <http://org.apache.spark.sql.sources.hadoopfsrelation.org/>$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute()
@bci=26, line=465 (Interpreted frame)
>  - org.apache.spark.sql.sources.HadoopFsRelation.org <http://org.apache.spark.sql.sources.hadoopfsrelation.org/>$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache()
@bci=12, line=463 (Interpreted frame)
>  - org.apache.spark.sql.sources.HadoopFsRelation.refresh() @bci=1, line=540 (Interpreted
frame)
>  - org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.refresh() @bci=1,
line=204 (Interpreted frame)
>  - org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp()
@bci=392, line=152 (Interpreted frame)
>  - org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
@bci=1, line=108 (Interpreted frame)
>  - org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
@bci=1, line=108 (Interpreted frame)
>  - org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(org.apache.spark.sql.SQLContext,
org.apache.spark.sql.SQLContext$QueryExecution, scala.Function0) @bci=96, line=56 (Interpreted
frame)
>  - org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(org.apache.spark.sql.SQLContext)
@bci=718, line=108 (Interpreted frame)
>  - org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute() @bci=20,
line=57 (Interpreted frame)
>  - org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult() @bci=15, line=57
(Interpreted frame)
>  - org.apache.spark.sql.execution.ExecutedCommand.doExecute() @bci=12, line=69 (Interpreted
frame)
>  - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply() @bci=11, line=140
(Interpreted frame)
>  - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply() @bci=1, line=138
(Interpreted frame)
>  - org.apache.spark.rdd.RDDOperationScope$.withScope(org.apache.spark.SparkContext, java.lang.String,
boolean, boolean, scala.Function0) @bci=131, line=147 (Interpreted frame)
>  - org.apache.spark.sql.execution.SparkPlan.execute() @bci=189, line=138 (Interpreted
frame)
>  - org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute() @bci=21, line=933
(Interpreted frame)
>  - org.apache.spark.sql.SQLContext$QueryExecution.toRdd() @bci=13, line=933 (Interpreted
frame)
>  - org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(org.apache.spark.sql.SQLContext,
java.lang.String, java.lang.String[], org.apache.spark.sql.SaveMode, scala.collection.immutable.Map,
org.apache.spark.sql.DataFrame) @bci=293, line=197 (Interpreted frame)
>  - org.apache.spark.sql.DataFrameWriter.save() @bci=64, line=146 (Interpreted frame)
>  - org.apache.spark.sql.DataFrameWriter.save(java.lang.String) @bci=24, line=137 (Interpreted
frame)
>  - org.apache.spark.sql.DataFrameWriter.parquet(java.lang.String) @bci=8, line=304 (Interpreted
frame)
> 
> Best Regards,
> 
> Jerry
> 
> 
> 
> 
> 
> 
> 


Mime
View raw message