spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Fregly <ch...@fregly.com>
Subject Re: Connection pool in workers
Date Sun, 01 Mar 2015 20:04:09 GMT
hey AKM!

this is a very common problem.  the streaming programming guide addresses
this issue here, actually:
http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

the tl;dr is this:
1) you want to use foreachPartition() to operate on a whole partition
versus a single record with foreachRDD()
2) you want to get/release the ConnectionPool within each worker
3) make sure you initialize the ConnectionPool first - or do it lazily upon
getting the first connection.

here's the sample code referenced in the link above with some additional
comments for clarity:

dstream.foreachRDD { rdd =>
  // everything within here runs on the Driver

  rdd.foreachPartition { partitionOfRecords =>
   // everything within here runs on the Worker and operates on a partition
of records

    // ConnectionPool is a static, lazily initialized singleton pool of
connections that runs within the Worker JVM

    // retrieve a connection from the pool
    val connection = ConnectionPool.getConnection()

    // perform the application logic here - parse and write to mongodb
using the connection
    partitionOfRecords.foreach(record => connection.send(record))

    // return to the pool for future reuse
    ConnectionPool.returnConnection(connection)
  }
}

hope that helps!

-chris




On Sun, Mar 1, 2015 at 4:00 AM, A.K.M. Ashrafuzzaman <
ashrafuzzaman.g2@gmail.com> wrote:

> Sorry guys may bad,
> Here is a high level code sample,
>
> val unionStreams = ssc.union(kinesisStreams)
> unionStreams.foreachRDD(rdd => {
>   rdd.foreach(tweet =>
>     val strTweet = new String(tweet, "UTF-8")
>     val interaction = InteractionParser.parser(strTweet)
>     interactionDAL.insert(interaction)
>   )
> })
>
> Here I have to close the connection for interactionDAL other wise the JVM
> gives me error that the connection is open. I tried with sticky connection
> as well with keep_alive true. So my guess was that at the point of
> “unionStreams.foreachRDD” or at “rdd.foreach” the code is marshaled and
> send to workers and workers un-marshals and execute the process, which is
> why the connection is alway opened for each RDD. I might be completely
> wrong. I would love to know what is going on underneath.
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Mime
View raw message