trafodion-codereview mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From DaveBirdsall <...@git.apache.org>
Subject [GitHub] incubator-trafodion pull request: [TRAFODION-1421] Implement paral...
Date Fri, 15 Apr 2016 21:42:05 GMT
Github user DaveBirdsall commented on a diff in the pull request:

    https://github.com/apache/incubator-trafodion/pull/422#discussion_r59945379
  
    --- Diff: core/sqf/src/seatrans/hbase-trx/src/main/java/org/apache/hadoop/hbase/client/ClientScanner98.java.tmpl
---
    @@ -0,0 +1,490 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.hadoop.hbase.client;
    +
    +import java.io.IOException;
    +import java.util.LinkedList;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.classification.InterfaceAudience;
    +import org.apache.hadoop.classification.InterfaceStability;
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.hbase.Cell;
    +import org.apache.hadoop.hbase.DoNotRetryIOException;
    +import org.apache.hadoop.hbase.HBaseConfiguration;
    +import org.apache.hadoop.hbase.HConstants;
    +import org.apache.hadoop.hbase.HRegionInfo;
    +import org.apache.hadoop.hbase.KeyValueUtil;
    +import org.apache.hadoop.hbase.NotServingRegionException;
    +import org.apache.hadoop.hbase.TableName;
    +import org.apache.hadoop.hbase.UnknownScannerException;
    +import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
    +import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
    +import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
    +import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
    +import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
    +import org.apache.hadoop.hbase.util.Bytes;
    +
    +/**
    + * Implements the scanner interface for the HBase client.
    + * If there are multiple regions in a table, this scanner will iterate
    + * through them all.
    + */
    +@InterfaceAudience.Public
    +@InterfaceStability.Stable
    +public class ClientScanner98 extends AbstractClientScanner {
    +    private final Log LOG = LogFactory.getLog(this.getClass());
    +    protected Scan scan;
    +    protected boolean closed = false;
    +    // Current region scanner is against.  Gets cleared if current region goes
    +    // wonky: e.g. if it splits on us.
    +    protected HRegionInfo currentRegion = null;
    +    protected ScannerCallable callable = null;
    +    protected final LinkedList<Result> cache = new LinkedList<Result>();
    +    protected final int caching;
    +    protected long lastNext;
    +    // Keep lastResult returned successfully in case we have to reset scanner.
    +    protected Result lastResult = null;
    +    protected final long maxScannerResultSize;
    +    private final HConnection connection;
    +    private final TableName tableName;
    +    protected final int scannerTimeout;
    +    protected boolean scanMetricsPublished = false;
    +    protected RpcRetryingCaller<Result []> caller;
    +    protected RpcControllerFactory rpcControllerFactory;
    +
    +    /**
    +     * Create a new ClientScanner for the specified table. An HConnection will be
    +     * retrieved using the passed Configuration.
    +     * Note that the passed {@link Scan}'s start row maybe changed changed.
    +     *
    +     * @param conf The {@link Configuration} to use.
    +     * @param scan {@link Scan} to use in this scanner
    +     * @param tableName The table that we wish to scan
    +     * @throws IOException
    +     */
    +    @Deprecated
    +    public ClientScanner98(final Configuration conf, final Scan scan,
    +        final TableName tableName) throws IOException {
    +      this(conf, scan, tableName, HConnectionManager.getConnection(conf));
    +    }
    +
    +    /**
    +     * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName)}
    +     */
    +    @Deprecated
    +    public ClientScanner98(final Configuration conf, final Scan scan,
    +        final byte [] tableName) throws IOException {
    +      this(conf, scan, TableName.valueOf(tableName));
    +    }
    +
    +
    +    /**
    +     * Create a new ClientScanner for the specified table
    +     * Note that the passed {@link Scan}'s start row maybe changed changed.
    +     *
    +     * @param conf The {@link Configuration} to use.
    +     * @param scan {@link Scan} to use in this scanner
    +     * @param tableName The table that we wish to scan
    +     * @param connection Connection identifying the cluster
    +     * @throws IOException
    +     */
    +  public ClientScanner98(final Configuration conf, final Scan scan, final TableName tableName,
    +      HConnection connection) throws IOException {
    +    this(conf, scan, tableName, connection, RpcRetryingCallerFactory.instantiate(conf),
    +        RpcControllerFactory.instantiate(conf));
    +  }
    +
    +  /**
    +   * @deprecated Use {@link #ClientScanner(Configuration, Scan, TableName, HConnection)}
    +   */
    +  @Deprecated
    +  public ClientScanner98(final Configuration conf, final Scan scan, final byte [] tableName,
    +      HConnection connection) throws IOException {
    +    this(conf, scan, TableName.valueOf(tableName), connection, new RpcRetryingCallerFactory(conf),
    +        RpcControllerFactory.instantiate(conf));
    +  }
    +
    +  /**
    +   * @deprecated Use
    +   *             {@link #ClientScanner(Configuration, Scan, TableName, HConnection,
    +   *             RpcRetryingCallerFactory, RpcControllerFactory)}
    +   *             instead
    +   */
    +  @Deprecated
    +  public ClientScanner98(final Configuration conf, final Scan scan, final TableName tableName,
    +      HConnection connection, RpcRetryingCallerFactory rpcFactory) throws IOException
{
    +    this(conf, scan, tableName, connection, rpcFactory, RpcControllerFactory.instantiate(conf));
    +  }
    +
    +  /**
    +   * Create a new ClientScanner for the specified table Note that the passed {@link Scan}'s
start
    +   * row maybe changed changed.
    +   * @param conf The {@link Configuration} to use.
    +   * @param scan {@link Scan} to use in this scanner
    +   * @param tableName The table that we wish to scan
    +   * @param connection Connection identifying the cluster
    +   * @throws IOException
    +   */
    +  public ClientScanner98(final Configuration conf, final Scan scan, final TableName tableName,
    +      HConnection connection, RpcRetryingCallerFactory rpcFactory,
    +      RpcControllerFactory controllerFactory) throws IOException {
    +      if (LOG.isTraceEnabled()) {
    +        LOG.trace("Scan table=" + tableName
    +            + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
    +      } 
    +      this.scan = scan;
    +      this.tableName = tableName;
    +      this.lastNext = System.currentTimeMillis();
    +      this.connection = connection;
    +      if (scan.getMaxResultSize() > 0) {
    +        this.maxScannerResultSize = scan.getMaxResultSize();
    +      } else {
    +        this.maxScannerResultSize = conf.getLong(
    +          HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
    +          HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
    +      }
    +      this.scannerTimeout = HBaseConfiguration.getInt(conf,
    +        HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
    +        HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
    +        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
    +
    +      // check if application wants to collect scan metrics
    +      initScanMetrics(scan);
    +
    +      // Use the caching from the Scan.  If not set, use the default cache setting for
this table.
    +      if (this.scan.getCaching() > 0) {
    +        this.caching = this.scan.getCaching();
    +      } else {
    +        this.caching = conf.getInt(
    +            HConstants.HBASE_CLIENT_SCANNER_CACHING,
    +            HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
    +      }
    +
    +      this.caller = rpcFactory.<Result[]> newCaller();
    +      this.rpcControllerFactory = controllerFactory;
    +
    +      initializeScannerInConstruction();
    +    }
    +
    +    protected void initializeScannerInConstruction() throws IOException{
    +      // initialize the scanner
    +      nextScanner(this.caching, false);
    +    }
    +
    +    protected HConnection getConnection() {
    +      return this.connection;
    +    }
    +
    +    /**
    +     * @return Table name
    +     * @deprecated Since 0.96.0; use {@link #getTable()}
    +     */
    +    @Deprecated
    +    protected byte [] getTableName() {
    +      return this.tableName.getName();
    +    }
    +
    +    protected TableName getTable() {
    +      return this.tableName;
    +    }
    +
    +    protected Scan getScan() {
    +      return scan;
    +    }
    +
    +    protected long getTimestamp() {
    +      return lastNext;
    +    }
    +
    +    // returns true if the passed region endKey
    +    protected boolean checkScanStopRow(final byte [] endKey) {
    +      if (this.scan.getStopRow().length > 0) {
    +        // there is a stop row, check to see if we are past it.
    +        byte [] stopRow = scan.getStopRow();
    +        int cmp = Bytes.compareTo(stopRow, 0, stopRow.length,
    +          endKey, 0, endKey.length);
    +        if (cmp <= 0) {
    +          // stopRow <= endKey (endKey is equals to or larger than stopRow)
    +          // This is a stop.
    +          return true;
    +        }
    +      }
    +      return false; //unlikely.
    --- End diff --
    
    Why is false unlikely? Are we past the end key almost all the time? (Seems counter-intuitive.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message