sqoop-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jarek Jarcec Cecho <jar...@apache.org>
Subject Re: incremental import from database in direct mode
Date Thu, 13 Jun 2013 16:52:42 GMT
Hi Tim,
thank you very much for reporting the bug and providing a fix for it, greatly appreciated!
Would you mind creating a JIRA [1] and attaching your patch there? Unfortunately due to legal
restrictions we can't accept "email" patches.

Jarcec

Links:
1: https://issues.apache.org/jira/browse/SQOOP

On Thu, Jun 13, 2013 at 12:46:48PM -0400, Tim Howe wrote:
> Hello all,
> 
> I was recently tasked with looking into a problem using Sqoop's
> incremental import on our installation, namely that any imports after
> the first would report success but the data would never appear.  A
> temporary file was created on HDFS with the data but deleted upon
> completion rather than being moved into place.
> 
> It turned out to be a conflict between the "direct mode" database
> manager (for PostgreSQL, in this case) and "incremental mode" import.
> Ordinarily Sqoop ends up creating files named part-m-nnnnn where nnnnn
> is an incrementing file partition number.  However the direct mode
> importer creates files of the form data-nnnnn.  This poses a problem
> because AppendUtils, which is used to move files into place at the end
> of a direct import, only copies files which match that part-m-nnnnn
> format and discards the rest.
> 
> I've written a patch which causes direct imports to use the same naming
> convention elsewhere.  Attached please also find some changes to
> AppendUtils which improve resiliency especially if there happen to be
> multiple concurrent operations on the same table.  This patch is against
> sqoop-1.3.0-cdh3u3 but seems to apply and build with minimal changes
> across the whole 1.x series.
> 
> Please let me know if anyone finds this useful or if you have any
> further suggestions.  In particular I am curious where the
> "part-m-nnnnn" naming comes from and if the "-m" signifies anything.  I
> did hunt around in order to find the code which creates those files but
> with no luck.
> 
> Thanks and regards,
> -- 
> Tim Howe
> Data Warehouse
> TripAdvisor

> diff -ru unpack/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/AppendUtils.java
devel/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/AppendUtils.java
> --- unpack/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/AppendUtils.java	2012-01-26
13:42:29.000000000 -0500
> +++ devel/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/AppendUtils.java	2013-06-13
11:31:22.476128082 -0400
> @@ -51,6 +51,8 @@
>    private static final String FILEPART_SEPARATOR = "-";
>    private static final String FILEEXT_SEPARATOR = ".";
>  
> +  private static final Pattern DATA_PART_PATTERN = Pattern.compile("part.*-([0-9]{"
+ PARTITION_DIGITS + "}+).*");
> +
>    private ImportJobContext context = null;
>  
>    public AppendUtils(ImportJobContext context) {
> @@ -118,11 +120,10 @@
>      int nextPartition = 0;
>      FileStatus[] existingFiles = fs.listStatus(targetDir);
>      if (existingFiles != null && existingFiles.length > 0) {
> -      Pattern patt = Pattern.compile("part.*-([0-9][0-9][0-9][0-9][0-9]).*");
>        for (FileStatus fileStat : existingFiles) {
>          if (!fileStat.isDir()) {
>            String filename = fileStat.getPath().getName();
> -          Matcher mat = patt.matcher(filename);
> +          Matcher mat = DATA_PART_PATTERN.matcher(filename);
>            if (mat.matches()) {
>              int thisPart = Integer.parseInt(mat.group(1));
>              if (thisPart >= nextPartition) {
> @@ -142,52 +143,94 @@
>    }
>  
>    /**
> -   * Move files from source to target using a specified starting partition.
> +   * Move selected files from source to target using a specified starting partition.
> +   *
> +   * Directories are moved without restriction.  Note that the serial
> +   * number of directories bears no relation to the file partition
> +   * numbering.
>     */
>    private void moveFiles(FileSystem fs, Path sourceDir, Path targetDir,
>        int partitionStart) throws IOException {
>  
> -    NumberFormat numpart = NumberFormat.getInstance();
> -    numpart.setMinimumIntegerDigits(PARTITION_DIGITS);
> -    numpart.setGroupingUsed(false);
> -    Pattern patt = Pattern.compile("part.*-([0-9][0-9][0-9][0-9][0-9]).*");
> -    FileStatus[] tempFiles = fs.listStatus(sourceDir);
> +    /* list files in the source dir and check for errors */
> +
> +    FileStatus[] sourceFiles = fs.listStatus(sourceDir);
>  
> -    if (null == tempFiles) {
> +    if (null == sourceFiles) {
>        // If we've already checked that the dir exists, and now it can't be
>        // listed, this is a genuine error (permissions, fs integrity, or other).
>        throw new IOException("Could not list files from " + sourceDir);
>      }
>  
> -    // Move and rename files & directories from temporary to target-dir thus
> -    // appending file's next partition
> -    for (FileStatus fileStat : tempFiles) {
> -      if (!fileStat.isDir()) {
> -        // Move imported data files
> -        String filename = fileStat.getPath().getName();
> -        Matcher mat = patt.matcher(filename);
> -        if (mat.matches()) {
> -          String name = getFilename(filename);
> -          String fileToMove = name.concat(numpart.format(partitionStart++));
> -          String extension = getFileExtension(filename);
> -          if (extension != null) {
> -            fileToMove = fileToMove.concat(extension);
> -          }
> -          LOG.debug("Filename: " + filename + " repartitioned to: "
> -              + fileToMove);
> -          fs.rename(fileStat.getPath(), new Path(targetDir, fileToMove));
> -        }
> -      } else {
> -        // Move directories (_logs & any other)
> -        String dirName = fileStat.getPath().getName();
> -        Path path = new Path(targetDir, dirName);
> -        int dirNumber = 0;
> -        while (fs.exists(path)) {
> -          path = new Path(targetDir, dirName.concat("-").concat(
> -              numpart.format(dirNumber++)));
> +
> +    /* state used throughout the entire move operation */
> +
> +    // pad the data partition number thusly
> +    NumberFormat partFormat = NumberFormat.getInstance();
> +    partFormat.setMinimumIntegerDigits(PARTITION_DIGITS);
> +    partFormat.setGroupingUsed(false);
> +
> +    // where the data partitioning is currently at
> +    int dataPart = partitionStart;
> +
> +
> +    /* loop through all top-level files and copy matching ones */
> +
> +    for (FileStatus fileStatus : sourceFiles) {
> +      String        sourceFilename = fileStatus.getPath().getName();
> +      StringBuilder destFilename   = new StringBuilder();
> +
> +      if (fileStatus.isDir()) {    // move all subdirectories
> +        // pass target dir as initial dest to prevent nesting inside preexisting dir
> +        if (fs.rename(fileStatus.getPath(), targetDir)) {    
> +          LOG.debug("Directory: " + sourceFilename + " renamed to: " + sourceFilename);
> +        } else {
> +          int dirNumber = 0;
> +          Path destPath;
> +          do {
> +            // clear the builder in case this isn't the first iteration
> +            destFilename.setLength(0);
> +
> +            // name-nnnnn?
> +            destFilename
> +              .append(sourceFilename)
> +              .append("-")
> +              .append(partFormat.format(dirNumber++));
> +
> +            destPath = new Path(targetDir, destFilename.toString());
> +            if (fs.exists(destPath))
> +              continue;
> +
> +            /*
> +             * There's still a race condition right here if an
> +             * identically-named directory is created concurrently.
> +             * It can be avoided by creating a parent dir for all
> +             * migrated dirs, or by an intermediate rename.
> +             */
> +
> +          } while (!fs.rename(fileStatus.getPath(), destPath));
> +
> +          LOG.debug("Directory: " + sourceFilename + " renamed to: " + destPath.getName());
>          }
> -        LOG.debug("Directory: " + dirName + " renamed to: " + path.getName());
> -        fs.rename(fileStat.getPath(), path);
> +      } else if (DATA_PART_PATTERN.matcher(sourceFilename).matches()) {    // move only
matching top-level files
> +        do {
> +          // clear the builder in case this isn't the first iteration
> +          destFilename.setLength(0);
> +
> +          // name-nnnnn
> +          destFilename
> +            .append(getFilename(sourceFilename))
> +            .append(partFormat.format(dataPart++));
> +
> +          // .ext?
> +          String extension = getFileExtension(sourceFilename);
> +          if (extension != null)
> +            destFilename.append(getFileExtension(sourceFilename));
> +        } while (!fs.rename(fileStatus.getPath(), new Path(targetDir, destFilename.toString())));
> +
> +        LOG.debug("Filename: " + sourceFilename + " repartitioned to: " + destFilename.toString());
> +      } else {    // ignore everything else
> +        LOG.debug("Filename: " + sourceFilename + " ignored");
>        }
>      }
>    }
> diff -ru unpack/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/DirectImportUtils.java
devel/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/DirectImportUtils.java
> --- unpack/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/DirectImportUtils.java
2012-01-26 13:42:29.000000000 -0500
> +++ devel/sqoop-1.3.0-cdh3u3/src/java/com/cloudera/sqoop/util/DirectImportUtils.java
2013-06-13 11:31:22.475128082 -0400
> @@ -88,7 +88,7 @@
>  
>      // This Writer will be closed by the caller.
>      return new SplittableBufferedWriter(
> -        new SplittingOutputStream(conf, destDir, "data-",
> +        new SplittingOutputStream(conf, destDir, "part-m-",
>          options.getDirectSplitSize(), getCodec(conf, options)));
>    }
>  


Mime
View raw message