What we’ve observed
Increasing the number of partitions (and thus decreasing the partition size) seems to reliably help avoid OOM errors. To demonstrate this we used a single executor and loaded a small table into a DataFrame, persisted it with MEMORY_AND_DISK, repartitioned it and joined it to itself. Varying the number of partitions identifies a threshold between completing the join and incurring an OOM error.Questions
lineitem = sc.textFile('lineitem.tbl').map(converter) lineitem = sqlContext.createDataFrame(lineitem, schema) lineitem.persist(StorageLevel.MEMORY_AND_DISK) repartitioned = lineitem.repartition(partition_count) joined = repartitioned.join(repartitioned) joined.show()
Generally, what influences the space complexity of Spark operations? Is it the case that a single partition of each operand’s data set + a single partition of the resulting data set all need to fit in memory at the same time? We can see where the transformations (for say joins) are implemented in the source code (for the example above BroadcastNestedLoopJoin), but they seem to be based on virtualized iterators; where in the code is the partition data for the inputs and outputs actually materialized?
View this message in context: What influences the space complexity of Spark operations?
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.