tephra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From go...@apache.org
Subject [1/3] incubator-tephra git commit: TEPHRA-228 Adding the ability to pass-in a clientId during the start of a transaction which is logged when the transaction gets invalidated during time out.
Date Mon, 22 May 2017 23:51:18 GMT
Repository: incubator-tephra
Updated Branches:
  refs/heads/master 0b209bb41 -> a22c11d81


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java
b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java
index ff796c1..787832b 100644
--- a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java
+++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionDistributedModule.java
@@ -28,6 +28,7 @@ import org.apache.tephra.TransactionExecutor;
 import org.apache.tephra.TransactionExecutorFactory;
 import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.TxConstants;
 import org.apache.tephra.distributed.TransactionServiceClient;
 import org.apache.tephra.metrics.DefaultMetricsCollector;
 import org.apache.tephra.metrics.MetricsCollector;
@@ -35,11 +36,23 @@ import org.apache.tephra.persist.HDFSTransactionStateStorage;
 import org.apache.tephra.persist.TransactionStateStorage;
 import org.apache.tephra.snapshot.SnapshotCodecProvider;
 
+import java.lang.management.ManagementFactory;
+
 /**
  * Guice bindings for running in distributed mode on a cluster.
  */
 final class TransactionDistributedModule extends AbstractModule {
 
+  private final String clientId;
+
+  public TransactionDistributedModule() {
+    this(ManagementFactory.getRuntimeMXBean().getName());
+  }
+
+  public TransactionDistributedModule(String clientId) {
+    this.clientId = clientId;
+  }
+
   @Override
   protected void configure() {
     // some of these classes need to be non-singleton in order to create a new instance during
leader() in
@@ -48,6 +61,8 @@ final class TransactionDistributedModule extends AbstractModule {
     bind(TransactionStateStorage.class).annotatedWith(Names.named("persist")).to(HDFSTransactionStateStorage.class);
     bind(TransactionStateStorage.class).toProvider(TransactionStateStorageProvider.class);
 
+    bindConstant().annotatedWith(Names.named(TxConstants.CLIENT_ID)).to(clientId);
+
     // to catch issues during configure time
     bind(TransactionManager.class);
     bind(TransactionSystemClient.class).to(TransactionServiceClient.class).in(Scopes.SINGLETON);

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java
b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java
index 0c5a3f3..f5f864b 100644
--- a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java
+++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionInMemoryModule.java
@@ -21,11 +21,13 @@ package org.apache.tephra.runtime;
 import com.google.inject.AbstractModule;
 import com.google.inject.Scopes;
 import com.google.inject.assistedinject.FactoryModuleBuilder;
+import com.google.inject.name.Names;
 import org.apache.tephra.DefaultTransactionExecutor;
 import org.apache.tephra.TransactionExecutor;
 import org.apache.tephra.TransactionExecutorFactory;
 import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.TxConstants;
 import org.apache.tephra.inmemory.InMemoryTxSystemClient;
 import org.apache.tephra.metrics.MetricsCollector;
 import org.apache.tephra.metrics.TxMetricsCollector;
@@ -33,11 +35,22 @@ import org.apache.tephra.persist.NoOpTransactionStateStorage;
 import org.apache.tephra.persist.TransactionStateStorage;
 import org.apache.tephra.snapshot.SnapshotCodecProvider;
 
+import java.lang.management.ManagementFactory;
+
 /**
  * Guice bindings for running completely in-memory (no persistence).  This should only be
used for
  * test classes, as the transaction state cannot be recovered in the case of a failure.
  */
 public class TransactionInMemoryModule extends AbstractModule {
+  private final String clientId;
+
+  public TransactionInMemoryModule() {
+    this(ManagementFactory.getRuntimeMXBean().getName());
+  }
+
+  public TransactionInMemoryModule(String clientId) {
+    this.clientId = clientId;
+  }
 
   @Override
   protected void configure() {
@@ -48,6 +61,8 @@ public class TransactionInMemoryModule extends AbstractModule {
     // no metrics output for in-memory
     bind(MetricsCollector.class).to(TxMetricsCollector.class);
 
+    bindConstant().annotatedWith(Names.named(TxConstants.CLIENT_ID)).to(clientId);
+
     install(new FactoryModuleBuilder()
               .implement(TransactionExecutor.class, DefaultTransactionExecutor.class)
               .build(TransactionExecutorFactory.class));

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java
b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java
index 0dba09e..213abb0 100644
--- a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java
+++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionLocalModule.java
@@ -28,6 +28,7 @@ import org.apache.tephra.TransactionExecutor;
 import org.apache.tephra.TransactionExecutorFactory;
 import org.apache.tephra.TransactionManager;
 import org.apache.tephra.TransactionSystemClient;
+import org.apache.tephra.TxConstants;
 import org.apache.tephra.inmemory.InMemoryTxSystemClient;
 import org.apache.tephra.metrics.DefaultMetricsCollector;
 import org.apache.tephra.metrics.MetricsCollector;
@@ -35,10 +36,21 @@ import org.apache.tephra.persist.LocalFileTransactionStateStorage;
 import org.apache.tephra.persist.TransactionStateStorage;
 import org.apache.tephra.snapshot.SnapshotCodecProvider;
 
+import java.lang.management.ManagementFactory;
+
 /**
  * Guice bindings for running in single-node mode (persistence to local disk and in-memory
client).
  */
 final class TransactionLocalModule extends AbstractModule {
+  private final String clientId;
+
+  public TransactionLocalModule() {
+    this(ManagementFactory.getRuntimeMXBean().getName());
+  }
+
+  public TransactionLocalModule(String clientId) {
+    this.clientId = clientId;
+  }
 
   @Override
   protected void configure() {
@@ -51,6 +63,8 @@ final class TransactionLocalModule extends AbstractModule {
     bind(TransactionSystemClient.class).to(InMemoryTxSystemClient.class).in(Singleton.class);
     bind(MetricsCollector.class).to(DefaultMetricsCollector.class);
 
+    bindConstant().annotatedWith(Names.named(TxConstants.CLIENT_ID)).to(clientId);
+
     install(new FactoryModuleBuilder()
               .implement(TransactionExecutor.class, DefaultTransactionExecutor.class)
               .build(TransactionExecutorFactory.class));

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionModules.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionModules.java b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionModules.java
index a3fe1c1..1e021d3 100644
--- a/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionModules.java
+++ b/tephra-core/src/main/java/org/apache/tephra/runtime/TransactionModules.java
@@ -20,22 +20,31 @@ package org.apache.tephra.runtime;
 
 import com.google.inject.Module;
 
+import java.lang.management.ManagementFactory;
+
 /**
  * Provides access to Google Guice modules for in-memory, single-node, and distributed operation.
  */
 public class TransactionModules {
+  private final String clientId;
+
+  public TransactionModules(String clientId) {
+    this.clientId = clientId;
+  }
+
   public TransactionModules() {
+    this(ManagementFactory.getRuntimeMXBean().getName());
   }
 
   public Module getInMemoryModules() {
-    return new TransactionInMemoryModule();
+    return new TransactionInMemoryModule(clientId);
   }
 
   public Module getSingleNodeModules() {
-    return new TransactionLocalModule();
+    return new TransactionLocalModule(clientId);
   }
 
   public Module getDistributedModules() {
-    return new TransactionDistributedModule();
+    return new TransactionDistributedModule(clientId);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/main/thrift/README
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/thrift/README b/tephra-core/src/main/thrift/README
index a4962be..3bb7a62 100644
--- a/tephra-core/src/main/thrift/README
+++ b/tephra-core/src/main/thrift/README
@@ -19,6 +19,6 @@
 To generate thrift classes:
 	thrift --gen java --out ../java/ transaction.thrift
 
-To add the Apache license header to the generated fiels:
+To add the Apache license header to the generated files:
   for f in ../java/org/apache/tephra/distributed/thrift/T*.java; do mv $f nn; cat header
nn > $f; rm -f nn; done
 

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/main/thrift/transaction.thrift
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/thrift/transaction.thrift b/tephra-core/src/main/thrift/transaction.thrift
index 454450e..f4460e5 100644
--- a/tephra-core/src/main/thrift/transaction.thrift
+++ b/tephra-core/src/main/thrift/transaction.thrift
@@ -69,8 +69,11 @@ service TTransactionServer {
   // temporary tx2 stuff
   TTransaction startLong(),
   TTransaction startShort(),
+  TTransaction startLongClientId(1: string clientId) throws (1: TGenericException e),
   // TODO remove this as it was replaced with startShortWithTimeout in 0.10
   TTransaction startShortTimeout(1: i32 timeout),
+  TTransaction startShortClientId(1: string clientId) throws (1: TGenericException e),
+  TTransaction startShortWithClientIdAndTimeOut(1: string clientId, 2: i32 timeout) throws
(1:TGenericException e),
   TTransaction startShortWithTimeout(1: i32 timeout) throws (1:TGenericException e),
   TBoolean canCommitTx(1: TTransaction tx, 2: set<binary> changes) throws (1:TTransactionNotInProgressException
e),
   TBoolean commitTx(1: TTransaction tx) throws (1:TTransactionNotInProgressException e),

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
index 20f6944..56a9076 100644
--- a/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionContextTest.java
@@ -61,7 +61,7 @@ public class TransactionContextTest {
       new ConfigModule(conf),
       new DiscoveryModules().getInMemoryModules(),
       Modules.override(
-        new TransactionModules().getInMemoryModules()).with(new AbstractModule() {
+        new TransactionModules("clientA").getInMemoryModules()).with(new AbstractModule()
{
         @Override
         protected void configure() {
           TransactionManager txManager = new TransactionManager(conf);

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java b/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
index 28ccc6e..b96b779 100644
--- a/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/TransactionExecutorTest.java
@@ -63,7 +63,7 @@ public class TransactionExecutorTest {
       new ConfigModule(conf),
       new DiscoveryModules().getInMemoryModules(),
       Modules.override(
-        new TransactionModules().getInMemoryModules()).with(new AbstractModule() {
+        new TransactionModules("clientB").getInMemoryModules()).with(new AbstractModule()
{
         @Override
         protected void configure() {
           TransactionManager txManager = new TransactionManager(conf);

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
b/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
index ec06528..971c93c 100644
--- a/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
+++ b/tephra-core/src/test/java/org/apache/tephra/persist/AbstractTransactionStateStorageTest.java
@@ -131,25 +131,24 @@ public abstract class AbstractTransactionStateStorageTest {
     TransactionStateStorage storage3 = null;
     try {
       storage = getStorage(conf);
-      TransactionManager txManager = new TransactionManager
-        (conf, storage, new TxMetricsCollector());
+      TransactionManager txManager = new TransactionManager(conf, storage, new TxMetricsCollector());
       txManager.startAndWait();
 
       // TODO: replace with new persistence tests
       final byte[] a = { 'a' };
       final byte[] b = { 'b' };
       // Start and invalidate a transaction
-      Transaction invalid = txManager.startShort();
+      Transaction invalid = txManager.startShort("clientTx");
       txManager.invalidate(invalid.getTransactionId());
       // start a tx1, add a change A and commit
-      Transaction tx1 = txManager.startShort();
+      Transaction tx1 = txManager.startShort("client1");
       Assert.assertTrue(txManager.canCommit(tx1, Collections.singleton(a)));
       Assert.assertTrue(txManager.commit(tx1));
       // start a tx2 and add a change B
-      Transaction tx2 = txManager.startShort();
+      Transaction tx2 = txManager.startShort("client2");
       Assert.assertTrue(txManager.canCommit(tx2, Collections.singleton(b)));
       // start a tx3
-      Transaction tx3 = txManager.startShort();
+      Transaction tx3 = txManager.startShort("client3");
       // restart
       txManager.stopAndWait();
       TransactionSnapshot origState = txManager.getCurrentState();

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
----------------------------------------------------------------------
diff --git a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
index 30b69a1..17d55a4 100644
--- a/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
+++ b/tephra-hbase-compat-0.98/src/main/java/org/apache/tephra/hbase/coprocessor/TransactionProcessor.java
@@ -18,7 +18,6 @@
 
 package org.apache.tephra.hbase.coprocessor;
 
-import com.google.common.base.Supplier;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.commons.logging.Log;
@@ -127,7 +126,7 @@ public class TransactionProcessor extends BaseRegionObserver {
   public void start(CoprocessorEnvironment e) throws IOException {
     if (e instanceof RegionCoprocessorEnvironment) {
       RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
-      Supplier<TransactionStateCache> cacheSupplier = getTransactionStateCacheSupplier(env);
+      this.cacheSupplier = getTransactionStateCacheSupplier(env);
       this.cache = cacheSupplier.get();
 
       HTableDescriptor tableDesc = env.getRegion().getTableDesc();


Mime
View raw message