tephra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jamestay...@apache.org
Subject [2/3] incubator-tephra git commit: TEPHRA-294 Update TransactionAwareTable to implement interface of Table for 1.3 and 1.4
Date Wed, 15 Aug 2018 15:11:09 GMT
TEPHRA-294 Update TransactionAwareTable to implement interface of Table for 1.3 and 1.4


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/52fd15f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/52fd15f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/52fd15f0

Branch: refs/heads/master
Commit: 52fd15f0ab45529f8b0ba808e5de2a909f6638a4
Parents: d0a1c4c
Author: James Taylor <jamestaylor@apache.org>
Authored: Sun Aug 12 20:47:10 2018 -0700
Committer: James Taylor <jamestaylor@apache.org>
Committed: Sun Aug 12 20:54:02 2018 -0700

----------------------------------------------------------------------
 tephra-examples/hbase-1.3/pom.xml               |   4 +-
 .../apache/tephra/examples/BalanceBooks.java    | 341 +++++++++++++++++++
 .../apache/tephra/examples/package-info.java    |  40 +++
 .../tephra/examples/BalanceBooksTest.java       | 137 ++++++++
 tephra-examples/hbase-1.4/pom.xml               |   4 +-
 .../apache/tephra/examples/BalanceBooks.java    | 341 +++++++++++++++++++
 .../apache/tephra/examples/package-info.java    |  40 +++
 .../tephra/examples/BalanceBooksTest.java       | 137 ++++++++
 .../tephra/hbase/TransactionAwareHTable.java    |  92 +----
 .../tephra/hbase/AbstractHBaseTableTest.java    |   9 +-
 .../hbase/TransactionAwareHTableTest.java       |  60 ++--
 .../hbase/txprune/DataJanitorStateTest.java     |   3 +-
 .../hbase/txprune/InvalidListPruneTest.java     |   7 +-
 .../txprune/InvalidListPruningDebugTest.java    |   3 +-
 .../tephra/hbase/TransactionAwareHTable.java    | 135 ++------
 .../tephra/hbase/AbstractHBaseTableTest.java    |   9 +-
 .../hbase/TransactionAwareHTableTest.java       |  60 ++--
 .../hbase/txprune/DataJanitorStateTest.java     |   3 +-
 .../hbase/txprune/InvalidListPruneTest.java     |   7 +-
 .../txprune/InvalidListPruningDebugTest.java    |   3 +-
 20 files changed, 1171 insertions(+), 264 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-examples/hbase-1.3/pom.xml
----------------------------------------------------------------------
diff --git a/tephra-examples/hbase-1.3/pom.xml b/tephra-examples/hbase-1.3/pom.xml
index de1c84a..865541a 100644
--- a/tephra-examples/hbase-1.3/pom.xml
+++ b/tephra-examples/hbase-1.3/pom.xml
@@ -34,8 +34,8 @@
   </properties>
 
   <build>
-    <sourceDirectory>../src/main/java</sourceDirectory>
-    <testSourceDirectory>../src/test/java</testSourceDirectory>
+    <sourceDirectory>src/main/java</sourceDirectory>
+    <testSourceDirectory>src/test/java</testSourceDirectory>
   </build>
 
   <dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-examples/hbase-1.3/src/main/java/org/apache/tephra/examples/BalanceBooks.java
----------------------------------------------------------------------
diff --git a/tephra-examples/hbase-1.3/src/main/java/org/apache/tephra/examples/BalanceBooks.java b/tephra-examples/hbase-1.3/src/main/java/org/apache/tephra/examples/BalanceBooks.java
new file mode 100644
index 0000000..b970598
--- /dev/null
+++ b/tephra-examples/hbase-1.3/src/main/java/org/apache/tephra/examples/BalanceBooks.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.examples;
+
+import com.google.common.io.Closeables;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TransactionConflictException;
+import org.apache.tephra.TransactionContext;
+import org.apache.tephra.TransactionFailureException;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.distributed.TransactionServiceClient;
+import org.apache.tephra.hbase.TransactionAwareHTable;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.runtime.ConfigModule;
+import org.apache.tephra.runtime.DiscoveryModules;
+import org.apache.tephra.runtime.TransactionClientModule;
+import org.apache.tephra.runtime.TransactionModules;
+import org.apache.tephra.runtime.ZKModule;
+import org.apache.tephra.util.ConfigurationFactory;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Simple example application that launches a number of concurrent clients, one per "account".  Each client attempts to
+ * make withdrawals from other clients, and deposit the same amount to its own account in a single transaction.
+ * Since this means the client will be updating both its own row and the withdrawee's row, this will naturally lead to
+ * transaction conflicts.  All clients will run for a specified number of iterations.  When the processing is complete,
+ * the total sum of all rows should be zero, if transactional integrity was maintained.
+ *
+ * <p>
+ *   You can run the BalanceBooks application with the following command:
+ *   <pre>
+ *     ./bin/tephra run org.apache.tephra.examples.BalanceBooks [num clients] [num iterations]
+ *   </pre>
+ *   where <code>[num clients]</code> is the number of concurrent client threads to use, and
+ *   <code>[num iterations]</code> is the number of "transfer" operations to perform per client thread.
+ * </p>
+ */
+public class BalanceBooks implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(BalanceBooks.class);
+
+  private static final int MAX_AMOUNT = 100;
+  private static final byte[] TABLE = Bytes.toBytes("testbalances");
+  private static final byte[] FAMILY = Bytes.toBytes("f");
+  private static final byte[] COL = Bytes.toBytes("b");
+
+  private final int totalClients;
+  private final int iterations;
+
+  private Configuration conf;
+  private ZKClientService zkClient;
+  private TransactionServiceClient txClient;
+  private Connection conn;
+
+  public BalanceBooks(int totalClients, int iterations) {
+    this(totalClients, iterations, new ConfigurationFactory().get());
+  }
+
+  public BalanceBooks(int totalClients, int iterations, Configuration conf) {
+    this.totalClients = totalClients;
+    this.iterations = iterations;
+    this.conf = conf;
+  }
+
+  /**
+   * Sets up common resources required by all clients.
+   */
+  public void init() throws IOException {
+    Injector injector = Guice.createInjector(
+        new ConfigModule(conf),
+        new ZKModule(),
+        new DiscoveryModules().getDistributedModules(),
+        new TransactionModules().getDistributedModules(),
+        new TransactionClientModule()
+    );
+
+    zkClient = injector.getInstance(ZKClientService.class);
+    zkClient.startAndWait();
+    txClient = injector.getInstance(TransactionServiceClient.class);
+    conn = ConnectionFactory.createConnection(conf);
+    createTableIfNotExists(conf, TABLE, new byte[][]{ FAMILY });
+  }
+
+  /**
+   * Runs all clients and waits for them to complete.
+   */
+  public void run() throws IOException, InterruptedException {
+    List<Client> clients = new ArrayList<>(totalClients);
+    for (int i = 0; i < totalClients; i++) {
+      Client c = new Client(i, totalClients, iterations);
+      c.init(txClient, conn.getTable(TableName.valueOf(TABLE)));
+      c.start();
+      clients.add(c);
+    }
+
+    for (Client c : clients) {
+      c.join();
+      Closeables.closeQuietly(c);
+    }
+  }
+
+  /**
+   * Validates the current state of the data stored at the end of the test.  Each update by a client consists of two
+   * parts: a withdrawal of a random amount from a randomly select other account, and a corresponding to deposit to
+   * the client's own account.  So, if all the updates were performed consistently (no partial updates or partial
+   * rollbacks), then the total sum of all balances at the end should be 0.
+   */
+  public boolean verify() {
+    boolean success = false;
+    try {
+      TransactionAwareHTable table = new TransactionAwareHTable(conn.getTable(TableName.valueOf(TABLE)));
+      TransactionContext context = new TransactionContext(txClient, table);
+
+      LOG.info("VERIFYING BALANCES");
+      context.start();
+      long totalBalance = 0;
+
+      try (ResultScanner scanner = table.getScanner(new Scan())) {
+        for (Result r : scanner) {
+          if (!r.isEmpty()) {
+            int rowId = Bytes.toInt(r.getRow());
+            long balance = Bytes.toLong(r.getValue(FAMILY, COL));
+            totalBalance += balance;
+            LOG.info("Client #{}: balance = ${}", rowId, balance);
+          }
+        }
+      }
+      if (totalBalance == 0) {
+        LOG.info("PASSED!");
+        success = true;
+      } else {
+        LOG.info("FAILED! Total balance should be 0 but was {}", totalBalance);
+      }
+      context.finish();
+    } catch (Exception e) {
+      LOG.error("Failed verification check", e);
+    }
+    return success;
+  }
+
+  /**
+   * Frees up the underlying resources common to all clients.
+   */
+  public void close() {
+    try {
+      if (conn != null) {
+        conn.close();
+      }
+    } catch (IOException ignored) { }
+
+    if (zkClient != null) {
+      zkClient.stopAndWait();
+    }
+  }
+
+  protected void createTableIfNotExists(Configuration conf, byte[] tableName, byte[][] columnFamilies)
+      throws IOException {
+    try (Admin admin = this.conn.getAdmin()) {
+      HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+      for (byte[] family : columnFamilies) {
+        HColumnDescriptor columnDesc = new HColumnDescriptor(family);
+        columnDesc.setMaxVersions(Integer.MAX_VALUE);
+        desc.addFamily(columnDesc);
+      }
+      desc.addCoprocessor(TransactionProcessor.class.getName());
+      admin.createTable(desc);
+    }
+  }
+
+  public static void main(String[] args) {
+    if (args.length != 2) {
+      System.err.println("Usage: java " + BalanceBooks.class.getName() + " <num clients> <iterations>");
+      System.err.println("\twhere <num clients> >= 2");
+      System.exit(1);
+    }
+
+    try (BalanceBooks bb = new BalanceBooks(Integer.parseInt(args[0]), Integer.parseInt(args[1]))) {
+      bb.init();
+      bb.run();
+      bb.verify();
+    } catch (Exception e) {
+      LOG.error("Failed during BalanceBooks run", e);
+    }
+  }
+
+  /**
+   * Represents a single client actor in the test.  Each client runs as a separate thread.
+   *
+   * For the given number of iterations, the client will:
+   * <ol>
+   *   <li>select a random other client from which to withdraw</li>
+   *   <li>select a random amount from 0 to MAX_AMOUNT</li>
+   *   <li>start a new transaction and: deduct the amount from the other client's acccount, and deposit
+   *       the same amount to its own account.</li>
+   * </ol>
+   *
+   * Since multiple clients operate concurrently and contend over a set of constrained resources
+   * (the client accounts), it is expected that a portion of the attempted transactions will encounter
+   * conflicts, due to a simultaneous deduction from or deposit to one the same accounts which has successfully
+   * committed first.  In this case, the updates from the transaction encountering the conflict should be completely
+   * rolled back, leaving the data in a consistent state.
+   */
+  private static class Client extends Thread implements Closeable {
+    private final int id;
+    private final int totalClients;
+    private final int iterations;
+
+    private final Random random = new Random();
+
+    private TransactionContext txContext;
+    private TransactionAwareHTable txTable;
+
+
+    public Client(int id, int totalClients, int iterations) {
+      this.id = id;
+      this.totalClients = totalClients;
+      this.iterations = iterations;
+    }
+
+    /**
+     * Sets up any resources needed by the individual client.
+     *
+     * @param txClient the transaction client to use in accessing the transaciton service
+     * @param table the HBase table instance to use for accessing storage
+     */
+    public void init(TransactionSystemClient txClient, Table table) {
+      txTable = new TransactionAwareHTable(table);
+      txContext = new TransactionContext(txClient, txTable);
+    }
+
+    public void run() {
+      try {
+        for (int i = 0; i < iterations; i++) {
+          runOnce();
+        }
+      } catch (TransactionFailureException e) {
+        LOG.error("Client #{}: Failed on exception", id, e);
+      }
+    }
+
+    /**
+     * Runs a single iteration of the client logic.
+     */
+    private void runOnce() throws TransactionFailureException {
+      int withdrawee = getNextWithdrawee();
+      int amount = getAmount();
+
+      try {
+        txContext.start();
+        long withdraweeBalance = getCurrentBalance(withdrawee);
+        long ownBalance = getCurrentBalance(id);
+        long withdraweeNew = withdraweeBalance - amount;
+        long ownNew = ownBalance + amount;
+
+        setBalance(withdrawee, withdraweeNew);
+        setBalance(id, ownNew);
+        LOG.debug("Client #{}: Withdrew ${} from #{}; withdrawee old={}, new={}; own old={}, new={}",
+            id, amount, withdrawee, withdraweeBalance, withdraweeNew, ownBalance, ownNew);
+        txContext.finish();
+
+      } catch (IOException ioe) {
+        LOG.error("Client #{}: Unhandled client failure", id, ioe);
+        txContext.abort();
+      } catch (TransactionConflictException tce) {
+        LOG.debug("CONFLICT: client #{} attempting to withdraw from #{}", id, withdrawee);
+        txContext.abort(tce);
+      } catch (TransactionFailureException tfe) {
+        LOG.error("Client #{}: Unhandled transaction failure", id, tfe);
+        txContext.abort(tfe);
+      }
+    }
+
+    private long getCurrentBalance(int id) throws IOException {
+      Result r = txTable.get(new Get(Bytes.toBytes(id)));
+      byte[] balanceBytes = r.getValue(FAMILY, COL);
+      if (balanceBytes == null) {
+        return 0;
+      }
+      return Bytes.toLong(balanceBytes);
+    }
+
+    private void setBalance(int id, long balance) throws IOException {
+      txTable.put(new Put(Bytes.toBytes(id)).addColumn(FAMILY, COL, Bytes.toBytes(balance)));
+    }
+
+    private int getNextWithdrawee() {
+      int next;
+      do {
+        next = random.nextInt(totalClients);
+      } while (next == id);
+      return next;
+    }
+
+    private int getAmount() {
+      return random.nextInt(MAX_AMOUNT);
+    }
+
+    public void close() throws IOException {
+      txTable.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-examples/hbase-1.3/src/main/java/org/apache/tephra/examples/package-info.java
----------------------------------------------------------------------
diff --git a/tephra-examples/hbase-1.3/src/main/java/org/apache/tephra/examples/package-info.java b/tephra-examples/hbase-1.3/src/main/java/org/apache/tephra/examples/package-info.java
new file mode 100644
index 0000000..a0e67d5
--- /dev/null
+++ b/tephra-examples/hbase-1.3/src/main/java/org/apache/tephra/examples/package-info.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains example applications for Tephra designed to illustrate sample Tephra usage
+ * and provide out-of-the-box sample applications which can be run to test cluster functionality.
+ *
+ * <p>Currently the following applications are provided:
+ *
+ * <ul>
+ *   <li><strong>BalanceBooks</strong> - this application runs a specified number of concurrent clients in separate
+ *     threads, which perform transactions to make withdrawals from each other's accounts and deposits to their own
+ *     accounts.  At the end of the test, the total value of all account balances is verified to be equal to zero,
+ *     which confirms that transactional integrity was not violated.
+ *   </li>
+ * </ul>
+ * </p>
+ *
+ * <p>
+ *   Note that, for simplicity, the examples package is currently hardcoded to compile against a specific HBase
+ *   version (currently 2.0).  In the future, we should provide Maven profiles to allow compiling the examples
+ *   against each of the supported HBase versions.
+ * </p>
+ */
+package org.apache.tephra.examples;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-examples/hbase-1.3/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java
----------------------------------------------------------------------
diff --git a/tephra-examples/hbase-1.3/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java b/tephra-examples/hbase-1.3/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java
new file mode 100644
index 0000000..4dfe107
--- /dev/null
+++ b/tephra-examples/hbase-1.3/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.examples;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Scopes;
+import com.google.inject.util.Modules;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.distributed.TransactionService;
+import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.runtime.ConfigModule;
+import org.apache.tephra.runtime.DiscoveryModules;
+import org.apache.tephra.runtime.TransactionClientModule;
+import org.apache.tephra.runtime.TransactionModules;
+import org.apache.tephra.runtime.ZKModule;
+import org.apache.tephra.util.Tests;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the {@link BalanceBooks} program.
+ */
+public class BalanceBooksTest {
+  private static final Logger LOG = LoggerFactory.getLogger(BalanceBooksTest.class);
+  private static HBaseTestingUtility testUtil;
+  private static TransactionService txService;
+  private static ZKClientService zkClientService;
+
+  @ClassRule
+  public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    testUtil = new HBaseTestingUtility();
+    Configuration conf = testUtil.getConfiguration();
+    conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
+    conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
+
+    // Tune down the connection thread pool size
+    conf.setInt("hbase.hconnection.threads.core", 5);
+    conf.setInt("hbase.hconnection.threads.max", 10);
+    // Tunn down handler threads in regionserver
+    conf.setInt("hbase.regionserver.handler.count", 10);
+
+    // Set to random port
+    conf.setInt("hbase.master.port", 0);
+    conf.setInt("hbase.master.info.port", 0);
+    conf.setInt("hbase.regionserver.port", 0);
+    conf.setInt("hbase.regionserver.info.port", 0);
+
+    testUtil.startMiniCluster();
+
+    String zkClusterKey = testUtil.getClusterKey(); // hostname:clientPort:parentZnode
+    String zkQuorum = zkClusterKey.substring(0, zkClusterKey.lastIndexOf(':'));
+    LOG.info("Zookeeper Quorum is running at {}", zkQuorum);
+    conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkQuorum);
+
+    Injector injector = Guice.createInjector(
+        new ConfigModule(conf),
+        new ZKModule(),
+        new DiscoveryModules().getDistributedModules(),
+        Modules.override(new TransactionModules().getDistributedModules())
+            .with(new AbstractModule() {
+              @Override
+              protected void configure() {
+                bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON);
+              }
+            }),
+        new TransactionClientModule()
+    );
+
+    zkClientService = injector.getInstance(ZKClientService.class);
+    zkClientService.startAndWait();
+
+    // start a tx server
+    txService = injector.getInstance(TransactionService.class);
+    try {
+      LOG.info("Starting transaction service");
+      txService.startAndWait();
+    } catch (Exception e) {
+      LOG.error("Failed to start service: ", e);
+      throw e;
+    }
+
+    Tests.waitForTxReady(injector.getInstance(TransactionSystemClient.class));
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if (txService != null) {
+      txService.stopAndWait();
+    }
+    if (zkClientService != null) {
+      zkClientService.stopAndWait();
+    }
+    testUtil.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testBalanceBooks() throws Exception {
+    try (BalanceBooks bb = new BalanceBooks(5, 100, testUtil.getConfiguration())) {
+      bb.init();
+      bb.run();
+      assertTrue(bb.verify());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-examples/hbase-1.4/pom.xml
----------------------------------------------------------------------
diff --git a/tephra-examples/hbase-1.4/pom.xml b/tephra-examples/hbase-1.4/pom.xml
index 3286c5c..3de31b4 100644
--- a/tephra-examples/hbase-1.4/pom.xml
+++ b/tephra-examples/hbase-1.4/pom.xml
@@ -34,8 +34,8 @@
   </properties>
 
   <build>
-    <sourceDirectory>../src/main/java</sourceDirectory>
-    <testSourceDirectory>../src/test/java</testSourceDirectory>
+    <sourceDirectory>src/main/java</sourceDirectory>
+    <testSourceDirectory>src/test/java</testSourceDirectory>
   </build>
 
   <dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-examples/hbase-1.4/src/main/java/org/apache/tephra/examples/BalanceBooks.java
----------------------------------------------------------------------
diff --git a/tephra-examples/hbase-1.4/src/main/java/org/apache/tephra/examples/BalanceBooks.java b/tephra-examples/hbase-1.4/src/main/java/org/apache/tephra/examples/BalanceBooks.java
new file mode 100644
index 0000000..b970598
--- /dev/null
+++ b/tephra-examples/hbase-1.4/src/main/java/org/apache/tephra/examples/BalanceBooks.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.examples;
+
+import com.google.common.io.Closeables;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.tephra.TransactionConflictException;
+import org.apache.tephra.TransactionContext;
+import org.apache.tephra.TransactionFailureException;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.distributed.TransactionServiceClient;
+import org.apache.tephra.hbase.TransactionAwareHTable;
+import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
+import org.apache.tephra.runtime.ConfigModule;
+import org.apache.tephra.runtime.DiscoveryModules;
+import org.apache.tephra.runtime.TransactionClientModule;
+import org.apache.tephra.runtime.TransactionModules;
+import org.apache.tephra.runtime.ZKModule;
+import org.apache.tephra.util.ConfigurationFactory;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Simple example application that launches a number of concurrent clients, one per "account".  Each client attempts to
+ * make withdrawals from other clients, and deposit the same amount to its own account in a single transaction.
+ * Since this means the client will be updating both its own row and the withdrawee's row, this will naturally lead to
+ * transaction conflicts.  All clients will run for a specified number of iterations.  When the processing is complete,
+ * the total sum of all rows should be zero, if transactional integrity was maintained.
+ *
+ * <p>
+ *   You can run the BalanceBooks application with the following command:
+ *   <pre>
+ *     ./bin/tephra run org.apache.tephra.examples.BalanceBooks [num clients] [num iterations]
+ *   </pre>
+ *   where <code>[num clients]</code> is the number of concurrent client threads to use, and
+ *   <code>[num iterations]</code> is the number of "transfer" operations to perform per client thread.
+ * </p>
+ */
+public class BalanceBooks implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(BalanceBooks.class);
+
+  private static final int MAX_AMOUNT = 100;
+  private static final byte[] TABLE = Bytes.toBytes("testbalances");
+  private static final byte[] FAMILY = Bytes.toBytes("f");
+  private static final byte[] COL = Bytes.toBytes("b");
+
+  private final int totalClients;
+  private final int iterations;
+
+  private Configuration conf;
+  private ZKClientService zkClient;
+  private TransactionServiceClient txClient;
+  private Connection conn;
+
+  public BalanceBooks(int totalClients, int iterations) {
+    this(totalClients, iterations, new ConfigurationFactory().get());
+  }
+
+  public BalanceBooks(int totalClients, int iterations, Configuration conf) {
+    this.totalClients = totalClients;
+    this.iterations = iterations;
+    this.conf = conf;
+  }
+
+  /**
+   * Sets up common resources required by all clients.
+   */
+  public void init() throws IOException {
+    Injector injector = Guice.createInjector(
+        new ConfigModule(conf),
+        new ZKModule(),
+        new DiscoveryModules().getDistributedModules(),
+        new TransactionModules().getDistributedModules(),
+        new TransactionClientModule()
+    );
+
+    zkClient = injector.getInstance(ZKClientService.class);
+    zkClient.startAndWait();
+    txClient = injector.getInstance(TransactionServiceClient.class);
+    conn = ConnectionFactory.createConnection(conf);
+    createTableIfNotExists(conf, TABLE, new byte[][]{ FAMILY });
+  }
+
+  /**
+   * Runs all clients and waits for them to complete.
+   */
+  public void run() throws IOException, InterruptedException {
+    List<Client> clients = new ArrayList<>(totalClients);
+    for (int i = 0; i < totalClients; i++) {
+      Client c = new Client(i, totalClients, iterations);
+      c.init(txClient, conn.getTable(TableName.valueOf(TABLE)));
+      c.start();
+      clients.add(c);
+    }
+
+    for (Client c : clients) {
+      c.join();
+      Closeables.closeQuietly(c);
+    }
+  }
+
+  /**
+   * Validates the current state of the data stored at the end of the test.  Each update by a client consists of two
+   * parts: a withdrawal of a random amount from a randomly select other account, and a corresponding to deposit to
+   * the client's own account.  So, if all the updates were performed consistently (no partial updates or partial
+   * rollbacks), then the total sum of all balances at the end should be 0.
+   */
+  public boolean verify() {
+    boolean success = false;
+    try {
+      TransactionAwareHTable table = new TransactionAwareHTable(conn.getTable(TableName.valueOf(TABLE)));
+      TransactionContext context = new TransactionContext(txClient, table);
+
+      LOG.info("VERIFYING BALANCES");
+      context.start();
+      long totalBalance = 0;
+
+      try (ResultScanner scanner = table.getScanner(new Scan())) {
+        for (Result r : scanner) {
+          if (!r.isEmpty()) {
+            int rowId = Bytes.toInt(r.getRow());
+            long balance = Bytes.toLong(r.getValue(FAMILY, COL));
+            totalBalance += balance;
+            LOG.info("Client #{}: balance = ${}", rowId, balance);
+          }
+        }
+      }
+      if (totalBalance == 0) {
+        LOG.info("PASSED!");
+        success = true;
+      } else {
+        LOG.info("FAILED! Total balance should be 0 but was {}", totalBalance);
+      }
+      context.finish();
+    } catch (Exception e) {
+      LOG.error("Failed verification check", e);
+    }
+    return success;
+  }
+
+  /**
+   * Frees up the underlying resources common to all clients.
+   */
+  public void close() {
+    try {
+      if (conn != null) {
+        conn.close();
+      }
+    } catch (IOException ignored) { }
+
+    if (zkClient != null) {
+      zkClient.stopAndWait();
+    }
+  }
+
+  protected void createTableIfNotExists(Configuration conf, byte[] tableName, byte[][] columnFamilies)
+      throws IOException {
+    try (Admin admin = this.conn.getAdmin()) {
+      HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
+      for (byte[] family : columnFamilies) {
+        HColumnDescriptor columnDesc = new HColumnDescriptor(family);
+        columnDesc.setMaxVersions(Integer.MAX_VALUE);
+        desc.addFamily(columnDesc);
+      }
+      desc.addCoprocessor(TransactionProcessor.class.getName());
+      admin.createTable(desc);
+    }
+  }
+
+  public static void main(String[] args) {
+    if (args.length != 2) {
+      System.err.println("Usage: java " + BalanceBooks.class.getName() + " <num clients> <iterations>");
+      System.err.println("\twhere <num clients> >= 2");
+      System.exit(1);
+    }
+
+    try (BalanceBooks bb = new BalanceBooks(Integer.parseInt(args[0]), Integer.parseInt(args[1]))) {
+      bb.init();
+      bb.run();
+      bb.verify();
+    } catch (Exception e) {
+      LOG.error("Failed during BalanceBooks run", e);
+    }
+  }
+
+  /**
+   * Represents a single client actor in the test.  Each client runs as a separate thread.
+   *
+   * For the given number of iterations, the client will:
+   * <ol>
+   *   <li>select a random other client from which to withdraw</li>
+   *   <li>select a random amount from 0 to MAX_AMOUNT</li>
+   *   <li>start a new transaction and: deduct the amount from the other client's acccount, and deposit
+   *       the same amount to its own account.</li>
+   * </ol>
+   *
+   * Since multiple clients operate concurrently and contend over a set of constrained resources
+   * (the client accounts), it is expected that a portion of the attempted transactions will encounter
+   * conflicts, due to a simultaneous deduction from or deposit to one the same accounts which has successfully
+   * committed first.  In this case, the updates from the transaction encountering the conflict should be completely
+   * rolled back, leaving the data in a consistent state.
+   */
+  private static class Client extends Thread implements Closeable {
+    private final int id;
+    private final int totalClients;
+    private final int iterations;
+
+    private final Random random = new Random();
+
+    private TransactionContext txContext;
+    private TransactionAwareHTable txTable;
+
+
+    public Client(int id, int totalClients, int iterations) {
+      this.id = id;
+      this.totalClients = totalClients;
+      this.iterations = iterations;
+    }
+
+    /**
+     * Sets up any resources needed by the individual client.
+     *
+     * @param txClient the transaction client to use in accessing the transaciton service
+     * @param table the HBase table instance to use for accessing storage
+     */
+    public void init(TransactionSystemClient txClient, Table table) {
+      txTable = new TransactionAwareHTable(table);
+      txContext = new TransactionContext(txClient, txTable);
+    }
+
+    public void run() {
+      try {
+        for (int i = 0; i < iterations; i++) {
+          runOnce();
+        }
+      } catch (TransactionFailureException e) {
+        LOG.error("Client #{}: Failed on exception", id, e);
+      }
+    }
+
+    /**
+     * Runs a single iteration of the client logic.
+     */
+    private void runOnce() throws TransactionFailureException {
+      int withdrawee = getNextWithdrawee();
+      int amount = getAmount();
+
+      try {
+        txContext.start();
+        long withdraweeBalance = getCurrentBalance(withdrawee);
+        long ownBalance = getCurrentBalance(id);
+        long withdraweeNew = withdraweeBalance - amount;
+        long ownNew = ownBalance + amount;
+
+        setBalance(withdrawee, withdraweeNew);
+        setBalance(id, ownNew);
+        LOG.debug("Client #{}: Withdrew ${} from #{}; withdrawee old={}, new={}; own old={}, new={}",
+            id, amount, withdrawee, withdraweeBalance, withdraweeNew, ownBalance, ownNew);
+        txContext.finish();
+
+      } catch (IOException ioe) {
+        LOG.error("Client #{}: Unhandled client failure", id, ioe);
+        txContext.abort();
+      } catch (TransactionConflictException tce) {
+        LOG.debug("CONFLICT: client #{} attempting to withdraw from #{}", id, withdrawee);
+        txContext.abort(tce);
+      } catch (TransactionFailureException tfe) {
+        LOG.error("Client #{}: Unhandled transaction failure", id, tfe);
+        txContext.abort(tfe);
+      }
+    }
+
+    private long getCurrentBalance(int id) throws IOException {
+      Result r = txTable.get(new Get(Bytes.toBytes(id)));
+      byte[] balanceBytes = r.getValue(FAMILY, COL);
+      if (balanceBytes == null) {
+        return 0;
+      }
+      return Bytes.toLong(balanceBytes);
+    }
+
+    private void setBalance(int id, long balance) throws IOException {
+      txTable.put(new Put(Bytes.toBytes(id)).addColumn(FAMILY, COL, Bytes.toBytes(balance)));
+    }
+
+    private int getNextWithdrawee() {
+      int next;
+      do {
+        next = random.nextInt(totalClients);
+      } while (next == id);
+      return next;
+    }
+
+    private int getAmount() {
+      return random.nextInt(MAX_AMOUNT);
+    }
+
+    public void close() throws IOException {
+      txTable.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-examples/hbase-1.4/src/main/java/org/apache/tephra/examples/package-info.java
----------------------------------------------------------------------
diff --git a/tephra-examples/hbase-1.4/src/main/java/org/apache/tephra/examples/package-info.java b/tephra-examples/hbase-1.4/src/main/java/org/apache/tephra/examples/package-info.java
new file mode 100644
index 0000000..a0e67d5
--- /dev/null
+++ b/tephra-examples/hbase-1.4/src/main/java/org/apache/tephra/examples/package-info.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * This package contains example applications for Tephra designed to illustrate sample Tephra usage
+ * and provide out-of-the-box sample applications which can be run to test cluster functionality.
+ *
+ * <p>Currently the following applications are provided:
+ *
+ * <ul>
+ *   <li><strong>BalanceBooks</strong> - this application runs a specified number of concurrent clients in separate
+ *     threads, which perform transactions to make withdrawals from each other's accounts and deposits to their own
+ *     accounts.  At the end of the test, the total value of all account balances is verified to be equal to zero,
+ *     which confirms that transactional integrity was not violated.
+ *   </li>
+ * </ul>
+ * </p>
+ *
+ * <p>
+ *   Note that, for simplicity, the examples package is currently hardcoded to compile against a specific HBase
+ *   version (currently 2.0).  In the future, we should provide Maven profiles to allow compiling the examples
+ *   against each of the supported HBase versions.
+ * </p>
+ */
+package org.apache.tephra.examples;

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-examples/hbase-1.4/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java
----------------------------------------------------------------------
diff --git a/tephra-examples/hbase-1.4/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java b/tephra-examples/hbase-1.4/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java
new file mode 100644
index 0000000..4dfe107
--- /dev/null
+++ b/tephra-examples/hbase-1.4/src/test/java/org/apache/tephra/examples/BalanceBooksTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tephra.examples;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.Scopes;
+import com.google.inject.util.Modules;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.TxConstants;
+import org.apache.tephra.distributed.TransactionService;
+import org.apache.tephra.persist.InMemoryTransactionStateStorage;
+import org.apache.tephra.persist.TransactionStateStorage;
+import org.apache.tephra.runtime.ConfigModule;
+import org.apache.tephra.runtime.DiscoveryModules;
+import org.apache.tephra.runtime.TransactionClientModule;
+import org.apache.tephra.runtime.TransactionModules;
+import org.apache.tephra.runtime.ZKModule;
+import org.apache.tephra.util.Tests;
+import org.apache.twill.zookeeper.ZKClientService;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests the {@link BalanceBooks} program.
+ */
+public class BalanceBooksTest {
+  private static final Logger LOG = LoggerFactory.getLogger(BalanceBooksTest.class);
+  private static HBaseTestingUtility testUtil;
+  private static TransactionService txService;
+  private static ZKClientService zkClientService;
+
+  @ClassRule
+  public static TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    testUtil = new HBaseTestingUtility();
+    Configuration conf = testUtil.getConfiguration();
+    conf.setBoolean(TxConstants.Manager.CFG_DO_PERSIST, false);
+    conf.set(TxConstants.Manager.CFG_TX_SNAPSHOT_DIR, tmpFolder.newFolder().getAbsolutePath());
+
+    // Tune down the connection thread pool size
+    conf.setInt("hbase.hconnection.threads.core", 5);
+    conf.setInt("hbase.hconnection.threads.max", 10);
+    // Tunn down handler threads in regionserver
+    conf.setInt("hbase.regionserver.handler.count", 10);
+
+    // Set to random port
+    conf.setInt("hbase.master.port", 0);
+    conf.setInt("hbase.master.info.port", 0);
+    conf.setInt("hbase.regionserver.port", 0);
+    conf.setInt("hbase.regionserver.info.port", 0);
+
+    testUtil.startMiniCluster();
+
+    String zkClusterKey = testUtil.getClusterKey(); // hostname:clientPort:parentZnode
+    String zkQuorum = zkClusterKey.substring(0, zkClusterKey.lastIndexOf(':'));
+    LOG.info("Zookeeper Quorum is running at {}", zkQuorum);
+    conf.set(TxConstants.Service.CFG_DATA_TX_ZOOKEEPER_QUORUM, zkQuorum);
+
+    Injector injector = Guice.createInjector(
+        new ConfigModule(conf),
+        new ZKModule(),
+        new DiscoveryModules().getDistributedModules(),
+        Modules.override(new TransactionModules().getDistributedModules())
+            .with(new AbstractModule() {
+              @Override
+              protected void configure() {
+                bind(TransactionStateStorage.class).to(InMemoryTransactionStateStorage.class).in(Scopes.SINGLETON);
+              }
+            }),
+        new TransactionClientModule()
+    );
+
+    zkClientService = injector.getInstance(ZKClientService.class);
+    zkClientService.startAndWait();
+
+    // start a tx server
+    txService = injector.getInstance(TransactionService.class);
+    try {
+      LOG.info("Starting transaction service");
+      txService.startAndWait();
+    } catch (Exception e) {
+      LOG.error("Failed to start service: ", e);
+      throw e;
+    }
+
+    Tests.waitForTxReady(injector.getInstance(TransactionSystemClient.class));
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if (txService != null) {
+      txService.stopAndWait();
+    }
+    if (zkClientService != null) {
+      zkClientService.stopAndWait();
+    }
+    testUtil.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testBalanceBooks() throws Exception {
+    try (BalanceBooks bb = new BalanceBooks(5, 100, testUtil.getConfiguration())) {
+      bb.init();
+      bb.run();
+      assertTrue(bb.verify());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
index af4b350..18886c7 100644
--- a/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
+++ b/tephra-hbase-compat-1.3/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.OperationWithAttributes;
@@ -41,6 +40,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
 import org.apache.hadoop.hbase.filter.CompareFilter;
@@ -65,17 +65,17 @@ import java.util.Set;
  * was started.
  */
 public class TransactionAwareHTable extends AbstractTransactionAwareTable
-    implements HTableInterface, TransactionAware {
+    implements Table, TransactionAware {
 
   private static final Logger LOG = LoggerFactory.getLogger(TransactionAwareHTable.class);
-  private final HTableInterface hTable;
+  private final Table hTable;
 
   /**
    * Create a transactional aware instance of the passed HTable
    *
    * @param hTable underlying HBase table to use
    */
-  public TransactionAwareHTable(HTableInterface hTable) {
+  public TransactionAwareHTable(Table hTable) {
     this(hTable, false);
   }
 
@@ -85,7 +85,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
    * @param hTable underlying HBase table to use
    * @param conflictLevel level of conflict detection to perform (defaults to {@code COLUMN})
    */
-  public TransactionAwareHTable(HTableInterface hTable, TxConstants.ConflictDetection conflictLevel) {
+  public TransactionAwareHTable(Table hTable, TxConstants.ConflictDetection conflictLevel) {
     this(hTable, conflictLevel, false);
   }
 
@@ -96,7 +96,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
    * @param allowNonTransactional if true, additional operations (checkAndPut, increment, checkAndDelete)
    *                              will be available, though non-transactional
    */
-  public TransactionAwareHTable(HTableInterface hTable, boolean allowNonTransactional) {
+  public TransactionAwareHTable(Table hTable, boolean allowNonTransactional) {
     this(hTable, TxConstants.ConflictDetection.COLUMN, allowNonTransactional);
   }
 
@@ -108,7 +108,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
    * @param allowNonTransactional if true, additional operations (checkAndPut, increment, checkAndDelete)
    *                              will be available, though non-transactional
    */
-  public TransactionAwareHTable(HTableInterface hTable, TxConstants.ConflictDetection conflictLevel,
+  public TransactionAwareHTable(Table hTable, TxConstants.ConflictDetection conflictLevel,
                                 boolean allowNonTransactional) {
     super(conflictLevel, allowNonTransactional, 
         hTable.getConfiguration().getBoolean(
@@ -121,12 +121,11 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
 
   @Override
   protected byte[] getTableKey() {
-    return getTableName();
+    return hTable.getName().getName();
   }
 
   @Override
   protected boolean doCommit() throws IOException {
-    hTable.flushCommits();
     return true;
   }
 
@@ -169,22 +168,12 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
       hTable.delete(rollbackDeletes);
       return true;
     } finally {
-      try {
-        hTable.flushCommits();
-      } catch (Exception e) {
-        LOG.error("Could not flush HTable commits", e);
-      }
       tx = null;
       changeSets.clear();
     }
   }
 
-  /* HTableInterface implementation */
-
-  @Override
-  public byte[] getTableName() {
-    return hTable.getTableName();
-  }
+  /* Table implementation */
 
   @Override
   public TableName getName() {
@@ -210,18 +199,6 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
   }
 
   @Override
-  public Boolean[] exists(List<Get> gets) throws IOException {
-    if (tx == null) {
-      throw new IOException("Transaction not started");
-    }
-    List<Get> transactionalizedGets = new ArrayList<>(gets.size());
-    for (Get get : gets) {
-      transactionalizedGets.add(transactionalizeAction(get));
-    }
-    return hTable.exists(transactionalizedGets);
-  }
-
-  @Override
   public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException {
     if (tx == null) {
       throw new IOException("Transaction not started");
@@ -276,15 +253,6 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
   }
 
   @Override
-  public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
-    if (allowNonTransactional) {
-      return hTable.getRowOrBefore(row, family);
-    } else {
-      throw new UnsupportedOperationException("Operation is not supported transactionally");
-    }
-  }
-
-  @Override
   public ResultScanner getScanner(Scan scan) throws IOException {
     if (tx == null) {
       throw new IOException("Transaction not started");
@@ -472,7 +440,8 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
   }
 
   @Override
-  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException {
+  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount)
+      throws IOException {
     if (allowNonTransactional) {
       return hTable.incrementColumnValue(row, family, qualifier, amount);
     } else {
@@ -481,8 +450,8 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
   }
 
   @Override
-  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability)
-    throws IOException {
+  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount,
+      Durability durability) throws IOException {
     if (allowNonTransactional) {
       return hTable.incrementColumnValue(row, family, qualifier, amount, durability);
     } else {
@@ -491,26 +460,6 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
   }
 
   @Override
-  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL)
-    throws IOException {
-    if (allowNonTransactional) {
-      return hTable.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
-    } else {
-      throw new UnsupportedOperationException("Operation is not supported transactionally");
-    }
-  }
-
-  @Override
-  public boolean isAutoFlush() {
-    return hTable.isAutoFlush();
-  }
-
-  @Override
-  public void flushCommits() throws IOException {
-    hTable.flushCommits();
-  }
-
-  @Override
   public void close() throws IOException {
     hTable.close();
   }
@@ -549,21 +498,6 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
   }
 
   @Override
-  public void setAutoFlush(boolean autoFlush) {
-    setAutoFlushTo(autoFlush);
-  }
-
-  @Override
-  public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
-    hTable.setAutoFlush(autoFlush, clearBufferOnFail);
-  }
-
-  @Override
-  public void setAutoFlushTo(boolean autoFlush) {
-    hTable.setAutoFlushTo(autoFlush);
-  }
-
-  @Override
   public long getWriteBufferSize() {
     return hTable.getWriteBufferSize();
   }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
index 560b0fe..179b22e 100644
--- a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
+++ b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/AbstractHBaseTableTest.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.tephra.TxConstants;
 import org.apache.tephra.hbase.coprocessor.TransactionProcessor;
 import org.junit.AfterClass;
@@ -60,7 +60,6 @@ public abstract class AbstractHBaseTableTest {
     conf.setInt("hbase.master.info.port", 0);
     conf.setInt("hbase.regionserver.port", 0);
     conf.setInt("hbase.regionserver.info.port", 0);
-
     testUtil.startMiniCluster();
     hBaseAdmin = testUtil.getHBaseAdmin();
   }
@@ -76,12 +75,12 @@ public abstract class AbstractHBaseTableTest {
     }
   }
 
-  protected static HTable createTable(byte[] tableName, byte[][] columnFamilies) throws Exception {
+  protected static Table createTable(byte[] tableName, byte[][] columnFamilies) throws Exception {
     return createTable(tableName, columnFamilies, false,
                        Collections.singletonList(TransactionProcessor.class.getName()));
   }
 
-  protected static HTable createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData,
+  protected static Table createTable(byte[] tableName, byte[][] columnFamilies, boolean existingData,
                                       List<String> coprocessors) throws Exception {
     HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
     for (byte[] family : columnFamilies) {
@@ -101,6 +100,6 @@ public abstract class AbstractHBaseTableTest {
     }
     hBaseAdmin.createTable(desc);
     testUtil.waitTableAvailable(tableName, 5000);
-    return new HTable(testUtil.getConfiguration(), tableName);
+    return testUtil.getConnection().getTable(TableName.valueOf(tableName));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
index 73f9d45..51fbbda 100644
--- a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
+++ b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/TransactionAwareHTableTest.java
@@ -26,17 +26,17 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.OperationWithAttributes;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -99,7 +99,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
   static TransactionManager txManager;
   private TransactionContext transactionContext;
   private TransactionAwareHTable transactionAwareHTable;
-  private HTable hTable;
+  private Table hTable;
   
   @ClassRule
   public static TemporaryFolder tmpFolder = new TemporaryFolder();
@@ -254,7 +254,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
    */
   @Test
   public void testValidTransactionalDelete() throws Exception {
-    try (HTable hTable = createTable(Bytes.toBytes("TestValidTransactionalDelete"),
+    try (Table hTable = createTable(Bytes.toBytes("TestValidTransactionalDelete"),
                                      new byte[][]{TestBytes.family, TestBytes.family2})) {
       TransactionAwareHTable txTable = new TransactionAwareHTable(hTable);
       TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
@@ -394,7 +394,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
    */
   @Test
   public void testAttributesPreserved() throws Exception {
-    HTable hTable = createTable(Bytes.toBytes("TestAttributesPreserved"),
+    Table hTable = createTable(Bytes.toBytes("TestAttributesPreserved"),
         new byte[][]{TestBytes.family, TestBytes.family2}, false,
         Lists.newArrayList(TransactionProcessor.class.getName(), TestRegionObserver.class.getName()));
     try {
@@ -436,7 +436,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
   
   @Test
   public void testFamilyDeleteWithCompaction() throws Exception {
-    HTable hTable = createTable(Bytes.toBytes("TestFamilyDeleteWithCompaction"),
+    Table hTable = createTable(Bytes.toBytes("TestFamilyDeleteWithCompaction"),
                                 new byte[][]{TestBytes.family, TestBytes.family2});
     try {
       TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, ConflictDetection.ROW);
@@ -536,7 +536,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
 
   private void testDeleteRollback(TxConstants.ConflictDetection conflictDetection) throws Exception {
     String tableName = String.format("%s%s", "TestColFamilyDelete", conflictDetection);
-    HTable hTable = createTable(Bytes.toBytes(tableName), new byte[][]{TestBytes.family});
+    Table hTable = createTable(Bytes.toBytes(tableName), new byte[][]{TestBytes.family});
     try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, conflictDetection)) {
       TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
       txContext.start();
@@ -572,7 +572,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
 
   @Test
   public void testMultiColumnFamilyRowDeleteRollback() throws Exception {
-    HTable hTable = createTable(Bytes.toBytes("TestMultColFam"), new byte[][] {TestBytes.family, TestBytes.family2});
+    Table hTable = createTable(Bytes.toBytes("TestMultColFam"), new byte[][] {TestBytes.family, TestBytes.family2});
     try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) {
       TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
       txContext.start();
@@ -604,7 +604,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
 
   @Test
   public void testRowDelete() throws Exception {
-    HTable hTable = createTable(Bytes.toBytes("TestRowDelete"), new byte[][]{TestBytes.family, TestBytes.family2});
+    Table hTable = createTable(Bytes.toBytes("TestRowDelete"), new byte[][]{TestBytes.family, TestBytes.family2});
     try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) {
       TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
 
@@ -687,7 +687,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
                     .add(TestBytes.family2, TestBytes.qualifier, TestBytes.value)
                     .add(TestBytes.family2, TestBytes.qualifier2, TestBytes.value2));
       txContext.finish();
-
+      
       txContext.start();
       txTable.delete(new Delete(TestBytes.row2).deleteFamily(TestBytes.family));
       txContext.finish();
@@ -781,12 +781,12 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
   @Test
   public void testReadYourWrites() throws Exception {
     // In-progress tx1: started before our main transaction
-    HTable hTable1 = new HTable(testUtil.getConfiguration(), TestBytes.table);
+    Table hTable1 = testUtil.getConnection().getTable(TableName.valueOf(TestBytes.table));
     TransactionAwareHTable txHTable1 = new TransactionAwareHTable(hTable1);
     TransactionContext inprogressTxContext1 = new TransactionContext(new InMemoryTxSystemClient(txManager), txHTable1);
 
     // In-progress tx2: started while our main transaction is running
-    HTable hTable2 = new HTable(testUtil.getConfiguration(), TestBytes.table);
+    Table hTable2 = testUtil.getConnection().getTable(TableName.valueOf(TestBytes.table));
     TransactionAwareHTable txHTable2 = new TransactionAwareHTable(hTable2);
     TransactionContext inprogressTxContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txHTable2);
 
@@ -838,11 +838,13 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
 
   @Test
   public void testRowLevelConflictDetection() throws Exception {
-    TransactionAwareHTable txTable1 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
+    TransactionAwareHTable txTable1 = new TransactionAwareHTable(
+            testUtil.getConnection().getTable(TableName.valueOf(TestBytes.table)),
         TxConstants.ConflictDetection.ROW);
     TransactionContext txContext1 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable1);
 
-    TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
+    TransactionAwareHTable txTable2 = new TransactionAwareHTable(
+            testUtil.getConnection().getTable(TableName.valueOf(TestBytes.table)),
         TxConstants.ConflictDetection.ROW);
     TransactionContext txContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable2);
 
@@ -949,11 +951,13 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
   @Test
   public void testNoneLevelConflictDetection() throws Exception {
     InMemoryTxSystemClient txClient = new InMemoryTxSystemClient(txManager);
-    TransactionAwareHTable txTable1 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
+    TransactionAwareHTable txTable1 = new TransactionAwareHTable(
+            testUtil.getConnection().getTable(TableName.valueOf(TestBytes.table)),
         TxConstants.ConflictDetection.NONE);
     TransactionContext txContext1 = new TransactionContext(txClient, txTable1);
 
-    TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table),
+    TransactionAwareHTable txTable2 = new TransactionAwareHTable(
+            testUtil.getConnection().getTable(TableName.valueOf(TestBytes.table)),
         TxConstants.ConflictDetection.NONE);
     TransactionContext txContext2 = new TransactionContext(txClient, txTable2);
 
@@ -1088,7 +1092,8 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
     transactionAwareHTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, TestBytes.value2));
 
     // check that writes are still not visible to other clients
-    TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table));
+    TransactionAwareHTable txTable2 = new TransactionAwareHTable(
+            testUtil.getConnection().getTable(TableName.valueOf(TestBytes.table)));
     TransactionContext txContext2 = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable2);
 
     txContext2.start();
@@ -1147,7 +1152,8 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
     txClient.invalidate(transactionContext.getCurrentTransaction().getTransactionId());
 
     // check that writes are not visible
-    TransactionAwareHTable txTable2 = new TransactionAwareHTable(new HTable(conf, TestBytes.table));
+    TransactionAwareHTable txTable2 = new TransactionAwareHTable(
+            testUtil.getConnection().getTable(TableName.valueOf(TestBytes.table)));
     TransactionContext txContext2 = new TransactionContext(txClient, txTable2);
     txContext2.start();
     Transaction newTx = txContext2.getCurrentTransaction();
@@ -1183,7 +1189,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
     TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
 
     // Add some pre-existing, non-transactional data
-    HTable nonTxTable = new HTable(testUtil.getConfiguration(), txTable.getTableName());
+    Table nonTxTable = testUtil.getConnection().getTable(txTable.getName());
     nonTxTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier, val11));
     nonTxTable.put(new Put(TestBytes.row).add(TestBytes.family, TestBytes.qualifier2, val12));
     nonTxTable.put(new Put(TestBytes.row2).add(TestBytes.family, TestBytes.qualifier, val21));
@@ -1191,7 +1197,6 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
     nonTxTable.put(new Put(TestBytes.row4).add(TestBytes.family, TxConstants.FAMILY_DELETE_QUALIFIER,
                                                HConstants.EMPTY_BYTE_ARRAY));
     nonTxTable.put(new Put(TestBytes.row4).add(TestBytes.family, TestBytes.qualifier, HConstants.EMPTY_BYTE_ARRAY));
-    nonTxTable.flushCommits();
 
     // Add transactional data
     txContext.start();
@@ -1282,15 +1287,15 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
     txContext.finish();
   }
 
-  private void verifyRow(HTableInterface table, byte[] rowkey, byte[] expectedValue) throws Exception {
+  private void verifyRow(Table table, byte[] rowkey, byte[] expectedValue) throws Exception {
     verifyRow(table, new Get(rowkey), expectedValue);
   }
 
-  private void verifyRow(HTableInterface table, Get get, byte[] expectedValue) throws Exception {
+  private void verifyRow(Table table, Get get, byte[] expectedValue) throws Exception {
     verifyRows(table, get, expectedValue == null ? null : ImmutableList.of(expectedValue));
   }
 
-  private void verifyRows(HTableInterface table, Get get, List<byte[]> expectedValues) throws Exception {
+  private void verifyRows(Table table, Get get, List<byte[]> expectedValues) throws Exception {
     Result result = table.get(get);
     if (expectedValues == null) {
       assertTrue(result.isEmpty());
@@ -1310,12 +1315,12 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
     }
   }
 
-  private Cell[] getRow(HTableInterface table, Get get) throws Exception {
+  private Cell[] getRow(Table table, Get get) throws Exception {
     Result result = table.get(get);
     return result.rawCells();
   }
 
-  private void verifyScan(HTableInterface table, Scan scan, List<KeyValue> expectedCells) throws Exception {
+  private void verifyScan(Table table, Scan scan, List<KeyValue> expectedCells) throws Exception {
     List<Cell> actualCells = new ArrayList<>();
     try (ResultScanner scanner = table.getScanner(scan)) {
       Result[] results = scanner.next(expectedCells.size() + 1);
@@ -1328,7 +1333,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
 
   @Test
   public void testVisibilityAll() throws Exception {
-    HTable nonTxTable =
+    Table nonTxTable =
       createTable(Bytes.toBytes("testVisibilityAll"), new byte[][]{TestBytes.family, TestBytes.family2},
                   true, Collections.singletonList(TransactionProcessor.class.getName()));
     TransactionAwareHTable txTable =
@@ -1500,7 +1505,6 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
     // to prevent Tephra from replacing delete with delete marker
     deleteFamily.setAttribute(TxConstants.TX_ROLLBACK_ATTRIBUTE_KEY, new byte[0]);
     nonTxTable.delete(deleteFamily);
-    nonTxTable.flushCommits();
 
     txContext.start();
     txContext.getCurrentTransaction().setVisibility(Transaction.VisibilityLevel.SNAPSHOT_ALL);
@@ -1712,7 +1716,7 @@ public class TransactionAwareHTableTest extends AbstractHBaseTableTest {
    * Represents older transaction clients
    */
   private static class OldTransactionAwareHTable extends TransactionAwareHTable {
-    public OldTransactionAwareHTable(HTableInterface hTable) {
+    public OldTransactionAwareHTable(Table hTable) {
       super(hTable);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
index 2e9dc17..031b0f3 100644
--- a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
+++ b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/DataJanitorStateTest.java
@@ -23,7 +23,6 @@ package org.apache.tephra.hbase.txprune;
 import com.google.common.collect.ImmutableSortedMap;
 import com.google.common.collect.ImmutableSortedSet;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.tephra.TxConstants;
@@ -53,7 +52,7 @@ public class DataJanitorStateTest extends AbstractHBaseTableTest {
   public void beforeTest() throws Exception {
     pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
                                                  TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
-    HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
+    Table table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
                                // Prune state table is a non-transactional table, hence no transaction co-processor
                                Collections.<String>emptyList());
     table.close();

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
index 55348b0..e92b4cf 100644
--- a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
+++ b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruneTest.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -90,7 +89,7 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
 
     // Do some transactional data operations
     txDataTable1 = TableName.valueOf("invalidListPruneTestTable1");
-    HTable hTable = createTable(txDataTable1.getName(), new byte[][]{family}, false,
+    Table hTable = createTable(txDataTable1.getName(), new byte[][]{family}, false,
                                 Collections.singletonList(TestTransactionProcessor.class.getName()));
     try (TransactionAwareHTable txTable = new TransactionAwareHTable(hTable, TxConstants.ConflictDetection.ROW)) {
       TransactionContext txContext = new TransactionContext(new InMemoryTxSystemClient(txManager), txTable);
@@ -129,7 +128,7 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
   }
 
   private void createPruneStateTable() throws Exception {
-    HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
+    Table table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
                                // Prune state table is a non-transactional table, hence no transaction co-processor
                                Collections.<String>emptyList());
     table.close();
@@ -311,7 +310,7 @@ public class InvalidListPruneTest extends AbstractHBaseTableTest {
 
     // Create an empty table
     TableName txEmptyTable = TableName.valueOf("emptyPruneTestTable");
-    HTable emptyHTable = createTable(txEmptyTable.getName(), new byte[][]{family}, false,
+    Table emptyHTable = createTable(txEmptyTable.getName(), new byte[][]{family}, false,
                                      Collections.singletonList(TestTransactionProcessor.class.getName()));
 
     TransactionPruningPlugin transactionPruningPlugin = new TestTransactionPruningPlugin();

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java
index 1476906..aa669e5 100644
--- a/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java
+++ b/tephra-hbase-compat-1.3/src/test/java/org/apache/tephra/hbase/txprune/InvalidListPruningDebugTest.java
@@ -30,7 +30,6 @@ import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.tephra.TxConstants;
@@ -104,7 +103,7 @@ public class InvalidListPruningDebugTest extends AbstractHBaseTableTest {
   public static void addData() throws Exception {
     pruneStateTable = TableName.valueOf(conf.get(TxConstants.TransactionPruning.PRUNE_STATE_TABLE,
                                                  TxConstants.TransactionPruning.DEFAULT_PRUNE_STATE_TABLE));
-    HTable table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
+    Table table = createTable(pruneStateTable.getName(), new byte[][]{DataJanitorState.FAMILY}, false,
                                // Prune state table is a non-transactional table, hence no transaction co-processor
                                Collections.<String>emptyList());
     table.close();

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/52fd15f0/tephra-hbase-compat-1.4/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-1.4/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java b/tephra-hbase-compat-1.4/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
index d6eb30e..e3ef374 100644
--- a/tephra-hbase-compat-1.4/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
+++ b/tephra-hbase-compat-1.4/src/main/java/org/apache/tephra/hbase/TransactionAwareHTable.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.OperationWithAttributes;
@@ -41,6 +40,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
 import org.apache.hadoop.hbase.filter.CompareFilter;
@@ -58,6 +58,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 /**
  * A Transaction Aware HTable implementation for HBase 1.3. Operations are committed as usual,
@@ -65,17 +66,17 @@ import java.util.Set;
  * was started.
  */
 public class TransactionAwareHTable extends AbstractTransactionAwareTable
-    implements HTableInterface, TransactionAware {
+    implements Table, TransactionAware {
 
   private static final Logger LOG = LoggerFactory.getLogger(TransactionAwareHTable.class);
-  private final HTableInterface hTable;
+  private final Table hTable;
 
   /**
    * Create a transactional aware instance of the passed HTable
    *
    * @param hTable underlying HBase table to use
    */
-  public TransactionAwareHTable(HTableInterface hTable) {
+  public TransactionAwareHTable(Table hTable) {
     this(hTable, false);
   }
 
@@ -85,7 +86,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
    * @param hTable underlying HBase table to use
    * @param conflictLevel level of conflict detection to perform (defaults to {@code COLUMN})
    */
-  public TransactionAwareHTable(HTableInterface hTable, TxConstants.ConflictDetection conflictLevel) {
+  public TransactionAwareHTable(Table hTable, TxConstants.ConflictDetection conflictLevel) {
     this(hTable, conflictLevel, false);
   }
 
@@ -96,7 +97,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
    * @param allowNonTransactional if true, additional operations (checkAndPut, increment, checkAndDelete)
    *                              will be available, though non-transactional
    */
-  public TransactionAwareHTable(HTableInterface hTable, boolean allowNonTransactional) {
+  public TransactionAwareHTable(Table hTable, boolean allowNonTransactional) {
     this(hTable, TxConstants.ConflictDetection.COLUMN, allowNonTransactional);
   }
 
@@ -108,7 +109,7 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
    * @param allowNonTransactional if true, additional operations (checkAndPut, increment, checkAndDelete)
    *                              will be available, though non-transactional
    */
-  public TransactionAwareHTable(HTableInterface hTable, TxConstants.ConflictDetection conflictLevel,
+  public TransactionAwareHTable(Table hTable, TxConstants.ConflictDetection conflictLevel,
                                 boolean allowNonTransactional) {
     super(conflictLevel, allowNonTransactional, 
         hTable.getConfiguration().getBoolean(
@@ -121,12 +122,11 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
 
   @Override
   protected byte[] getTableKey() {
-    return getTableName();
+    return hTable.getName().getName();
   }
 
   @Override
   protected boolean doCommit() throws IOException {
-    hTable.flushCommits();
     return true;
   }
 
@@ -169,22 +169,12 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
       hTable.delete(rollbackDeletes);
       return true;
     } finally {
-      try {
-        hTable.flushCommits();
-      } catch (Exception e) {
-        LOG.error("Could not flush HTable commits", e);
-      }
       tx = null;
       changeSets.clear();
     }
   }
 
-  /* HTableInterface implementation */
-
-  @Override
-  public byte[] getTableName() {
-    return hTable.getTableName();
-  }
+  /* Table implementation */
 
   @Override
   public TableName getName() {
@@ -210,18 +200,6 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
   }
 
   @Override
-  public Boolean[] exists(List<Get> gets) throws IOException {
-    if (tx == null) {
-      throw new IOException("Transaction not started");
-    }
-    List<Get> transactionalizedGets = new ArrayList<>(gets.size());
-    for (Get get : gets) {
-      transactionalizedGets.add(transactionalizeAction(get));
-    }
-    return hTable.exists(transactionalizedGets);
-  }
-
-  @Override
   public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException {
     if (tx == null) {
       throw new IOException("Transaction not started");
@@ -276,15 +254,6 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
   }
 
   @Override
-  public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
-    if (allowNonTransactional) {
-      return hTable.getRowOrBefore(row, family);
-    } else {
-      throw new UnsupportedOperationException("Operation is not supported transactionally");
-    }
-  }
-
-  @Override
   public ResultScanner getScanner(Scan scan) throws IOException {
     if (tx == null) {
       throw new IOException("Transaction not started");
@@ -438,6 +407,28 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
   }
 
   @Override
+  public int getReadRpcTimeout() {
+    return hTable.getReadRpcTimeout();
+  }
+
+  @Override
+  public void setReadRpcTimeout(int readRpcTimeout) {
+    hTable.setReadRpcTimeout(readRpcTimeout);
+
+  }
+
+  @Override
+  public int getWriteRpcTimeout() {
+    return hTable.getWriteRpcTimeout();
+  }
+
+  @Override
+  public void setWriteRpcTimeout(int writeRpcTimeout) {
+    hTable.setWriteRpcTimeout(writeRpcTimeout);
+
+  }
+
+  @Override
   public void mutateRow(RowMutations rm) throws IOException {
     if (tx == null) {
       throw new IOException("Transaction not started");
@@ -472,7 +463,8 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
   }
 
   @Override
-  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException {
+  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount)
+      throws IOException {
     if (allowNonTransactional) {
       return hTable.incrementColumnValue(row, family, qualifier, amount);
     } else {
@@ -481,8 +473,8 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
   }
 
   @Override
-  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability)
-    throws IOException {
+  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount,
+      Durability durability) throws IOException {
     if (allowNonTransactional) {
       return hTable.incrementColumnValue(row, family, qualifier, amount, durability);
     } else {
@@ -491,26 +483,6 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
   }
 
   @Override
-  public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL)
-    throws IOException {
-    if (allowNonTransactional) {
-      return hTable.incrementColumnValue(row, family, qualifier, amount, writeToWAL);
-    } else {
-      throw new UnsupportedOperationException("Operation is not supported transactionally");
-    }
-  }
-
-  @Override
-  public boolean isAutoFlush() {
-    return hTable.isAutoFlush();
-  }
-
-  @Override
-  public void flushCommits() throws IOException {
-    hTable.flushCommits();
-  }
-
-  @Override
   public void close() throws IOException {
     hTable.close();
   }
@@ -549,21 +521,6 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
   }
 
   @Override
-  public void setAutoFlush(boolean autoFlush) {
-    setAutoFlushTo(autoFlush);
-  }
-
-  @Override
-  public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
-    hTable.setAutoFlush(autoFlush, clearBufferOnFail);
-  }
-
-  @Override
-  public void setAutoFlushTo(boolean autoFlush) {
-    hTable.setAutoFlushTo(autoFlush);
-  }
-
-  @Override
   public long getWriteBufferSize() {
     return hTable.getWriteBufferSize();
   }
@@ -573,26 +530,6 @@ public class TransactionAwareHTable extends AbstractTransactionAwareTable
     hTable.setWriteBufferSize(writeBufferSize);
   }
 
-  @Override
-  public int getReadRpcTimeout() {
-    return hTable.getReadRpcTimeout();
-  }
-
-  @Override
-  public void setReadRpcTimeout(int timeout) {
-    hTable.setReadRpcTimeout(timeout);
-  }
-
-  @Override
-  public int getWriteRpcTimeout() {
-    return hTable.getWriteRpcTimeout();
-  }
-
-  @Override
-  public void setWriteRpcTimeout(int timeout) {
-    hTable.setWriteRpcTimeout(timeout);
-  }
-
   // Helpers to get copies of objects with the timestamp set to the current transaction timestamp.
 
   private Get transactionalizeAction(Get get) throws IOException {



Mime
View raw message