sqoop-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jong Ho Lee (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SQOOP-1312) One of mappers does not load data from mySql if double column is used as split key
Date Tue, 12 Aug 2014 04:27:13 GMT

    [ https://issues.apache.org/jira/browse/SQOOP-1312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14093711#comment-14093711
] 

Jong Ho Lee edited comment on SQOOP-1312 at 8/12/14 4:25 AM:
-------------------------------------------------------------

BigDecimalSplitter.java

{code}
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
// modified by Jongho Lee at Samsung SDS. (jongho.lee84@gmail.com)
package org.apache.sqoop.mapreduce.db;

import java.math.BigDecimal;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;

import com.cloudera.sqoop.config.ConfigurationHelper;
import com.cloudera.sqoop.mapreduce.db.DBSplitter;
import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;

/**
 * Implement DBSplitter over BigDecimal values.
 */
public class BigDecimalSplitter implements DBSplitter  {
  private static final Log LOG = LogFactory.getLog(BigDecimalSplitter.class);

  public List<InputSplit> split(Configuration conf, ResultSet results,
      String colName) throws SQLException {

    BigDecimal minVal = results.getBigDecimal(1);
    BigDecimal maxVal = results.getBigDecimal(2);

    String lowClausePrefix = colName + " >= ";
    String highClausePrefix = colName + " < ";

    BigDecimal numSplits = new BigDecimal(
        ConfigurationHelper.getConfNumMaps(conf));

    if (minVal == null && maxVal == null) {
      // Range is null to null. Return a null split accordingly.
      List<InputSplit> splits = new ArrayList<InputSplit>();
      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
          colName + " IS NULL", colName + " IS NULL"));
      return splits;
    }

    if (minVal == null || maxVal == null) {
      // Don't know what is a reasonable min/max value for interpolation. Fail.
      LOG.error("Cannot find a range for NUMERIC or DECIMAL "
          + "fields with one end NULL.");
      return null;
    }

    // Get all the split points together.
    List<BigDecimal> splitPoints = split(numSplits, minVal, maxVal);
    List<InputSplit> splits = new ArrayList<InputSplit>();

    // Turn the split points into a set of intervals.
    BigDecimal start = splitPoints.get(0);
    for (int i = 1; i < splitPoints.size(); i++) {
      BigDecimal end = splitPoints.get(i);

      if (i == splitPoints.size() - 1) {
        // This is the last one; use a closed interval.
        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
            lowClausePrefix + start.toString(),
            colName + " <= " + end.toString()));
      } else {
        // Normal open-interval case.
        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
            lowClausePrefix + start.toString(),
            highClausePrefix + end.toString()));
      }

      start = end;
    }

    return splits;
  }

  private static final BigDecimal MIN_INCREMENT =
      new BigDecimal(10000 * Double.MIN_VALUE);

  /**
   * Divide numerator by denominator. If impossible in exact mode, use rounding.
   */
  protected BigDecimal tryDivide(BigDecimal numerator, BigDecimal denominator) {
    try {
      return numerator.divide(denominator);
    } catch (ArithmeticException ae) {
      return numerator.divide(denominator, BigDecimal.ROUND_HALF_UP);
    }
  }

  /**
   * Returns a list of BigDecimals one element longer than the list of input
   * splits.  This represents the boundaries between input splits.  All splits
   * are open on the top end, except the last one.
   *
   * So the list [0, 5, 8, 12, 18] would represent splits capturing the
   * intervals:
   *
   * [0, 5)
   * [5, 8)
   * [8, 12)
   * [12, 18] note the closed interval for the last split.
   */
  protected List<BigDecimal> split(BigDecimal numSplits, BigDecimal minVal,
      BigDecimal maxVal) throws SQLException {

    List<BigDecimal> splits = new ArrayList<BigDecimal>();

    // Use numSplits as a hint. May need an extra task if the size doesn't
    // divide cleanly.

    BigDecimal splitSize = tryDivide(maxVal.subtract(minVal), (numSplits));
    if (splitSize.compareTo(MIN_INCREMENT) < 0) {
      splitSize = MIN_INCREMENT;
      LOG.warn("Set BigDecimal splitSize to MIN_INCREMENT");
    }

    BigDecimal curVal = minVal;

    for (int i = 0; i < numSplits - 1; i++) {
    // while (curVal.compareTo(maxVal) <= 0) { 
      // modified: while-loop is replaced with for-loop and if-condition.
      if (curVal.compareTo(maxVal) > 0)
        break;
      splits.add(curVal);
      curVal = curVal.add(splitSize);
    }

    if (splits.get(splits.size() - 1).compareTo(maxVal) != 0
        || splits.size() == 1) {
      // We didn't end on the maxVal. Add that to the end of the list.
      splits.add(maxVal);
    }

    return splits;
  }
}
{code}

FloatSplitter.java

{code}
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
// modified by Jongho Lee at Samsung SDS. (jongho.lee84@gmail.com)
package org.apache.sqoop.mapreduce.db;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;

import com.cloudera.sqoop.config.ConfigurationHelper;
import com.cloudera.sqoop.mapreduce.db.DBSplitter;
import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;

/**
 * Implement DBSplitter over floating-point values.
 */
public class FloatSplitter implements DBSplitter  {

  private static final Log LOG = LogFactory.getLog(FloatSplitter.class);

  private static final double MIN_INCREMENT = 10000 * Double.MIN_VALUE;

  public List<InputSplit> split(Configuration conf, ResultSet results,
      String colName) throws SQLException {

    LOG.warn("Generating splits for a floating-point index column. Due to the");
    LOG.warn("imprecise representation of floating-point values in Java, this");
    LOG.warn("may result in an incomplete import.");
    LOG.warn("You are strongly encouraged to choose an integral split column.");

    List<InputSplit> splits = new ArrayList<InputSplit>();

    if (results.getString(1) == null && results.getString(2) == null) {
      // Range is null to null. Return a null split accordingly.
      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
          colName + " IS NULL", colName + " IS NULL"));
      return splits;
    }

    double minVal = results.getDouble(1);
    double maxVal = results.getDouble(2);

    // Use this as a hint. May need an extra task if the size doesn't
    // divide cleanly.
    int numSplits = ConfigurationHelper.getConfNumMaps(conf);
    double splitSize = (maxVal - minVal) / (double) numSplits;

    if (splitSize < MIN_INCREMENT) {
      splitSize = MIN_INCREMENT;
    }

    String lowClausePrefix = colName + " >= ";
    String highClausePrefix = colName + " < ";

    double curLower = minVal;
    double curUpper = curLower + splitSize;

    for (int i = 0; i < numSplits - 1; i++) {
      // while (curUpper < maxVal) {  
      // modified: while-loop is replaced with for-loop and if-condition.
      if (curUpper >= maxVal)  
        break;
      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
          lowClausePrefix + Double.toString(curLower),
          highClausePrefix + Double.toString(curUpper)));

      curLower = curUpper;
      curUpper += splitSize;

    }

    // Catch any overage and create the closed interval for the last split.
    if (curLower <= maxVal || splits.size() == 1) {
      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
          lowClausePrefix + Double.toString(curLower),
          colName + " <= " + Double.toString(maxVal)));
      // curUpper -> curLower // changed
    }

    if (results.getString(1) == null || results.getString(2) == null) {
      // At least one extrema is null; add a null split.
      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
          colName + " IS NULL", colName + " IS NULL"));
    }

    return splits;
  }
}
{code}

IntegerSplitter.java
{code}
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
// modified by Jongho Lee at Samsung SDS. (jongho.lee84@gmail.com)
package org.apache.sqoop.mapreduce.db;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;

import com.cloudera.sqoop.config.ConfigurationHelper;
import com.cloudera.sqoop.mapreduce.db.DBSplitter;
import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;

/**
 * Implement DBSplitter over integer values.
 */
public class IntegerSplitter implements DBSplitter  {
  public static final Log LOG =
      LogFactory.getLog(IntegerSplitter.class.getName());

    public List<InputSplit> split(Configuration conf, ResultSet results,
        String colName) throws SQLException {

      long minVal = results.getLong(1);
      long maxVal = results.getLong(2);

      String lowClausePrefix = colName + " >= ";
      String highClausePrefix = colName + " < ";

      int numSplits = ConfigurationHelper.getConfNumMaps(conf);
      if (numSplits < 1) {
        numSplits = 1;
      }

      if (results.getString(1) == null && results.getString(2) == null) {
        // Range is null to null. Return a null split accordingly.
        List<InputSplit> splits = new ArrayList<InputSplit>();
        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
            colName + " IS NULL", colName + " IS NULL"));
        return splits;
      }

      // Get all the split points together.
      List<Long> splitPoints = split(numSplits, minVal, maxVal);
      if (LOG.isDebugEnabled()) {
        LOG.debug(String.format("Splits: [%,28d to %,28d] into %d parts",
            minVal, maxVal, numSplits));
        for (int i = 0; i < splitPoints.size(); i++) {
          LOG.debug(String.format("%,28d", splitPoints.get(i)));
        }
      }
      List<InputSplit> splits = new ArrayList<InputSplit>();

      // Turn the split points into a set of intervals.
      long start = splitPoints.get(0);
      for (int i = 1; i < splitPoints.size(); i++) {
        long end = splitPoints.get(i);

        if (i == splitPoints.size() - 1) {
          // This is the last one; use a closed interval.
          splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
              lowClausePrefix + Long.toString(start),
              colName + " <= " + Long.toString(end)));
        } else {
          // Normal open-interval case.
          splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
              lowClausePrefix + Long.toString(start),
              highClausePrefix + Long.toString(end)));
        }

        start = end;
      }

      if (results.getString(1) == null || results.getString(2) == null) {
        // At least one extrema is null; add a null split.
        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
            colName + " IS NULL", colName + " IS NULL"));
      }

      return splits;
    }

    /**
     * Returns a list of longs one element longer than the list of input splits.
     * This represents the boundaries between input splits.
     * All splits are open on the top end, except the last one.
     *
     * So the list [0, 5, 8, 12, 18] would represent splits capturing the
     * intervals:
     *
     * [0, 5)
     * [5, 8)
     * [8, 12)
     * [12, 18] note the closed interval for the last split.
     */
    public List<Long> split(long numSplits, long minVal, long maxVal)
        throws SQLException {

      List<Long> splits = new ArrayList<Long>();

      // We take the min-max interval and divide by the numSplits and also
      // calculate a remainder.  Because of integer division rules, numsplits *
      // splitSize + minVal will always be <= maxVal.  We then use the remainder
      // and add 1 if the current split index is less than the < the remainder.
      // This is guaranteed to add up to remainder and not surpass the value.
      long splitSize = (maxVal - minVal) / numSplits;
      long remainder = (maxVal - minVal) % numSplits;
      long curVal = minVal;

      // This will honor numSplits as long as split size > 0.  If split size is
      // 0, it will have remainder splits.
      for (int i = 0; i <= numSplits; i++) {
        splits.add(curVal);
        if (curVal >= maxVal) {
          break;
        }
        curVal += splitSize;
        curVal += (i < remainder) ? 1 : 0;
      }

      if (splits.size() == 1) {
        // make a valid singleton split
        splits.add(maxVal);
      } else if ((maxVal - minVal) < numSplits) {
      // } else if ((maxVal - minVal) <= numSplits) { // modified '<=' -> '<'
        // Edge case when there is lesser split points (intervals) then
        // requested number of splits. In such case we are creating last split
        // with two values, for example interval [1, 5] broken down into 5
        // splits will create following conditions:
        //  * 1 <= x < 2
        //  * 2 <= x < 3
        //  * 3 <= x < 4
        //  * 4 <= x <= 5
        // Notice that the last split have twice more data than others. In
        // those cases we add one maxVal at the end to create following splits
        // instead:
        //  * 1 <= x < 2
        //  * 2 <= x < 3
        //  * 3 <= x < 4
        //  * 4 <= x < 5
        //  * 5 <= x <= 5
        splits.add(maxVal);
      }

      return splits;
    }
}
{code}


was (Author: jongho):
BigDecimalSplitter.java

{code}
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
// modified by Jongho Lee at Samsung SDS. (jongho.lee84@gmail.com)
package org.apache.sqoop.mapreduce.db;

import java.math.BigDecimal;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;

import com.cloudera.sqoop.config.ConfigurationHelper;
import com.cloudera.sqoop.mapreduce.db.DBSplitter;
import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;

/**
 * Implement DBSplitter over BigDecimal values.
 */
public class BigDecimalSplitter implements DBSplitter  {
  private static final Log LOG = LogFactory.getLog(BigDecimalSplitter.class);

  public List<InputSplit> split(Configuration conf, ResultSet results,
      String colName) throws SQLException {

    BigDecimal minVal = results.getBigDecimal(1);
    BigDecimal maxVal = results.getBigDecimal(2);

    String lowClausePrefix = colName + " >= ";
    String highClausePrefix = colName + " < ";

    BigDecimal numSplits = new BigDecimal(
        ConfigurationHelper.getConfNumMaps(conf));

    if (minVal == null && maxVal == null) {
      // Range is null to null. Return a null split accordingly.
      List<InputSplit> splits = new ArrayList<InputSplit>();
      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
          colName + " IS NULL", colName + " IS NULL"));
      return splits;
    }

    if (minVal == null || maxVal == null) {
      // Don't know what is a reasonable min/max value for interpolation. Fail.
      LOG.error("Cannot find a range for NUMERIC or DECIMAL "
          + "fields with one end NULL.");
      return null;
    }

    // Get all the split points together.
    List<BigDecimal> splitPoints = split(numSplits, minVal, maxVal);
    List<InputSplit> splits = new ArrayList<InputSplit>();

    // Turn the split points into a set of intervals.
    BigDecimal start = splitPoints.get(0);
    for (int i = 1; i < splitPoints.size(); i++) {
      BigDecimal end = splitPoints.get(i);

      if (i == splitPoints.size() - 1) {
        // This is the last one; use a closed interval.
        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
            lowClausePrefix + start.toString(),
            colName + " <= " + end.toString()));
      } else {
        // Normal open-interval case.
        splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
            lowClausePrefix + start.toString(),
            highClausePrefix + end.toString()));
      }

      start = end;
    }

    return splits;
  }

  private static final BigDecimal MIN_INCREMENT =
      new BigDecimal(10000 * Double.MIN_VALUE);

  /**
   * Divide numerator by denominator. If impossible in exact mode, use rounding.
   */
  protected BigDecimal tryDivide(BigDecimal numerator, BigDecimal denominator) {
    try {
      return numerator.divide(denominator);
    } catch (ArithmeticException ae) {
      return numerator.divide(denominator, BigDecimal.ROUND_HALF_UP);
    }
  }

  /**
   * Returns a list of BigDecimals one element longer than the list of input
   * splits.  This represents the boundaries between input splits.  All splits
   * are open on the top end, except the last one.
   *
   * So the list [0, 5, 8, 12, 18] would represent splits capturing the
   * intervals:
   *
   * [0, 5)
   * [5, 8)
   * [8, 12)
   * [12, 18] note the closed interval for the last split.
   */
  protected List<BigDecimal> split(BigDecimal numSplits, BigDecimal minVal,
      BigDecimal maxVal) throws SQLException {

    List<BigDecimal> splits = new ArrayList<BigDecimal>();

    // Use numSplits as a hint. May need an extra task if the size doesn't
    // divide cleanly.

    BigDecimal splitSize = tryDivide(maxVal.subtract(minVal), (numSplits));
    if (splitSize.compareTo(MIN_INCREMENT) < 0) {
      splitSize = MIN_INCREMENT;
      LOG.warn("Set BigDecimal splitSize to MIN_INCREMENT");
    }

    BigDecimal curVal = minVal;

    for (int i = 0; i < numSplits - 1; i++) {
    // while (curVal.compareTo(maxVal) <= 0) { 
      // modified: while-loop is replaced with for-loop and if-condition.
      if (curVal.compareTo(maxVal) > 0)
        break;
      splits.add(curVal);
      curVal = curVal.add(splitSize);
    }

    if (splits.get(splits.size() - 1).compareTo(maxVal) != 0
        || splits.size() == 1) {
      // We didn't end on the maxVal. Add that to the end of the list.
      splits.add(maxVal);
    }

    return splits;
  }
}
{code}

FloatSplitter.java

{code}
/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
// modified by Jongho Lee at Samsung SDS. (jongho.lee84@gmail.com)
package org.apache.sqoop.mapreduce.db;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;

import com.cloudera.sqoop.config.ConfigurationHelper;
import com.cloudera.sqoop.mapreduce.db.DBSplitter;
import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;

/**
 * Implement DBSplitter over floating-point values.
 */
public class FloatSplitter implements DBSplitter  {

  private static final Log LOG = LogFactory.getLog(FloatSplitter.class);

  private static final double MIN_INCREMENT = 10000 * Double.MIN_VALUE;

  public List<InputSplit> split(Configuration conf, ResultSet results,
      String colName) throws SQLException {

    LOG.warn("Generating splits for a floating-point index column. Due to the");
    LOG.warn("imprecise representation of floating-point values in Java, this");
    LOG.warn("may result in an incomplete import.");
    LOG.warn("You are strongly encouraged to choose an integral split column.");

    List<InputSplit> splits = new ArrayList<InputSplit>();

    if (results.getString(1) == null && results.getString(2) == null) {
      // Range is null to null. Return a null split accordingly.
      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
          colName + " IS NULL", colName + " IS NULL"));
      return splits;
    }

    double minVal = results.getDouble(1);
    double maxVal = results.getDouble(2);

    // Use this as a hint. May need an extra task if the size doesn't
    // divide cleanly.
    int numSplits = ConfigurationHelper.getConfNumMaps(conf);
    double splitSize = (maxVal - minVal) / (double) numSplits;

    if (splitSize < MIN_INCREMENT) {
      splitSize = MIN_INCREMENT;
    }

    String lowClausePrefix = colName + " >= ";
    String highClausePrefix = colName + " < ";

    double curLower = minVal;
    double curUpper = curLower + splitSize;

    for (int i = 0; i < numSplits - 1; i++) {
      // while (curUpper < maxVal) {  
      // modified: while-loop is replaced with for-loop and if-condition.
      if (curUpper >= maxVal)  
        break;
      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
          lowClausePrefix + Double.toString(curLower),
          highClausePrefix + Double.toString(curUpper)));

      curLower = curUpper;
      curUpper += splitSize;

    }

    // Catch any overage and create the closed interval for the last split.
    if (curLower <= maxVal || splits.size() == 1) {
      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
          lowClausePrefix + Double.toString(curLower),
          colName + " <= " + Double.toString(maxVal)));
      // curUpper -> curLower // changed
    }

    if (results.getString(1) == null || results.getString(2) == null) {
      // At least one extrema is null; add a null split.
      splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
          colName + " IS NULL", colName + " IS NULL"));
    }

    return splits;
  }
}
{code}

> One of mappers does not load data from mySql if double column is used as split key
> ----------------------------------------------------------------------------------
>
>                 Key: SQOOP-1312
>                 URL: https://issues.apache.org/jira/browse/SQOOP-1312
>             Project: Sqoop
>          Issue Type: Bug
>    Affects Versions: 1.4.4
>            Reporter: Jong Ho Lee
>              Labels: sourcecode
>
> When we used Sqoop to load data from mySQL using one double column as split-key in Samsung
SDS,
>   the last mapper did not load data from mySQL at all. 
>   The number of mappers was sometimes increased by 1.
>   I think they were caused by some bugs in FloatSplitter.java
>   For the last split, lowClausePrefix + Double.toString(curUpper), may be lowClausePrefix
+ Double.toString(curLower).
>   In while (curUpper < maxVal) loop, because of round-off error, 
>   minVal + splitSize * numSplits can be smaller than maxVal.
>   Therefore, using for-loop would be better.
>   Attached is a proposed new FloatSplitter.java
> {code}
> /**
>  * Licensed to the Apache Software Foundation (ASF) under one
>  * or more contributor license agreements.  See the NOTICE file
>  * distributed with this work for additional information
>  * regarding copyright ownership.  The ASF licenses this file
>  * to you under the Apache License, Version 2.0 (the
>  * "License"); you may not use this file except in compliance
>  * with the License.  You may obtain a copy of the License at
>  *
>  *     http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> // modified by Jongho Lee at Samsung SDS.
> package org.apache.sqoop.mapreduce.db;
> import java.sql.ResultSet;
> import java.sql.SQLException;
> import java.util.ArrayList;
> import java.util.List;
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.mapreduce.InputSplit;
> import com.cloudera.sqoop.config.ConfigurationHelper;
> import com.cloudera.sqoop.mapreduce.db.DBSplitter;
> import com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat;
> /**
>  * Implement DBSplitter over floating-point values.
>  */
> public class FloatSplitter implements DBSplitter  {
>   private static final Log LOG = LogFactory.getLog(FloatSplitter.class);
>   private static final double MIN_INCREMENT = 10000 * Double.MIN_VALUE;
>   public List<InputSplit> split(Configuration conf, ResultSet results,
>       String colName) throws SQLException {
>     LOG.warn("Generating splits for a floating-point index column. Due to the");
>     LOG.warn("imprecise representation of floating-point values in Java, this");
>     LOG.warn("may result in an incomplete import.");
>     LOG.warn("You are strongly encouraged to choose an integral split column.");
>     List<InputSplit> splits = new ArrayList<InputSplit>();
>     if (results.getString(1) == null && results.getString(2) == null) {
>       // Range is null to null. Return a null split accordingly.
>       splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
>           colName + " IS NULL", colName + " IS NULL"));
>       return splits;
>     }
>     double minVal = results.getDouble(1);
>     double maxVal = results.getDouble(2);
>     // Use this as a hint. May need an extra task if the size doesn't
>     // divide cleanly.
>     int numSplits = ConfigurationHelper.getConfNumMaps(conf);
>     double splitSize = (maxVal - minVal) / (double) numSplits;
>     if (splitSize < MIN_INCREMENT) {
>       splitSize = MIN_INCREMENT;
>     }
>     String lowClausePrefix = colName + " >= ";
>     String highClausePrefix = colName + " < ";
>     double curLower = minVal;
>     double curUpper = curLower + splitSize;
>     for (int i = 0; i < numSplits - 1; i++) {
>       // while (curUpper < maxVal) {  // changed to for loop
>       splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
>           lowClausePrefix + Double.toString(curLower),
>           highClausePrefix + Double.toString(curUpper)));
>       curLower = curUpper;
>       curUpper += splitSize;
>     }
>     // Catch any overage and create the closed interval for the last split.
>     if (curLower <= maxVal || splits.size() == 1) {
>       splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
>           lowClausePrefix + Double.toString(curLower),
>           colName + " <= " + Double.toString(maxVal)));
>       // curUpper -> curLower // changed
>     }
>     if (results.getString(1) == null || results.getString(2) == null) {
>       // At least one extrema is null; add a null split.
>       splits.add(new DataDrivenDBInputFormat.DataDrivenDBInputSplit(
>           colName + " IS NULL", colName + " IS NULL"));
>     }
>     return splits;
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Mime
View raw message