spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jaeboo Jung (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-19623) Take rows from DataFrame with empty first partition
Date Fri, 17 Feb 2017 02:16:41 GMT

    [ https://issues.apache.org/jira/browse/SPARK-19623?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15870990#comment-15870990
] 

Jaeboo Jung edited comment on SPARK-19623 at 2/17/17 2:15 AM:
--------------------------------------------------------------

Increasing driver memory can't clear this issue because memory consumption grows proportionally
to the number of partitions. For example, 5g of driver memory processes 1000 partitions properly
but OOME occurs in case of 3000 partitions.
{code}
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val rdd = sc.parallelize(1 to 100000000,3000).map(i => Row.fromSeq(Array.fill(100)(i)))
val schema = StructType(for(i <- 1 to 100) yield {
StructField("COL"+i,IntegerType, true)
})
val rdd2 = rdd.mapPartitionsWithIndex((idx,iter) => if(idx==0 || idx==1) Iterator[Row]()
else iter)
val df2 = sqlContext.createDataFrame(rdd2,schema)
df2.rdd.take(1000) // OK
df2.take(1000) // OOME
{code}
I think this issue comes from differences of taking rows process between rdd and dataframe.
RDD takes rows with its internal method but DataFrame takes rows with Limit SQL process. The
weird part is dataframe seeks all the partitions when the first partition is empty. Maybe
it is related to SPARK-3211.


was (Author: jb jung):
Increasing driver memory can't clear this issue because memory consumption grows proportionally
to the number of partitions. For example, 5g of driver memory processes 1000 partitions properly
but OOME occurs in case of 3000 partitions.
{code}
import org.apache.spark.sql._
import org.apache.spark.sql.types._
val rdd = sc.parallelize(1 to 100000000,3000).map(i => Row.fromSeq(Array.fill(100)(i)))
val schema = StructType(for(i <- 1 to 100) yield {
StructField("COL"+i,IntegerType, true)
})
val rdd2 = rdd.mapPartitionsWithIndex((idx,iter) => if(idx==0 || idx==1) Iterator[Row]()
else iter)
val df2 = sqlContext.createDataFrame(rdd2,schema)
df2.rdd.take(1000) // OK
df2.take(1000) // OOME
{code}
I think this issue comes from differences of taking rows process between rdd and dataframe.
RDD takes rows with its internal method but DataFrame takes rows with Limit SQL process. The
weird part is dataframe scanning all the partitions when the first partition is empty. Maybe
it is related to SPARK-3211.

> Take rows from DataFrame with empty first partition
> ---------------------------------------------------
>
>                 Key: SPARK-19623
>                 URL: https://issues.apache.org/jira/browse/SPARK-19623
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.6.2
>            Reporter: Jaeboo Jung
>            Priority: Minor
>
> I use Spark 1.6.2 with 1 master and 6 workers. Assuming we have partitions having a empty
first partition, DataFrame and its RDD have different behaviors during taking rows from it.
If we take only 1000 rows from DataFrame, it causes OOME but RDD is OK.
> In detail,
> DataFrame without a empty first partition => OK
> DataFrame with a empty first partition => OOME
> RDD of DataFrame with a empty first partition => OK
> Codes below reproduce this error.
> {code}
> import org.apache.spark.sql._
> import org.apache.spark.sql.types._
> val rdd = sc.parallelize(1 to 100000000,1000).map(i => Row.fromSeq(Array.fill(100)(i)))
> val schema = StructType(for(i <- 1 to 100) yield {
> StructField("COL"+i,IntegerType, true)
> })
> val rdd2 = rdd.mapPartitionsWithIndex((idx,iter) => if(idx==0 || idx==1) Iterator[Row]()
else iter)
> val df1 = sqlContext.createDataFrame(rdd,schema)
> df1.take(1000) // OK
> val df2 = sqlContext.createDataFrame(rdd2,schema)
> df2.rdd.take(1000) // OK
> df2.take(1000) // OOME
> {code}
> I tested it on Spark 1.6.2 with 2gb of driver memory and 5gb of executor memory.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message