Hello,

We are currently running our data pipeline on spark which uses Cassandra as the data source.

We are currently facing issue with the step where we create an rdd on data in cassandra table and then try to run "flatMapToPair" to transform the data but we are running into "Too many open files". I have already increased the file limits on all the worker and master node by changing the file /etc/system/limits.conf to 65K but that did not help.

Is there some setting so that we can restrict shuffle? 

Also when we use the log4j.properties in conf folder these logs are not getting emitted. 

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 20 in stage 1.0 failed 4 times, most recent failure: Lost task 20.3 in stage 1.0 (TID 51, ip-10-87-36-147.us-west-2.aws.neustar.com): java.io.FileNotFoundException: /tmp/spark-local-20150107203209-9333/2f/shuffle_0_20_1017 (Too many open files)

        java.io.FileOutputStream.open(Native Method)

        java.io.FileOutputStream.<init>(FileOutputStream.java:221)

        org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)

        org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192)

        org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67)

        org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65)

        scala.collection.Iterator$class.foreach(Iterator.scala:727)

        scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

        org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65)

        org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)

        org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)

        org.apache.spark.scheduler.Task.run(Task.scala:54)

        org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

        java.lang.Thread.run(Thread.java:745)



Thanks & Regards
Ankur