fluo-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] keith-turner commented on a change in pull request #1001: [WIP] - Issue 978
Date Thu, 01 Jan 1970 00:00:00 GMT
keith-turner commented on a change in pull request #1001: [WIP] - Issue 978
URL: https://github.com/apache/fluo/pull/1001#discussion_r162724373
 
 

 ##########
 File path: modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
 ##########
 @@ -872,447 +846,694 @@ public int getSize() {
     return size;
   }
 
-  private <V> void addCallback(CompletableFuture<V> cfuture, CommitData cd,
-      OnSuccessInterface<V> onSuccessInterface) {
-    cfuture.handleAsync((result, exception) -> {
-      if (exception != null) {
-        cd.commitObserver.failed(exception);
+  // TODO exception handling!!!! How?????
+  abstract class CommitStep {
+    private CommitStep nextStep;
+
+    // the boolean indicates if the operation was successful.
+    abstract CompletableFuture<Boolean> getMainOp(CommitData cd);
+
+    // create and run this op in the event that the main op was a failure
+    abstract CompletableFuture<Void> getFailureOp(CommitData cd);
+
+    // set the next step to run if this step is successful
+    CommitStep andThen(CommitStep next) {
+      this.nextStep = next;
+      return next;
+    }
+
+
+    CompletableFuture<Void> compose(CommitData cd) {
+      try {
+        return getMainOp(cd).thenComposeAsync(successful -> {
+          if (successful) {
+            if (nextStep != null) {
+              return nextStep.compose(cd);
+            } else {
+              return CompletableFuture.completedFuture(null);
+            }
+          } else {
+            return getFailureOp(cd);
+          }
+        }, env.getSharedResources().getAsyncCommitExecutor());
+      } catch (Exception e) {
+        cd.commitObserver.failed(e);
         return null;
-      } else {
+      }
+    }
+
+  }
+
+  abstract class ConditionalStep extends CommitStep {
+
+    CommitData cd;
+
+    public abstract Collection<ConditionalMutation> createMutations(CommitData cd);
+
+    public abstract Iterator<Result> handleUnknown(CommitData cd, Iterator<Result>
results)
+        throws Exception;
+
+    public abstract boolean processResults(CommitData cd, Iterator<Result> results)
+        throws Exception;
+
+    public AsyncConditionalWriter getACW(CommitData cd) {
+      return cd.acw;
+    }
+
+    @Override
+    CompletableFuture<Boolean> getMainOp(CommitData cd) {
+      // TODO not sure threading is correct
+      Executor ace = env.getSharedResources().getAsyncCommitExecutor();
+      return getACW(cd).apply(createMutations(cd)).thenCompose(results -> {
+        // ugh icky that this is an iterator, forces copy to inspect.. could refactor async
CW to
+        // return collection
+        ArrayList<Result> resultsList = new ArrayList<>();
+        Iterators.addAll(resultsList, results);
+        boolean containsUknown = false;
+        for (Result result : resultsList) {
+          try {
+            containsUknown |= result.getStatus() == Status.UNKNOWN;
+          } catch (Exception e) {
+            cd.commitObserver.failed(e);
+          }
+        }
+        if (containsUknown) {
+          // process unknown in sync executor
+          Executor se = env.getSharedResources().getSyncCommitExecutor();
+          return CompletableFuture.supplyAsync(() -> {
+            try {
+              return handleUnknown(cd, resultsList.iterator());
+            } catch (Exception e) {
+              cd.commitObserver.failed(e);
+              // TODO - what should we return? How do we stop processResults from running?
+              return null;
+            }
+          }, se);
+        } else {
+          return CompletableFuture.completedFuture(resultsList.iterator());
+        }
+      }).thenApplyAsync(results -> {
         try {
-          onSuccessInterface.onSuccess(result);
-          return null;
+          return processResults(cd, results);
         } catch (Exception e) {
           cd.commitObserver.failed(e);
-          return null;
+          return false;
         }
-      }
-    }, env.getSharedResources().getAsyncCommitExecutor());
+      }, ace);
+    }
+
+
   }
 
-  @Override
-  public synchronized void commitAsync(AsyncCommitObserver commitCallback) {
+  class LockPrimaryStep extends ConditionalStep {
 
-    checkIfOpen();
-    status = TxStatus.COMMIT_STARTED;
-    commitAttempted = true;
+    @Override
+    public Collection<ConditionalMutation> createMutations(CommitData cd) {
+      return Collections
+          .singleton(prewrite(cd.prow, cd.pcol, cd.pval, cd.prow, cd.pcol, isTriggerRow(cd.prow)));
+    }
 
-    try {
-      CommitData cd = createCommitData();
-      beginCommitAsync(cd, commitCallback, null);
-    } catch (Exception e) {
-      e.printStackTrace();
-      commitCallback.failed(e);
+    @Override
+    public Iterator<Result> handleUnknown(CommitData cd, Iterator<Result> results)
+        throws Exception {
+
+      Result result = Iterators.getOnlyElement(results);
+      Status mutationStatus = result.getStatus();
+      // TODO convert this code to async
+      while (mutationStatus == Status.UNKNOWN) {
+        TxInfo txInfo = TxInfo.getTransactionInfo(env, cd.prow, cd.pcol, startTs);
+
+        switch (txInfo.status) {
+          case LOCKED:
+            return Collections
+                .singleton(
+                    new Result(Status.ACCEPTED, result.getMutation(), result.getTabletServer()))
+                .iterator();
+          case ROLLED_BACK:
+            return Collections
+                .singleton(
+                    new Result(Status.REJECTED, result.getMutation(), result.getTabletServer()))
+                .iterator();
+          case UNKNOWN:
+            // TODO async
+            Result newResult = cd.cw.write(result.getMutation());
+            mutationStatus = newResult.getStatus();
+            if (mutationStatus != Status.UNKNOWN) {
+              return Collections.singleton(newResult).iterator();
+            }
+            // TODO handle case were data other tx has lock
+            break;
+          case COMMITTED:
+          default:
+            throw new IllegalStateException(
+                "unexpected tx state " + txInfo.status + " " + cd.prow + " " + cd.pcol);
+
+        }
+      }
+
+      // TODO
+      throw new IllegalStateException();
     }
-  }
 
-  private void beginCommitAsync(CommitData cd, AsyncCommitObserver commitCallback,
-      RowColumn primary) {
+    @Override
+    public boolean processResults(CommitData cd, Iterator<Result> results) throws Exception
{
+      Result result = Iterators.getOnlyElement(results);
+      return result.getStatus() == Status.ACCEPTED;
+    }
 
-    if (updates.size() == 0) {
-      // TODO do async
-      deleteWeakRow();
-      commitCallback.committed();
-      return;
+    @Override
+    CompletableFuture<Void> getFailureOp(CommitData cd) {
+      // TODO can this be simplified by pushing some code to the superclass?
+      return CompletableFuture.supplyAsync(() -> {
+        final ConditionalMutation pcm = Iterables.getOnlyElement(createMutations(cd));
+
+        cd.addPrimaryToRejected();
+        getStats().setRejected(cd.getRejected());
+        // TODO do async
+        try {
+          checkForOrphanedLocks(cd);
+        } catch (Exception e) {
+          cd.commitObserver.failed(e);
+          return null;
+        }
+        if (checkForAckCollision(pcm)) {
+          cd.commitObserver.alreadyAcknowledged();
+        } else {
+          cd.commitObserver.commitFailed(cd.getShortCollisionMessage());
+        }
+
+        return null;
+      }, env.getSharedResources().getSyncCommitExecutor());
     }
 
-    for (Map<Column, Bytes> cols : updates.values()) {
-      stats.incrementEntriesSet(cols.size());
+  }
+
+  class LockOtherStep extends ConditionalStep {
+
+    @Override
+    public AsyncConditionalWriter getACW(CommitData cd) {
+      return cd.bacw;
     }
 
-    Bytes primRow = null;
-    Column primCol = null;
 
-    if (primary != null) {
-      primRow = primary.getRow();
-      primCol = primary.getColumn();
-      if (notification != null && !primary.equals(notification.getRowColumn())) {
-        throw new IllegalArgumentException("Primary must be notification");
-      }
-    } else if (notification != null) {
-      primRow = notification.getRow();
-      primCol = notification.getColumn();
-    } else {
+    @Override
+    public Collection<ConditionalMutation> createMutations(CommitData cd) {
 
-      outer: for (Entry<Bytes, Map<Column, Bytes>> entry : updates.entrySet())
{
-        for (Entry<Column, Bytes> entry2 : entry.getValue().entrySet()) {
-          if (!isReadLock(entry2.getValue())) {
-            primRow = entry.getKey();
-            primCol = entry2.getKey();
-            break outer;
+      ArrayList<ConditionalMutation> mutations = new ArrayList<>();
+
+      for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet())
{
+        ConditionalFlutation cm = null;
+
+        for (Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet()) {
+          if (cm == null) {
+            cm = prewrite(rowUpdates.getKey(), colUpdates.getKey(), colUpdates.getValue(),
cd.prow,
+                cd.pcol, false);
+          } else {
+            prewrite(cm, colUpdates.getKey(), colUpdates.getValue(), cd.prow, cd.pcol, false);
           }
         }
-      }
 
-      if (primRow == null) {
-        // there are only read locks, so nothing to write
-        deleteWeakRow();
-        commitCallback.committed();
-        return;
+        mutations.add(cm);
       }
-    }
 
-    // get a primary column
-    cd.prow = primRow;
-    Map<Column, Bytes> colSet = updates.get(cd.prow);
-    cd.pcol = primCol;
-    cd.pval = colSet.remove(primCol);
-    if (colSet.size() == 0) {
-      updates.remove(cd.prow);
-    }
+      cd.acceptedRows = new HashSet<>();
 
-    cd.commitObserver = commitCallback;
+      return mutations;
+    }
 
-    // try to lock primary column
-    final ConditionalMutation pcm =
-        prewrite(cd.prow, cd.pcol, cd.pval, cd.prow, cd.pcol, isTriggerRow(cd.prow));
+    @Override
+    public Iterator<Result> handleUnknown(CommitData cd, Iterator<Result> results)
{
+      // TODO this step does not currently handle unknown
+      return results;
+    }
 
-    CompletableFuture<Iterator<Result>> cfuture = cd.acw.apply(Collections.singletonList(pcm));
-    addCallback(cfuture, cd, result -> postLockPrimary(cd, pcm, Iterators.getOnlyElement(result)));
-  }
+    @Override
+    public boolean processResults(CommitData cd, Iterator<Result> results) throws Exception
{
+
+      while (results.hasNext()) {
+        Result result = results.next();
+        // TODO handle unknown?
+        Bytes row = Bytes.of(result.getMutation().getRow());
+        if (result.getStatus() == Status.ACCEPTED) {
+          cd.acceptedRows.add(row);
+        } else {
+          cd.addToRejected(row, updates.get(row).keySet());
+        }
+      }
 
-  private void postLockPrimary(final CommitData cd, final ConditionalMutation pcm, Result
result)
-      throws Exception {
-    final Status mutationStatus = result.getStatus();
+      return cd.getRejected().size() == 0;
+    }
 
-    if (mutationStatus == Status.ACCEPTED) {
-      lockOtherColumns(cd);
-    } else {
-      env.getSharedResources().getSyncCommitExecutor().execute(new SynchronousCommitTask(cd)
{
-        @Override
-        protected void runCommitStep(CommitData cd) throws Exception {
-          synchronousPostLockPrimary(cd, pcm, mutationStatus);
+    @Override
+    CompletableFuture<Void> getFailureOp(CommitData cd) {
+      return CompletableFuture.supplyAsync(() -> {
+        getStats().setRejected(cd.getRejected());
+        try {
+          // Does this need to be async?
+          checkForOrphanedLocks(cd);
+        } catch (Exception e) {
+          cd.commitObserver.failed(e);
+        }
+        return null;
+      }, env.getSharedResources().getSyncCommitExecutor()).thenCompose(v -> {
+        try {
+          return rollbackLocks(cd);
+        } catch (Exception e) {
+          cd.commitObserver.failed(e);
+          return null;
         }
       });
     }
   }
 
-  private void synchronousPostLockPrimary(CommitData cd, ConditionalMutation pcm,
-      Status mutationStatus) throws AccumuloException, AccumuloSecurityException, Exception
{
-    // TODO convert this code to async
-    while (mutationStatus == Status.UNKNOWN) {
-      TxInfo txInfo = TxInfo.getTransactionInfo(env, cd.prow, cd.pcol, startTs);
-
-      switch (txInfo.status) {
-        case LOCKED:
-          mutationStatus = Status.ACCEPTED;
-          break;
-        case ROLLED_BACK:
-          mutationStatus = Status.REJECTED;
-          break;
-        case UNKNOWN:
-          // TODO async
-          mutationStatus = cd.cw.write(pcm).getStatus();
-          // TODO handle case were data other tx has lock
-          break;
-        case COMMITTED:
-        default:
-          throw new IllegalStateException(
-              "unexpected tx state " + txInfo.status + " " + cd.prow + " " + cd.pcol);
+  abstract class BatchWriterStep extends CommitStep {
+    public abstract Collection<Mutation> createMutations(CommitData cd);
 
-      }
+    @Override
+    CompletableFuture<Boolean> getMainOp(CommitData cd) {
+      return env.getSharedResources().getBatchWriter()
+          .writeMutationsAsyncFuture(createMutations(cd)).thenApply(v -> true);
     }
 
-    if (mutationStatus != Status.ACCEPTED) {
-      cd.addPrimaryToRejected();
-      getStats().setRejected(cd.getRejected());
-      // TODO do async
-      checkForOrphanedLocks(cd);
-      if (checkForAckCollision(pcm)) {
-        cd.commitObserver.alreadyAcknowledged();
-      } else {
-        cd.commitObserver.commitFailed(cd.getShortCollisionMessage());
-      }
-      return;
+    @Override
+    CompletableFuture<Void> getFailureOp(CommitData cd) {
+      throw new IllegalStateException("Failure not expected");
     }
+  }
+
+
+
+  private CompletableFuture<Void> rollbackLocks(CommitData cd) throws Exception {
+    CommitStep firstStep = new RollbackOtherLocks();
+    firstStep.andThen(new RollbackPrimaryLock());
+
+    return firstStep.compose(cd)
+        .thenAccept(v -> cd.commitObserver.commitFailed(cd.getShortCollisionMessage()));
 
-    lockOtherColumns(cd);
   }
 
-  private void lockOtherColumns(CommitData cd) {
-    ArrayList<ConditionalMutation> mutations = new ArrayList<>();
 
-    for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet()) {
-      ConditionalFlutation cm = null;
+  class RollbackOtherLocks extends BatchWriterStep {
 
-      for (Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet()) {
-        if (cm == null) {
-          cm = prewrite(rowUpdates.getKey(), colUpdates.getKey(), colUpdates.getValue(),
cd.prow,
-              cd.pcol, false);
-        } else {
-          prewrite(cm, colUpdates.getKey(), colUpdates.getValue(), cd.prow, cd.pcol, false);
+    @Override
+    public Collection<Mutation> createMutations(CommitData cd) {
+      // roll back locks
+
+      // TODO let rollback be done lazily? this makes GC more difficult
+
+      Flutation m;
+
+      ArrayList<Mutation> mutations = new ArrayList<>(cd.acceptedRows.size());
+      for (Bytes row : cd.acceptedRows) {
+        m = new Flutation(env, row);
+        for (Entry<Column, Bytes> entry : updates.get(row).entrySet()) {
+          if (isReadLock(entry.getValue())) {
+            m.put(entry.getKey(),
+                ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(startTs, true),
+                DelReadLockValue.encodeRollback());
+          } else {
+            m.put(entry.getKey(), ColumnConstants.DEL_LOCK_PREFIX | startTs,
+                DelLockValue.encodeRollback(false, true));
+          }
         }
+        mutations.add(m);
       }
 
-      mutations.add(cm);
+      return mutations;
     }
+  }
 
-    cd.acceptedRows = new HashSet<>();
+  class RollbackPrimaryLock extends BatchWriterStep {
 
-    CompletableFuture<Iterator<Result>> cfuture = cd.bacw.apply(mutations);
-    addCallback(cfuture, cd, results -> postLockOther(cd, results));
+    @Override
+    public Collection<Mutation> createMutations(CommitData cd) {
+      // mark transaction as complete for garbage collection purposes
+      Flutation m = new Flutation(env, cd.prow);
+
+      m.put(cd.pcol, ColumnConstants.DEL_LOCK_PREFIX | startTs,
+          DelLockValue.encodeRollback(startTs, true, true));
+      m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | startTs, EMPTY);
+
+      return Collections.singletonList(m);
+    }
   }
 
-  private void postLockOther(final CommitData cd, Iterator<Result> results) throws
Exception {
-    while (results.hasNext()) {
-      Result result = results.next();
-      // TODO handle unknown?
-      Bytes row = Bytes.of(result.getMutation().getRow());
-      if (result.getStatus() == Status.ACCEPTED) {
-        cd.acceptedRows.add(row);
-      } else {
-        cd.addToRejected(row, updates.get(row).keySet());
-      }
+  @VisibleForTesting
+  public boolean commitPrimaryColumn(CommitData cd, Stamp commitStamp) {
+
+    SyncCommitObserver sco = new SyncCommitObserver();
+    cd.commitObserver = sco;
+    try {
+      GetCommitStampStepTest firstStep = new GetCommitStampStepTest();
+      firstStep.setTestStamp(commitStamp);
+
+      firstStep.andThen(new WriteNotificationsStep()).andThen(new CommitPrimaryStep());
+
+      firstStep.compose(cd).thenAccept(v -> cd.commitObserver.committed());
+      sco.waitForCommit();
+    } catch (CommitException e) {
+      return false;
+    } catch (Exception e) {
+      throw new FluoException(e);
     }
+    return true;
+  }
 
-    if (cd.getRejected().size() > 0) {
-      getStats().setRejected(cd.getRejected());
-      env.getSharedResources().getSyncCommitExecutor().execute(new SynchronousCommitTask(cd)
{
-        @Override
-        protected void runCommitStep(CommitData cd) throws Exception {
-          checkForOrphanedLocks(cd);
-          rollbackOtherLocks(cd);
+  class GetCommitStampStep extends CommitStep {
+
+    @Override
+    CompletableFuture<Boolean> getMainOp(CommitData cd) {
+      // TODO Auto-generated method stub
+      // TODO set commitTs on commit data
+      return env.getSharedResources().getOracleClient().getStampAsync().thenApply(commitStamp
-> {
+        if (startTs < commitStamp.getGcTimestamp()) {
+          return false;
+        } else {
+          getStats().setCommitTs(commitStamp.getTxTimestamp());
+          return true;
         }
       });
-    } else if (stopAfterPreCommit) {
-      cd.commitObserver.committed();
-    } else {
-      CompletableFuture<Stamp> cfuture = env.getSharedResources().getOracleClient().getStampAsync();
-      addCallback(cfuture, cd, stamp -> beginSecondCommitPhase(cd, stamp));
     }
-  }
 
-  private void rollbackOtherLocks(CommitData cd) throws Exception {
-    // roll back locks
+    @Override
+    CompletableFuture<Void> getFailureOp(CommitData cd) {
+      return CompletableFuture.supplyAsync(() -> {
+        try {
+          rollbackLocks(cd);
+        } catch (Exception e) {
+          cd.commitObserver.failed(e);
+        }
+        return null;
+      }, env.getSharedResources().getSyncCommitExecutor());
+    }
+
+  }
 
-    // TODO let rollback be done lazily? this makes GC more difficult
+  class GetCommitStampStepTest extends GetCommitStampStep {
+    private Stamp testStamp;
 
-    Flutation m;
+    public void setTestStamp(Stamp testStamp) {
+      this.testStamp = testStamp;
+    }
 
-    ArrayList<Mutation> mutations = new ArrayList<>(cd.acceptedRows.size());
-    for (Bytes row : cd.acceptedRows) {
-      m = new Flutation(env, row);
-      for (Entry<Column, Bytes> entry : updates.get(row).entrySet()) {
-        if (isReadLock(entry.getValue())) {
-          m.put(entry.getKey(), ColumnConstants.RLOCK_PREFIX | ReadLockUtil.encodeTs(startTs,
true),
-              DelReadLockValue.encodeRollback());
+    @Override
+    CompletableFuture<Boolean> getMainOp(CommitData cd) {
+      // TODO Auto-generated method stub
+      // TODO set commitTs on commit data
+      return CompletableFuture.supplyAsync(() -> testStamp).thenApply(commitStamp ->
{
+        if (startTs < commitStamp.getGcTimestamp()) {
+          return false;
         } else {
-          m.put(entry.getKey(), ColumnConstants.DEL_LOCK_PREFIX | startTs,
-              DelLockValue.encodeRollback(false, true));
+          getStats().setCommitTs(commitStamp.getTxTimestamp());
+          return true;
         }
-      }
-      mutations.add(m);
+      });
     }
 
-    CompletableFuture<Void> cfuture =
-        env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations);
-    addCallback(cfuture, cd, result -> rollbackPrimaryLock(cd));
   }
 
-  private void rollbackPrimaryLock(CommitData cd) throws Exception {
-
-    // mark transaction as complete for garbage collection purposes
-    Flutation m = new Flutation(env, cd.prow);
+  class WriteNotificationsStep extends BatchWriterStep {
 
-    m.put(cd.pcol, ColumnConstants.DEL_LOCK_PREFIX | startTs,
-        DelLockValue.encodeRollback(startTs, true, true));
-    m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | startTs, EMPTY);
+    @Override
+    public Collection<Mutation> createMutations(CommitData cd) {
+      long commitTs = getStats().getCommitTs();
+      HashMap<Bytes, Mutation> mutations = new HashMap<>();
+
+      if (observedColumns.contains(cd.pcol) && isWrite(cd.pval) && !isDelete(cd.pval))
{
+        Flutation m = new Flutation(env, cd.prow);
+        Notification.put(env, m, cd.pcol, commitTs);
+        mutations.put(cd.prow, m);
+      }
 
-    CompletableFuture<Void> cfuture =
-        env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(m);
-    addCallback(cfuture, cd,
-        result -> cd.commitObserver.commitFailed(cd.getShortCollisionMessage()));
-  }
+      for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet())
{
+
+        for (Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet()) {
+          if (observedColumns.contains(colUpdates.getKey())) {
+            Bytes val = colUpdates.getValue();
+            if (isWrite(val) && !isDelete(val)) {
+              Mutation m = mutations.get(rowUpdates.getKey());
+              if (m == null) {
+                m = new Flutation(env, rowUpdates.getKey());
+                mutations.put(rowUpdates.getKey(), m);
+              }
+              Notification.put(env, m, colUpdates.getKey(), commitTs);
+            }
+          }
+        }
+      }
 
-  private void beginSecondCommitPhase(CommitData cd, Stamp commitStamp) throws Exception
{
-    if (startTs < commitStamp.getGcTimestamp()) {
-      rollbackOtherLocks(cd);
-    } else {
-      // Notification are written here for the following reasons :
-      // * At this point all columns are locked, this guarantees that anything triggering
as a
-      // result of this transaction will see all of this transactions changes.
-      // * The transaction is not yet committed. If the process dies at this point whatever
-      // was running this transaction should rerun and recreate all of the notifications.
-      // The next transactions will rerun because this transaction will have to be rolled
back.
-      // * If notifications are written in the 2nd phase of commit, then when the 2nd phase
-      // partially succeeds notifications may never be written. Because in the case of failure
-      // notifications would not be written until a column is read and it may never be read.
-      // See https://github.com/fluo-io/fluo/issues/642
-      //
-      // Its very important the notifications which trigger an observer are deleted after
the 2nd
-      // phase of commit finishes.
-      getStats().setCommitTs(commitStamp.getTxTimestamp());
-      writeNotificationsAsync(cd, commitStamp.getTxTimestamp());
+      for (Entry<Bytes, Set<Column>> entry : weakNotifications.entrySet()) {
+        Mutation m = mutations.get(entry.getKey());
+        if (m == null) {
+          m = new Flutation(env, entry.getKey());
+          mutations.put(entry.getKey(), m);
+        }
+        for (Column col : entry.getValue()) {
+          Notification.put(env, m, col, commitTs);
+        }
+      }
+      return mutations.values();
     }
+
   }
 
-  private void writeNotificationsAsync(CommitData cd, final long commitTs) {
+  class CommitPrimaryStep extends ConditionalStep {
 
-    HashMap<Bytes, Mutation> mutations = new HashMap<>();
+    @Override
+    public Collection<ConditionalMutation> createMutations(CommitData cd) {
+      long commitTs = getStats().getCommitTs();
+      IteratorSetting iterConf = new IteratorSetting(10, PrewriteIterator.class);
+      PrewriteIterator.setSnaptime(iterConf, startTs);
+      boolean isTrigger = isTriggerRow(cd.prow) && cd.pcol.equals(notification.getColumn());
 
-    if (observedColumns.contains(cd.pcol) && isWrite(cd.pval) && !isDelete(cd.pval))
{
-      Flutation m = new Flutation(env, cd.prow);
-      Notification.put(env, m, cd.pcol, commitTs);
-      mutations.put(cd.prow, m);
+      Condition lockCheck =
+          new FluoCondition(env, cd.pcol).setIterators(iterConf).setValue(LockValue.encode(cd.prow,
+              cd.pcol, isWrite(cd.pval), isDelete(cd.pval), isTrigger, getTransactorID()));
+      final ConditionalMutation delLockMutation = new ConditionalFlutation(env, cd.prow,
lockCheck);
+
+      ColumnUtil.commitColumn(env, isTrigger, true, cd.pcol, isWrite(cd.pval), isDelete(cd.pval),
+          isReadLock(cd.pval), startTs, commitTs, observedColumns, delLockMutation);
+
+      return Collections.singletonList(delLockMutation);
     }
 
-    for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet()) {
+    @Override
+    public Iterator<Result> handleUnknown(CommitData cd, Iterator<Result> results)
+        throws Exception {
+      // the code for handing this is synchronous and needs to be handled in another thread
pool
+      // TODO - how do we do the above without return a CF?
+      long commitTs = getStats().getCommitTs();
+      Result result = Iterators.getOnlyElement(results);
+      Status ms = result.getStatus();
+
+      while (ms == Status.UNKNOWN) {
+
+        // TODO async
+        TxInfo txInfo = TxInfo.getTransactionInfo(env, cd.prow, cd.pcol, startTs);
 
-      for (Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet()) {
-        if (observedColumns.contains(colUpdates.getKey())) {
-          Bytes val = colUpdates.getValue();
-          if (isWrite(val) && !isDelete(val)) {
-            Mutation m = mutations.get(rowUpdates.getKey());
-            if (m == null) {
-              m = new Flutation(env, rowUpdates.getKey());
-              mutations.put(rowUpdates.getKey(), m);
+        switch (txInfo.status) {
+          case COMMITTED:
+            if (txInfo.commitTs != commitTs) {
+              throw new IllegalStateException(
+                  cd.prow + " " + cd.pcol + " " + txInfo.commitTs + "!=" + commitTs);
             }
-            Notification.put(env, m, colUpdates.getKey(), commitTs);
-          }
+            ms = Status.ACCEPTED;
+            break;
+          case LOCKED:
+            // TODO async
+            ConditionalMutation delLockMutation = result.getMutation();
+            ms = cd.cw.write(delLockMutation).getStatus();
+            break;
+          default:
+            ms = Status.REJECTED;
         }
       }
+      Result newResult = new Result(ms, result.getMutation(), result.getTabletServer());
+      return Collections.singletonList(newResult).iterator();
     }
 
-    for (Entry<Bytes, Set<Column>> entry : weakNotifications.entrySet()) {
-      Mutation m = mutations.get(entry.getKey());
-      if (m == null) {
-        m = new Flutation(env, entry.getKey());
-        mutations.put(entry.getKey(), m);
-      }
-      for (Column col : entry.getValue()) {
-        Notification.put(env, m, col, commitTs);
-      }
+    @Override
+    public boolean processResults(CommitData cd, Iterator<Result> results) throws Exception
{
+      Result result = Iterators.getOnlyElement(results);
+      return result.getStatus() == Status.ACCEPTED;
+    }
+
+    @Override
+    CompletableFuture<Void> getFailureOp(CommitData cd) {
+      return CompletableFuture.runAsync(() -> {
+        cd.commitObserver.commitFailed(cd.getShortCollisionMessage());
+      }, env.getSharedResources().getSyncCommitExecutor());
     }
 
-    CompletableFuture<Void> cfuture =
-        env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations.values());
-    addCallback(cfuture, cd, result -> commmitPrimary(cd, commitTs));
   }
 
-  private void commmitPrimary(CommitData cd, final long commitTs) {
-    // try to delete lock and add write for primary column
-    IteratorSetting iterConf = new IteratorSetting(10, PrewriteIterator.class);
-    PrewriteIterator.setSnaptime(iterConf, startTs);
-    boolean isTrigger = isTriggerRow(cd.prow) && cd.pcol.equals(notification.getColumn());
+  @VisibleForTesting
+  public boolean finishCommit(CommitData cd, Stamp commitStamp)
+      throws TableNotFoundException, MutationsRejectedException {
+    getStats().setCommitTs(commitStamp.getTxTimestamp());
+
+    CommitStep firstStep = new DeleteLocksStep();
+    firstStep.andThen(new FinishCommitStep());
+    firstStep.compose(cd);
+
+    return true;
+  }
+
 
-    Condition lockCheck =
-        new FluoCondition(env, cd.pcol).setIterators(iterConf).setValue(LockValue.encode(cd.prow,
-            cd.pcol, isWrite(cd.pval), isDelete(cd.pval), isTrigger, getTransactorID()));
-    final ConditionalMutation delLockMutation = new ConditionalFlutation(env, cd.prow, lockCheck);
+  class DeleteLocksStep extends BatchWriterStep {
 
-    ColumnUtil.commitColumn(env, isTrigger, true, cd.pcol, isWrite(cd.pval), isDelete(cd.pval),
-        isReadLock(cd.pval), startTs, commitTs, observedColumns, delLockMutation);
+    @Override
+    public Collection<Mutation> createMutations(CommitData cd) {
+      long commitTs = getStats().getCommitTs();
+      ArrayList<Mutation> mutations = new ArrayList<>(updates.size() + 1);
+      for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet())
{
+        Flutation m = new Flutation(env, rowUpdates.getKey());
+        boolean isTriggerRow = isTriggerRow(rowUpdates.getKey());
+        for (Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet()) {
+          ColumnUtil.commitColumn(env,
+              isTriggerRow && colUpdates.getKey().equals(notification.getColumn()),
false,
+              colUpdates.getKey(), isWrite(colUpdates.getValue()), isDelete(colUpdates.getValue()),
+              isReadLock(colUpdates.getValue()), startTs, commitTs, observedColumns, m);
+        }
+
+        mutations.add(m);
+      }
+
+      return mutations;
+    }
 
-    CompletableFuture<Iterator<Result>> cfuture =
-        cd.acw.apply(Collections.singletonList(delLockMutation));
-    addCallback(cfuture, cd, result -> handleUnkownStatsAfterPrimary(cd, commitTs, delLockMutation,
-        Iterators.getOnlyElement(result)));
   }
 
-  private void handleUnkownStatsAfterPrimary(CommitData cd, final long commitTs,
-      final ConditionalMutation delLockMutation, Result result) throws Exception {
+  class FinishCommitStep extends BatchWriterStep {
 
-    final Status mutationStatus = result.getStatus();
-    if (mutationStatus == Status.UNKNOWN) {
-      // the code for handing this is synchronous and needs to be handled in another thread
pool
-      Runnable task = new SynchronousCommitTask(cd) {
-        @Override
-        protected void runCommitStep(CommitData cd) throws Exception {
+    @Override
+    CompletableFuture<Boolean> getMainOp(CommitData cd) {
+      return env.getSharedResources().getBatchWriter()
+          .writeMutationsAsyncFuture(createMutations(cd)).thenApply(v -> {
+            cd.commitObserver.committed();
+            return true;
+          });
+    }
 
-          Status ms = mutationStatus;
+    @Override
+    public Collection<Mutation> createMutations(CommitData cd) {
+      long commitTs = getStats().getCommitTs();
+      ArrayList<Mutation> afterFlushMutations = new ArrayList<>(2);
 
-          while (ms == Status.UNKNOWN) {
+      Flutation m = new Flutation(env, cd.prow);
+      // mark transaction as complete for garbage collection purposes
+      m.put(cd.pcol, ColumnConstants.TX_DONE_PREFIX | commitTs, EMPTY);
+      afterFlushMutations.add(m);
 
-            // TODO async
-            TxInfo txInfo = TxInfo.getTransactionInfo(env, cd.prow, cd.pcol, startTs);
-
-            switch (txInfo.status) {
-              case COMMITTED:
-                if (txInfo.commitTs != commitTs) {
-                  throw new IllegalStateException(
-                      cd.prow + " " + cd.pcol + " " + txInfo.commitTs + "!=" + commitTs);
-                }
-                ms = Status.ACCEPTED;
-                break;
-              case LOCKED:
-                // TODO async
-                ms = cd.cw.write(delLockMutation).getStatus();
-                break;
-              default:
-                ms = Status.REJECTED;
-            }
-          }
+      if (weakNotification != null) {
+        afterFlushMutations.add(weakNotification.newDelete(env, startTs));
+      }
 
-          postCommitPrimary(cd, commitTs, ms);
-        }
-      };
+      if (notification != null) {
+        afterFlushMutations.add(notification.newDelete(env, startTs));
+      }
 
-      env.getSharedResources().getSyncCommitExecutor().execute(task);
-    } else {
-      postCommitPrimary(cd, commitTs, mutationStatus);
+      return afterFlushMutations;
     }
+
   }
 
-  private void postCommitPrimary(CommitData cd, long commitTs, Status mutationStatus)
-      throws Exception {
-    if (mutationStatus != Status.ACCEPTED) {
-      cd.commitObserver.commitFailed(cd.getShortCollisionMessage());
-    } else {
-      if (stopAfterPrimaryCommit) {
-        cd.commitObserver.committed();
-      } else {
-        deleteLocks(cd, commitTs);
+  @Override
+  public synchronized void commitAsync(AsyncCommitObserver commitCallback) {
+
+    checkIfOpen();
+    status = TxStatus.COMMIT_STARTED;
+    commitAttempted = true;
+
+    try {
+      CommitData cd = createCommitData();
+      cd = setUpBeginCommitAsync(cd, commitCallback, null);
+      if (cd != null) {
+        beginCommitAsync(cd);
       }
+    } catch (Exception e) {
+      e.printStackTrace();
+      commitCallback.failed(e);
     }
   }
 
-  private void deleteLocks(CommitData cd, final long commitTs) {
-    // delete locks and add writes for other columns
-    ArrayList<Mutation> mutations = new ArrayList<>(updates.size() + 1);
-    for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet()) {
-      Flutation m = new Flutation(env, rowUpdates.getKey());
-      boolean isTriggerRow = isTriggerRow(rowUpdates.getKey());
-      for (Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet()) {
-        ColumnUtil.commitColumn(env,
-            isTriggerRow && colUpdates.getKey().equals(notification.getColumn()),
false,
-            colUpdates.getKey(), isWrite(colUpdates.getValue()), isDelete(colUpdates.getValue()),
-            isReadLock(colUpdates.getValue()), startTs, commitTs, observedColumns, m);
+  private CommitData setUpBeginCommitAsync(CommitData cd, AsyncCommitObserver commitCallback,
 
 Review comment:
   I like how this code was pulled into this method.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message