spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Joseph K. Bradley (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-15419) monotonicallyIncreasingId should use less memory with multiple partitions
Date Thu, 19 May 2016 21:51:12 GMT

     [ https://issues.apache.org/jira/browse/SPARK-15419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Joseph K. Bradley updated SPARK-15419:
--------------------------------------
    Description: 
When monotonicallyIncreasingId is used on a DataFrame with many partitions, it uses a very
large amount of memory.

Consider this code:
{code}
import org.apache.spark.sql.functions._

// JMAP1: run jmap -histo:live [PID]

val numPartitions = 1000
val df = spark.range(0, 1000000, 1, numPartitions).toDF("vtx")
df.cache().count()

// JMAP2

val df2 = df.withColumn("id", monotonicallyIncreasingId())
df2.cache().count()

// JMAP3

df2.select(col("id") + 1).count()

// JMAP4
{code}

Here's how memory usage progresses:
* JMAP1: This is just for calibration.
* JMAP2: No significant change from 1.
* JMAP3: Massive jump: 3048895 Longs, 1039638 Objects, 2007427 Integers, 1002000 org.apache.spark.sql.catalyst.expressions.GenericInternalRow
** None of these had significant numbers of instances in JMAP1/2.
* JMAP4: This doubles the object creation.  I.e., even after caching, it keeps generating
new objects on every use.

When the indexed DataFrame is used repeatedly afterwards, the driver memory usage keeps increasing
and eventually blows up in my application.

I wrote "with multiple partitions" because this issue goes away when numPartitions is small
(1 or 2).

Presumably this memory usage could be reduced.

Note: I also tested a custom indexing using RDD.zipWithIndex, and it is even worse in terms
of object creation (about 2x worse):
{code}
def zipWithUniqueIdFrom0(df: DataFrame): DataFrame = {
  val sqlContext = df.sqlContext
  val schema = df.schema
  val outputSchema = StructType(Seq(
    StructField("row", schema, false), StructField("id", DataTypes.IntegerType, false)))
  val rdd = df.rdd.zipWithIndex().map { case (row: Row, id: Long) => Row(row, id.toInt)
}
  sqlContext.createDataFrame(rdd, outputSchema)
}

// val df2 = df.withColumn("id", monotonicallyIncreasingId())
val df2 = zipWithUniqueIdFrom0(df)
df2.cache().count()
{code}


  was:
When monotonicallyIncreasingId is used on a DataFrame with many partitions, it uses a very
large amount of memory.

Consider this code:
{code}
import org.apache.spark.sql.functions._

// JMAP1: run jmap -histo:live [PID]

val numPartitions = 1000
val df = spark.range(0, 1000000, 1, numPartitions).toDF("vtx")
df.cache().count()

// JMAP2: run jmap -histo:live [PID]

val df2 = df.withColumn("id", monotonicallyIncreasingId())
df2.cache().count()

// JMAP3: run jmap -histo:live [PID]
{code}

Here's how memory usage progresses:
* JMAP1: This is just for calibration.
* JMAP2: No significant change from 1.
* JMAP3: Massive jump: 3048895 Longs, 1039638 Objects, 2007427 Integers, 1002000 org.apache.spark.sql.catalyst.expressions.GenericInternalRow
** None of these had significant numbers of instances in JMAP1/2.

When the indexed DataFrame is used repeatedly afterwards, the driver memory usage keeps increasing
and eventually blows up in my application.

I wrote "with multiple partitions" because this issue goes away when numPartitions is small
(1 or 2).

Presumably this memory usage could be reduced.

Note: I also tested a custom indexing using RDD.zipWithIndex, and it is even worse in terms
of object creation (about 2x worse):
{code}
def zipWithUniqueIdFrom0(df: DataFrame): DataFrame = {
  val sqlContext = df.sqlContext
  val schema = df.schema
  val outputSchema = StructType(Seq(
    StructField("row", schema, false), StructField("id", DataTypes.IntegerType, false)))
  val rdd = df.rdd.zipWithIndex().map { case (row: Row, id: Long) => Row(row, id.toInt)
}
  sqlContext.createDataFrame(rdd, outputSchema)
}

// val df2 = df.withColumn("id", monotonicallyIncreasingId())
val df2 = zipWithUniqueIdFrom0(df)
df2.cache().count()
{code}



> monotonicallyIncreasingId should use less memory with multiple partitions
> -------------------------------------------------------------------------
>
>                 Key: SPARK-15419
>                 URL: https://issues.apache.org/jira/browse/SPARK-15419
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.0.0
>         Environment: branch-2.0, 1 worker
>            Reporter: Joseph K. Bradley
>
> When monotonicallyIncreasingId is used on a DataFrame with many partitions, it uses a
very large amount of memory.
> Consider this code:
> {code}
> import org.apache.spark.sql.functions._
> // JMAP1: run jmap -histo:live [PID]
> val numPartitions = 1000
> val df = spark.range(0, 1000000, 1, numPartitions).toDF("vtx")
> df.cache().count()
> // JMAP2
> val df2 = df.withColumn("id", monotonicallyIncreasingId())
> df2.cache().count()
> // JMAP3
> df2.select(col("id") + 1).count()
> // JMAP4
> {code}
> Here's how memory usage progresses:
> * JMAP1: This is just for calibration.
> * JMAP2: No significant change from 1.
> * JMAP3: Massive jump: 3048895 Longs, 1039638 Objects, 2007427 Integers, 1002000 org.apache.spark.sql.catalyst.expressions.GenericInternalRow
> ** None of these had significant numbers of instances in JMAP1/2.
> * JMAP4: This doubles the object creation.  I.e., even after caching, it keeps generating
new objects on every use.
> When the indexed DataFrame is used repeatedly afterwards, the driver memory usage keeps
increasing and eventually blows up in my application.
> I wrote "with multiple partitions" because this issue goes away when numPartitions is
small (1 or 2).
> Presumably this memory usage could be reduced.
> Note: I also tested a custom indexing using RDD.zipWithIndex, and it is even worse in
terms of object creation (about 2x worse):
> {code}
> def zipWithUniqueIdFrom0(df: DataFrame): DataFrame = {
>   val sqlContext = df.sqlContext
>   val schema = df.schema
>   val outputSchema = StructType(Seq(
>     StructField("row", schema, false), StructField("id", DataTypes.IntegerType, false)))
>   val rdd = df.rdd.zipWithIndex().map { case (row: Row, id: Long) => Row(row, id.toInt)
}
>   sqlContext.createDataFrame(rdd, outputSchema)
> }
> // val df2 = df.withColumn("id", monotonicallyIncreasingId())
> val df2 = zipWithUniqueIdFrom0(df)
> df2.cache().count()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message