spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jörn Franke <jornfra...@gmail.com>
Subject Re: Scalable JDBCRDD
Date Sun, 01 Mar 2015 10:18:57 GMT
What database are you using?
Le 28 févr. 2015 18:15, "Michal Klos" <michal.klos81@gmail.com> a écrit :

> Hi Spark community,
>
> We have a use case where we need to pull huge amounts of data from a SQL
> query against a database into Spark. We need to execute the query against
> our huge database and not a substitute (SparkSQL, Hive, etc) because of a
> couple of factors including custom functions used in the queries that only
> our database has.
>
> We started by looking at JDBC RDD, which utilizes a prepared statement
> with two parameters that are meant to be used to partition the result set
> to the workers... e.g.:
>
> select * from table limit ?,?
>
> turns into
>
> select * from table limit 1,100 on worker 1
> select * from table limit 101,200 on worker 2
>
> This will not work for us because our database cannot support multiple
> execution of these queries without being crippled. But, additionally, our
> database doesn't support the above LIMIT syntax and we don't have a generic
> way of partitioning the various queries.
>
> As a result -- we stated by forking JDBCRDD and made a version that
> executes the SQL query once in getPartitions into a Vector and then hands
> each worker node an index and iterator. Here's a snippet of getPartitions
> and compute:
>
>   override def getPartitions: Array[Partition] = {
>     //Compute the DB query once here
>     val results = computeQuery
>
>     (0 until numPartitions).map(i => {
>       // TODO: would be better to do this partitioning when scrolling through result
set if still loading into memory
>       val partitionItems = results.drop(i).sliding(1, numPartitions).flatten.toVector
>       new DBPartition(i, partitionItems)
>     }).toArray
>   }
>
>   override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T]
{
>     val part = thePart.asInstanceOf[DBPartition[T]]
>
>     //Shift the result vector to our index number and then do a sliding iterator over
it
>     val iterator = part.items.iterator
>
>     override def getNext : T = {
>       if (iterator.hasNext) {
>         iterator.next()
>       } else {
>         finished = true
>         null.asInstanceOf[T]
>       }
>     }
>
>     override def close: Unit = ()
>   }
>
> This is a little better since we can just execute the query once. However, the result-set
needs to fit in memory.
>
> We've been trying to brainstorm a way to
>
> A) have that result set distribute out to the worker RDD partitions as it's streaming
in from the cursor?
> B) have the result set spill to disk if it exceeds memory and do something clever around
the iterators?
> C) something else?
>
> We're not familiar enough yet with all of the workings of Spark to know how to proceed
on this.
>
> We also thought of the worker-around of having the DB query dump to HDFS/S3 and then
pick it up for there, but it adds more moving parts and latency to our processing.
>
> Does anyone have a clever suggestion? Are we missing something?
>
> thanks,
> Michal
>
>

Mime
View raw message