spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yuanzhe Yang <yyz1...@gmail.com>
Subject Re: [Spark Structured Streaming]: Is it possible to ingest data from a jdbc data source incrementally?
Date Tue, 03 Jan 2017 16:46:45 GMT
Hi Ayan,

Yeah, I understand your proposal, but according to here
http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases,
it says

Notice that lowerBound and upperBound are just used to decide the partition
stride, not for filtering the rows in table. So all rows in the table will
be partitioned and returned. This option applies only to reading.

So my interpretation is all rows in the table are ingested, and this
"lowerBound" and "upperBound" is the span of each partition. Well, I am not
a native English speaker, maybe it means differently?

Best regards,
Yang

2017-01-03 17:23 GMT+01:00 ayan guha <guha.ayan@gmail.com>:

> Hi
>
> You need to store and capture the Max of the column you intend to use for
> identifying new records (Ex: INSERTED_ON) after every successful run of
> your job. Then, use the value in lowerBound option.
>
> Essentially, you want to create a query like
>
> select * from table where INSERTED_ON > lowerBound and
> INSERTED_ON<upperBound
>
> everytime you run the job....
>
>
>
> On Wed, Jan 4, 2017 at 2:13 AM, Yuanzhe Yang <yyz1989@gmail.com> wrote:
>
>> Hi Ayan,
>>
>> Thanks a lot for your suggestion. I am currently looking into sqoop.
>>
>> Concerning your suggestion for Spark, it is indeed parallelized with
>> multiple workers, but the job is one-off and cannot keep streaming.
>> Moreover, I cannot specify any "start row" in the job, it will always
>> ingest the entire table. So I also cannot simulate a streaming process by
>> starting the job in fix intervals...
>>
>> Best regards,
>> Yang
>>
>> 2017-01-03 15:06 GMT+01:00 ayan guha <guha.ayan@gmail.com>:
>>
>>> Hi
>>>
>>> While the solutions provided by others looks promising and I'd like to
>>> try out few of them, our old pal sqoop already "does" the job. It has a
>>> incremental mode where you can provide a --check-column and
>>> --last-modified-value combination to grab the data - and yes, sqoop
>>> essentially does it by running a MAP-only job which spawns number of
>>> parallel map task to grab data from DB.
>>>
>>> In Spark, you can use sqlContext.load function for JDBC and use
>>> partitionColumn and numPartition to define parallelism of connection.
>>>
>>> Best
>>> Ayan
>>>
>>> On Tue, Jan 3, 2017 at 10:49 PM, Yuanzhe Yang <yyz1989@gmail.com> wrote:
>>>
>>>> Hi Ayan,
>>>>
>>>> Thanks a lot for such a detailed response. I really appreciate it!
>>>>
>>>> I think this use case can be generalized, because the data is immutable
>>>> and append-only. We only need to find one column or timestamp to track the
>>>> last row consumed in the previous ingestion. This pattern should be common
>>>> when storing sensor data. If the data is mutable, then the solution will
be
>>>> surely difficult and vendor specific as you said.
>>>>
>>>> The workflow you proposed is very useful. The difficulty part is how to
>>>> parallelize the ingestion task. With Spark when I have multiple workers
>>>> working on the same job, I don't know if there is a way and how to
>>>> dynamically change the row range each worker should process in realtime...
>>>>
>>>> I tried to find out if there is any candidate available out of the box,
>>>> instead of reinventing the wheel. At this moment I have not discovered any
>>>> existing tool can parallelize ingestion tasks on one database. Is Sqoop a
>>>> proper candidate from your knowledge?
>>>>
>>>> Thank you again and have a nice day.
>>>>
>>>> Best regards,
>>>> Yang
>>>>
>>>>
>>>>
>>>> 2016-12-30 8:28 GMT+01:00 ayan guha <guha.ayan@gmail.com>:
>>>>
>>>>>
>>>>>
>>>>> "If data ingestion speed is faster than data production speed, then
>>>>> eventually the entire database will be harvested and those workers will
>>>>> start to "tail" the database for new data streams and the processing
>>>>> becomes real time."
>>>>>
>>>>> This part is really database dependent. So it will be hard to
>>>>> generalize it. For example, say you have a batch interval of 10
>>>>> secs....what happens if you get more than one updates on the same row
>>>>> within 10 secs? You will get a snapshot of every 10 secs. Now, different
>>>>> databases provide different mechanisms to expose all DML changes, MySQL
has
>>>>> binlogs, oracle has log shipping, cdc,golden gate and so on....typically
it
>>>>> requires new product or new licenses and most likely new component
>>>>> installation on production db :)
>>>>>
>>>>> So, if we keep real CDC solutions out of scope, a simple snapshot
>>>>> solution can be achieved fairly easily by
>>>>>
>>>>> 1. Adding INSERTED_ON and UPDATED_ON columns on the source table(s).
>>>>> 2. Keeping a simple table level check pointing (TABLENAME,TS_MAX)
>>>>> 3. Running an extraction/load mechanism which will take data from DB
>>>>> (where INSERTED_ON > TS_MAX or UPDATED_ON>TS_MAX) and put to HDFS.
This can
>>>>> be sqoop,spark,ETL tool like informatica,ODI,SAP etc. In addition, you
can
>>>>> directly write to Kafka as well. Sqoop, Spark supports Kafka. Most of
the
>>>>> ETL tools would too...
>>>>> 4. Finally, update check point...
>>>>>
>>>>> You may "determine" checkpoint from the data you already have in HDFS
>>>>> if you create a Hive structure on it.
>>>>>
>>>>> Best
>>>>> AYan
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Dec 30, 2016 at 4:45 PM, 任弘迪 <ryan.hd.ren@gmail.com>
wrote:
>>>>>
>>>>>> why not sync binlog of mysql(hopefully the data is immutable and
the
>>>>>> table is append-only), send the log through kafka and then consume
it by
>>>>>> spark streaming?
>>>>>>
>>>>>> On Fri, Dec 30, 2016 at 9:01 AM, Michael Armbrust <
>>>>>> michael@databricks.com> wrote:
>>>>>>
>>>>>>> We don't support this yet, but I've opened this JIRA as it sounds
>>>>>>> generally useful: https://issues.apache.org/jira/browse/SPARK-19031
>>>>>>>
>>>>>>> In the mean time you could try implementing your own Source,
but
>>>>>>> that is pretty low level and is not yet a stable API.
>>>>>>>
>>>>>>> On Thu, Dec 29, 2016 at 4:05 AM, "Yuanzhe Yang (杨远哲)" <
>>>>>>> yyz1989@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> Thanks a lot for your contributions to bring us new technologies.
>>>>>>>>
>>>>>>>> I don't want to waste your time, so before I write to you,
I
>>>>>>>> googled, checked stackoverflow and mailing list archive with
keywords
>>>>>>>> "streaming" and "jdbc". But I was not able to get any solution
to my use
>>>>>>>> case. I hope I can get some clarification from you.
>>>>>>>>
>>>>>>>> The use case is quite straightforward, I need to harvest
a
>>>>>>>> relational database via jdbc, do something with data, and
store result into
>>>>>>>> Kafka. I am stuck at the first step, and the difficulty is
as follows:
>>>>>>>>
>>>>>>>> 1. The database is too large to ingest with one thread.
>>>>>>>> 2. The database is dynamic and time series data comes in
constantly.
>>>>>>>>
>>>>>>>> Then an ideal workflow is that multiple workers process partitions
>>>>>>>> of data incrementally according to a time window. For example,
the
>>>>>>>> processing starts from the earliest data with each batch
containing data
>>>>>>>> for one hour. If data ingestion speed is faster than data
production speed,
>>>>>>>> then eventually the entire database will be harvested and
those workers
>>>>>>>> will start to "tail" the database for new data streams and
the processing
>>>>>>>> becomes real time.
>>>>>>>>
>>>>>>>> With Spark SQL I can ingest data from a JDBC source with
partitions
>>>>>>>> divided by time windows, but how can I dynamically increment
the time
>>>>>>>> windows during execution? Assume that there are two workers
ingesting data
>>>>>>>> of 2017-01-01 and 2017-01-02, the one which finishes quicker
gets next task
>>>>>>>> for 2017-01-03. But I am not able to find out how to increment
those values
>>>>>>>> during execution.
>>>>>>>>
>>>>>>>> Then I looked into Structured Streaming. It looks much more
>>>>>>>> promising because window operations based on event time are
considered
>>>>>>>> during streaming, which could be the solution to my use case.
However, from
>>>>>>>> documentation and code example I did not find anything related
to streaming
>>>>>>>> data from a growing database. Is there anything I can read
to achieve my
>>>>>>>> goal?
>>>>>>>>
>>>>>>>> Any suggestion is highly appreciated. Thank you very much
and have
>>>>>>>> a nice day.
>>>>>>>>
>>>>>>>> Best regards,
>>>>>>>> Yang
>>>>>>>> ------------------------------------------------------------
>>>>>>>> ---------
>>>>>>>> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best Regards,
>>>>> Ayan Guha
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Ayan Guha
>>>
>>
>>
>
>
> --
> Best Regards,
> Ayan Guha
>

Mime
View raw message