spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yuanzhe Yang <yyz1...@gmail.com>
Subject Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?
Date Tue, 03 Jan 2017 21:39:24 GMT
Hi Ayan,

This "inline view" idea is really awesome and enlightens me! Finally I have
a plan to move on. I greatly appreciate your help!

Best regards,
Yang

2017-01-03 18:14 GMT+01:00 ayan guha <guha.ayan@gmail.com>:

> Ahh I see what you mean....I confused two terminologies....because we were
> talking about partitioning and then changed topic to identify changed data
> ....
>
> For that, you can "construct" a dbtable as an inline view -
>
> viewSQL = "(select * from table where <column> > '<last_modified_value>')".
> replace("<column>","inserted_on").replace("<last_modified_
> value>",checkPointedValue)
> dbtable =viewSQL
>
> refer to this
> <http://www.sparkexpert.com/2015/03/28/loading-database-data-into-spark-using-data-sources-api/>
> blog...
>
> So, in summary, you have 2 things
>
> 1. Identify changed data - my suggestion to use dbtable with inline view
> 2. parallelism - use numPartition,lowerbound,upper bound to generate
> number of partitions
>
> HTH....
>
>
>
> On Wed, Jan 4, 2017 at 3:46 AM, Yuanzhe Yang <yyz1989@gmail.com> wrote:
>
>> Hi Ayan,
>>
>> Yeah, I understand your proposal, but according to here
>> http://spark.apache.org/docs/latest/sql-programming-gui
>> de.html#jdbc-to-other-databases, it says
>>
>> Notice that lowerBound and upperBound are just used to decide the
>> partition stride, not for filtering the rows in table. So all rows in the
>> table will be partitioned and returned. This option applies only to reading.
>>
>> So my interpretation is all rows in the table are ingested, and this
>> "lowerBound" and "upperBound" is the span of each partition. Well, I am not
>> a native English speaker, maybe it means differently?
>>
>> Best regards,
>> Yang
>>
>> 2017-01-03 17:23 GMT+01:00 ayan guha <guha.ayan@gmail.com>:
>>
>>> Hi
>>>
>>> You need to store and capture the Max of the column you intend to use
>>> for identifying new records (Ex: INSERTED_ON) after every successful run of
>>> your job. Then, use the value in lowerBound option.
>>>
>>> Essentially, you want to create a query like
>>>
>>> select * from table where INSERTED_ON > lowerBound and
>>> INSERTED_ON<upperBound
>>>
>>> everytime you run the job....
>>>
>>>
>>>
>>> On Wed, Jan 4, 2017 at 2:13 AM, Yuanzhe Yang <yyz1989@gmail.com> wrote:
>>>
>>>> Hi Ayan,
>>>>
>>>> Thanks a lot for your suggestion. I am currently looking into sqoop.
>>>>
>>>> Concerning your suggestion for Spark, it is indeed parallelized with
>>>> multiple workers, but the job is one-off and cannot keep streaming.
>>>> Moreover, I cannot specify any "start row" in the job, it will always
>>>> ingest the entire table. So I also cannot simulate a streaming process by
>>>> starting the job in fix intervals...
>>>>
>>>> Best regards,
>>>> Yang
>>>>
>>>> 2017-01-03 15:06 GMT+01:00 ayan guha <guha.ayan@gmail.com>:
>>>>
>>>>> Hi
>>>>>
>>>>> While the solutions provided by others looks promising and I'd like to
>>>>> try out few of them, our old pal sqoop already "does" the job. It has
a
>>>>> incremental mode where you can provide a --check-column and
>>>>> --last-modified-value combination to grab the data - and yes, sqoop
>>>>> essentially does it by running a MAP-only job which spawns number of
>>>>> parallel map task to grab data from DB.
>>>>>
>>>>> In Spark, you can use sqlContext.load function for JDBC and use
>>>>> partitionColumn and numPartition to define parallelism of connection.
>>>>>
>>>>> Best
>>>>> Ayan
>>>>>
>>>>> On Tue, Jan 3, 2017 at 10:49 PM, Yuanzhe Yang <yyz1989@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Ayan,
>>>>>>
>>>>>> Thanks a lot for such a detailed response. I really appreciate it!
>>>>>>
>>>>>> I think this use case can be generalized, because the data is
>>>>>> immutable and append-only. We only need to find one column or timestamp
to
>>>>>> track the last row consumed in the previous ingestion. This pattern
should
>>>>>> be common when storing sensor data. If the data is mutable, then
the
>>>>>> solution will be surely difficult and vendor specific as you said.
>>>>>>
>>>>>> The workflow you proposed is very useful. The difficulty part is
how
>>>>>> to parallelize the ingestion task. With Spark when I have multiple
workers
>>>>>> working on the same job, I don't know if there is a way and how to
>>>>>> dynamically change the row range each worker should process in realtime...
>>>>>>
>>>>>> I tried to find out if there is any candidate available out of the
>>>>>> box, instead of reinventing the wheel. At this moment I have not
discovered
>>>>>> any existing tool can parallelize ingestion tasks on one database.
Is Sqoop
>>>>>> a proper candidate from your knowledge?
>>>>>>
>>>>>> Thank you again and have a nice day.
>>>>>>
>>>>>> Best regards,
>>>>>> Yang
>>>>>>
>>>>>>
>>>>>>
>>>>>> 2016-12-30 8:28 GMT+01:00 ayan guha <guha.ayan@gmail.com>:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> "If data ingestion speed is faster than data production speed,
then
>>>>>>> eventually the entire database will be harvested and those workers
will
>>>>>>> start to "tail" the database for new data streams and the processing
>>>>>>> becomes real time."
>>>>>>>
>>>>>>> This part is really database dependent. So it will be hard to
>>>>>>> generalize it. For example, say you have a batch interval of
10
>>>>>>> secs....what happens if you get more than one updates on the
same row
>>>>>>> within 10 secs? You will get a snapshot of every 10 secs. Now,
different
>>>>>>> databases provide different mechanisms to expose all DML changes,
MySQL has
>>>>>>> binlogs, oracle has log shipping, cdc,golden gate and so on....typically
it
>>>>>>> requires new product or new licenses and most likely new component
>>>>>>> installation on production db :)
>>>>>>>
>>>>>>> So, if we keep real CDC solutions out of scope, a simple snapshot
>>>>>>> solution can be achieved fairly easily by
>>>>>>>
>>>>>>> 1. Adding INSERTED_ON and UPDATED_ON columns on the source table(s).
>>>>>>> 2. Keeping a simple table level check pointing (TABLENAME,TS_MAX)
>>>>>>> 3. Running an extraction/load mechanism which will take data
from DB
>>>>>>> (where INSERTED_ON > TS_MAX or UPDATED_ON>TS_MAX) and put
to HDFS. This can
>>>>>>> be sqoop,spark,ETL tool like informatica,ODI,SAP etc. In addition,
you can
>>>>>>> directly write to Kafka as well. Sqoop, Spark supports Kafka.
Most of the
>>>>>>> ETL tools would too...
>>>>>>> 4. Finally, update check point...
>>>>>>>
>>>>>>> You may "determine" checkpoint from the data you already have
in
>>>>>>> HDFS if you create a Hive structure on it.
>>>>>>>
>>>>>>> Best
>>>>>>> AYan
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Dec 30, 2016 at 4:45 PM, 任弘迪 <ryan.hd.ren@gmail.com>
wrote:
>>>>>>>
>>>>>>>> why not sync binlog of mysql(hopefully the data is immutable
and
>>>>>>>> the table is append-only), send the log through kafka and
then consume it
>>>>>>>> by spark streaming?
>>>>>>>>
>>>>>>>> On Fri, Dec 30, 2016 at 9:01 AM, Michael Armbrust <
>>>>>>>> michael@databricks.com> wrote:
>>>>>>>>
>>>>>>>>> We don't support this yet, but I've opened this JIRA
as it sounds
>>>>>>>>> generally useful: https://issues.apache.
>>>>>>>>> org/jira/browse/SPARK-19031
>>>>>>>>>
>>>>>>>>> In the mean time you could try implementing your own
Source, but
>>>>>>>>> that is pretty low level and is not yet a stable API.
>>>>>>>>>
>>>>>>>>> On Thu, Dec 29, 2016 at 4:05 AM, "Yuanzhe Yang (杨远哲)"
<
>>>>>>>>> yyz1989@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi all,
>>>>>>>>>>
>>>>>>>>>> Thanks a lot for your contributions to bring us new
technologies.
>>>>>>>>>>
>>>>>>>>>> I don't want to waste your time, so before I write
to you, I
>>>>>>>>>> googled, checked stackoverflow and mailing list archive
with keywords
>>>>>>>>>> "streaming" and "jdbc". But I was not able to get
any solution to my use
>>>>>>>>>> case. I hope I can get some clarification from you.
>>>>>>>>>>
>>>>>>>>>> The use case is quite straightforward, I need to
harvest a
>>>>>>>>>> relational database via jdbc, do something with data,
and store result into
>>>>>>>>>> Kafka. I am stuck at the first step, and the difficulty
is as follows:
>>>>>>>>>>
>>>>>>>>>> 1. The database is too large to ingest with one thread.
>>>>>>>>>> 2. The database is dynamic and time series data comes
in
>>>>>>>>>> constantly.
>>>>>>>>>>
>>>>>>>>>> Then an ideal workflow is that multiple workers process
>>>>>>>>>> partitions of data incrementally according to a time
window. For example,
>>>>>>>>>> the processing starts from the earliest data with
each batch containing
>>>>>>>>>> data for one hour. If data ingestion speed is faster
than data production
>>>>>>>>>> speed, then eventually the entire database will be
harvested and those
>>>>>>>>>> workers will start to "tail" the database for new
data streams and the
>>>>>>>>>> processing becomes real time.
>>>>>>>>>>
>>>>>>>>>> With Spark SQL I can ingest data from a JDBC source
with
>>>>>>>>>> partitions divided by time windows, but how can I
dynamically increment the
>>>>>>>>>> time windows during execution? Assume that there
are two workers ingesting
>>>>>>>>>> data of 2017-01-01 and 2017-01-02, the one which
finishes quicker gets next
>>>>>>>>>> task for 2017-01-03. But I am not able to find out
how to increment those
>>>>>>>>>> values during execution.
>>>>>>>>>>
>>>>>>>>>> Then I looked into Structured Streaming. It looks
much more
>>>>>>>>>> promising because window operations based on event
time are considered
>>>>>>>>>> during streaming, which could be the solution to
my use case. However, from
>>>>>>>>>> documentation and code example I did not find anything
related to streaming
>>>>>>>>>> data from a growing database. Is there anything I
can read to achieve my
>>>>>>>>>> goal?
>>>>>>>>>>
>>>>>>>>>> Any suggestion is highly appreciated. Thank you very
much and
>>>>>>>>>> have a nice day.
>>>>>>>>>>
>>>>>>>>>> Best regards,
>>>>>>>>>> Yang
>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>> ---------
>>>>>>>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best Regards,
>>>>>>> Ayan Guha
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best Regards,
>>>>> Ayan Guha
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>

Mime
View raw message