Repository: incubator-beam
Updated Branches:
refs/heads/BEAM-357_windows-build-fails [created] 460d21cb7
fixing build on windows
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/41883300
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/41883300
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/41883300
Branch: refs/heads/BEAM-357_windows-build-fails
Commit: 418833001fe6dd581f42f7fcc3c35ef36f292007
Parents: 0e4d0a9
Author: Romain manni-Bucau <rmannibucau@gmail.com>
Authored: Sun Jun 19 21:19:57 2016 +0200
Committer: Romain manni-Bucau <rmannibucau@gmail.com>
Committed: Sun Jun 19 21:19:57 2016 +0200
----------------------------------------------------------------------
.../beam/runners/flink/WriteSinkITCase.java | 13 +
.../beam/runners/spark/SimpleWordCountTest.java | 8 +
.../beam/runners/spark/io/AvroPipelineTest.java | 7 +
.../beam/runners/spark/io/NumShardsTest.java | 7 +
.../io/hadoop/HadoopFileFormatPipelineTest.java | 7 +
.../translation/TransformTranslatorTest.java | 7 +
.../src/main/resources/beam/checkstyle.xml | 4 +-
.../org/apache/beam/sdk/io/FileBasedSink.java | 7 +-
.../beam/sdk/testing/HadoopWorkarounds.java | 129 +++++++++
sdks/java/io/hdfs/pom.xml | 9 +
.../beam/sdk/io/hdfs/HDFSFileSourceTest.java | 264 ++++++++++---------
sdks/java/maven-archetypes/starter/pom.xml | 3 +
12 files changed, 334 insertions(+), 131 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
index 36d3aef..1a56350 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.test.util.JavaProgramTestBase;
import java.io.File;
+import java.io.IOException;
import java.io.PrintWriter;
import java.net.URI;
@@ -75,6 +76,18 @@ public class WriteSinkITCase extends JavaProgramTestBase {
p.run();
}
+
+ @Override
+ public void stopCluster() throws Exception {
+ try {
+ super.stopCluster();
+ } catch (final IOException ioe) {
+ if (ioe.getMessage().startsWith("Unable to delete file")) {
+ // that's ok for the test itself, just the OS playing with us on cleanup phase
+ }
+ }
+ }
+
/**
* Simple custom sink which writes to a file.
*/
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
index 2b4464d..4980995 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -25,6 +25,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.HadoopWorkarounds;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
@@ -40,11 +41,13 @@ import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.commons.io.FileUtils;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import java.io.File;
+import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
@@ -61,6 +64,11 @@ public class SimpleWordCountTest {
private static final Set<String> EXPECTED_COUNT_SET =
ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
+ @BeforeClass
+ public static void initWin() throws IOException {
+ HadoopWorkarounds.winTests();
+ }
+
@Test
public void testInMem() throws Exception {
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index f358878..f6d0d55 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -25,6 +25,7 @@ import org.apache.beam.runners.spark.SparkPipelineRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.HadoopWorkarounds;
import org.apache.beam.sdk.values.PCollection;
import com.google.common.collect.Lists;
@@ -38,6 +39,7 @@ import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -58,6 +60,11 @@ public class AvroPipelineTest {
@Rule
public final TemporaryFolder tmpDir = new TemporaryFolder();
+ @BeforeClass
+ public static void initWin() throws IOException {
+ HadoopWorkarounds.winTests();
+ }
+
@Before
public void setUp() throws IOException {
inputFile = tmpDir.newFile("test.avro");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
index 23d4592..8a864c4 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -29,6 +29,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.HadoopWorkarounds;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollection;
@@ -38,6 +39,7 @@ import com.google.common.collect.Sets;
import com.google.common.io.Files;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -64,6 +66,11 @@ public class NumShardsTest {
@Rule
public final TemporaryFolder tmpDir = new TemporaryFolder();
+ @BeforeClass
+ public static void initWin() throws IOException {
+ HadoopWorkarounds.winTests();
+ }
+
@Before
public void setUp() throws IOException {
outputDir = tmpDir.newFolder("out");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
index eaa508c..767682e 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
@@ -26,6 +26,7 @@ import org.apache.beam.runners.spark.coders.WritableCoder;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.HadoopWorkarounds;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -40,6 +41,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -58,6 +60,11 @@ public class HadoopFileFormatPipelineTest {
@Rule
public final TemporaryFolder tmpDir = new TemporaryFolder();
+ @BeforeClass
+ public static void initWin() throws IOException {
+ HadoopWorkarounds.winTests();
+ }
+
@Before
public void setUp() throws IOException {
inputFile = tmpDir.newFile("test.seq");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
index b593316..fec0dc9 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
@@ -28,10 +28,12 @@ import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.testing.HadoopWorkarounds;
import org.apache.beam.sdk.values.PCollection;
import com.google.common.base.Charsets;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -51,6 +53,11 @@ import java.util.List;
public class TransformTranslatorTest {
@Rule public TemporaryFolder tmp = new TemporaryFolder();
+ @BeforeClass
+ public static void initWin() throws IOException {
+ HadoopWorkarounds.winTests();
+ }
+
/**
* Builds a simple pipeline with TextIO.Read and TextIO.Write, runs the pipeline
* in DirectRunner and on SparkPipelineRunner, with the mapped dataflow-to-spark
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml
----------------------------------------------------------------------
diff --git a/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml b/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml
index 311f599..457675a 100644
--- a/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml
+++ b/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml
@@ -29,7 +29,9 @@ page at http://checkstyle.sourceforge.net/config.html -->
<!-- Checks that there are no tab characters in the file. -->
</module>
- <module name="NewlineAtEndOfFile"/>
+ <module name="NewlineAtEndOfFile">
+ <property name="lineSeparator" value="lf" />
+ </module>
<module name="RegexpSingleline">
<!-- Checks that TODOs don't have stuff in parenthesis, e.g., username. -->
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 521f54b..045d6ad 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -35,6 +35,7 @@ import com.google.common.collect.Ordering;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.channels.WritableByteChannel;
@@ -645,7 +646,11 @@ public abstract class FileBasedSink<T> extends Sink<T> {
private void copyOne(String source, String destination) throws IOException {
try {
// Copy the source file, replacing the existing destination.
- Files.copy(Paths.get(source), Paths.get(destination), StandardCopyOption.REPLACE_EXISTING);
+ // Paths.get(x) will not work on win cause of the ":" after the drive letter
+ Files.copy(
+ new File(source).toPath(),
+ new File(destination).toPath(),
+ StandardCopyOption.REPLACE_EXISTING);
} catch (NoSuchFileException e) {
LOG.debug("{} does not exist.", source);
// Suppress exception if file does not exist.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java
new file mode 100644
index 0000000..ee2e135
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/HadoopWorkarounds.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.commons.compress.utils.IOUtils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+
+/**
+ * A simple class ensure winutils.exe can be found in the JVM.
+ */
+public class HadoopWorkarounds {
+ /**
+ * In practise this method only needs to be called once by JVM
+ * since hadoop uses static variables to store it.
+ *
+ * Note: ensure invocation is done before hadoop reads it
+ * and ensure this folder survives tests
+ * (avoid temporary folder usage since tests can share it).
+ *
+ * @param hadoopHome where to fake hadoop home.
+ */
+ public static void win(final File hadoopHome) {
+ // if (Shell.osType != Shell.OSType.OS_TYPE_WIN) { // don't do that to not load Shell
yet
+ if (!System.getProperty("os.name", "").startsWith("Windows")
+ || System.getProperty("hadoop.home.dir") != null) {
+ return;
+ }
+
+ // hadoop doesn't have winutils.exe :(: https://issues.apache.org/jira/browse/HADOOP-10051
+ // so use this github repo temporarly then just use the main tar.gz
+ /*
+ String hadoopVersion = VersionInfo.getVersion();
+ final URL url = new URL("https://archive.apache.org/dist/hadoop/common/
+ hadoop-" + hadoopVersion + "/hadoop-" + hadoopVersion + ".tar.gz");
+ final File hadoopTar = tmpFolder.newFile();
+ try (final InputStream is = new GZIPInputStream(url.openStream());
+ final OutputStream os = new FileOutputStream(hadoopTar)) {
+ System.out.println("Downloading Hadoop in " + hadoopTar + ", " +
+ "this can take a while, if you have it locally " +
+ "maybe set \"hadoop.home.dir\" system property");
+ IOUtils.copyLarge(is, os, new byte[1024 * 1024]);
+ }
+
+ final File hadoopHome = tmpFolder.newFolder();
+ try (final ArchiveInputStream stream = new TarArchiveInputStream(
+ new FileInputStream(hadoopTar))) {
+ ArchiveEntry entry;
+ while ((entry = stream.getNextEntry()) != null) {
+ if (entry.isDirectory()) {
+ FileUtils.forceMkdir(new File(hadoopHome, entry.getName()));
+ continue;
+ }
+ final File out = new File(hadoopHome, entry.getName());
+ FileUtils.forceMkdir(out.getParentFile());
+ try (final OutputStream os = new FileOutputStream(out)) {
+ IOUtils.copy(stream, os);
+ }
+ }
+ }
+
+ final String hadoopRoot = "hadoop-" + hadoopVersion;
+ final File[] files = hadoopHome.listFiles(new FileFilter() {
+ @Override
+ public boolean accept(final File pathname) {
+ return pathname.isDirectory() && pathname.getName().equals(hadoopRoot);
+ }
+ });
+ if (files == null || files.length != 1) {
+ throw new IllegalStateException("Didn't find hadoop in " + hadoopHome);
+ }
+ System.setProperty("hadoop.home.dir", files[0].getAbsolutePath());
+ */
+
+ System.out.println("You are on windows (sorry) and you don't set "
+ + "-Dhadoop.home.dir so we'll download winutils.exe");
+
+ new File(hadoopHome, "bin").mkdirs();
+ final URL url;
+ try {
+ url = new URL("https://github.com/steveloughran/winutils/"
+ + "raw/master/hadoop-2.7.1/bin/winutils.exe");
+ } catch (final MalformedURLException e) { // unlikely
+ throw new IllegalArgumentException(e);
+ }
+ try {
+ try (final InputStream is = url.openStream();
+ final OutputStream os = new FileOutputStream(
+ new File(hadoopHome, "bin/winutils.exe"))) {
+ try {
+ IOUtils.copy(is, os, 1024 * 1024);
+ } catch (final IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ } catch (final IOException e) {
+ throw new IllegalStateException(e);
+ }
+ System.setProperty("hadoop.home.dir", hadoopHome.getAbsolutePath());
+ }
+
+ /**
+ * Just a convenient win(File) invocation for tests.
+ */
+ public static void winTests() {
+ win(new File("target/hadoop-win"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/sdks/java/io/hdfs/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/pom.xml b/sdks/java/io/hdfs/pom.xml
index 9c30792..f8e3c14 100644
--- a/sdks/java/io/hdfs/pom.xml
+++ b/sdks/java/io/hdfs/pom.xml
@@ -83,5 +83,14 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+
+ <!-- see HDFSFileSourceTest commented block
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ <version>1.9</version>
+ <scope>test</scope>
+ </dependency>
+ -->
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
index 67df7bc..2ce1af7 100644
--- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSourceTest.java
@@ -28,9 +28,9 @@ import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.HadoopWorkarounds;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.values.KV;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
@@ -38,6 +38,7 @@ import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -53,138 +54,143 @@ import java.util.Random;
*/
public class HDFSFileSourceTest {
- Random random = new Random(0L);
-
- @Rule
- public TemporaryFolder tmpFolder = new TemporaryFolder();
-
- @Test
- public void testFullyReadSingleFile() throws Exception {
- PipelineOptions options = PipelineOptionsFactory.create();
- List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10,
0);
- File file = createFileWithData("tmp.seq", expectedResults);
-
- HDFSFileSource<IntWritable, Text> source =
- HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class,
- IntWritable.class, Text.class);
-
- assertEquals(file.length(), source.getEstimatedSizeBytes(null));
-
- assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
- }
-
- @Test
- public void testFullyReadFilePattern() throws IOException {
- PipelineOptions options = PipelineOptionsFactory.create();
- List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
- File file1 = createFileWithData("file1", data1);
-
- List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
- createFileWithData("file2", data2);
-
- List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
- createFileWithData("file3", data3);
-
- List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
- createFileWithData("otherfile", data4);
-
- HDFSFileSource<IntWritable, Text> source =
- HDFSFileSource.from(new File(file1.getParent(), "file*").toString(),
- SequenceFileInputFormat.class, IntWritable.class, Text.class);
- List<KV<IntWritable, Text>> expectedResults = new ArrayList<>();
- expectedResults.addAll(data1);
- expectedResults.addAll(data2);
- expectedResults.addAll(data3);
- assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
- }
-
- @Test
- public void testCloseUnstartedFilePatternReader() throws IOException {
- PipelineOptions options = PipelineOptionsFactory.create();
- List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
- File file1 = createFileWithData("file1", data1);
-
- List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
- createFileWithData("file2", data2);
-
- List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
- createFileWithData("file3", data3);
-
- List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
- createFileWithData("otherfile", data4);
-
- HDFSFileSource<IntWritable, Text> source =
- HDFSFileSource.from(new File(file1.getParent(), "file*").toString(),
- SequenceFileInputFormat.class, IntWritable.class, Text.class);
- Source.Reader<KV<IntWritable, Text>> reader = source.createReader(options);
- // Closing an unstarted FilePatternReader should not throw an exception.
- try {
- reader.close();
- } catch (Exception e) {
- fail("Closing an unstarted FilePatternReader should not throw an exception");
+ Random random = new Random(0L);
+
+ @Rule
+ public final TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @BeforeClass
+ public static void setUpOnWinWithMissingHadoopHome() throws IOException {
+ HadoopWorkarounds.winTests();
+ }
+
+ @Test
+ public void testFullyReadSingleFile() throws Exception {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3,
10, 0);
+ File file = createFileWithData("tmp.seq", expectedResults);
+
+ HDFSFileSource<IntWritable, Text> source =
+ HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class,
+ IntWritable.class, Text.class);
+
+ assertEquals(file.length(), source.getEstimatedSizeBytes(null));
+
+ assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
}
- }
-
- @Test
- public void testSplits() throws Exception {
- PipelineOptions options = PipelineOptionsFactory.create();
-
- List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3, 10000,
0);
- File file = createFileWithData("tmp.avro", expectedResults);
-
- HDFSFileSource<IntWritable, Text> source =
- HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class,
- IntWritable.class, Text.class);
-
- // Assert that the source produces the expected records
- assertEquals(expectedResults, readFromSource(source, options));
-
- // Split with a small bundle size (has to be at least size of sync interval)
- List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source
- .splitIntoBundles(SequenceFile.SYNC_INTERVAL, options);
- assertTrue(splits.size() > 2);
- SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
- int nonEmptySplits = 0;
- for (BoundedSource<KV<IntWritable, Text>> subSource : splits) {
- if (readFromSource(subSource, options).size() > 0) {
- nonEmptySplits += 1;
- }
+
+ @Test
+ public void testFullyReadFilePattern() throws IOException {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
+ File file1 = createFileWithData("file1", data1);
+
+ List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
+ createFileWithData("file2", data2);
+
+ List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
+ createFileWithData("file3", data3);
+
+ List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
+ createFileWithData("otherfile", data4);
+
+ HDFSFileSource<IntWritable, Text> source =
+ HDFSFileSource.from(new File(file1.getParent(), "file*").toString(),
+ SequenceFileInputFormat.class, IntWritable.class, Text.class);
+ List<KV<IntWritable, Text>> expectedResults = new ArrayList<>();
+ expectedResults.addAll(data1);
+ expectedResults.addAll(data2);
+ expectedResults.addAll(data3);
+ assertThat(expectedResults, containsInAnyOrder(readFromSource(source, options).toArray()));
+ }
+
+ @Test
+ public void testCloseUnstartedFilePatternReader() throws IOException {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ List<KV<IntWritable, Text>> data1 = createRandomRecords(3, 10, 0);
+ File file1 = createFileWithData("file1", data1);
+
+ List<KV<IntWritable, Text>> data2 = createRandomRecords(3, 10, 10);
+ createFileWithData("file2", data2);
+
+ List<KV<IntWritable, Text>> data3 = createRandomRecords(3, 10, 20);
+ createFileWithData("file3", data3);
+
+ List<KV<IntWritable, Text>> data4 = createRandomRecords(3, 10, 30);
+ createFileWithData("otherfile", data4);
+
+ HDFSFileSource<IntWritable, Text> source =
+ HDFSFileSource.from(new File(file1.getParent(), "file*").toString(),
+ SequenceFileInputFormat.class, IntWritable.class, Text.class);
+ Source.Reader<KV<IntWritable, Text>> reader = source.createReader(options);
+ // Closing an unstarted FilePatternReader should not throw an exception.
+ try {
+ reader.close();
+ } catch (Exception e) {
+ fail("Closing an unstarted FilePatternReader should not throw an exception");
+ }
+ }
+
+ @Test
+ public void testSplits() throws Exception {
+ PipelineOptions options = PipelineOptionsFactory.create();
+
+ List<KV<IntWritable, Text>> expectedResults = createRandomRecords(3,
10000, 0);
+ File file = createFileWithData("tmp.avro", expectedResults);
+
+ HDFSFileSource<IntWritable, Text> source =
+ HDFSFileSource.from(file.toString(), SequenceFileInputFormat.class,
+ IntWritable.class, Text.class);
+
+ // Assert that the source produces the expected records
+ assertEquals(expectedResults, readFromSource(source, options));
+
+ // Split with a small bundle size (has to be at least size of sync interval)
+ List<? extends BoundedSource<KV<IntWritable, Text>>> splits = source
+ .splitIntoBundles(SequenceFile.SYNC_INTERVAL, options);
+ assertTrue(splits.size() > 2);
+ SourceTestUtils.assertSourcesEqualReferenceSource(source, splits, options);
+ int nonEmptySplits = 0;
+ for (BoundedSource<KV<IntWritable, Text>> subSource : splits) {
+ if (readFromSource(subSource, options).size() > 0) {
+ nonEmptySplits += 1;
+ }
+ }
+ assertTrue(nonEmptySplits > 2);
}
- assertTrue(nonEmptySplits > 2);
- }
-
- private File createFileWithData(String filename, List<KV<IntWritable, Text>>
records)
- throws IOException {
- File tmpFile = tmpFolder.newFile(filename);
- try (Writer writer = SequenceFile.createWriter(new Configuration(),
- Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class),
- Writer.file(new Path(tmpFile.toURI())))) {
-
- for (KV<IntWritable, Text> record : records) {
- writer.append(record.getKey(), record.getValue());
- }
+
+ private File createFileWithData(String filename, List<KV<IntWritable, Text>>
records)
+ throws IOException {
+ File tmpFile = tmpFolder.newFile(filename);
+ try (Writer writer = SequenceFile.createWriter(new Configuration(),
+ Writer.keyClass(IntWritable.class), Writer.valueClass(Text.class),
+ Writer.file(new Path(tmpFile.toURI())))) {
+
+ for (KV<IntWritable, Text> record : records) {
+ writer.append(record.getKey(), record.getValue());
+ }
+ }
+ return tmpFile;
}
- return tmpFile;
- }
-
- private List<KV<IntWritable, Text>> createRandomRecords(int dataItemLength,
- int numItems, int offset) {
- List<KV<IntWritable, Text>> records = new ArrayList<>();
- for (int i = 0; i < numItems; i++) {
- IntWritable key = new IntWritable(i + offset);
- Text value = new Text(createRandomString(dataItemLength));
- records.add(KV.of(key, value));
+
+ private List<KV<IntWritable, Text>> createRandomRecords(int dataItemLength,
+ int numItems, int offset) {
+ List<KV<IntWritable, Text>> records = new ArrayList<>();
+ for (int i = 0; i < numItems; i++) {
+ IntWritable key = new IntWritable(i + offset);
+ Text value = new Text(createRandomString(dataItemLength));
+ records.add(KV.of(key, value));
+ }
+ return records;
}
- return records;
- }
-
- private String createRandomString(int length) {
- char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray();
- StringBuilder builder = new StringBuilder();
- for (int i = 0; i < length; i++) {
- builder.append(chars[random.nextInt(chars.length)]);
+
+ private String createRandomString(int length) {
+ char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray();
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < length; i++) {
+ builder.append(chars[random.nextInt(chars.length)]);
+ }
+ return builder.toString();
}
- return builder.toString();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/41883300/sdks/java/maven-archetypes/starter/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/starter/pom.xml b/sdks/java/maven-archetypes/starter/pom.xml
index 5b6cb2a..9fb21e9 100644
--- a/sdks/java/maven-archetypes/starter/pom.xml
+++ b/sdks/java/maven-archetypes/starter/pom.xml
@@ -60,6 +60,9 @@
<goals>
<goal>integration-test</goal>
</goals>
+ <configuration>
+ <ignoreEOLStyle>true</ignoreEOLStyle> <!-- for win -->
+ </configuration>
</execution>
</executions>
</plugin>
|