spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michal Klos <>
Subject Re: Scalable JDBCRDD
Date Mon, 02 Mar 2015 16:07:12 GMT
Hi Cody,

Thanks for the reply. Yea, we thought of possibly doing this in a UDX in
Vertica somehow to get the lower level co-operation but its a bit daunting.
We want to do this because there are things we want to do with the
result-set in Spark that are not possible in Vertica. The DStream receiver
is a good thought.

I think right now, we are learning towards eric's suggestion -- where we
run the big query once somewhere (getPartitions maybe) in Vertica and dumps
into a temporary table with an additional generated partition_key field.
Then we have the workers issue N "select * from temp-table where
partition_key = ?" queries that are hopefully lightweight. The temporary
table we are hoping will just clean itself up so we don't need to handle
that mess. It seems like the most sane approach today ;]


On Mon, Mar 2, 2015 at 10:51 AM, Cody Koeninger <> wrote:

> Have you already tried using the Vertica hadoop input format with spark?
> I don't know how it's implemented, but I'd hope that it has some notion of
> vertica-specific shard locality (which JdbcRDD does not).
> If you're really constrained to consuming the result set in a single
> thread, whatever processing you're doing of the results must be
> time-consuming enough to make the overhead of distributing it in a spark
> job still worthwhile?   I guess you might take a look at doing a custom
> DStream receiver that iterates over the result set and makes micro-batches
> out of it.
> On Sun, Mar 1, 2015 at 9:59 AM, <
>> wrote:
>> Yes exactly.
>> The temp table is an approach but then we need to manage the deletion of
>> it etc.
>> I'm sure we won't be the only people with this crazy use case.
>> If there isn't a feasible way to do this "within the framework" then
>> that's okay. But if there is a way we are happy to write the code and PR it
>> back :)
>> M
>> On Mar 1, 2015, at 10:02 AM, eric <> wrote:
>> What you're saying is that, due to the intensity of the query, you need
>> to run a single query and partition the results, versus running one query
>> for each partition.
>> I assume it's not viable to throw the query results into another table in
>> your database and then query that using the normal approach?
>> --eric
>> On 3/1/15 4:28 AM, wrote:
>> Jorn: Vertica
>>  Cody: I posited the limit just as an example of how jdbcrdd could be
>> used least invasively. Let's say we used a partition on a time field -- we
>> would still need to have N executions of those queries. The queries we have
>> are very intense and concurrency is an issue even if the the N partitioned
>> queries are smaller. Some queries require evaluating the whole data set
>> first. If our use case a simple select * from table.. Then the partitions
>> would be an easier sell if it wasn't for the concurrency problem :) Long
>> story short -- we need only one execution of the query and would like to
>> just divy out the result set.
>>  M
>> On Mar 1, 2015, at 5:18 AM, Jörn Franke <> wrote:
>>   What database are you using?
>> Le 28 févr. 2015 18:15, "Michal Klos" <> a écrit :
>>> Hi Spark community,
>>> We have a use case where we need to pull huge amounts of data from a SQL
>>> query against a database into Spark. We need to execute the query against
>>> our huge database and not a substitute (SparkSQL, Hive, etc) because of a
>>> couple of factors including custom functions used in the queries that only
>>> our database has.
>>>  We started by looking at JDBC RDD, which utilizes a prepared statement
>>> with two parameters that are meant to be used to partition the result set
>>> to the workers... e.g.:
>>>  select * from table limit ?,?
>>>  turns into
>>>  select * from table limit 1,100 on worker 1
>>> select * from table limit 101,200 on worker 2
>>>  This will not work for us because our database cannot support multiple
>>> execution of these queries without being crippled. But, additionally, our
>>> database doesn't support the above LIMIT syntax and we don't have a generic
>>> way of partitioning the various queries.
>>>  As a result -- we stated by forking JDBCRDD and made a version that
>>> executes the SQL query once in getPartitions into a Vector and then hands
>>> each worker node an index and iterator. Here's a snippet of getPartitions
>>> and compute:
>>>    override def getPartitions: Array[Partition] = {
>>>     //Compute the DB query once here
>>>     val results = computeQuery
>>>     (0 until numPartitions).map(i => {
>>>       // TODO: would be better to do this partitioning when scrolling through
result set if still loading into memory
>>>       val partitionItems = results.drop(i).sliding(1, numPartitions).flatten.toVector
>>>       new DBPartition(i, partitionItems)
>>>     }).toArray
>>>   }
>>>   override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T]
>>>     val part = thePart.asInstanceOf[DBPartition[T]]
>>>     //Shift the result vector to our index number and then do a sliding iterator
over it
>>>     val iterator = part.items.iterator
>>>     override def getNext : T = {
>>>       if (iterator.hasNext) {
>>>       } else {
>>>         finished = true
>>>         null.asInstanceOf[T]
>>>       }
>>>     }
>>>     override def close: Unit = ()
>>>   }
>>> This is a little better since we can just execute the query once. However, the
result-set needs to fit in memory.
>>> We've been trying to brainstorm a way to
>>> A) have that result set distribute out to the worker RDD partitions as it's streaming
in from the cursor?
>>> B) have the result set spill to disk if it exceeds memory and do something clever
around the iterators?
>>> C) something else?
>>> We're not familiar enough yet with all of the workings of Spark to know how to
proceed on this.
>>> We also thought of the worker-around of having the DB query dump to HDFS/S3 and
then pick it up for there, but it adds more moving parts and latency to our processing.
>>> Does anyone have a clever suggestion? Are we missing something?
>>> thanks,
>>> Michal

View raw message