spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Arun Mahadevan <ar...@apache.org>
Subject Re: DataSourceWriter V2 Api questions
Date Tue, 11 Sep 2018 13:39:25 GMT
>Some being said it is exactly-once when the output is eventually
exactly-once, whereas others being said there should be no side effect,
like consumer shouldn't see partial write. I guess 2PC is former, since
some partitions can commit earlier while other partitions fail to commit
for some time.
Yes its more about guaranteeing atomicity like all partitions eventually
commit or none commits. The visibility of the data for the readers is
orthogonal (e.g setting the isolation levels like serializable for XA) and
in general its difficult to guarantee that data across partitions are
visible at once. The approach like staging table and global commit works in
a centralized set up but can be difficult to do in a distributed manner
across partitions (e.g each partition output goes to a different database)

On Mon, 10 Sep 2018 at 21:23, Jungtaek Lim <kabhwan@gmail.com> wrote:

> IMHO that's up to how we would like to be strict about "exactly-once".
>
> Some being said it is exactly-once when the output is eventually
> exactly-once, whereas others being said there should be no side effect,
> like consumer shouldn't see partial write. I guess 2PC is former, since
> some partitions can commit earlier while other partitions fail to commit
> for some time.
>
> Being said, there may be couple of alternatives other than the contract
> Spark provides/requires, and I'd like to see how Spark community wants to
> deal with others. Would we want to disallow alternatives, like "replay +
> deduplicate write (per a batch/partition)" which ensures "eventually"
> exactly-once but cannot ensure the contract?
>
> Btw, unless achieving exactly-once is light enough for given sink, I think
> the sink should provide both at-least-once (also optimized for the
> semantic) vs exactly-once, and let end users pick one.
>
> 2018년 9월 11일 (화) 오후 12:57, Russell Spitzer <russell.spitzer@gmail.com>님이
> 작성:
>
>> Why is atomic operations a requirement? I feel like doubling the amount
>> of writes (with staging tables) is probably a tradeoff that the end user
>> should make.
>>
>> On Mon, Sep 10, 2018, 10:43 PM Wenchen Fan <cloud0fan@gmail.com> wrote:
>>
>>> Regardless the API, to use Spark to write data atomically, it requires
>>> 1. Write data distributedly, with a central coordinator at Spark driver.
>>> 2. The distributed writers are not guaranteed to run together at the
>>> same time. (This can be relaxed if we can extend the barrier scheduling
>>> feature)
>>> 3. The new data is visible if and only if all distributed writers
>>> success.
>>>
>>> According to these requirements, I think using a staging table is the
>>> most common way and maybe the only way. I'm not sure how 2PC can help, we
>>> don't want users to read partial data, so we need a final step to commit
>>> all the data together.
>>>
>>> For RDBMS data sources, I think a simple solution is to ask users to
>>> coalesce the input RDD/DataFrame into one partition, then we don't need to
>>> care about multi-client transaction. Or using a staging table like Ryan
>>> described before.
>>>
>>>
>>>
>>> On Tue, Sep 11, 2018 at 5:10 AM Jungtaek Lim <kabhwan@gmail.com> wrote:
>>>
>>>> > And regarding the issue that Jungtaek brought up, 2PC doesn't require
>>>> tasks to be running at the same time, we need a mechanism to take down
>>>> tasks after they have prepared and bring up the tasks during the commit
>>>> phase.
>>>>
>>>> I guess we already got into too much details here, but if it is based
>>>> on client transaction Spark must assign "commit" tasks to the executor
>>>> which task was finished "prepare", and if it loses executor it is not
>>>> feasible to force committing. Staging should come into play for that.
>>>>
>>>> We should also have mechanism for "recovery": Spark needs to ensure it
>>>> finalizes "commit" even in case of failures before starting a new batch.
>>>>
>>>> So not an easy thing to integrate correctly.
>>>>
>>>> 2018년 9월 11일 (화) 오전 6:00, Arun Mahadevan <arunm@apache.org>님이
작성:
>>>>
>>>>> >Well almost all relational databases you can move data in a
>>>>> transactional way. That’s what transactions are for.
>>>>>
>>>>> It would work, but I suspect in most cases it would involve moving
>>>>> data from temporary tables to the final tables
>>>>>
>>>>> Right now theres no mechanisms to let the individual tasks commit in
a
>>>>> two-phase manner (Not sure if the CommitCordinator might help). If such
an
>>>>> API is provided, the sources could use it as they wish (e.g. use XA support
>>>>> provided by mysql to implement it in a more efficient way than the driver
>>>>> moving from temp tables to destination tables).
>>>>>
>>>>> Definitely there are complexities involved, but I am not sure if the
>>>>> network partitioning comes into play here since the driver can act as
the
>>>>> co-ordinator and can run in HA mode. And regarding the issue that Jungtaek
>>>>> brought up, 2PC doesn't require tasks to be running at the same time,
we
>>>>> need a mechanism to take down tasks after they have prepared and bring
up
>>>>> the tasks during the commit phase.
>>>>>
>>>>> Most of the sources would not need any of the above and just need a
>>>>> way to support Idempotent writes and like Ryan suggested we can enable
this
>>>>> (if there are gaps in the current APIs).
>>>>>
>>>>>
>>>>> On Mon, 10 Sep 2018 at 13:43, Reynold Xin <rxin@databricks.com>
wrote:
>>>>>
>>>>>> Well almost all relational databases you can move data in a
>>>>>> transactional way. That’s what transactions are for.
>>>>>>
>>>>>> For just straight HDFS, the move is a pretty fast operation so while
>>>>>> it is not completely transactional, the window of potential failure
is
>>>>>> pretty short for appends. For writers at the partition level it is
fine
>>>>>> because it is just renaming directory, which is atomic.
>>>>>>
>>>>>> On Mon, Sep 10, 2018 at 1:40 PM Jungtaek Lim <kabhwan@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> When network partitioning happens it is pretty OK for me to see
2PC
>>>>>>> not working, cause we deal with global transaction. Recovery
should be hard
>>>>>>> thing to get it correctly though. I completely agree it would
require
>>>>>>> massive changes to Spark.
>>>>>>>
>>>>>>> What I couldn't find for underlying storages is moving data from
>>>>>>> staging table to final table in transactional way. I'm not fully
sure but
>>>>>>> as I'm aware of, many storages would not support moving data,
and even HDFS
>>>>>>> sink it is not strictly done in transactional way since we move
multiple
>>>>>>> files with multiple operations. If coordinator just crashes it
leaves
>>>>>>> partial write, and among writers and coordinator need to deal
with ensuring
>>>>>>> it will not be going to be duplicated.
>>>>>>>
>>>>>>> Ryan replied me as Iceberg and HBase MVCC timestamps can enable
us
>>>>>>> to implement "commit" (his reply didn't hit dev. mailing list
though) but
>>>>>>> I'm not an expert of both twos and I couldn't still imagine it
can deal
>>>>>>> with various crash cases.
>>>>>>>
>>>>>>> 2018년 9월 11일 (화) 오전 5:17, Reynold Xin <rxin@databricks.com>님이
작성:
>>>>>>>
>>>>>>>> I don't think two phase commit would work here at all.
>>>>>>>>
>>>>>>>> 1. It'd require massive changes to Spark.
>>>>>>>>
>>>>>>>> 2. Unless the underlying data source can provide an API to
>>>>>>>> coordinate commits (which few data sources I know provide
something like
>>>>>>>> that), 2PC wouldn't work in the presence of network partitioning.
You can't
>>>>>>>> defy the law of physics.
>>>>>>>>
>>>>>>>> Really the most common and simple way I've seen this working
is
>>>>>>>> through staging tables and a final transaction to move data
from staging
>>>>>>>> table to final table.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Sep 10, 2018 at 12:56 PM Jungtaek Lim <kabhwan@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I guess we all are aware of limitation of contract on
DSv2 writer.
>>>>>>>>> Actually it can be achieved only with HDFS sink (or other
filesystem based
>>>>>>>>> sinks) and other external storage are normally not feasible
to implement it
>>>>>>>>> because there's no way to couple a transaction with multiple
clients as
>>>>>>>>> well as coordinator can't take over transactions from
writers to do the
>>>>>>>>> final commit.
>>>>>>>>>
>>>>>>>>> XA is also not a trivial one to get it correctly with
current
>>>>>>>>> execution model: Spark doesn't require writer tasks to
run at the same time
>>>>>>>>> but to achieve 2PC they should run until end of transaction
(closing client
>>>>>>>>> before transaction ends normally means aborting transaction).
Spark should
>>>>>>>>> also integrate 2PC with its checkpointing mechanism to
guarantee
>>>>>>>>> completeness of batch. And it might require different
integration for
>>>>>>>>> continuous mode.
>>>>>>>>>
>>>>>>>>> Jungtaek Lim (HeartSaVioR)
>>>>>>>>>
>>>>>>>>> 2018년 9월 11일 (화) 오전 4:37, Arun Mahadevan
<arunm@apache.org>님이 작성:
>>>>>>>>>
>>>>>>>>>> In some cases the implementations may be ok with
eventual
>>>>>>>>>> consistency (and does not care if the output is written
out atomically)
>>>>>>>>>>
>>>>>>>>>> XA can be one option for datasources that supports
it and
>>>>>>>>>> requires atomicity but I am not sure how would one
implement it with the
>>>>>>>>>> current API.
>>>>>>>>>>
>>>>>>>>>> May be we need to discuss improvements at the Datasource
V2 API
>>>>>>>>>> level (e.g. individual tasks would "prepare" for
commit and once the driver
>>>>>>>>>> receives "prepared" from all the tasks, a "commit"
would be invoked at each
>>>>>>>>>> of the individual tasks). Right now the responsibility
of the final
>>>>>>>>>> "commit" is with the driver and it may not always
be possible for the
>>>>>>>>>> driver to take over the transactions started by the
tasks.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, 10 Sep 2018 at 11:48, Dilip Biswal <dbiswal@us.ibm.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> This is a pretty big challenge in general for
data sources --
>>>>>>>>>>> for the vast majority of data stores, the boundary
of a transaction is per
>>>>>>>>>>> client. That is, you can't have two clients doing
writes and coordinating a
>>>>>>>>>>> single transaction. That's certainly the case
for almost all relational
>>>>>>>>>>> databases. Spark, on the other hand, will have
multiple clients (consider
>>>>>>>>>>> each task a client) writing to the same underlying
data store.
>>>>>>>>>>>
>>>>>>>>>>> DB>> Perhaps we can explore two-phase commit
protocol (aka XA)
>>>>>>>>>>> for this ? Not sure how easy it is to implement
this though :-)
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>> Dilip Biswal
>>>>>>>>>>> Tel: 408-463-4980 <(408)%20463-4980>
>>>>>>>>>>> dbiswal@us.ibm.com
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> ----- Original message -----
>>>>>>>>>>> From: Reynold Xin <rxin@databricks.com>
>>>>>>>>>>> To: Ryan Blue <rblue@netflix.com>
>>>>>>>>>>> Cc: ross.lawley@gmail.com, dev <dev@spark.apache.org>
>>>>>>>>>>> Subject: Re: DataSourceWriter V2 Api questions
>>>>>>>>>>> Date: Mon, Sep 10, 2018 10:26 AM
>>>>>>>>>>>
>>>>>>>>>>> I don't think the problem is just whether we
have a starting
>>>>>>>>>>> point for write. As a matter of fact there's
always a starting point for
>>>>>>>>>>> write, whether it is explicit or implicit.
>>>>>>>>>>>
>>>>>>>>>>> This is a pretty big challenge in general for
data sources --
>>>>>>>>>>> for the vast majority of data stores, the boundary
of a transaction is per
>>>>>>>>>>> client. That is, you can't have two clients doing
writes and coordinating a
>>>>>>>>>>> single transaction. That's certainly the case
for almost all relational
>>>>>>>>>>> databases. Spark, on the other hand, will have
multiple clients (consider
>>>>>>>>>>> each task a client) writing to the same underlying
data store.
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Sep 10, 2018 at 10:19 AM Ryan Blue <rblue@netflix.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Ross, I think the intent is to create a single
transaction on
>>>>>>>>>>> the driver, write as part of it in each task,
and then commit the
>>>>>>>>>>> transaction once the tasks complete. Is that
possible in your
>>>>>>>>>>> implementation?
>>>>>>>>>>>
>>>>>>>>>>> I think that part of this is made more difficult
by not having a
>>>>>>>>>>> clear starting point for a write, which we are
fixing in the redesign of
>>>>>>>>>>> the v2 API. That will have a method that creates
a Write to track the
>>>>>>>>>>> operation. That can create your transaction when
it is created and commit
>>>>>>>>>>> the transaction when commit is called on it.
>>>>>>>>>>>
>>>>>>>>>>> rb
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Sep 10, 2018 at 9:05 AM Reynold Xin <rxin@databricks.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Typically people do it via transactions, or staging
tables.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Sep 10, 2018 at 2:07 AM Ross Lawley <
>>>>>>>>>>> ross.lawley@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi all,
>>>>>>>>>>>
>>>>>>>>>>> I've been prototyping an implementation of the
DataSource V2
>>>>>>>>>>> writer for the MongoDB Spark Connector and I
have a couple of questions
>>>>>>>>>>> about how its intended to be used with database
systems. According to the
>>>>>>>>>>> Javadoc for DataWriter.commit():
>>>>>>>>>>>
>>>>>>>>>>> *"this method should still "hide" the written
data and ask the
>>>>>>>>>>> DataSourceWriter at driver side to do the final
commit via
>>>>>>>>>>> WriterCommitMessage"*
>>>>>>>>>>>
>>>>>>>>>>> Although, MongoDB now has transactions, it doesn't
have a way to
>>>>>>>>>>> "hide" the data once it has been written. So
as soon as the DataWriter has
>>>>>>>>>>> committed the data, it has been inserted/updated
in the collection and is
>>>>>>>>>>> discoverable - thereby breaking the documented
contract.
>>>>>>>>>>>
>>>>>>>>>>> I was wondering how other databases systems plan
to implement
>>>>>>>>>>> this API and meet the contract as per the Javadoc?
>>>>>>>>>>>
>>>>>>>>>>> Many thanks
>>>>>>>>>>>
>>>>>>>>>>> Ross
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Ryan Blue
>>>>>>>>>>> Software Engineer
>>>>>>>>>>> Netflix
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>> To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>>>>>>>>>>
>>>>>>>>>>

Mime
View raw message