Blocking operators like Sort, Join or Aggregate will put all of the data for a whole partition into a hash table or array.  However, if you are running Spark 1.5+ we should be spilling to disk.  In Spark 1.6 if you are seeing OOMs for SQL operations you should report it as a bug.

On Thu, Mar 31, 2016 at 9:26 AM, Steve Johnston <sjohnston@algebraixdata.com> wrote:
What we’ve observed

Increasing the number of partitions (and thus decreasing the partition size) seems to reliably help avoid OOM errors. To demonstrate this we used a single executor and loaded a small table into a DataFrame, persisted it with MEMORY_AND_DISK, repartitioned it and joined it to itself. Varying the number of partitions identifies a threshold between completing the join and incurring an OOM error.


lineitem = sc.textFile('lineitem.tbl').map(converter)
lineitem = sqlContext.createDataFrame(lineitem, schema)
lineitem.persist(StorageLevel.MEMORY_AND_DISK)
repartitioned = lineitem.repartition(partition_count)
joined = repartitioned.join(repartitioned)
joined.show()
 
Questions

Generally, what influences the space complexity of Spark operations? Is it the case that a single partition of each operand’s data set + a single partition of the resulting data set all need to fit in memory at the same time? We can see where the transformations (for say joins) are implemented in the source code (for the example above BroadcastNestedLoopJoin), but they seem to be based on virtualized iterators; where in the code is the partition data for the inputs and outputs actually materialized?


View this message in context: What influences the space complexity of Spark operations?
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.