fluo-notifications mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] jkosh44 commented on a change in pull request #1001: [WIP] - Issue 978
Date Thu, 01 Jan 1970 00:00:00 GMT
jkosh44 commented on a change in pull request #1001: [WIP] - Issue 978
URL: https://github.com/apache/fluo/pull/1001#discussion_r163444041
 
 

 ##########
 File path: modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java
 ##########
 @@ -872,262 +847,356 @@ public int getSize() {
     return size;
   }
 
-  private <V> void addCallback(CompletableFuture<V> cfuture, CommitData cd,
-      OnSuccessInterface<V> onSuccessInterface) {
-    cfuture.handleAsync((result, exception) -> {
-      if (exception != null) {
-        cd.commitObserver.failed(exception);
-        return null;
-      } else {
-        try {
-          onSuccessInterface.onSuccess(result);
-          return null;
-        } catch (Exception e) {
-          cd.commitObserver.failed(e);
-          return null;
-        }
-      }
-    }, env.getSharedResources().getAsyncCommitExecutor());
-  }
+  // TODO exception handling!!!! How?????
+  abstract class CommitStep {
+    private CommitStep nextStep;
 
-  @Override
-  public synchronized void commitAsync(AsyncCommitObserver commitCallback) {
+    // the boolean indicates if the operation was successful.
+    abstract CompletableFuture<Boolean> getMainOp(CommitData cd);
 
-    checkIfOpen();
-    status = TxStatus.COMMIT_STARTED;
-    commitAttempted = true;
+    // create and run this op in the event that the main op was a failure
+    abstract CompletableFuture<Void> getFailureOp(CommitData cd);
 
-    try {
-      CommitData cd = createCommitData();
-      beginCommitAsync(cd, commitCallback, null);
-    } catch (Exception e) {
-      e.printStackTrace();
-      commitCallback.failed(e);
+    // set the next step to run if this step is successful
+    CommitStep andThen(CommitStep next) {
+      this.nextStep = next;
+      return next;
     }
-  }
 
-  private void beginCommitAsync(CommitData cd, AsyncCommitObserver commitCallback,
-      RowColumn primary) {
 
-    if (updates.size() == 0) {
-      // TODO do async
-      deleteWeakRow();
-      commitCallback.committed();
-      return;
+    CompletableFuture<Void> compose(CommitData cd) {
+      return getMainOp(cd).thenComposeAsync(successful -> {
+        if (successful) {
+          if (nextStep != null) {
+            return nextStep.compose(cd);
+          } else {
+            return CompletableFuture.completedFuture(null);
+          }
+        } else {
+          return getFailureOp(cd);
+        }
+      }, env.getSharedResources().getAsyncCommitExecutor());
     }
 
-    for (Map<Column, Bytes> cols : updates.values()) {
-      stats.incrementEntriesSet(cols.size());
-    }
+  }
 
-    Bytes primRow = null;
-    Column primCol = null;
+  abstract class ConditionalStep extends CommitStep {
 
-    if (primary != null) {
-      primRow = primary.getRow();
-      primCol = primary.getColumn();
-      if (notification != null && !primary.equals(notification.getRowColumn())) {
-        throw new IllegalArgumentException("Primary must be notification");
-      }
-    } else if (notification != null) {
-      primRow = notification.getRow();
-      primCol = notification.getColumn();
-    } else {
+    CommitData cd;
 
-      outer: for (Entry<Bytes, Map<Column, Bytes>> entry : updates.entrySet())
{
-        for (Entry<Column, Bytes> entry2 : entry.getValue().entrySet()) {
-          if (!isReadLock(entry2.getValue())) {
-            primRow = entry.getKey();
-            primCol = entry2.getKey();
-            break outer;
-          }
-        }
-      }
+    public abstract Collection<ConditionalMutation> createMutations(CommitData cd);
 
-      if (primRow == null) {
-        // there are only read locks, so nothing to write
-        deleteWeakRow();
-        commitCallback.committed();
-        return;
-      }
-    }
+    public abstract Iterator<Result> handleUnknown(CommitData cd, Iterator<Result>
results)
+        throws Exception;
 
-    // get a primary column
-    cd.prow = primRow;
-    Map<Column, Bytes> colSet = updates.get(cd.prow);
-    cd.pcol = primCol;
-    cd.pval = colSet.remove(primCol);
-    if (colSet.size() == 0) {
-      updates.remove(cd.prow);
+    public abstract boolean processResults(CommitData cd, Iterator<Result> results)
+        throws Exception;
+
+    public AsyncConditionalWriter getACW(CommitData cd) {
+      return cd.acw;
     }
 
-    cd.commitObserver = commitCallback;
+    @Override
+    CompletableFuture<Boolean> getMainOp(CommitData cd) {
+      // TODO not sure threading is correct
+      Executor ace = env.getSharedResources().getAsyncCommitExecutor();
+      return getACW(cd).apply(createMutations(cd)).thenCompose(results -> {
+        // ugh icky that this is an iterator, forces copy to inspect.. could refactor async
CW to
+        // return collection
+        ArrayList<Result> resultsList = new ArrayList<>();
+        Iterators.addAll(resultsList, results);
+        boolean containsUknown = false;
+        for (Result result : resultsList) {
+          try {
+            containsUknown |= result.getStatus() == Status.UNKNOWN;
+          } catch (Exception e) {
 
 Review comment:
   I agree, however when trying to do that mvn errors out saying that there's an unhandled
exception. My IDE also complains. I tried removing the trys and appending an `.exceptionally`
to the end and that didn't work either. Not quite sure what's going on but it may not be worth
the headache.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message