trafodion-codereview mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From selvaganesang <...@git.apache.org>
Subject [GitHub] incubator-trafodion pull request #581: Cherry-pick changes for JIRA 2095
Date Mon, 11 Jul 2016 17:43:20 GMT
Github user selvaganesang commented on a diff in the pull request:

    https://github.com/apache/incubator-trafodion/pull/581#discussion_r70303183
  
    --- Diff: core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/transactional/RMInterface.java
---
    @@ -137,6 +163,130 @@ public RMInterface() throws IOException {
     
         }
     
    +    public void pushRegionEpoch (HTableDescriptor desc, final TransactionState ts) throws
IOException {
    +       LOG.info("pushRegionEpoch start; transId: " + ts.getTransactionId());
    +
    +       TransactionalTable ttable1 = new TransactionalTable(Bytes.toBytes(desc.getNameAsString()));
    +       HConnection connection = ttable1.getConnection();
    +       long lvTransid = ts.getTransactionId();
    +       RegionLocator rl = connection.getRegionLocator(desc.getTableName());
    +       List<HRegionLocation> regionList = rl.getAllRegionLocations();
    +
    +       boolean complete = false;
    +       int loopCount = 0;
    +       int result = 0;
    +
    +       for (HRegionLocation location : regionList) {
    +          final byte[] regionName = location.getRegionInfo().getRegionName();
    +          if (compPool == null){
    +              LOG.info("pushRegionEpoch compPool is null");
    +              threadPool = Executors.newFixedThreadPool(intThreads);
    +              compPool = new ExecutorCompletionService<Integer>(threadPool);
    +          }
    +
    +          final HRegionLocation lv_location = location;
    +          final HConnection lv_connection = connection;
    +          compPool.submit(new RMCallable2(ts, lv_location, lv_connection ) {
    +             public Integer call() throws IOException {
    +                return pushRegionEpochX(ts, lv_location, lv_connection);
    +             }
    +          });
    +          try {
    +            result = compPool.take().get();
    +          } catch(Exception ex) {
    +            throw new IOException(ex);
    +          }
    +          if ( result != 0 ){
    +             LOG.error("pushRegionEpoch result " + result + " returned from region "
    +                          + location.getRegionInfo().getRegionName());
    +             throw new IOException("pushRegionEpoch result " + result + " returned from
region "
    +                      + location.getRegionInfo().getRegionName());
    +          }
    +       }
    +       if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpoch end transid: " + ts.getTransactionId());
    +       return;
    +    }
    +
    +    private abstract class RMCallable2 implements Callable<Integer>{
    +       TransactionState transactionState;
    +       HRegionLocation  location;
    +       HConnection connection;
    +       HTable table;
    +       byte[] startKey;
    +       byte[] endKey_orig;
    +       byte[] endKey;
    +
    +       RMCallable2(TransactionState txState, HRegionLocation location, HConnection connection)
{
    +          this.transactionState = txState;
    +          this.location = location;
    +          this.connection = connection;
    +          try {
    +             table = new HTable(location.getRegionInfo().getTable(), connection);
    +          } catch(IOException e) {
    +             LOG.error("Error obtaining HTable instance " + e);
    +             table = null;
    +          }
    +          startKey = location.getRegionInfo().getStartKey();
    +          endKey_orig = location.getRegionInfo().getEndKey();
    +          endKey = TransactionManager.binaryIncrementPos(endKey_orig, -1);
    +
    +       }
    +
    +       public Integer pushRegionEpochX(final TransactionState txState,
    +        		           final HRegionLocation location, HConnection connection) throws IOException
{
    +          if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX -- Entry txState: " +
txState
    +                   + " location: " + location);
    +        	
    +          Batch.Call<TrxRegionService, PushEpochResponse> callable =
    +              new Batch.Call<TrxRegionService, PushEpochResponse>() {
    +                 ServerRpcController controller = new ServerRpcController();
    +                 BlockingRpcCallback<PushEpochResponse> rpcCallback =
    +                    new BlockingRpcCallback<PushEpochResponse>();
    +
    +                 @Override
    +                 public PushEpochResponse call(TrxRegionService instance) throws IOException
{
    +                    org.apache.hadoop.hbase.coprocessor.transactional.generated.TrxRegionProtos.PushEpochRequest.Builder
    +                    builder = PushEpochRequest.newBuilder();
    +                    builder.setTransactionId(txState.getTransactionId());
    +                    builder.setEpoch(txState.getStartEpoch());
    +                    builder.setRegionName(ByteString.copyFromUtf8(Bytes.toString(location.getRegionInfo().getRegionName())));
    +                    instance.pushOnlineEpoch(controller, builder.build(), rpcCallback);
    +                    return rpcCallback.get();
    +                 }
    +              };
    +
    +              Map<byte[], PushEpochResponse> result = null;
    +              try {
    +                 if (LOG.isTraceEnabled()) LOG.trace("pushRegionEpochX -- before coprocessorService:
startKey: "
    +                     + new String(startKey, "UTF-8") + " endKey: " + new String(endKey,
"UTF-8"));
    +                 result = table.coprocessorService(TrxRegionService.class, startKey,
endKey, callable);
    +              } catch (Throwable e) {
    --- End diff --
    
    ditto


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message