spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ryan Blue <rb...@netflix.com.INVALID>
Subject Re: DataSourceWriter V2 Api questions
Date Mon, 10 Sep 2018 21:04:36 GMT
I'm not sure if it was Sqoop or another similar system, but I've heard
about at least one Hadoop to RDBMS system working by writing to temporary
staging tables in the database. The commit then runns a big INSERT INTO
that selects from all the temporary tables and cleans up after by deleting
them. That way, the database can optimize the real insert by copying the
underlying files from one table into the other table.

On Mon, Sep 10, 2018 at 2:00 PM Arun Mahadevan <arunm@apache.org> wrote:

> >Well almost all relational databases you can move data in a transactional
> way. That’s what transactions are for.
>
> It would work, but I suspect in most cases it would involve moving data
> from temporary tables to the final tables
>
> Right now theres no mechanisms to let the individual tasks commit in a
> two-phase manner (Not sure if the CommitCordinator might help). If such an
> API is provided, the sources could use it as they wish (e.g. use XA support
> provided by mysql to implement it in a more efficient way than the driver
> moving from temp tables to destination tables).
>
> Definitely there are complexities involved, but I am not sure if the
> network partitioning comes into play here since the driver can act as the
> co-ordinator and can run in HA mode. And regarding the issue that Jungtaek
> brought up, 2PC doesn't require tasks to be running at the same time, we
> need a mechanism to take down tasks after they have prepared and bring up
> the tasks during the commit phase.
>
> Most of the sources would not need any of the above and just need a way to
> support Idempotent writes and like Ryan suggested we can enable this (if
> there are gaps in the current APIs).
>
>
> On Mon, 10 Sep 2018 at 13:43, Reynold Xin <rxin@databricks.com> wrote:
>
>> Well almost all relational databases you can move data in a transactional
>> way. That’s what transactions are for.
>>
>> For just straight HDFS, the move is a pretty fast operation so while it
>> is not completely transactional, the window of potential failure is pretty
>> short for appends. For writers at the partition level it is fine because it
>> is just renaming directory, which is atomic.
>>
>> On Mon, Sep 10, 2018 at 1:40 PM Jungtaek Lim <kabhwan@gmail.com> wrote:
>>
>>> When network partitioning happens it is pretty OK for me to see 2PC not
>>> working, cause we deal with global transaction. Recovery should be hard
>>> thing to get it correctly though. I completely agree it would require
>>> massive changes to Spark.
>>>
>>> What I couldn't find for underlying storages is moving data from staging
>>> table to final table in transactional way. I'm not fully sure but as I'm
>>> aware of, many storages would not support moving data, and even HDFS sink
>>> it is not strictly done in transactional way since we move multiple files
>>> with multiple operations. If coordinator just crashes it leaves partial
>>> write, and among writers and coordinator need to deal with ensuring it will
>>> not be going to be duplicated.
>>>
>>> Ryan replied me as Iceberg and HBase MVCC timestamps can enable us to
>>> implement "commit" (his reply didn't hit dev. mailing list though) but I'm
>>> not an expert of both twos and I couldn't still imagine it can deal with
>>> various crash cases.
>>>
>>> 2018년 9월 11일 (화) 오전 5:17, Reynold Xin <rxin@databricks.com>님이
작성:
>>>
>>>> I don't think two phase commit would work here at all.
>>>>
>>>> 1. It'd require massive changes to Spark.
>>>>
>>>> 2. Unless the underlying data source can provide an API to coordinate
>>>> commits (which few data sources I know provide something like that), 2PC
>>>> wouldn't work in the presence of network partitioning. You can't defy the
>>>> law of physics.
>>>>
>>>> Really the most common and simple way I've seen this working is through
>>>> staging tables and a final transaction to move data from staging table to
>>>> final table.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Sep 10, 2018 at 12:56 PM Jungtaek Lim <kabhwan@gmail.com>
>>>> wrote:
>>>>
>>>>> I guess we all are aware of limitation of contract on DSv2 writer.
>>>>> Actually it can be achieved only with HDFS sink (or other filesystem
based
>>>>> sinks) and other external storage are normally not feasible to implement
it
>>>>> because there's no way to couple a transaction with multiple clients
as
>>>>> well as coordinator can't take over transactions from writers to do the
>>>>> final commit.
>>>>>
>>>>> XA is also not a trivial one to get it correctly with current
>>>>> execution model: Spark doesn't require writer tasks to run at the same
time
>>>>> but to achieve 2PC they should run until end of transaction (closing
client
>>>>> before transaction ends normally means aborting transaction). Spark should
>>>>> also integrate 2PC with its checkpointing mechanism to guarantee
>>>>> completeness of batch. And it might require different integration for
>>>>> continuous mode.
>>>>>
>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>
>>>>> 2018년 9월 11일 (화) 오전 4:37, Arun Mahadevan <arunm@apache.org>님이
작성:
>>>>>
>>>>>> In some cases the implementations may be ok with eventual consistency
>>>>>> (and does not care if the output is written out atomically)
>>>>>>
>>>>>> XA can be one option for datasources that supports it and requires
>>>>>> atomicity but I am not sure how would one implement it with the current
>>>>>> API.
>>>>>>
>>>>>> May be we need to discuss improvements at the Datasource V2 API level
>>>>>> (e.g. individual tasks would "prepare" for commit and once the driver
>>>>>> receives "prepared" from all the tasks, a "commit" would be invoked
at each
>>>>>> of the individual tasks). Right now the responsibility of the final
>>>>>> "commit" is with the driver and it may not always be possible for
the
>>>>>> driver to take over the transactions started by the tasks.
>>>>>>
>>>>>>
>>>>>> On Mon, 10 Sep 2018 at 11:48, Dilip Biswal <dbiswal@us.ibm.com>
>>>>>> wrote:
>>>>>>
>>>>>>> This is a pretty big challenge in general for data sources --
for
>>>>>>> the vast majority of data stores, the boundary of a transaction
is per
>>>>>>> client. That is, you can't have two clients doing writes and
coordinating a
>>>>>>> single transaction. That's certainly the case for almost all
relational
>>>>>>> databases. Spark, on the other hand, will have multiple clients
(consider
>>>>>>> each task a client) writing to the same underlying data store.
>>>>>>>
>>>>>>> DB>> Perhaps we can explore two-phase commit protocol (aka
XA) for
>>>>>>> this ? Not sure how easy it is to implement this though :-)
>>>>>>>
>>>>>>> Regards,
>>>>>>> Dilip Biswal
>>>>>>> Tel: 408-463-4980 <(408)%20463-4980>
>>>>>>> dbiswal@us.ibm.com
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> ----- Original message -----
>>>>>>> From: Reynold Xin <rxin@databricks.com>
>>>>>>> To: Ryan Blue <rblue@netflix.com>
>>>>>>> Cc: ross.lawley@gmail.com, dev <dev@spark.apache.org>
>>>>>>> Subject: Re: DataSourceWriter V2 Api questions
>>>>>>> Date: Mon, Sep 10, 2018 10:26 AM
>>>>>>>
>>>>>>> I don't think the problem is just whether we have a starting
point
>>>>>>> for write. As a matter of fact there's always a starting point
for write,
>>>>>>> whether it is explicit or implicit.
>>>>>>>
>>>>>>> This is a pretty big challenge in general for data sources --
for
>>>>>>> the vast majority of data stores, the boundary of a transaction
is per
>>>>>>> client. That is, you can't have two clients doing writes and
coordinating a
>>>>>>> single transaction. That's certainly the case for almost all
relational
>>>>>>> databases. Spark, on the other hand, will have multiple clients
(consider
>>>>>>> each task a client) writing to the same underlying data store.
>>>>>>>
>>>>>>> On Mon, Sep 10, 2018 at 10:19 AM Ryan Blue <rblue@netflix.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Ross, I think the intent is to create a single transaction on
the
>>>>>>> driver, write as part of it in each task, and then commit the
transaction
>>>>>>> once the tasks complete. Is that possible in your implementation?
>>>>>>>
>>>>>>> I think that part of this is made more difficult by not having
a
>>>>>>> clear starting point for a write, which we are fixing in the
redesign of
>>>>>>> the v2 API. That will have a method that creates a Write to track
the
>>>>>>> operation. That can create your transaction when it is created
and commit
>>>>>>> the transaction when commit is called on it.
>>>>>>>
>>>>>>> rb
>>>>>>>
>>>>>>> On Mon, Sep 10, 2018 at 9:05 AM Reynold Xin <rxin@databricks.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Typically people do it via transactions, or staging tables.
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Sep 10, 2018 at 2:07 AM Ross Lawley <ross.lawley@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I've been prototyping an implementation of the DataSource V2
writer
>>>>>>> for the MongoDB Spark Connector and I have a couple of questions
about how
>>>>>>> its intended to be used with database systems. According to the
Javadoc for
>>>>>>> DataWriter.commit():
>>>>>>>
>>>>>>> *"this method should still "hide" the written data and ask the
>>>>>>> DataSourceWriter at driver side to do the final commit via
>>>>>>> WriterCommitMessage"*
>>>>>>>
>>>>>>> Although, MongoDB now has transactions, it doesn't have a way
to
>>>>>>> "hide" the data once it has been written. So as soon as the DataWriter
has
>>>>>>> committed the data, it has been inserted/updated in the collection
and is
>>>>>>> discoverable - thereby breaking the documented contract.
>>>>>>>
>>>>>>> I was wondering how other databases systems plan to implement
this
>>>>>>> API and meet the contract as per the Javadoc?
>>>>>>>
>>>>>>> Many thanks
>>>>>>>
>>>>>>> Ross
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Ryan Blue
>>>>>>> Software Engineer
>>>>>>> Netflix
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> ---------------------------------------------------------------------
>>>>>>> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>>>>>>
>>>>>>

-- 
Ryan Blue
Software Engineer
Netflix

Mime
View raw message