spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <>
Subject Re: Scalable JDBCRDD
Date Mon, 02 Mar 2015 15:51:56 GMT
Have you already tried using the Vertica hadoop input format with spark?  I
don't know how it's implemented, but I'd hope that it has some notion of
vertica-specific shard locality (which JdbcRDD does not).

If you're really constrained to consuming the result set in a single
thread, whatever processing you're doing of the results must be
time-consuming enough to make the overhead of distributing it in a spark
job still worthwhile?   I guess you might take a look at doing a custom
DStream receiver that iterates over the result set and makes micro-batches
out of it.

On Sun, Mar 1, 2015 at 9:59 AM, <> wrote:

> Yes exactly.
> The temp table is an approach but then we need to manage the deletion of
> it etc.
> I'm sure we won't be the only people with this crazy use case.
> If there isn't a feasible way to do this "within the framework" then
> that's okay. But if there is a way we are happy to write the code and PR it
> back :)
> M
> On Mar 1, 2015, at 10:02 AM, eric <> wrote:
> What you're saying is that, due to the intensity of the query, you need to
> run a single query and partition the results, versus running one query for
> each partition.
> I assume it's not viable to throw the query results into another table in
> your database and then query that using the normal approach?
> --eric
> On 3/1/15 4:28 AM, wrote:
> Jorn: Vertica
>  Cody: I posited the limit just as an example of how jdbcrdd could be
> used least invasively. Let's say we used a partition on a time field -- we
> would still need to have N executions of those queries. The queries we have
> are very intense and concurrency is an issue even if the the N partitioned
> queries are smaller. Some queries require evaluating the whole data set
> first. If our use case a simple select * from table.. Then the partitions
> would be an easier sell if it wasn't for the concurrency problem :) Long
> story short -- we need only one execution of the query and would like to
> just divy out the result set.
>  M
> On Mar 1, 2015, at 5:18 AM, Jörn Franke <> wrote:
>   What database are you using?
> Le 28 févr. 2015 18:15, "Michal Klos" <> a écrit :
>> Hi Spark community,
>> We have a use case where we need to pull huge amounts of data from a SQL
>> query against a database into Spark. We need to execute the query against
>> our huge database and not a substitute (SparkSQL, Hive, etc) because of a
>> couple of factors including custom functions used in the queries that only
>> our database has.
>>  We started by looking at JDBC RDD, which utilizes a prepared statement
>> with two parameters that are meant to be used to partition the result set
>> to the workers... e.g.:
>>  select * from table limit ?,?
>>  turns into
>>  select * from table limit 1,100 on worker 1
>> select * from table limit 101,200 on worker 2
>>  This will not work for us because our database cannot support multiple
>> execution of these queries without being crippled. But, additionally, our
>> database doesn't support the above LIMIT syntax and we don't have a generic
>> way of partitioning the various queries.
>>  As a result -- we stated by forking JDBCRDD and made a version that
>> executes the SQL query once in getPartitions into a Vector and then hands
>> each worker node an index and iterator. Here's a snippet of getPartitions
>> and compute:
>>    override def getPartitions: Array[Partition] = {
>>     //Compute the DB query once here
>>     val results = computeQuery
>>     (0 until numPartitions).map(i => {
>>       // TODO: would be better to do this partitioning when scrolling through result
set if still loading into memory
>>       val partitionItems = results.drop(i).sliding(1, numPartitions).flatten.toVector
>>       new DBPartition(i, partitionItems)
>>     }).toArray
>>   }
>>   override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T]
>>     val part = thePart.asInstanceOf[DBPartition[T]]
>>     //Shift the result vector to our index number and then do a sliding iterator
over it
>>     val iterator = part.items.iterator
>>     override def getNext : T = {
>>       if (iterator.hasNext) {
>>       } else {
>>         finished = true
>>         null.asInstanceOf[T]
>>       }
>>     }
>>     override def close: Unit = ()
>>   }
>> This is a little better since we can just execute the query once. However, the result-set
needs to fit in memory.
>> We've been trying to brainstorm a way to
>> A) have that result set distribute out to the worker RDD partitions as it's streaming
in from the cursor?
>> B) have the result set spill to disk if it exceeds memory and do something clever
around the iterators?
>> C) something else?
>> We're not familiar enough yet with all of the workings of Spark to know how to proceed
on this.
>> We also thought of the worker-around of having the DB query dump to HDFS/S3 and then
pick it up for there, but it adds more moving parts and latency to our processing.
>> Does anyone have a clever suggestion? Are we missing something?
>> thanks,
>> Michal

View raw message