ignite-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vladimir Ozerov <voze...@gridgain.com>
Subject Re: Historical rebalance
Date Mon, 03 Dec 2018 10:46:19 GMT
Roman,

We already track updates on per-transaction basis. The only difference is
that instead of doing a single "increment(1)" for transaction we do
"increment(X)" where X is number of updates in the given transaction.

On Mon, Dec 3, 2018 at 1:16 PM Roman Kondakov <kondakov87@mail.ru.invalid>
wrote:

> Igor, Vladimir, Ivan,
>
> perhaps, we are focused too much on update counters. This feature was
> designed for the continuous queries and it may not be suited well for
> the historical rebalance. What if we would track updates on
> per-transaction basis instead of per-update basis? Let's consider two
> counters: low-water mark (LWM) and high-water mark (HWM) which should be
> added to each partition. They have the following properties:
>
> * HWM - is a plane atomic counter. When Tx makes its first write on
> primary node it does incrementAndGet for this counter and remembers
> obtained value within its context. This counter can be considered as tx
> id within current partition - transactions should maintain per-partition
> map of their HWM ids. WAL pointer to the first record should remembered
> in this map. Also this id should be recorded to WAL data records.
>
> When Tx sends updates to backups it sends Tx HWM too. When backup
> receives this message from the primary node it takes HWM and do
> setIfGreater on the local HWM counter.
>
> * LWM - is a plane atomic counter. When Tx terminates (either with
> commit or rollback) it updates its local LWM in the same manner as
> update counters do it using holes tracking. For example, if partition's
> LWM = 10 now, and tx with id (HWM id) = 12 commits, we do not update
> partition LWM until tx with id = 11 is committed. When id = 11 is
> committed, LWM is set to 12. If we have LWM == N, this means that all
> transactions with id <= N have been terminated for the current partition
> and all data is already recorded in the local partition.
>
> Brief summary for both counters: HWM - means that partition has already
> seen at least one update of transactions with id <= HWM. LWM means that
> partition has all updates made by transactions wth id <= LWM.
>
> LWM is always <= HWM.
>
> On checkpoint we should store only these two counters in checkpoint
> record. As optimization we can also store list of pending LWMs - ids
> which haven't been merged to LWM because of the holes in sequence.
>
> Historical rebalance:
>
> 1. Demander knows its LWM - all updates before it has been applied.
> Demander sends LWM to supplier.
>
> 2. Supplier finds the earliest checkpoint where HWM(supplier) <= LWM
> (demander)
>
> 3. Supplier starts moving forward on WAL until it finds first data
> record with HWM id = LWM (demander). From this point WAL can be
> rebalanced to demander.
>
> In this approach updates and checkpoints on primary and backup can be
> reordered in any way, but we can always find a proper point to read WAL
> from.
>
> Let's consider a couple of examples. In this examples transaction
> updates marked as w1(a) - transaction 1 updates key=a, c1 - transaction
> 1 is committed, cp(1, 0) - checkpoint with HWM=1 and  LWM=0. (HWM,LWM) -
> current counters after operation. (HWM,LWM[hole1, hole2]) - counters
> with holes in LWM.
>
>
> 1. Simple case with no reordering:
>
> PRIMARY
> -----w1(a)---cp(1,0)---w2(b)----w1(c)--------------c1--------c2-----cp(2,2)
> (HWM,LWM)    (1,0)             (2,0)    (2,0)             (2,1)     (2,2)
>                |                  |        |                 |        |
> BACKUP
> ------w1(a)-------------w2(b)----w1(c)---cp(2,0)----c1--------c2-----cp(2,2)
> (HWM,LWM)    (1,0)             (2,0)    (2,0)             (2,1)     (2,2)
>
>
> In this case if backup failed before c1 it will receive all updates from
> the beginning (HWM=0).
> If it fails between c1 and c2, it will receive WAL from primary's cp(1,0),
> because tx with id=1 is fully processed on backup: HWM(supplier cp(1,0))=1
> == LWM(demander)=1
> if backup fails after c2, it will receive nothing because it has all
> updates HWM(supplier)=2 == LWM(demander)=2
>
>
>
> 2. Case with reordering
>
> PRIMARY
> -----w1(a)---cp(1,0)---w2(b)------cp(2,0)----------w1(c)------c1-----c2-----------cp(2,2)
> (HWM,LWM)    (1,0)             (2,0)                       (2,0)
>  (2,1)  (2,2)
>                  \_____           |                           |
> \   |
>                        \_______   |                           |
>  \__|_______
>                                \__|______                     |
>   |       \
>                                   |      \                    |
>   |        \
> BACKUP
> -------------------------w2(b)---w1(a)----cp(2,0)---w1(c)------------c2-------c1-----cp(2,2)
> (HWM,LWM)                       (2,0)   (2,0)              (2,0)
> (2,0[2])  (2,2)
>
>
> Note here we have a hole on backup when tx2 has committed earlier than tx1
> and LWM wasn't changed at this moment.
>
> In last case if backup is failed before c1, the entire WAL will be
> supplied because LWM=0 until this moment.
> If backup fails after c1 - there is nothing to rebalance, because
> HWM(supplier)=2 == LWM(demander)=2
>
>
> What do you think?
>
>
> --
> Kind Regards
> Roman Kondakov
>
> On 30.11.2018 2:01, Seliverstov Igor wrote:
> > Vladimir,
> >
> > Look at my example:
> >
> > One active transaction (Tx1 which does opX ops) while another tx (Tx2
> which
> > does opX' ops) is finishes with uc4:
> >
> >
> ----uc1--op1----op2---uc2--op1'----uc3--uc4---op3------------X-------------
> > Node1
> >
> >
> >
> > ----uc1----op1----uc2----op2----uc3--op3----------uc4----cp1----     Tx1
> -
> >                              ^         |                  |
> >                                 |
> >                               ------------------------
> >                              | -Node2
> >                                                            ^------
> >                                 |
> >                                                                    |
> >                                   |
> > ----uc1-------------uc2-------------uc3--------op1'----uc4----cp1----
> > Tx2 -
> >
> >
> > state on Node2: tx1 -> op3 -> uc2
> >                            cp1 [current=uc4, backpointer=uc2]
> >
> > Here op2 was acknowledged by op3, op3 was applied before op1' (linearized
> > by WAL).
> >
> > All nodes having uc4 must have op1' because uc4 cannot be get earlier
> than
> > prepare stage while prepare stage happens after all updates so *op1'
> > happens before uc4* regardless Tx2 was committed or rolled back.
> >
> > This means *op2 happens before uc4* (uc4 cannot be earlier op2 on any
> node
> > because on Node2 op2 was already finished (acknowledged by op3) when op1'
> > happens)
> >
> > That was my idea which easy to proof.
> >
> > You used a different approach, but yes, It has to work.
> >
> > чт, 29 нояб. 2018 г. в 22:19, Vladimir Ozerov <vozerov@gridgain.com>:
> >
> >> "If more recent WAL records will contain *ALL* updates of the
> transaction"
> >> -> "More recent WAL records will contain *ALL* updates of the
> transaction"
> >>
> >> On Thu, Nov 29, 2018 at 10:15 PM Vladimir Ozerov <vozerov@gridgain.com>
> >> wrote:
> >>
> >>> Igor,
> >>>
> >>> Yes, I tried to draw different configurations, and it really seems to
> >>> work, despite of being very hard to proof due to non-inituitive HB
> edges.
> >>> So let me try to spell the algorithm once again to make sure that we
> are
> >> on
> >>> the same page here.
> >>>
> >>> 1) There are two nodes - primary (P) and backup (B)
> >>> 2) There are three type of events: small transactions which possibly
> >>> increments update counter (ucX), one long active transaction which is
> >> split
> >>> into multiple operations (opX), and checkpoints (cpX)
> >>> 3) Every node always has current update counter. When transaction
> commits
> >>> it may or may not shift this counter further depending on whether there
> >> are
> >>> holes behind. But we have a strict rule that it always grow. Higher
> >>> coutners synchrnoizes with smaller. Possible cases:
> >>> ----uc1----uc2----uc3----
> >>> ----uc1--------uc3------- // uc2 missing due to reorder, but is is ok
> >>>
> >>> 4) Operations within a single transaction is always applied
> sequentially,
> >>> and hence also have HB edge:
> >>> ----op1----op2----op3----
> >>>
> >>> 5) When transaction operation happens, we save in memory current update
> >>> counter available at this moment. I.e. we have a map from transaction
> ID
> >> to
> >>> update counter which was relevant by the time last *completed*
> operation
> >>> *started*. This is very important thing - we remember the counter when
> >>> operation starts, but update the map only when it finishes. This is
> >> needed
> >>> for situation when update counter is bumber in the middle of a long
> >>> operation.
> >>> ----uc1----op1----op2----uc2----uc3----op3----
> >>>              |      |                    |
> >>>             uc1    uc1                  uc3
> >>>
> >>> state: tx1 -> op3 -> uc3
> >>>
> >>> 6) Whenever checkpoint occurs, we save two counters with: "current" and
> >>> "backpointer". The latter is the smallest update counter associated
> with
> >>> active transactions. If there are no active transactions, current
> update
> >>> counter is used.
> >>>
> >>> Example 1: no active transactions.
> >>> ----uc1----cp1----
> >>>       ^      |
> >>>       --------
> >>>
> >>> state: cp1 [current=uc1, backpointer=uc1]
> >>>
> >>> Example 2: one active transaction:
> >>>                                   ---------------
> >>>                                   |             |
> >>> ----uc1----op1----uc2----op2----op3----uc3----cp1----
> >>>                     ^             |
> >>>                     --------------
> >>>
> >>> state: tx1 -> op3 -> uc2
> >>>         cp1 [current=uc3, backpointer=uc2]
> >>>
> >>> 7) Historical rebalance:
> >>> 7.1) Demander finds latest checkpoint, get it's backpointer and sends
> it
> >>> to supplier.
> >>> 7.2) Supplier finds earliest checkpoint where [supplier(current) <=
> >>> demander(backpointer)]
> >>> 7.3) Supplier reads checkpoint backpointer and finds associated WAL
> >>> record. This is where we start.
> >>>
> >>> So in terms of WAL we have: supplier[uc_backpointer <- cp(uc_current
<=
> >>> demanter_uc_backpointer)] <- demander[uc_backpointer <- cp(last)]
> >>>
> >>> Now the most important - why it works :-)
> >>> 1) Transaction opeartions are sequential, so at the time of crash nodes
> >>> are *at most one operation ahead *each other
> >>> 2) Demander goes to the past and finds update counter which was current
> >> at
> >>> the time of last TX completed operation
> >>> 3) Supplier goes to the closest checkpoint in the past where this
> update
> >>> counter either doesn't exist or just appeared
> >>> 4) Transaction cannot be committed on supplier at this checkpoint, as
> it
> >>> would violate UC happens-before rule
> >>> 5) Tranasction may have not started yet on supplier at this point. If
> >> more
> >>> recent WAL records will contain *ALL* updates of the transaction
> >>> 6) Transaction may exist on supplier at this checkpoint. Thanks to p.1
> we
> >>> must skip at most one operation. Jump back through supplier's
> checkpoint
> >>> backpointer is guaranteed to do this.
> >>>
> >>> Igor, do we have the same understanding here?
> >>>
> >>> Vladimir.
> >>>
> >>> On Thu, Nov 29, 2018 at 2:47 PM Seliverstov Igor <gvvinblade@gmail.com
> >
> >>> wrote:
> >>>
> >>>> Ivan,
> >>>>
> >>>> different transactions may be applied in different order on backup
> >> nodes.
> >>>> That's why we need an active tx set
> >>>> and some sorting by their update times. The idea is to identify a
> point
> >> in
> >>>> time which starting from we may lost some updates.
> >>>> This point:
> >>>>     1) is the last acknowledged by all backups (including possible
> >> further
> >>>> demander) update on timeline;
> >>>>     2) have a specific update counter (aka back-counter) which we
> going
> >> to
> >>>> start iteration from.
> >>>>
> >>>> After additional thinking on, I've identified a rule:
> >>>>
> >>>> There is two fences:
> >>>>    1) update counter (UC) - this means that all updates, with less UC
> >> than
> >>>> applied one, was applied on a node, having this UC.
> >>>>    2) update in scope of TX - all updates are applied one by one
> >>>> sequentially, this means that the fact of update guaranties the
> previous
> >>>> update (statement) was finished on all TX participants.
> >>>>
> >>>> Сombining them, we can say the next:
> >>>>
> >>>> All updates, that was acknowledged at the time the last update of tx,
> >>>> which
> >>>> updated UC, applied, are guaranteed to be presented on a node having
> >> such
> >>>> UC
> >>>>
> >>>> We can use this rule to find an iterator start pointer.
> >>>>
> >>>> ср, 28 нояб. 2018 г. в 20:26, Павлухин Иван <vololo100@gmail.com>:
> >>>>
> >>>>> Guys,
> >>>>>
> >>>>> Another one idea. We can introduce additional update counter which
is
> >>>>> incremented by MVCC transactions right after executing operation
> (like
> >>>>> is done for classic transactions). And we can use that counter for
> >>>>> searching needed WAL records. Can it did the trick?
> >>>>>
> >>>>> P.S. Mentally I am trying to separate facilities providing
> >>>>> transactions and durability. And it seems to me that those facilities
> >>>>> are in different dimensions.
> >>>>> ср, 28 нояб. 2018 г. в 16:26, Павлухин Иван
<vololo100@gmail.com>:
> >>>>>> Sorry, if it was stated that a SINGLE transaction updates are
> >> applied
> >>>>>> in a same order on all replicas then I have no questions so
far. I
> >>>>>> thought about reordering updates coming from different transactions.
> >>>>>>> I have not got why we can assume that reordering is not
possible.
> >>>> What
> >>>>>> have I missed?
> >>>>>> ср, 28 нояб. 2018 г. в 13:26, Павлухин Иван
<vololo100@gmail.com>:
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> Regarding Vladimir's new idea.
> >>>>>>>> We assume that transaction can be represented as a set
of
> >>>>> independent operations, which are applied in the same order on both
> >>>> primary
> >>>>> and backup nodes.
> >>>>>>> I have not got why we can assume that reordering is not
possible.
> >>>> What
> >>>>>>> have I missed?
> >>>>>>> вт, 27 нояб. 2018 г. в 14:42, Seliverstov Igor <
> >>>> gvvinblade@gmail.com>:
> >>>>>>>> Vladimir,
> >>>>>>>>
> >>>>>>>> I think I got your point,
> >>>>>>>>
> >>>>>>>> It should work if we do the next:
> >>>>>>>> introduce two structures: active list (txs) and candidate
list
> >>>>> (updCntr ->
> >>>>>>>> txn pairs)
> >>>>>>>>
> >>>>>>>> Track active txs, mapping them to actual update counter
at
> >> update
> >>>>> time.
> >>>>>>>> On each next update put update counter, associated with
previous
> >>>>> update,
> >>>>>>>> into a candidates list possibly overwrite existing value
> >> (checking
> >>>>> txn)
> >>>>>>>> On tx finish remove tx from active list only if appropriate
> >> update
> >>>>> counter
> >>>>>>>> (associated with finished tx) is applied.
> >>>>>>>> On update counter update set the minimal update counter
from the
> >>>>> candidates
> >>>>>>>> list as a back-counter, clear the candidate list and
remove an
> >>>>> associated
> >>>>>>>> tx from the active list if present.
> >>>>>>>> Use back-counter instead of actual update counter in
demand
> >>>> message.
> >>>>>>>> вт, 27 нояб. 2018 г. в 12:56, Seliverstov Igor
<
> >>>> gvvinblade@gmail.com
> >>>>>> :
> >>>>>>>>> Ivan,
> >>>>>>>>>
> >>>>>>>>> 1) The list is saved on each checkpoint, wholly
(all
> >>>> transactions
> >>>>> in
> >>>>>>>>> active state at checkpoint begin).
> >>>>>>>>> We need whole the list to get oldest transaction
because after
> >>>>>>>>> the previous oldest tx finishes, we need to get
the following
> >>>> one.
> >>>>>>>>> 2) I guess there is a description of how persistent
storage
> >>>> works
> >>>>> and how
> >>>>>>>>> it restores [1]
> >>>>>>>>>
> >>>>>>>>> Vladimir,
> >>>>>>>>>
> >>>>>>>>> the whole list of what we going to store on checkpoint
> >>>> (updated):
> >>>>>>>>> 1) Partition counter low watermark (LWM)
> >>>>>>>>> 2) WAL pointer of earliest active transaction write
to
> >> partition
> >>>>> at the
> >>>>>>>>> time the checkpoint have started
> >>>>>>>>> 3) List of prepared txs with acquired partition
counters
> >> (which
> >>>>> were
> >>>>>>>>> acquired but not applied yet)
> >>>>>>>>>
> >>>>>>>>> This way we don't need any additional info in demand
message.
> >>>>> Start point
> >>>>>>>>> can be easily determined using stored WAL "back-pointer".
> >>>>>>>>>
> >>>>>>>>> [1]
> >>>>>>>>>
> >>
> https://cwiki.apache.org/confluence/display/IGNITE/Ignite+Persistent+Store+-+under+the+hood#IgnitePersistentStore-underthehood-LocalRecoveryProcess
> >>>>>>>>>
> >>>>>>>>> вт, 27 нояб. 2018 г. в 11:19, Vladimir Ozerov
<
> >>>>> vozerov@gridgain.com>:
> >>>>>>>>>> Igor,
> >>>>>>>>>>
> >>>>>>>>>> Could you please elaborate - what is the whole
set of
> >>>> information
> >>>>> we are
> >>>>>>>>>> going to save at checkpoint time? From what
I understand this
> >>>>> should be:
> >>>>>>>>>> 1) List of active transactions with WAL pointers
of their
> >> first
> >>>>> writes
> >>>>>>>>>> 2) List of prepared transactions with their
update counters
> >>>>>>>>>> 3) Partition counter low watermark (LWM) - the
smallest
> >>>> partition
> >>>>> counter
> >>>>>>>>>> before which there are no prepared transactions.
> >>>>>>>>>>
> >>>>>>>>>> And the we send to supplier node a message:
"Give me all
> >>>> updates
> >>>>> starting
> >>>>>>>>>> from that LWM plus data for that transactions
which were
> >> active
> >>>>> when I
> >>>>>>>>>> failed".
> >>>>>>>>>>
> >>>>>>>>>> Am I right?
> >>>>>>>>>>
> >>>>>>>>>> On Fri, Nov 23, 2018 at 11:22 AM Seliverstov
Igor <
> >>>>> gvvinblade@gmail.com>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi Igniters,
> >>>>>>>>>>>
> >>>>>>>>>>> Currently I’m working on possible approaches
how to
> >> implement
> >>>>> historical
> >>>>>>>>>>> rebalance (delta rebalance using WAL iterator)
over MVCC
> >>>> caches.
> >>>>>>>>>>> The main difficulty is that MVCC writes
changes on tx
> >> active
> >>>>> phase while
> >>>>>>>>>>> partition update version, aka update counter,
is being
> >>>> applied
> >>>>> on tx
> >>>>>>>>>>> finish. This means we cannot start iteration
over WAL right
> >>>>> from the
> >>>>>>>>>>> pointer where the update counter updated,
but should
> >> include
> >>>>> updates,
> >>>>>>>>>> which
> >>>>>>>>>>> the transaction that updated the counter
did.
> >>>>>>>>>>>
> >>>>>>>>>>> These updates may be much earlier than the
point where the
> >>>>> update
> >>>>>>>>>> counter
> >>>>>>>>>>> was updated, so we have to be able to identify
the point
> >>>> where
> >>>>> the first
> >>>>>>>>>>> update happened.
> >>>>>>>>>>>
> >>>>>>>>>>> The proposed approach includes:
> >>>>>>>>>>>
> >>>>>>>>>>> 1) preserve list of active txs, sorted by
the time of their
> >>>>> first update
> >>>>>>>>>>> (using WAL ptr of first WAL record in tx)
> >>>>>>>>>>>
> >>>>>>>>>>> 2) persist this list on each checkpoint
(together with
> >> TxLog
> >>>> for
> >>>>>>>>>> example)
> >>>>>>>>>>> 4) send whole active tx list (transactions
which were in
> >>>> active
> >>>>> state at
> >>>>>>>>>>> the time the node was crushed, empty list
in case of
> >> graceful
> >>>>> node
> >>>>>>>>>> stop) as
> >>>>>>>>>>> a part of partition demand message.
> >>>>>>>>>>>
> >>>>>>>>>>> 4) find a checkpoint where the earliest
tx exists in
> >>>> persisted
> >>>>> txs and
> >>>>>>>>>> use
> >>>>>>>>>>> saved WAL ptr as a start point or apply
current approach in
> >>>>> case the
> >>>>>>>>>> active
> >>>>>>>>>>> tx list (sent on previous step) is empty
> >>>>>>>>>>>
> >>>>>>>>>>> 5) start iteration.
> >>>>>>>>>>>
> >>>>>>>>>>> Your thoughts?
> >>>>>>>>>>>
> >>>>>>>>>>> Regards,
> >>>>>>>>>>> Igor
> >>>>>>>
> >>>>>>>
> >>>>>>> --
> >>>>>>> Best regards,
> >>>>>>> Ivan Pavlukhin
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> Best regards,
> >>>>>> Ivan Pavlukhin
> >>>>>
> >>>>>
> >>>>> --
> >>>>> Best regards,
> >>>>> Ivan Pavlukhin
> >>>>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message