beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [1/2] beam git commit: [BEAM-2005, BEAM-2030, BEAM-2031, BEAM-2032, BEAM-2033, BEAM-2070] Base implementation of HadoopFileSystem.
Date Tue, 02 May 2017 04:34:13 GMT
Repository: beam
Updated Branches:
  refs/heads/master 7fa00647d -> a3ba8e71b


[BEAM-2005, BEAM-2030, BEAM-2031, BEAM-2032, BEAM-2033, BEAM-2070] Base implementation of
HadoopFileSystem.

TODO:
* Add multiplexing FileSystem that is able to route requests based upon the base URI when
configured for multiple file systems.
* Take a look at copy/rename again to see if we can do better than moving all the bytes through
the local machine.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/23d16f7d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/23d16f7d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/23d16f7d

Branch: refs/heads/master
Commit: 23d16f7deb8130457d7fadab498cebc3c994ad76
Parents: 7fa0064
Author: Luke Cwik <lcwik@google.com>
Authored: Fri Apr 28 19:07:59 2017 -0700
Committer: Luke Cwik <lcwik@google.com>
Committed: Mon May 1 21:33:16 2017 -0700

----------------------------------------------------------------------
 sdks/java/io/hdfs/pom.xml                       |  44 ++++
 .../beam/sdk/io/hdfs/HadoopFileSystem.java      | 179 +++++++++++++-
 .../sdk/io/hdfs/HadoopFileSystemRegistrar.java  |  27 +-
 .../beam/sdk/io/hdfs/HadoopResourceId.java      |  46 +++-
 .../io/hdfs/HadoopFileSystemRegistrarTest.java  |  39 ++-
 .../beam/sdk/io/hdfs/HadoopFileSystemTest.java  | 244 +++++++++++++++++++
 6 files changed, 555 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/23d16f7d/sdks/java/io/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml
index 46cf8cf..daa3b26 100644
--- a/sdks/java/io/hdfs/pom.xml
+++ b/sdks/java/io/hdfs/pom.xml
@@ -44,6 +44,37 @@
     </plugins>
   </build>
 
+  <properties>
+    <!--
+      This is the version of Hadoop used to compile the hadoop-common module.
+      This dependency is defined with a provided scope.
+      Users must supply their own Hadoop version at runtime.
+    -->
+    <hadoop.version>2.7.3</hadoop.version>
+  </properties>
+
+  <dependencyManagement>
+    <!--
+       We define dependencies here instead of sdks/java/io because
+       of a version mimatch between this Hadoop version and other
+       Hadoop versions declared in other io submodules.
+    -->
+    <dependencies>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-hdfs</artifactId>
+        <classifier>tests</classifier>
+        <version>${hadoop.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-minicluster</artifactId>
+        <version>${hadoop.version}</version>
+      </dependency>
+    </dependencies>
+  </dependencyManagement>
+
   <dependencies>
     <dependency>
       <groupId>org.apache.beam</groupId>
@@ -131,6 +162,19 @@
 
     <!-- test dependencies -->
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minicluster</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.beam</groupId>
       <artifactId>beam-runners-direct-java</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/beam/blob/23d16f7d/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
index a8bdd44..154a818 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java
@@ -17,65 +17,224 @@
  */
 package org.apache.beam.sdk.io.hdfs;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
 import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
 import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import org.apache.beam.sdk.io.FileSystem;
 import org.apache.beam.sdk.io.fs.CreateOptions;
 import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.MatchResult.Status;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
 
 /**
  * Adapts {@link org.apache.hadoop.fs.FileSystem} connectors to be used as
  * Apache Beam {@link FileSystem FileSystems}.
+ *
+ * <p>The following HDFS FileSystem(s) are known to be unsupported:
+ * <ul>
+ *   <li>FTPFileSystem: Missing seek support within FTPInputStream</li>
+ * </ul>
+ *
+ * <p>This implementation assumes that the underlying Hadoop {@link FileSystem} is
seek
+ * efficient when reading. The source code for the following {@link FSInputStream} implementations
+ * (as of Hadoop 2.7.1) do provide seek implementations:
+ * <ul>
+ *   <li>HarFsInputStream</li>
+ *   <li>S3InputStream</li>
+ *   <li>DFSInputStream</li>
+ *   <li>SwiftNativeInputStream</li>
+ *   <li>NativeS3FsInputStream</li>
+ *   <li>LocalFSFileInputStream</li>
+ *   <li>NativeAzureFsInputStream</li>
+ *   <li>S3AInputStream</li>
+ * </ul>
  */
 class HadoopFileSystem extends FileSystem<HadoopResourceId> {
+  @VisibleForTesting
+  final org.apache.hadoop.fs.FileSystem fileSystem;
 
-  HadoopFileSystem() {}
+  HadoopFileSystem(Configuration configuration) throws IOException {
+    this.fileSystem = org.apache.hadoop.fs.FileSystem.newInstance(configuration);
+  }
 
   @Override
-  protected List<MatchResult> match(List<String> specs) throws IOException {
-    throw new UnsupportedOperationException();
+  protected List<MatchResult> match(List<String> specs) {
+    ImmutableList.Builder<MatchResult> resultsBuilder = ImmutableList.builder();
+    for (String spec : specs) {
+      try {
+        FileStatus[] fileStatuses = fileSystem.globStatus(new Path(spec));
+        List<Metadata> metadata = new ArrayList<>();
+        for (FileStatus fileStatus : fileStatuses) {
+          if (fileStatus.isFile()) {
+            metadata.add(Metadata.builder()
+                .setResourceId(new HadoopResourceId(fileStatus.getPath().toUri()))
+                .setIsReadSeekEfficient(true)
+                .setSizeBytes(fileStatus.getLen())
+                .build());
+          }
+        }
+        resultsBuilder.add(MatchResult.create(Status.OK, metadata));
+      } catch (IOException e) {
+        resultsBuilder.add(MatchResult.create(Status.ERROR, e));
+      }
+    }
+    return resultsBuilder.build();
   }
 
   @Override
   protected WritableByteChannel create(HadoopResourceId resourceId, CreateOptions createOptions)
       throws IOException {
-    throw new UnsupportedOperationException();
+    return Channels.newChannel(fileSystem.create(resourceId.toPath()));
   }
 
   @Override
   protected ReadableByteChannel open(HadoopResourceId resourceId) throws IOException {
-    throw new UnsupportedOperationException();
+    FileStatus fileStatus = fileSystem.getFileStatus(resourceId.toPath());
+    return new HadoopSeekableByteChannel(fileStatus, fileSystem.open(resourceId.toPath()));
   }
 
   @Override
   protected void copy(
       List<HadoopResourceId> srcResourceIds,
       List<HadoopResourceId> destResourceIds) throws IOException {
-    throw new UnsupportedOperationException();
+    for (int i = 0; i < srcResourceIds.size(); ++i) {
+      // Unfortunately HDFS FileSystems don't support a native copy operation so we are forced
+      // to use the inefficient implementation found in FileUtil which copies all the bytes
through
+      // the local machine.
+      //
+      // HDFS FileSystem does define a concat method but could only find the DFSFileSystem
+      // implementing it. The DFSFileSystem implemented concat by deleting the srcs after
which
+      // is not what we want. Also, all the other FileSystem implementations I saw threw
+      // UnsupportedOperationException within concat.
+      FileUtil.copy(
+          fileSystem, srcResourceIds.get(i).toPath(),
+          fileSystem, destResourceIds.get(i).toPath(),
+          false,
+          true,
+          fileSystem.getConf());
+    }
   }
 
   @Override
   protected void rename(
       List<HadoopResourceId> srcResourceIds,
       List<HadoopResourceId> destResourceIds) throws IOException {
-    throw new UnsupportedOperationException();
+    for (int i = 0; i < srcResourceIds.size(); ++i) {
+      fileSystem.rename(
+          srcResourceIds.get(i).toPath(),
+          destResourceIds.get(i).toPath());
+    }
   }
 
   @Override
   protected void delete(Collection<HadoopResourceId> resourceIds) throws IOException
{
-    throw new UnsupportedOperationException();
+    for (HadoopResourceId resourceId : resourceIds) {
+      fileSystem.delete(resourceId.toPath(), false);
+    }
   }
 
   @Override
   protected HadoopResourceId matchNewResource(String singleResourceSpec, boolean isDirectory)
{
-    throw new UnsupportedOperationException();
+    try {
+      if (singleResourceSpec.endsWith("/") && !isDirectory) {
+        throw new IllegalArgumentException(String.format(
+            "Expected file path but received directory path %s", singleResourceSpec));
+      }
+      return !singleResourceSpec.endsWith("/") && isDirectory
+          ? new HadoopResourceId(new URI(singleResourceSpec + "/"))
+          : new HadoopResourceId(new URI(singleResourceSpec));
+    } catch (URISyntaxException e) {
+      throw new IllegalArgumentException(
+          String.format("Invalid spec %s directory %s", singleResourceSpec, isDirectory),
+          e);
+    }
   }
 
   @Override
   protected String getScheme() {
-    return "hdfs";
+    return fileSystem.getScheme();
+  }
+
+  /** An adapter around {@link FSDataInputStream} that implements {@link SeekableByteChannel}.
*/
+  private static class HadoopSeekableByteChannel implements SeekableByteChannel {
+    private final FileStatus fileStatus;
+    private final FSDataInputStream inputStream;
+    private boolean closed;
+
+    private HadoopSeekableByteChannel(FileStatus fileStatus, FSDataInputStream inputStream)
{
+      this.fileStatus = fileStatus;
+      this.inputStream = inputStream;
+      this.closed = false;
+    }
+
+    @Override
+    public int read(ByteBuffer dst) throws IOException {
+      if (closed) {
+        throw new IOException("Channel is closed");
+      }
+      return inputStream.read(dst);
+    }
+
+    @Override
+    public int write(ByteBuffer src) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long position() throws IOException {
+      if (closed) {
+        throw new IOException("Channel is closed");
+      }
+      return inputStream.getPos();
+    }
+
+    @Override
+    public SeekableByteChannel position(long newPosition) throws IOException {
+      if (closed) {
+        throw new IOException("Channel is closed");
+      }
+      inputStream.seek(newPosition);
+      return this;
+    }
+
+    @Override
+    public long size() throws IOException {
+      if (closed) {
+        throw new IOException("Channel is closed");
+      }
+      return fileStatus.getLen();
+    }
+
+    @Override
+    public SeekableByteChannel truncate(long size) throws IOException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean isOpen() {
+      return !closed;
+    }
+
+    @Override
+    public void close() throws IOException {
+      closed = true;
+      inputStream.close();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/23d16f7d/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
index cc22f4f..9159df3 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrar.java
@@ -17,12 +17,18 @@
  */
 package org.apache.beam.sdk.io.hdfs;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import com.google.auto.service.AutoService;
 import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 import javax.annotation.Nonnull;
 import org.apache.beam.sdk.io.FileSystem;
 import org.apache.beam.sdk.io.FileSystemRegistrar;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.hadoop.conf.Configuration;
 
 /**
  * {@link AutoService} registrar for the {@link HadoopFileSystem}.
@@ -32,6 +38,25 @@ public class HadoopFileSystemRegistrar implements FileSystemRegistrar {
 
   @Override
   public Iterable<FileSystem> fromOptions(@Nonnull PipelineOptions options) {
-    return ImmutableList.<FileSystem>of(new HadoopFileSystem());
+    List<Configuration> configurations =
+        options.as(HadoopFileSystemOptions.class).getHdfsConfiguration();
+    if (configurations == null) {
+      configurations = Collections.emptyList();
+    }
+    checkArgument(configurations.size() <= 1,
+        String.format(
+            "The %s currently only supports at most a single Hadoop configuration.",
+            HadoopFileSystemRegistrar.class.getSimpleName()));
+
+    ImmutableList.Builder<FileSystem> builder = ImmutableList.builder();
+    for (Configuration configuration : configurations) {
+      try {
+        builder.add(new HadoopFileSystem(configuration));
+      } catch (IOException e) {
+        throw new IllegalArgumentException(String.format(
+            "Failed to construct Hadoop filesystem with configuration %s", configuration),
e);
+      }
+    }
+    return builder.build();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/23d16f7d/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
index 8e0b58c..e570864 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopResourceId.java
@@ -17,35 +17,65 @@
  */
 package org.apache.beam.sdk.io.hdfs;
 
+import java.net.URI;
+import java.util.Objects;
 import org.apache.beam.sdk.io.fs.ResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.hadoop.fs.Path;
 
 /**
  * {@link ResourceId} implementation for the {@link HadoopFileSystem}.
  */
-public class HadoopResourceId implements ResourceId {
+class HadoopResourceId implements ResourceId {
+  private final URI uri;
+
+  HadoopResourceId(URI uri) {
+    this.uri = uri;
+  }
 
   @Override
   public ResourceId resolve(String other, ResolveOptions resolveOptions) {
-    throw new UnsupportedOperationException();
+    return new HadoopResourceId(uri.resolve(other));
   }
 
   @Override
   public ResourceId getCurrentDirectory() {
-    throw new UnsupportedOperationException();
+    return new HadoopResourceId(uri.getPath().endsWith("/") ? uri : uri.resolve("."));
+  }
+
+  public boolean isDirectory() {
+    return uri.getPath().endsWith("/");
+  }
+
+  @Override
+  public String getFilename() {
+    return new Path(uri).getName();
   }
 
   @Override
   public String getScheme() {
-    throw new UnsupportedOperationException();
+    return uri.getScheme();
   }
 
   @Override
-  public String getFilename() {
-    throw new UnsupportedOperationException();
+  public String toString() {
+    return uri.toString();
   }
 
-  public boolean isDirectory() {
-    throw new UnsupportedOperationException();
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof HadoopResourceId)) {
+      return false;
+    }
+    return Objects.equals(uri, ((HadoopResourceId) obj).uri);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(uri);
+  }
+
+  Path toPath() {
+    return new Path(uri);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/23d16f7d/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
index c332af5..96f7102 100644
--- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemRegistrarTest.java
@@ -17,17 +17,24 @@
  */
 package org.apache.beam.sdk.io.hdfs;
 
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
+import java.net.URI;
 import java.util.ServiceLoader;
 import org.apache.beam.sdk.io.FileSystem;
 import org.apache.beam.sdk.io.FileSystemRegistrar;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -37,13 +44,35 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class HadoopFileSystemRegistrarTest {
 
+  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+  private Configuration configuration;
+  private MiniDFSCluster hdfsCluster;
+  private URI hdfsClusterBaseUri;
+
+  @Before
+  public void setUp() throws Exception {
+    configuration = new Configuration();
+    configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.getRoot().getAbsolutePath());
+    MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(configuration);
+    hdfsCluster = builder.build();
+    hdfsClusterBaseUri = new URI(configuration.get("fs.defaultFS") + "/");
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    hdfsCluster.shutdown();
+  }
+
   @Test
   public void testServiceLoader() {
+    HadoopFileSystemOptions options = PipelineOptionsFactory.as(HadoopFileSystemOptions.class);
+    options.setHdfsConfiguration(ImmutableList.of(configuration));
     for (FileSystemRegistrar registrar
         : Lists.newArrayList(ServiceLoader.load(FileSystemRegistrar.class).iterator())) {
       if (registrar instanceof HadoopFileSystemRegistrar) {
-        Iterable<FileSystem> fileSystems = registrar.fromOptions(PipelineOptionsFactory.create());
-        assertThat(fileSystems, contains(instanceOf(HadoopFileSystem.class)));
+        Iterable<FileSystem> fileSystems = registrar.fromOptions(options);
+        assertEquals(hdfsClusterBaseUri.getScheme(),
+            ((HadoopFileSystem) Iterables.getOnlyElement(fileSystems)).getScheme());
         return;
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/23d16f7d/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
new file mode 100644
index 0000000..6cb5326
--- /dev/null
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystemTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.hdfs;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.io.ByteStreams;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.List;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.MatchResult.Status;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.util.MimeTypes;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link HadoopFileSystem}.
+ */
+@RunWith(JUnit4.class)
+public class HadoopFileSystemTest {
+
+  @Rule public TestPipeline p = TestPipeline.create();
+  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+  private Configuration configuration;
+  private MiniDFSCluster hdfsCluster;
+  private URI hdfsClusterBaseUri;
+  private HadoopFileSystem fileSystem;
+
+  @Before
+  public void setUp() throws Exception {
+    configuration = new Configuration();
+    configuration.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, tmpFolder.getRoot().getAbsolutePath());
+    MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(configuration);
+    hdfsCluster = builder.build();
+    hdfsClusterBaseUri = new URI(configuration.get("fs.defaultFS") + "/");
+    fileSystem = new HadoopFileSystem(configuration);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    hdfsCluster.shutdown();
+  }
+
+  @Test
+  public void testCreateAndReadFile() throws Exception {
+    create("testFile", "testData".getBytes());
+    assertArrayEquals("testData".getBytes(), read("testFile"));
+  }
+
+  @Test
+  public void testCopy() throws Exception {
+    create("testFileA", "testDataA".getBytes());
+    create("testFileB", "testDataB".getBytes());
+    fileSystem.copy(
+        ImmutableList.of(
+            testPath("testFileA"),
+            testPath("testFileB")),
+        ImmutableList.of(
+            testPath("copyTestFileA"),
+            testPath("copyTestFileB")));
+    assertArrayEquals("testDataA".getBytes(), read("testFileA"));
+    assertArrayEquals("testDataB".getBytes(), read("testFileB"));
+    assertArrayEquals("testDataA".getBytes(), read("copyTestFileA"));
+    assertArrayEquals("testDataB".getBytes(), read("copyTestFileB"));
+  }
+
+  @Test
+  public void testDelete() throws Exception {
+    create("testFileA", "testDataA".getBytes());
+    create("testFileB", "testDataB".getBytes());
+    create("testFileC", "testDataC".getBytes());
+
+    // ensure files exist
+    assertArrayEquals("testDataA".getBytes(), read("testFileA"));
+    assertArrayEquals("testDataB".getBytes(), read("testFileB"));
+    assertArrayEquals("testDataC".getBytes(), read("testFileC"));
+
+    fileSystem.delete(ImmutableList.of(
+        testPath("testFileA"),
+        testPath("testFileC")));
+
+    List<MatchResult> results =
+        fileSystem.match(ImmutableList.of(testPath("testFile*").toString()));
+    assertThat(results, contains(MatchResult.create(Status.OK, ImmutableList.of(
+        Metadata.builder()
+            .setResourceId(testPath("testFileB"))
+            .setIsReadSeekEfficient(true)
+            .setSizeBytes("testDataB".getBytes().length)
+            .build()))));
+  }
+
+  @Test
+  public void testMatch() throws Exception {
+    create("testFileAA", "testDataAA".getBytes());
+    create("testFileA", "testDataA".getBytes());
+    create("testFileB", "testDataB".getBytes());
+
+    // ensure files exist
+    assertArrayEquals("testDataAA".getBytes(), read("testFileAA"));
+    assertArrayEquals("testDataA".getBytes(), read("testFileA"));
+    assertArrayEquals("testDataB".getBytes(), read("testFileB"));
+
+    List<MatchResult> results =
+        fileSystem.match(ImmutableList.of(testPath("testFileA*").toString()));
+    assertEquals(Status.OK, Iterables.getOnlyElement(results).status());
+    assertThat(Iterables.getOnlyElement(results).metadata(), containsInAnyOrder(
+        Metadata.builder()
+            .setResourceId(testPath("testFileAA"))
+            .setIsReadSeekEfficient(true)
+            .setSizeBytes("testDataAA".getBytes().length)
+            .build(),
+        Metadata.builder()
+            .setResourceId(testPath("testFileA"))
+            .setIsReadSeekEfficient(true)
+            .setSizeBytes("testDataA".getBytes().length)
+            .build()));
+  }
+
+  @Test
+  public void testRename() throws Exception {
+    create("testFileA", "testDataA".getBytes());
+    create("testFileB", "testDataB".getBytes());
+
+    // ensure files exist
+    assertArrayEquals("testDataA".getBytes(), read("testFileA"));
+    assertArrayEquals("testDataB".getBytes(), read("testFileB"));
+
+    fileSystem.rename(
+        ImmutableList.of(
+            testPath("testFileA"), testPath("testFileB")),
+        ImmutableList.of(
+            testPath("renameFileA"), testPath("renameFileB")));
+
+    List<MatchResult> results =
+        fileSystem.match(ImmutableList.of(testPath("*").toString()));
+    assertEquals(Status.OK, Iterables.getOnlyElement(results).status());
+    assertThat(Iterables.getOnlyElement(results).metadata(), containsInAnyOrder(
+        Metadata.builder()
+            .setResourceId(testPath("renameFileA"))
+            .setIsReadSeekEfficient(true)
+            .setSizeBytes("testDataA".getBytes().length)
+            .build(),
+        Metadata.builder()
+            .setResourceId(testPath("renameFileB"))
+            .setIsReadSeekEfficient(true)
+            .setSizeBytes("testDataB".getBytes().length)
+            .build()));
+
+    // ensure files exist
+    assertArrayEquals("testDataA".getBytes(), read("renameFileA"));
+    assertArrayEquals("testDataB".getBytes(), read("renameFileB"));
+  }
+
+  @Test
+  public void testMatchNewResource() throws Exception {
+    // match file spec
+    assertEquals(testPath("file"),
+        fileSystem.matchNewResource(testPath("file").toString(), false));
+    // match dir spec missing '/'
+    assertEquals(testPath("dir/"),
+        fileSystem.matchNewResource(testPath("dir").toString(), true));
+    // match dir spec with '/'
+    assertEquals(testPath("dir/"),
+        fileSystem.matchNewResource(testPath("dir/").toString(), true));
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Expected file path but received directory path");
+    fileSystem.matchNewResource(testPath("dir/").toString(), false);
+  }
+
+  @Test
+  public void testReadPipeline() throws Exception {
+    create("testFileA", "testDataA".getBytes());
+    create("testFileB", "testDataB".getBytes());
+    create("testFileC", "testDataC".getBytes());
+
+    HadoopFileSystemOptions options = TestPipeline.testingPipelineOptions()
+        .as(HadoopFileSystemOptions.class);
+    options.setHdfsConfiguration(ImmutableList.of(fileSystem.fileSystem.getConf()));
+    FileSystems.setDefaultConfigInWorkers(options);
+    PCollection<String> pc = p.apply(TextIO.Read.from(testPath("testFile*").toString()));
+    PAssert.that(pc).containsInAnyOrder("testDataA", "testDataB", "testDataC");
+    p.run();
+  }
+
+  private void create(String relativePath, byte[] contents) throws Exception {
+    try (WritableByteChannel channel = fileSystem.create(
+        testPath(relativePath),
+        StandardCreateOptions.builder().setMimeType(MimeTypes.BINARY).build())) {
+      channel.write(ByteBuffer.wrap(contents));
+    }
+  }
+
+  private byte[] read(String relativePath) throws Exception {
+    try (ReadableByteChannel channel = fileSystem.open(testPath(relativePath))) {
+      return ByteStreams.toByteArray(Channels.newInputStream(channel));
+    }
+  }
+
+  private HadoopResourceId testPath(String relativePath) {
+    return new HadoopResourceId(hdfsClusterBaseUri.resolve(relativePath));
+  }
+}


Mime
View raw message