tephra-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From go...@apache.org
Subject [3/3] incubator-tephra git commit: TEPHRA-228 Adding the ability to pass-in a clientId during the start of a transaction which is logged when the transaction gets invalidated during time out.
Date Mon, 22 May 2017 23:51:20 GMT
TEPHRA-228 Adding the ability to pass-in a clientId during the start of a transaction which
is logged when the transaction gets invalidated during time out.

This closes #42 from GitHub.

Signed-off-by: Gokul Gunasekaran <gokul@cask.co>


Project: http://git-wip-us.apache.org/repos/asf/incubator-tephra/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tephra/commit/a22c11d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tephra/tree/a22c11d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tephra/diff/a22c11d8

Branch: refs/heads/master
Commit: a22c11d81eb08c4afb0071922aa90c42987ffe59
Parents: 0b209bb
Author: Gokul Gunasekaran <gokul@cask.co>
Authored: Mon May 15 18:06:10 2017 -0700
Committer: Gokul Gunasekaran <gokul@cask.co>
Committed: Mon May 22 16:51:05 2017 -0700

----------------------------------------------------------------------
 .../org/apache/tephra/TransactionContext.java   |    2 +-
 .../org/apache/tephra/TransactionManager.java   |   98 +-
 .../java/org/apache/tephra/TxConstants.java     |    5 +
 .../distributed/TransactionServiceClient.java   |   26 +-
 .../TransactionServiceThriftClient.java         |   51 +
 .../TransactionServiceThriftHandler.java        |   27 +
 .../distributed/thrift/TTransactionServer.java  | 3040 +++++++++++++++++-
 .../runtime/TransactionDistributedModule.java   |   15 +
 .../runtime/TransactionInMemoryModule.java      |   15 +
 .../tephra/runtime/TransactionLocalModule.java  |   14 +
 .../tephra/runtime/TransactionModules.java      |   15 +-
 tephra-core/src/main/thrift/README              |    2 +-
 tephra-core/src/main/thrift/transaction.thrift  |    3 +
 .../apache/tephra/TransactionContextTest.java   |    2 +-
 .../apache/tephra/TransactionExecutorTest.java  |    2 +-
 .../AbstractTransactionStateStorageTest.java    |   11 +-
 .../hbase/coprocessor/TransactionProcessor.java |    3 +-
 17 files changed, 3175 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java b/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
index 7ca8f06..0806294 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionContext.java
@@ -79,7 +79,7 @@ public class TransactionContext {
   }
 
   /**
-   * Starts a new transaction.  Calling this will initiate a new transaction using the {@link
TransactionSystemClient},
+   * Starts a new transaction. Calling this will initiate a new transaction using the {@link
TransactionSystemClient},
    * and pass the returned transaction to {@link TransactionAware#startTx(Transaction)} for
each registered
    * TransactionAware.  If an exception is encountered, the transaction will be aborted and
a
    * {@code TransactionFailureException} wrapping the root cause will be thrown.

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
index f2060cd..27b6bc6 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TransactionManager.java
@@ -118,8 +118,11 @@ public class TransactionManager extends AbstractService {
   //poll every 10 second to emit metrics
   private static final long METRICS_POLL_INTERVAL = 10000L;
 
+  //Client id that is used if a client doesn't provide one while starting a transaction.
+  private static final String DEFAULT_CLIENTID = "unknown";
+
   // Transactions that are in progress, with their info.
-  private final NavigableMap<Long, InProgressTx> inProgress = new ConcurrentSkipListMap<Long,
InProgressTx>();
+  private final NavigableMap<Long, InProgressTx> inProgress = new ConcurrentSkipListMap<>();
 
   // the list of transactions that are invalid (not properly committed/aborted, or timed
out)
   private final InvalidTxList invalidTxList = new InvalidTxList();
@@ -127,8 +130,7 @@ public class TransactionManager extends AbstractService {
   // todo: use moving array instead (use Long2ObjectMap<byte[]> in fastutil)
   // todo: should this be consolidated with inProgress?
   // commit time next writePointer -> changes made by this tx
-  private final NavigableMap<Long, Set<ChangeId>> committedChangeSets =
-    new ConcurrentSkipListMap<Long, Set<ChangeId>>();
+  private final NavigableMap<Long, Set<ChangeId>> committedChangeSets = new ConcurrentSkipListMap<>();
   // not committed yet
   private final Map<Long, Set<ChangeId>> committingChangeSets = Maps.newConcurrentMap();
 
@@ -347,15 +349,16 @@ public class TransactionManager extends AbstractService {
         long currentTime = System.currentTimeMillis();
         Map<Long, InProgressType> timedOut = Maps.newHashMap();
         for (Map.Entry<Long, InProgressTx> tx : inProgress.entrySet()) {
-          long expiration = tx.getValue().getExpiration();
+          InProgressTx inProgressTx = tx.getValue();
+          long expiration = inProgressTx.getExpiration();
           if (expiration >= 0L && currentTime > expiration) {
             // timed out, remember tx id (can't remove while iterating over entries)
-            timedOut.put(tx.getKey(), tx.getValue().getType());
-            LOG.info("Tx invalid list: added tx {} because of timeout", tx.getKey());
+            timedOut.put(tx.getKey(), inProgressTx.getType());
+            LOG.info("Tx invalid list: added tx {} belonging to client '{}' because of timeout.",
+                     tx.getKey(), inProgressTx.getClientId());
           } else if (expiration < 0) {
             LOG.warn("Transaction {} has negative expiration time {}. Likely cause is the
transaction was not " +
-                       "migrated correctly, this transaction will be expired immediately",
-                     tx.getKey(), expiration);
+                       "migrated correctly, this transaction will be expired immediately",
tx.getKey(), expiration);
             timedOut.put(tx.getKey(), InProgressType.LONG);
           }
         }
@@ -592,8 +595,8 @@ public class TransactionManager extends AbstractService {
               } else if (type == null) {
                 type = TransactionType.SHORT;
               }
-              addInProgressAndAdvance(edit.getWritePointer(), edit.getVisibilityUpperBound(),
-                                      expiration, type);
+              // We don't persist the client id.
+              addInProgressAndAdvance(edit.getWritePointer(), edit.getVisibilityUpperBound(),
expiration, type, null);
               break;
             case COMMITTING:
               addCommittingChangeSet(edit.getWritePointer(), edit.getChanges());
@@ -727,10 +730,28 @@ public class TransactionManager extends AbstractService {
   }
 
   /**
+   * Start a short transaction with a client id and default timeout.
+   * @param clientId id of the client requesting a transaction.
+   */
+  public Transaction startShort(String clientId) {
+    return startShort(clientId, defaultTimeout);
+  }
+
+  /**
    * Start a short transaction with a given timeout.
    * @param timeoutInSeconds the time out period in seconds.
    */
   public Transaction startShort(int timeoutInSeconds) {
+    return startShort(DEFAULT_CLIENTID, timeoutInSeconds);
+  }
+
+  /**
+   * Start a short transaction with a given timeout.
+   * @param clientId id of the client requesting a transaction.
+   * @param timeoutInSeconds the time out period in seconds.
+   */
+  public Transaction startShort(String clientId, int timeoutInSeconds) {
+    Preconditions.checkArgument(clientId != null, "clientId must not be null");
     Preconditions.checkArgument(timeoutInSeconds > 0,
                                 "timeout must be positive but is %s seconds", timeoutInSeconds);
     Preconditions.checkArgument(timeoutInSeconds <= maxTimeout,
@@ -738,7 +759,7 @@ public class TransactionManager extends AbstractService {
     txMetricsCollector.rate("start.short");
     Stopwatch timer = new Stopwatch().start();
     long expiration = getTxExpiration(timeoutInSeconds);
-    Transaction tx = startTx(expiration, TransactionType.SHORT);
+    Transaction tx = startTx(expiration, TransactionType.SHORT, clientId);
     txMetricsCollector.histogram("start.short.latency", (int) timer.elapsedMillis());
     return tx;
   }
@@ -763,15 +784,23 @@ public class TransactionManager extends AbstractService {
    * transaction moves it to the invalid list because we assume that its writes cannot be
rolled back.
    */
   public Transaction startLong() {
+    return startLong(DEFAULT_CLIENTID);
+  }
+
+  /**
+   * Starts a long transaction with a client id.
+   */
+  public Transaction startLong(String clientId) {
+    Preconditions.checkArgument(clientId != null, "clientId must not be null");
     txMetricsCollector.rate("start.long");
     Stopwatch timer = new Stopwatch().start();
     long expiration = getTxExpiration(defaultLongTimeout);
-    Transaction tx = startTx(expiration, TransactionType.LONG);
+    Transaction tx = startTx(expiration, TransactionType.LONG, clientId);
     txMetricsCollector.histogram("start.long.latency", (int) timer.elapsedMillis());
     return tx;
   }
 
-  private Transaction startTx(long expiration, TransactionType type) {
+  private Transaction startTx(long expiration, TransactionType type, @Nullable String clientId)
{
     Transaction tx = null;
     long txid;
     // guard against changes to the transaction log while processing
@@ -781,7 +810,7 @@ public class TransactionManager extends AbstractService {
         ensureAvailable();
         txid = getNextWritePointer();
         tx = createTransaction(txid, type);
-        addInProgressAndAdvance(tx.getTransactionId(), tx.getVisibilityUpperBound(), expiration,
type);
+        addInProgressAndAdvance(tx.getTransactionId(), tx.getVisibilityUpperBound(), expiration,
type, clientId);
       }
       // appending to WAL out of global lock for concurrent performance
       // we should still be able to arrive at the same state even if log entries are out
of order
@@ -793,13 +822,13 @@ public class TransactionManager extends AbstractService {
   }
 
   private void addInProgressAndAdvance(long writePointer, long visibilityUpperBound,
-                                       long expiration, TransactionType type) {
-    addInProgressAndAdvance(writePointer, visibilityUpperBound, expiration, InProgressType.of(type));
+                                       long expiration, TransactionType type, @Nullable String
clientId) {
+    addInProgressAndAdvance(writePointer, visibilityUpperBound, expiration, InProgressType.of(type),
clientId);
   }
 
   private void addInProgressAndAdvance(long writePointer, long visibilityUpperBound,
-                                       long expiration, InProgressType type) {
-    inProgress.put(writePointer, new InProgressTx(visibilityUpperBound, expiration, type));
+                                       long expiration, InProgressType type, @Nullable String
clientId) {
+    inProgress.put(writePointer, new InProgressTx(clientId, visibilityUpperBound, expiration,
type));
     advanceWritePointer(writePointer);
   }
 
@@ -1026,6 +1055,7 @@ public class TransactionManager extends AbstractService {
     Set<ChangeId> previousChangeSet = committingChangeSets.remove(writePointer);
     // remove from in-progress set, so that it does not get excluded in the future
     InProgressTx previous = inProgress.remove(writePointer);
+
     // This check is to prevent from invalidating committed transactions
     if (previous != null || previousChangeSet != null) {
       // add tx to invalids
@@ -1040,7 +1070,12 @@ public class TransactionManager extends AbstractService {
           inProgress.keySet().removeAll(childWritePointers);
         }
       }
-      LOG.info("Tx invalid list: added tx {} because of invalidate", writePointer);
+
+      String clientId = DEFAULT_CLIENTID;
+      if (previous != null && previous.getClientId() != null) {
+        clientId = previous.getClientId();
+      }
+      LOG.info("Tx invalid list: added tx {} belonging to client '{}' because of invalidate",
writePointer, clientId);
       if (previous != null && !previous.isLongRunning()) {
         // tx was short-running: must move read pointer
         moveReadPointerIfNeeded(writePointer);
@@ -1170,7 +1205,7 @@ public class TransactionManager extends AbstractService {
     InProgressTx existingTx = inProgress.get(parentWritePointer);
     existingTx.addCheckpointWritePointer(newWritePointer);
     addInProgressAndAdvance(newWritePointer, existingTx.getVisibilityUpperBound(), existingTx.getExpiration(),
-                            InProgressType.CHECKPOINT);
+                            InProgressType.CHECKPOINT, existingTx.getClientId());
   }
   
   // hack for exposing important metric
@@ -1378,17 +1413,34 @@ public class TransactionManager extends AbstractService {
     private final long expiration;
     private final InProgressType type;
     private final LongArrayList checkpointWritePointers;
+    // clientId is not part of hashCode computation or equality check since it is not persisted.
Once it is persisted
+    // and restored, we can include it in the above.
+    private final String clientId;
+
+    public InProgressTx(String clientId, long visibilityUpperBound, long expiration, InProgressType
type) {
+      this(clientId, visibilityUpperBound, expiration, type, new LongArrayList());
+    }
 
     public InProgressTx(long visibilityUpperBound, long expiration, InProgressType type)
{
       this(visibilityUpperBound, expiration, type, new LongArrayList());
     }
 
+    public InProgressTx(String clientId, long visibilityUpperBound, long expiration, InProgressType
type,
+                        LongArrayList checkpointWritePointers) {
+      this.visibilityUpperBound = visibilityUpperBound;
+      this.expiration = expiration;
+      this.type = type;
+      this.checkpointWritePointers = checkpointWritePointers;
+      this.clientId = clientId;
+    }
+
     public InProgressTx(long visibilityUpperBound, long expiration, InProgressType type,
                         LongArrayList checkpointWritePointers) {
       this.visibilityUpperBound = visibilityUpperBound;
       this.expiration = expiration;
       this.type = type;
       this.checkpointWritePointers = checkpointWritePointers;
+      this.clientId = null;
     }
 
     // For backwards compatibility when long running txns were represented with -1 expiration
@@ -1410,6 +1462,11 @@ public class TransactionManager extends AbstractService {
       return type;
     }
 
+    @Nullable
+    public String getClientId() {
+      return clientId;
+    }
+
     public boolean isLongRunning() {
       if (type == null) {
         // for backwards compatibility when long running txns were represented with -1 expiration
@@ -1455,6 +1512,7 @@ public class TransactionManager extends AbstractService {
           .add("expiration", expiration)
           .add("type", type)
           .add("checkpointWritePointers", checkpointWritePointers)
+          .add("clientId", clientId)
           .toString();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
index 26a48fb..1dbd3cb 100644
--- a/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
+++ b/tephra-core/src/main/java/org/apache/tephra/TxConstants.java
@@ -120,6 +120,11 @@ public class TxConstants {
   public static final boolean DEFAULT_READ_NON_TX_DATA = false;
 
   /**
+   * Used to inject the name of the client that is starting the transaction.
+   */
+  public static final String CLIENT_ID = "tephra.client.id";
+
+  /**
    * TransactionManager configuration.
    */
   public static final class Manager {

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
index 4ee9615..5f7792a 100644
--- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceClient.java
@@ -23,6 +23,7 @@ import com.google.common.base.Throwables;
 import com.google.inject.Guice;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
+import com.google.inject.name.Named;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.tephra.InvalidTruncateTimeException;
 import org.apache.tephra.Transaction;
@@ -42,6 +43,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.InputStream;
+import java.lang.management.ManagementFactory;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Set;
@@ -60,6 +62,9 @@ public class TransactionServiceClient implements TransactionSystemClient
{
   // the retry strategy we will use
   private final RetryStrategyProvider retryStrategyProvider;
 
+  // client id that is used to identify the transactions
+  private final String clientId;
+
   /**
    * Utility to be used for basic verification of transaction system availability and functioning
    * @param args arguments list, accepts single option "-v" that makes it to print out more
details about started tx
@@ -126,14 +131,24 @@ public class TransactionServiceClient implements TransactionSystemClient
{
   }
 
   /**
+   * Create from a configuration. This will first attempt to find a zookeeper for service
discovery.
+   * This constructor can be used if Guice dependency injection is not used. JVM name will
be used for the client id.
+   * @param config a configuration containing the zookeeper properties
+   */
+  public TransactionServiceClient(Configuration config, ThriftClientProvider clientProvider)
{
+    this(config, clientProvider, ManagementFactory.getRuntimeMXBean().getName());
+  }
+
+  /**
    * Create from a configuration. This will first attempt to find a zookeeper
    * for service discovery. Otherwise it will look for the port in the
    * config and use localhost.
    * @param config a configuration containing the zookeeper properties
+   * @param clientId id of the client that identifies it when it starts a transaction
    */
   @Inject
-  public TransactionServiceClient(Configuration config,
-                                  ThriftClientProvider clientProvider) {
+  public TransactionServiceClient(Configuration config, ThriftClientProvider clientProvider,
+                                  @Named(TxConstants.CLIENT_ID) String clientId) {
 
     // initialize the retry logic
     String retryStrat = config.get(
@@ -155,6 +170,7 @@ public class TransactionServiceClient implements TransactionSystemClient
{
     LOG.debug("Retry strategy is " + this.retryStrategyProvider);
 
     this.clientProvider = clientProvider;
+    this.clientId = clientId;
   }
 
   /**
@@ -247,7 +263,7 @@ public class TransactionServiceClient implements TransactionSystemClient
{
           @Override
           public Transaction execute(TransactionServiceThriftClient client)
             throws TException {
-            return client.startLong();
+            return client.startLong(clientId);
           }
         });
     } catch (Exception e) {
@@ -263,7 +279,7 @@ public class TransactionServiceClient implements TransactionSystemClient
{
           @Override
           public Transaction execute(TransactionServiceThriftClient client)
             throws TException {
-            return client.startShort();
+            return client.startShort(clientId);
           }
         });
     } catch (Exception e) {
@@ -279,7 +295,7 @@ public class TransactionServiceClient implements TransactionSystemClient
{
           @Override
           public Transaction execute(TransactionServiceThriftClient client)
             throws TException {
-            return client.startShort(timeout);
+            return client.startShort(clientId, timeout);
           }
         });
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
index b76412f..8ba81e3 100644
--- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftClient.java
@@ -106,6 +106,23 @@ public class TransactionServiceThriftClient {
     }
   }
 
+  public Transaction startLong(String clientId) throws TException {
+    try {
+      return TransactionConverterUtils.unwrap(client.startLongClientId(clientId));
+    } catch (TGenericException e) {
+      // currently, we only expect IllegalArgumentException here, if the clientId is null
+      if (!IllegalArgumentException.class.getName().equals(e.getOriginalExceptionClass()))
{
+        LOG.trace("Expecting only {} as the original exception class but found {}",
+                  IllegalArgumentException.class.getName(), e.getOriginalExceptionClass());
+        throw e;
+      }
+      throw new IllegalArgumentException(e.getMessage());
+    } catch (TException e) {
+      isValid.set(false);
+      throw e;
+    }
+  }
+
   public Transaction startShort() throws TException {
     try {
       return TransactionConverterUtils.unwrap(client.startShort());
@@ -132,6 +149,40 @@ public class TransactionServiceThriftClient {
     }
   }
 
+  public Transaction startShort(String clientId) throws TException {
+    try {
+      return TransactionConverterUtils.unwrap(client.startShortClientId(clientId));
+    } catch (TGenericException e) {
+      // currently, we only expect IllegalArgumentException here, if the clientId is null
+      if (!IllegalArgumentException.class.getName().equals(e.getOriginalExceptionClass()))
{
+        LOG.trace("Expecting only {} as the original exception class but found {}",
+                  IllegalArgumentException.class.getName(), e.getOriginalExceptionClass());
+        throw e;
+      }
+      throw new IllegalArgumentException(e.getMessage());
+    } catch (TException e) {
+      isValid.set(false);
+      throw e;
+    }
+  }
+
+  public Transaction startShort(String clientId, int timeout) throws TException {
+    try {
+      return TransactionConverterUtils.unwrap(client.startShortWithClientIdAndTimeOut(clientId,
timeout));
+    } catch (TGenericException e) {
+      // currently, we only expect IllegalArgumentException here, if the timeout is invalid
or if clientId is null
+      if (!IllegalArgumentException.class.getName().equals(e.getOriginalExceptionClass()))
{
+        LOG.trace("Expecting only {} as the original exception class but found {}",
+                  IllegalArgumentException.class.getName(), e.getOriginalExceptionClass());
+        throw e;
+      }
+      throw new IllegalArgumentException(e.getMessage());
+    } catch (TException e) {
+      isValid.set(false);
+      throw e;
+    }
+  }
+
   public boolean canCommit(Transaction tx, Collection<byte[]> changeIds)
     throws TException, TransactionNotInProgressException {
     try {

http://git-wip-us.apache.org/repos/asf/incubator-tephra/blob/a22c11d8/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java
----------------------------------------------------------------------
diff --git a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java
b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java
index 6552cfe..954ee1d 100644
--- a/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java
+++ b/tephra-core/src/main/java/org/apache/tephra/distributed/TransactionServiceThriftHandler.java
@@ -73,11 +73,38 @@ public class TransactionServiceThriftHandler implements TTransactionServer.Iface
   }
 
   @Override
+  public TTransaction startLongClientId(String clientId) throws TException {
+    try {
+      return TransactionConverterUtils.wrap(txManager.startLong(clientId));
+    } catch (IllegalArgumentException ex) {
+      throw new TGenericException(ex.getMessage(), ex.getClass().getName());
+    }
+  }
+
+  @Override
   public TTransaction startShortTimeout(int timeout) throws TException {
     return TransactionConverterUtils.wrap(txManager.startShort(timeout));
   }
 
   @Override
+  public TTransaction startShortClientId(String clientId) throws TException {
+    try {
+      return TransactionConverterUtils.wrap(txManager.startShort(clientId));
+    } catch (IllegalArgumentException ex) {
+      throw new TGenericException(ex.getMessage(), ex.getClass().getName());
+    }
+  }
+
+  @Override
+  public TTransaction startShortWithClientIdAndTimeOut(String clientId, int timeout) throws
TException {
+    try {
+      return TransactionConverterUtils.wrap(txManager.startShort(clientId, timeout));
+    } catch (IllegalArgumentException ex) {
+      throw new TGenericException(ex.getMessage(), ex.getClass().getName());
+    }
+  }
+
+  @Override
   public TTransaction startShortWithTimeout(int timeout) throws TException {
     try {
       return TransactionConverterUtils.wrap(txManager.startShort(timeout));


Mime
View raw message