Thanks for the suggestion Ryan, I will convert it to singleton and see if it solves the problem.
If a code/object is created in the driver (in this case a connection object is passed from a pool of objects created in the driver) and is passed to executors or workers, why would a new object be created in each executor?
Also would this new object be created every micro-batch?

I'm sorry I might not understand what is going on properly so wanted to ask.

Thanks
Nipun



On Tue, Jan 31, 2017 at 8:28 PM Shixiong(Ryan) Zhu <shixiong@databricks.com> wrote:
The KafkaProducerPool instance is created in the driver. Right? What's I was saying is when a Spark job runs, it will serialize KafkaProducerPool and create a new instance in the executor side.

You can use the singleton pattern to make sure one JVM process has only one KafkaProducerPool instance.

On Tue, Jan 31, 2017 at 3:32 PM, Nipun Arora <nipunarora2512@gmail.com> wrote:
It's a producer pool, the borrow object takes an existing kafka producer object if it is free, or creates one if all are being used.
Shouldn't we re-use kafka producer objects for writing to Kafka.

@ryan- can you suggest a good solution for writing a dstream to kafka which can be used in production?

I am attaching the Kafka producer pool class, where would one issue a call to close():
public class KafkaProducerPool implements Serializable {

private static final long serialVersionUID = -1913028296093224674L;

private transient ConcurrentLinkedQueue<KafkaProducer<String, String>> pool;

private ScheduledExecutorService executorService;

private final Properties properties;

private final int minIdle;

/**
* Creates the pool.
*
* @param minIdle
* minimum number of objects residing in the pool
*/
public KafkaProducerPool(final int minIdle, final Properties properties) {
// initialize pool
this.properties = properties;
this.minIdle = minIdle;
initialize();

}

/**
* Creates the pool.
*
* @param minIdle
* minimum number of objects residing in the pool
* @param maxIdle
* maximum number of objects residing in the pool
* @param validationInterval
* time in seconds for periodical checking of minIdle / maxIdle
* conditions in a separate thread. When the number of objects is
* less than minIdle, missing instances will be created. When the
* number of objects is greater than maxIdle, too many instances
* will be removed.
*/
public KafkaProducerPool(final int minIdle, final int maxIdle,
final long validationInterval, final Properties properties) {
// initialize pool
this.properties = properties;
this.minIdle = minIdle;
initialize();

// check pool conditions in a separate thread
executorService = Executors.newSingleThreadScheduledExecutor();
executorService.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
int size = pool.size();
if (size < minIdle) {
int sizeToBeAdded = minIdle - size;
for (int i = 0; i < sizeToBeAdded; i++) {
pool.add(createProducer());
}
} else if (size > maxIdle) {
int sizeToBeRemoved = size - maxIdle;
for (int i = 0; i < sizeToBeRemoved; i++) {
pool.poll();
}
}
}
}, validationInterval, validationInterval, TimeUnit.SECONDS);
}

/**
* Gets the next free object from the pool. If the pool doesn't contain any
* objects, a new object will be created and given to the caller of this
* method back.
*
* @return T borrowed object
*/
public synchronized KafkaProducer<String, String> borrowProducer() {
if (pool == null)
initialize();
KafkaProducer<String, String> object;
if ((object = pool.poll()) == null) {
object = createProducer();
}

return object;
}

/**
* Returns object back to the pool.
*
* object to be returned
*/
public void returnProducer(KafkaProducer<String, String> producer) {
if (producer == null) {
return;
}
this.pool.offer(producer);
}

/**
* Shutdown this pool.
*/
public void shutdown() {
if (executorService != null) {
KafkaProducer<String, String> producer;
while ((producer = pool.poll()) != null) {
producer.close();
}
executorService.shutdown();
}
}

/**
* Creates a new producer.
*
* @return T new object
*/
private KafkaProducer<String, String> createProducer() {
KafkaProducer<String, String> producer = new KafkaProducer<String,String>(properties);
return producer;
}

private void initialize() {
pool = new ConcurrentLinkedQueue<KafkaProducer<String, String>>();

for (int i = 0; i < minIdle; i++) {
pool.add(createProducer());
}
}

public void closeAll() {
KafkaProducer<String, String> object;
while ((object = pool.poll()) != null) {
//object.flush();
object.close();
}
}
}
Thanks
Nipun

On Tue, Jan 31, 2017 at 6:09 PM Shixiong(Ryan) Zhu <shixiong@databricks.com> wrote:
Looks like you create KafkaProducerPool in the driver. So when the task is running in the executor, it will always see an new empty KafkaProducerPool and create KafkaProducers. But nobody closes these KafkaProducers.

On Tue, Jan 31, 2017 at 3:02 PM, Nipun Arora <nipunarora2512@gmail.com> wrote:

Sorry for not writing the patch number, it's spark 1.6.1.
The relevant code is here inline.

Please have a look and let me know if there is a resource leak. 
Please also let me know if you need any more details.

Thanks
Nipun


The JavaRDDKafkaWriter code is here inline:
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

import java.io.Serializable;
import java.util.Iterator;

public class JavaRDDStringKafkaWriter implements Serializable, VoidFunction<JavaRDD<String>> {

private static final long serialVersionUID = -865193912367180261L;
private final KafkaProducerPool pool;
private final String topic;
private final Boolean kafkaAsync;

public JavaRDDStringKafkaWriter(final KafkaProducerPool pool, String topic, Boolean kafkaAsync) {
this.pool = pool;
this.topic = topic;
this.kafkaAsync = kafkaAsync;
}

@Override
public void call(JavaRDD<String> stringJavaRDD) throws Exception {
stringJavaRDD.foreachPartition(new PartitionVoidFunction(
new RDDKafkaWriter(pool,kafkaAsync), topic));
}

private class PartitionVoidFunction implements
VoidFunction<Iterator<String>> {

private static final long serialVersionUID = 8726871215617446598L;
private final RDDKafkaWriter kafkaWriter;
private final String topic;

public PartitionVoidFunction(RDDKafkaWriter kafkaWriter, String topic) {
this.kafkaWriter = kafkaWriter;
this.topic = topic;
}

@Override
public void call(Iterator<String> iterator) throws Exception {
while (iterator.hasNext()) {
kafkaWriter.writeToKafka(topic, iterator.next());
}
}
}
}

The RDDKafkaWriter is here:

import java.io.Serializable;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import scala.Tuple2;

public class RDDKafkaWriter implements Serializable {

private static final long serialVersionUID = 7374381310562055607L;
private final KafkaProducerPool pool;
private final Boolean async;

public RDDKafkaWriter(final KafkaProducerPool pool, Boolean async) {
this.pool = pool;
this.async = async;

}

public void writeToKafka(String topic, Tuple2<String, String> message) {
KafkaProducer<String, String> producer = pool.borrowProducer();
ProducerRecord<String, String> record = new ProducerRecord<String, String>(
topic, message._1(), message._2());
if (async) {
producer.send(record);
} else {
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
}
pool.returnProducer(producer);
}

public void writeToKafka(String topic, String message) {

KafkaProducer<String, String> producer = pool.borrowProducer();
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, message);

if (async) {
producer.send(record);
} else {
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
}
pool.returnProducer(producer);
}


}




On Tue, Jan 31, 2017 at 5:20 PM Shixiong(Ryan) Zhu <shixiong@databricks.com> wrote:
Please also include the patch version, such as 1.6.0, 1.6.1. Could you also post the JAVARDDKafkaWriter codes. It's also possible that it leaks resources.

On Tue, Jan 31, 2017 at 2:12 PM, Nipun Arora <nipunarora2512@gmail.com> wrote:
It is spark 1.6

Thanks
Nipun

On Tue, Jan 31, 2017 at 1:45 PM Shixiong(Ryan) Zhu <shixiong@databricks.com> wrote:
Could you provide your Spark version please?

On Tue, Jan 31, 2017 at 10:37 AM, Nipun Arora <nipunarora2512@gmail.com> wrote:
Hi,

I get a resource leak, where the number of file descriptors in spark streaming keeps increasing. We end up with a "too many file open" error eventually through an exception caused in:

JAVARDDKafkaWriter, which is writing a spark JavaDStream<String>

The exception is attached inline. Any help will be greatly appreciated.

Thanks
Nipun

-------------------------------------------
Time: 1485762530000 ms
-------------------------------------------

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 85968.0 failed 1 times, most recent failure: Lost task 0.0 in stage 85968.0 (TID 29562, localhost): java.io.FileNotFoundException: /tmp/blockmgr-1b3ddc44-f9a4-42cd-977c-532cb962d7d3/3e/shuffle_10625_0_0.data.4651a131-6072-460b-b150-2b3080902084 (too many open files)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:88)
at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:181)
at org.apache.spark.util.collection.WritablePartitionedPairCollection$$anon$1.writeNext(WritablePartitionedPairCollection.scala:56)
at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:659)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:72)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1870)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1883)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1954)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:920)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:918)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:918)
at org.apache.spark.api.java.JavaRDDLike$class.foreachPartition(JavaRDDLike.scala:225)
at org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:46)
at org.necla.ngla.kafka.JavaRDDStringKafkaWriter.call(JavaRDDStringKafkaWriter.java:25)
at org.necla.ngla.kafka.JavaRDDStringKafkaWriter.call(JavaRDDStringKafkaWriter.java:10)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:229)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:229)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:229)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:228)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: /tmp/blockmgr-1b3ddc44-f9a4-42cd-977c-532cb962d7d3/3e/shuffle_10625_0_0.data.4651a131-6072-460b-b150-2b3080902084 (too many open files)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.<init>(FileOutputStream.java:221)
at org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:88)