spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "A.K.M. Ashrafuzzaman" <ashrafuzzaman...@gmail.com>
Subject Re: Connection pool in workers
Date Mon, 02 Mar 2015 08:13:36 GMT
Thanks Chris,
That is what I wanted to know :)

A.K.M. Ashrafuzzaman
Lead Software Engineer
NewsCred

(M) 880-175-5592433
Twitter | Blog | Facebook

Check out The Academy, your #1 source
for free content marketing resources

On Mar 2, 2015, at 2:04 AM, Chris Fregly <chris@fregly.com> wrote:

> hey AKM!
> 
> this is a very common problem.  the streaming programming guide addresses this issue
here, actually:  http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
> 
> the tl;dr is this:
> 1) you want to use foreachPartition() to operate on a whole partition versus a single
record with foreachRDD()
> 2) you want to get/release the ConnectionPool within each worker
> 3) make sure you initialize the ConnectionPool first - or do it lazily upon getting the
first connection.
> 
> here's the sample code referenced in the link above with some additional comments for
clarity:
> 
> dstream.foreachRDD { rdd =>
>   // everything within here runs on the Driver
> 
>   rdd.foreachPartition { partitionOfRecords =>
>    // everything within here runs on the Worker and operates on a partition of records
> 
>     // ConnectionPool is a static, lazily initialized singleton pool of connections that
runs within the Worker JVM 
> 
>     // retrieve a connection from the pool
>     val connection = ConnectionPool.getConnection()
> 
>     // perform the application logic here - parse and write to mongodb using the connection
>     partitionOfRecords.foreach(record => connection.send(record))
> 
>     // return to the pool for future reuse
>     ConnectionPool.returnConnection(connection)
>   }
> }
> 
> hope that helps!
> 
> -chris
> 
> 
> 
> 
> On Sun, Mar 1, 2015 at 4:00 AM, A.K.M. Ashrafuzzaman <ashrafuzzaman.g2@gmail.com>
wrote:
> Sorry guys may bad,
> Here is a high level code sample,
> 
> val unionStreams = ssc.union(kinesisStreams)
> unionStreams.foreachRDD(rdd => {
>   rdd.foreach(tweet =>
>     val strTweet = new String(tweet, "UTF-8")
>     val interaction = InteractionParser.parser(strTweet)
>     interactionDAL.insert(interaction)
>   )
> })
> 
> Here I have to close the connection for interactionDAL other wise the JVM gives me error
that the connection is open. I tried with sticky connection as well with keep_alive true.
So my guess was that at the point of “unionStreams.foreachRDD” or at “rdd.foreach”
the code is marshaled and send to workers and workers un-marshals and execute the process,
which is why the connection is alway opened for each RDD. I might be completely wrong. I would
love to know what is going on underneath.
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
> 
> 


Mime
View raw message