hey AKM!

this is a very common problem.  the streaming programming guide addresses this issue here, actually:  http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

the tl;dr is this:
1) you want to use foreachPartition() to operate on a whole partition versus a single record with foreachRDD()
2) you want to get/release the ConnectionPool within each worker
3) make sure you initialize the ConnectionPool first - or do it lazily upon getting the first connection.

here's the sample code referenced in the link above with some additional comments for clarity:

dstream.foreachRDD { rdd =>
  // everything within here runs on the Driver

  rdd.foreachPartition { partitionOfRecords =>
   // everything within here runs on the Worker and operates on a partition of records

    // ConnectionPool is a static, lazily initialized singleton pool of connections that runs within the Worker JVM 

    // retrieve a connection from the pool
    val connection = ConnectionPool.getConnection()

    // perform the application logic here - parse and write to mongodb using the connection
    partitionOfRecords.foreach(record => connection.send(record))

    // return to the pool for future reuse
    ConnectionPool.returnConnection(connection)
  }
}

hope that helps!

-chris




On Sun, Mar 1, 2015 at 4:00 AM, A.K.M. Ashrafuzzaman <ashrafuzzaman.g2@gmail.com> wrote:
Sorry guys may bad,
Here is a high level code sample,

val unionStreams = ssc.union(kinesisStreams)
unionStreams.foreachRDD(rdd => {
  rdd.foreach(tweet =>
    val strTweet = new String(tweet, "UTF-8")
    val interaction = InteractionParser.parser(strTweet)
    interactionDAL.insert(interaction)
  )
})

Here I have to close the connection for interactionDAL other wise the JVM gives me error that the connection is open. I tried with sticky connection as well with keep_alive true. So my guess was that at the point of “unionStreams.foreachRDD” or at “rdd.foreach” the code is marshaled and send to workers and workers un-marshals and execute the process, which is why the connection is alway opened for each RDD. I might be completely wrong. I would love to know what is going on underneath.
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org