spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: Scalable JDBCRDD
Date Mon, 02 Mar 2015 15:51:56 GMT
Have you already tried using the Vertica hadoop input format with spark?  I
don't know how it's implemented, but I'd hope that it has some notion of
vertica-specific shard locality (which JdbcRDD does not).

If you're really constrained to consuming the result set in a single
thread, whatever processing you're doing of the results must be
time-consuming enough to make the overhead of distributing it in a spark
job still worthwhile?   I guess you might take a look at doing a custom
DStream receiver that iterates over the result set and makes micro-batches
out of it.

On Sun, Mar 1, 2015 at 9:59 AM, michal.klos81@gmail.com <
michal.klos81@gmail.com> wrote:

> Yes exactly.
>
> The temp table is an approach but then we need to manage the deletion of
> it etc.
>
> I'm sure we won't be the only people with this crazy use case.
>
> If there isn't a feasible way to do this "within the framework" then
> that's okay. But if there is a way we are happy to write the code and PR it
> back :)
>
> M
>
>
>
> On Mar 1, 2015, at 10:02 AM, eric <eric@ericjbell.com> wrote:
>
> What you're saying is that, due to the intensity of the query, you need to
> run a single query and partition the results, versus running one query for
> each partition.
>
> I assume it's not viable to throw the query results into another table in
> your database and then query that using the normal approach?
>
> --eric
>
> On 3/1/15 4:28 AM, michal.klos81@gmail.com wrote:
>
> Jorn: Vertica
>
>  Cody: I posited the limit just as an example of how jdbcrdd could be
> used least invasively. Let's say we used a partition on a time field -- we
> would still need to have N executions of those queries. The queries we have
> are very intense and concurrency is an issue even if the the N partitioned
> queries are smaller. Some queries require evaluating the whole data set
> first. If our use case a simple select * from table.. Then the partitions
> would be an easier sell if it wasn't for the concurrency problem :) Long
> story short -- we need only one execution of the query and would like to
> just divy out the result set.
>
>  M
>
>
>
> On Mar 1, 2015, at 5:18 AM, Jörn Franke <jornfranke@gmail.com> wrote:
>
>   What database are you using?
> Le 28 févr. 2015 18:15, "Michal Klos" <michal.klos81@gmail.com> a écrit :
>
>> Hi Spark community,
>>
>> We have a use case where we need to pull huge amounts of data from a SQL
>> query against a database into Spark. We need to execute the query against
>> our huge database and not a substitute (SparkSQL, Hive, etc) because of a
>> couple of factors including custom functions used in the queries that only
>> our database has.
>>
>>  We started by looking at JDBC RDD, which utilizes a prepared statement
>> with two parameters that are meant to be used to partition the result set
>> to the workers... e.g.:
>>
>>  select * from table limit ?,?
>>
>>  turns into
>>
>>  select * from table limit 1,100 on worker 1
>> select * from table limit 101,200 on worker 2
>>
>>  This will not work for us because our database cannot support multiple
>> execution of these queries without being crippled. But, additionally, our
>> database doesn't support the above LIMIT syntax and we don't have a generic
>> way of partitioning the various queries.
>>
>>  As a result -- we stated by forking JDBCRDD and made a version that
>> executes the SQL query once in getPartitions into a Vector and then hands
>> each worker node an index and iterator. Here's a snippet of getPartitions
>> and compute:
>>
>>    override def getPartitions: Array[Partition] = {
>>     //Compute the DB query once here
>>     val results = computeQuery
>>
>>     (0 until numPartitions).map(i => {
>>       // TODO: would be better to do this partitioning when scrolling through result
set if still loading into memory
>>       val partitionItems = results.drop(i).sliding(1, numPartitions).flatten.toVector
>>       new DBPartition(i, partitionItems)
>>     }).toArray
>>   }
>>
>>   override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T]
{
>>     val part = thePart.asInstanceOf[DBPartition[T]]
>>
>>     //Shift the result vector to our index number and then do a sliding iterator
over it
>>     val iterator = part.items.iterator
>>
>>     override def getNext : T = {
>>       if (iterator.hasNext) {
>>         iterator.next()
>>       } else {
>>         finished = true
>>         null.asInstanceOf[T]
>>       }
>>     }
>>
>>     override def close: Unit = ()
>>   }
>> This is a little better since we can just execute the query once. However, the result-set
needs to fit in memory.
>> We've been trying to brainstorm a way to
>> A) have that result set distribute out to the worker RDD partitions as it's streaming
in from the cursor?
>> B) have the result set spill to disk if it exceeds memory and do something clever
around the iterators?
>> C) something else?
>> We're not familiar enough yet with all of the workings of Spark to know how to proceed
on this.
>> We also thought of the worker-around of having the DB query dump to HDFS/S3 and then
pick it up for there, but it adds more moving parts and latency to our processing.
>> Does anyone have a clever suggestion? Are we missing something?
>> thanks,
>> Michal
>>
>>
>

Mime
View raw message