spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From eric <e...@ericjbell.com>
Subject Re: Scalable JDBCRDD
Date Sun, 01 Mar 2015 15:02:33 GMT
What you're saying is that, due to the intensity of the query, you need 
to run a single query and partition the results, versus running one 
query for each partition.

I assume it's not viable to throw the query results into another table 
in your database and then query that using the normal approach?

--eric

On 3/1/15 4:28 AM, michal.klos81@gmail.com wrote:
> Jorn: Vertica
>
> Cody: I posited the limit just as an example of how jdbcrdd could be 
> used least invasively. Let's say we used a partition on a time field 
> -- we would still need to have N executions of those queries. The 
> queries we have are very intense and concurrency is an issue even if 
> the the N partitioned queries are smaller. Some queries require 
> evaluating the whole data set first. If our use case a simple select * 
> from table.. Then the partitions would be an easier sell if it wasn't 
> for the concurrency problem :) Long story short -- we need only one 
> execution of the query and would like to just divy out the result set.
>
> M
>
>
>
> On Mar 1, 2015, at 5:18 AM, Jörn Franke <jornfranke@gmail.com 
> <mailto:jornfranke@gmail.com>> wrote:
>
>> What database are you using?
>>
>> Le 28 févr. 2015 18:15, "Michal Klos" <michal.klos81@gmail.com 
>> <mailto:michal.klos81@gmail.com>> a écrit :
>>
>>     Hi Spark community,
>>
>>     We have a use case where we need to pull huge amounts of data
>>     from a SQL query against a database into Spark. We need to
>>     execute the query against our huge database and not a substitute
>>     (SparkSQL, Hive, etc) because of a couple of factors including
>>     custom functions used in the queries that only our database has.
>>
>>     We started by looking at JDBC RDD, which utilizes a prepared
>>     statement with two parameters that are meant to be used to
>>     partition the result set to the workers... e.g.:
>>
>>     select * from table limit ?,?
>>
>>     turns into
>>
>>     select * from table limit 1,100 on worker 1
>>     select * from table limit 101,200 on worker 2
>>
>>     This will not work for us because our database cannot support
>>     multiple execution of these queries without being crippled. But,
>>     additionally, our database doesn't support the above LIMIT syntax
>>     and we don't have a generic way of partitioning the various queries.
>>
>>     As a result -- we stated by forking JDBCRDD and made a version
>>     that executes the SQL query once in getPartitions into a Vector
>>     and then hands each worker node an index and iterator. Here's a
>>     snippet of getPartitions and compute:
>>
>>     override def getPartitions: Array[Partition] = {
>>     //Compute the DB query once here
>>     val results = computeQuery
>>     (0 until numPartitions).map(i => {
>>     // TODO: would be better to do this partitioning when scrolling
>>     through result set if still loading into memory
>>     val partitionItems = results.drop(i).sliding(1,
>>     numPartitions).flatten.toVector
>>     new DBPartition(i, partitionItems)
>>     }).toArray
>>     }
>>
>>     override def compute(thePart: Partition, context: TaskContext) =
>>     new NextIterator[T] {
>>     val part = thePart.asInstanceOf[DBPartition[T]]
>>     //Shift the result vector to our index number and then do a
>>     sliding iterator over it
>>     val iterator = part.items.iterator
>>     override def getNext : T = {
>>     if (iterator.hasNext) {
>>     iterator.next()
>>     } else {
>>     finished = true
>>     null.asInstanceOf[T]
>>     }
>>     }
>>     override def close: Unit = ()
>>     }
>>     This is a little better since we can just execute the query once.
>>     However, the result-set needs to fit in memory.
>>     We've been trying to brainstorm a way to
>>     A) have that result set distribute out to the worker RDD
>>     partitions as it's streaming in from the cursor?
>>     B) have the result set spill to disk if it exceeds memory and do
>>     something clever around the iterators?
>>     C) something else?
>>     We're not familiar enough yet with all of the workings of Spark
>>     to know how to proceed on this.
>>     We also thought of the worker-around of having the DB query dump
>>     to HDFS/S3 and then pick it up for there, but it adds more moving
>>     parts and latency to our processing.
>>     Does anyone have a clever suggestion? Are we missing something?
>>     thanks,
>>     Michal
>>


Mime
View raw message