One map() operation in my Spark APP takes an RDD[A] as input and map each element in RDD[A] using a custom mapping function func(x:A):B to another object of type B.
I received lots of OutOfMemory error, and after some debugging I find this is because func() requires significant amount of memory when computing each input x. And since each node is executing multiple mapping operation (i.e., multiple func()) concurrently. The total amount of memory required by those mapping operation per node exceeds the amount of physical memory.
What I have tried so far:
In order to solve the problem, I limited the number of concurrent mapping tasks to 2 per executor(node), by coalesce() the RDD[A] first and then repartition() it:
val rdd:RDD[A] = sc.textFile().flapMap()
rdd.coalesce(#_of_nodes * 2).map(func).repartition(300)
I was also suggested
to set spark.task.cpus larger than 1. But this could take effect globally. My pipeline involves lots of other operations which I do not want to set limit on. Is there any better solution to fulfil the purpose?