fluo-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] keith-turner commented on a change in pull request #1001: [WIP] - Issue 978
Date Thu, 01 Jan 1970 00:00:00 GMT
keith-turner commented on a change in pull request #1001: [WIP] - Issue 978
URL: https://github.com/apache/fluo/pull/1001#discussion_r163413493
 
 

 ##########
 File path: modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
 ##########
 @@ -1141,178 +1210,340 @@ private void beginSecondCommitPhase(CommitData cd, Stamp commitStamp)
throws Exc
       //
       // Its very important the notifications which trigger an observer are deleted after
the 2nd
       // phase of commit finishes.
-      getStats().setCommitTs(commitStamp.getTxTimestamp());
-      writeNotificationsAsync(cd, commitStamp.getTxTimestamp());
+      return env.getSharedResources().getOracleClient().getStampAsync().thenApply(commitStamp
-> {
+        if (startTs < commitStamp.getGcTimestamp()) {
+          return false;
+        } else {
+          getStats().setCommitTs(commitStamp.getTxTimestamp());
+          return true;
+        }
+      });
     }
+
+    @Override
+    CompletableFuture<Void> getFailureOp(CommitData cd) {
+      return CompletableFuture.supplyAsync(() -> {
+        try {
+          rollbackLocks(cd);
+        } catch (Exception e) {
+          throw new CompletionException(e);
+        }
+        return null;
+      }, env.getSharedResources().getSyncCommitExecutor());
+    }
+
   }
 
-  private void writeNotificationsAsync(CommitData cd, final long commitTs) {
+  class GetCommitStampStepTest extends GetCommitStampStep {
+    private Stamp testStamp;
 
-    HashMap<Bytes, Mutation> mutations = new HashMap<>();
+    public GetCommitStampStepTest(Stamp testStamp) {
+      this.testStamp = testStamp;
+    }
 
-    if (observedColumns.contains(cd.pcol) && isWrite(cd.pval) && !isDelete(cd.pval))
{
-      Flutation m = new Flutation(env, cd.prow);
-      Notification.put(env, m, cd.pcol, commitTs);
-      mutations.put(cd.prow, m);
+    @Override
+    CompletableFuture<Boolean> getMainOp(CommitData cd) {
+      return CompletableFuture.completedFuture(testStamp).thenApply(commitStamp -> {
+        if (startTs < commitStamp.getGcTimestamp()) {
+          return false;
+        } else {
+          getStats().setCommitTs(commitStamp.getTxTimestamp());
+          return true;
+        }
+      });
     }
 
-    for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet()) {
+  }
+
+  class WriteNotificationsStep extends BatchWriterStep {
 
-      for (Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet()) {
-        if (observedColumns.contains(colUpdates.getKey())) {
-          Bytes val = colUpdates.getValue();
-          if (isWrite(val) && !isDelete(val)) {
-            Mutation m = mutations.get(rowUpdates.getKey());
-            if (m == null) {
-              m = new Flutation(env, rowUpdates.getKey());
-              mutations.put(rowUpdates.getKey(), m);
+    @Override
+    public Collection<Mutation> createMutations(CommitData cd) {
+      long commitTs = getStats().getCommitTs();
+      HashMap<Bytes, Mutation> mutations = new HashMap<>();
+
+      if (observedColumns.contains(cd.pcol) && isWrite(cd.pval) && !isDelete(cd.pval))
{
+        Flutation m = new Flutation(env, cd.prow);
+        Notification.put(env, m, cd.pcol, commitTs);
+        mutations.put(cd.prow, m);
+      }
+
+      for (Entry<Bytes, Map<Column, Bytes>> rowUpdates : updates.entrySet())
{
+
+        for (Entry<Column, Bytes> colUpdates : rowUpdates.getValue().entrySet()) {
+          if (observedColumns.contains(colUpdates.getKey())) {
+            Bytes val = colUpdates.getValue();
+            if (isWrite(val) && !isDelete(val)) {
+              Mutation m = mutations.get(rowUpdates.getKey());
+              if (m == null) {
+                m = new Flutation(env, rowUpdates.getKey());
+                mutations.put(rowUpdates.getKey(), m);
+              }
+              Notification.put(env, m, colUpdates.getKey(), commitTs);
             }
-            Notification.put(env, m, colUpdates.getKey(), commitTs);
           }
         }
       }
-    }
 
-    for (Entry<Bytes, Set<Column>> entry : weakNotifications.entrySet()) {
-      Mutation m = mutations.get(entry.getKey());
-      if (m == null) {
-        m = new Flutation(env, entry.getKey());
-        mutations.put(entry.getKey(), m);
-      }
-      for (Column col : entry.getValue()) {
-        Notification.put(env, m, col, commitTs);
+      for (Entry<Bytes, Set<Column>> entry : weakNotifications.entrySet()) {
+        Mutation m = mutations.get(entry.getKey());
+        if (m == null) {
+          m = new Flutation(env, entry.getKey());
+          mutations.put(entry.getKey(), m);
+        }
+        for (Column col : entry.getValue()) {
+          Notification.put(env, m, col, commitTs);
+        }
       }
+      return mutations.values();
     }
 
-    CompletableFuture<Void> cfuture =
-        env.getSharedResources().getBatchWriter().writeMutationsAsyncFuture(mutations.values());
-    addCallback(cfuture, cd, result -> commmitPrimary(cd, commitTs));
   }
 
-  private void commmitPrimary(CommitData cd, final long commitTs) {
-    // try to delete lock and add write for primary column
-    IteratorSetting iterConf = new IteratorSetting(10, PrewriteIterator.class);
-    PrewriteIterator.setSnaptime(iterConf, startTs);
-    boolean isTrigger = isTriggerRow(cd.prow) && cd.pcol.equals(notification.getColumn());
+  class CommitPrimaryStep extends ConditionalStep {
 
-    Condition lockCheck =
-        new FluoCondition(env, cd.pcol).setIterators(iterConf).setValue(LockValue.encode(cd.prow,
-            cd.pcol, isWrite(cd.pval), isDelete(cd.pval), isTrigger, getTransactorID()));
-    final ConditionalMutation delLockMutation = new ConditionalFlutation(env, cd.prow, lockCheck);
+    @Override
+    public Collection<ConditionalMutation> createMutations(CommitData cd) {
+      long commitTs = getStats().getCommitTs();
+      IteratorSetting iterConf = new IteratorSetting(10, PrewriteIterator.class);
+      PrewriteIterator.setSnaptime(iterConf, startTs);
+      boolean isTrigger = isTriggerRow(cd.prow) && cd.pcol.equals(notification.getColumn());
 
-    ColumnUtil.commitColumn(env, isTrigger, true, cd.pcol, isWrite(cd.pval), isDelete(cd.pval),
-        isReadLock(cd.pval), startTs, commitTs, observedColumns, delLockMutation);
+      Condition lockCheck =
+          new FluoCondition(env, cd.pcol).setIterators(iterConf).setValue(LockValue.encode(cd.prow,
+              cd.pcol, isWrite(cd.pval), isDelete(cd.pval), isTrigger, getTransactorID()));
+      final ConditionalMutation delLockMutation = new ConditionalFlutation(env, cd.prow,
lockCheck);
 
-    CompletableFuture<Iterator<Result>> cfuture =
-        cd.acw.apply(Collections.singletonList(delLockMutation));
-    addCallback(cfuture, cd, result -> handleUnkownStatsAfterPrimary(cd, commitTs, delLockMutation,
-        Iterators.getOnlyElement(result)));
-  }
+      ColumnUtil.commitColumn(env, isTrigger, true, cd.pcol, isWrite(cd.pval), isDelete(cd.pval),
+          isReadLock(cd.pval), startTs, commitTs, observedColumns, delLockMutation);
 
-  private void handleUnkownStatsAfterPrimary(CommitData cd, final long commitTs,
-      final ConditionalMutation delLockMutation, Result result) throws Exception {
+      return Collections.singletonList(delLockMutation);
+    }
 
-    final Status mutationStatus = result.getStatus();
-    if (mutationStatus == Status.UNKNOWN) {
+    @Override
+    public Iterator<Result> handleUnknown(CommitData cd, Iterator<Result> results)
+        throws Exception {
       // the code for handing this is synchronous and needs to be handled in another thread
pool
-      Runnable task = new SynchronousCommitTask(cd) {
-        @Override
-        protected void runCommitStep(CommitData cd) throws Exception {
+      // TODO - how do we do the above without return a CF?
+      long commitTs = getStats().getCommitTs();
+      Result result = Iterators.getOnlyElement(results);
+      Status ms = result.getStatus();
 
-          Status ms = mutationStatus;
+      while (ms == Status.UNKNOWN) {
 
-          while (ms == Status.UNKNOWN) {
+        // TODO async
+        TxInfo txInfo = TxInfo.getTransactionInfo(env, cd.prow, cd.pcol, startTs);
 
-            // TODO async
-            TxInfo txInfo = TxInfo.getTransactionInfo(env, cd.prow, cd.pcol, startTs);
-
-            switch (txInfo.status) {
-              case COMMITTED:
-                if (txInfo.commitTs != commitTs) {
-                  throw new IllegalStateException(
-                      cd.prow + " " + cd.pcol + " " + txInfo.commitTs + "!=" + commitTs);
-                }
-                ms = Status.ACCEPTED;
-                break;
-              case LOCKED:
-                // TODO async
-                ms = cd.cw.write(delLockMutation).getStatus();
-                break;
-              default:
-                ms = Status.REJECTED;
+        switch (txInfo.status) {
+          case COMMITTED:
+            if (txInfo.commitTs != commitTs) {
+              throw new IllegalStateException(
+                  cd.prow + " " + cd.pcol + " " + txInfo.commitTs + "!=" + commitTs);
             }
-          }
-
-          postCommitPrimary(cd, commitTs, ms);
+            ms = Status.ACCEPTED;
+            break;
+          case LOCKED:
+            // TODO async
+            ConditionalMutation delLockMutation = result.getMutation();
+            ms = cd.cw.write(delLockMutation).getStatus();
+            break;
+          default:
+            ms = Status.REJECTED;
         }
-      };
+      }
+      Result newResult = new Result(ms, result.getMutation(), result.getTabletServer());
+      return Collections.singletonList(newResult).iterator();
+    }
 
-      env.getSharedResources().getSyncCommitExecutor().execute(task);
-    } else {
-      postCommitPrimary(cd, commitTs, mutationStatus);
+    @Override
+    public boolean processResults(CommitData cd, Iterator<Result> results) throws Exception
{
+      Result result = Iterators.getOnlyElement(results);
+      return result.getStatus() == Status.ACCEPTED;
     }
+
+    @Override
+    CompletableFuture<Void> getFailureOp(CommitData cd) {
+      return CompletableFuture.runAsync(() -> {
+        cd.commitObserver.commitFailed(cd.getShortCollisionMessage());
 
 Review comment:
   This operation should not block, so probably do not need to run it in the sync commit executor.
  Could probably just call commit failed and return a completed future.
    

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message