beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] incubator-beam git commit: Add SortValues
Date Tue, 01 Nov 2016 17:40:24 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master afb1e91f5 -> fab7b2402


Add SortValues

an extension that provides a PTransform which performs
local(non-distributed) sorting. It will sort in memory until the buffer
is full, then flush to disk and use external sorting.

Consumes a PCollection of KVs from primary key to iterable of secondary
key and value KVs and sorts the iterables. Would probably be called
after a GroupByKey. Uses coders to convert secondary keys and values
into byte arrays and does a lexicographical comparison on the secondary
keys.

Uses Hadoop as an external sorting library.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/49ee9299
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/49ee9299
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/49ee9299

Branch: refs/heads/master
Commit: 49ee9299636715403d1b0c17119373fc360f660b
Parents: afb1e91
Author: Mitch Shanklin <mshanklin@google.com>
Authored: Tue Oct 25 16:17:01 2016 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Tue Nov 1 10:40:20 2016 -0700

----------------------------------------------------------------------
 sdks/java/extensions/pom.xml                    |   1 +
 sdks/java/extensions/sorter/README.md           |  42 ++++
 sdks/java/extensions/sorter/pom.xml             | 167 ++++++++++++++
 .../sorter/BufferedExternalSorter.java          | 125 +++++++++++
 .../sdk/extensions/sorter/ExternalSorter.java   | 225 +++++++++++++++++++
 .../sdk/extensions/sorter/InMemorySorter.java   | 166 ++++++++++++++
 .../beam/sdk/extensions/sorter/SortValues.java  | 213 ++++++++++++++++++
 .../beam/sdk/extensions/sorter/Sorter.java      |  47 ++++
 .../sdk/extensions/sorter/package-info.java     |  23 ++
 .../sorter/BufferedExternalSorterTest.java      | 177 +++++++++++++++
 .../extensions/sorter/ExternalSorterTest.java   |  87 +++++++
 .../extensions/sorter/InMemorySorterTest.java   | 144 ++++++++++++
 .../sdk/extensions/sorter/SortValuesTest.java   | 128 +++++++++++
 .../sdk/extensions/sorter/SorterTestUtils.java  | 129 +++++++++++
 14 files changed, 1674 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/49ee9299/sdks/java/extensions/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/pom.xml b/sdks/java/extensions/pom.xml
index 4328d3d..fc90edf 100644
--- a/sdks/java/extensions/pom.xml
+++ b/sdks/java/extensions/pom.xml
@@ -33,6 +33,7 @@
 
   <modules>
     <module>join-library</module>
+    <module>sorter</module>
   </modules>
 
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/49ee9299/sdks/java/extensions/sorter/README.md
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/README.md b/sdks/java/extensions/sorter/README.md
new file mode 100644
index 0000000..80d2a40
--- /dev/null
+++ b/sdks/java/extensions/sorter/README.md
@@ -0,0 +1,42 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+#Sorter
+This module provides the SortValues transform, which takes a `PCollection<KV<K, Iterable<KV<K2, V>>>>` and produces a `PCollection<KV<K, Iterable<KV<K2, V>>>>` where, for each primary key `K` the paired `Iterable<KV<K2, V>>` has been sorted by the byte encoding of secondary key (`K2`). It will efficiently and scalably sort the iterables, even if they are large (do not fit in memory).
+
+##Caveats
+* This transform performs value-only sorting; the iterable accompanying each key is sorted, but *there is no relationship between different keys*, as Beam does not support any defined relationship between different elements in a PCollection.
+* Each `Iterable<KV<K2, V>>` is sorted on a single worker using local memory and disk. This means that `SortValues` may be a performance and/or scalability bottleneck when used in different pipelines. For example, users are discouraged from using `SortValues` on a `PCollection` of a single element to globally sort a large `PCollection`.
+
+##Options
+* The user can customize the temporary location used if sorting requires spilling to disk and the maximum amount of memory to use by creating a custom instance of `BufferedExternalSorter.Options` to pass into `SortValues.create`.
+
+##Using `SortValues`
+~~~~
+PCollection<KV<String, KV<String, Integer>>> input = ...
+
+// Group by primary key, bringing <SecondaryKey, Value> pairs for the same key together.
+PCollection<KV<String, Iterable<KV<String, Integer>>>> grouped =
+    input.apply(GroupByKey.<String, KV<String, Integer>>create());
+
+// For every primary key, sort the iterable of <SecondaryKey, Value> pairs by secondary key.
+PCollection<KV<String, Iterable<KV<String, Integer>>>> groupedAndSorted =
+    grouped.apply(
+        SortValues.<String, String, Integer>create(new BufferedExternalSorter.Options()));
+~~~~
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/49ee9299/sdks/java/extensions/sorter/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/pom.xml b/sdks/java/extensions/sorter/pom.xml
new file mode 100644
index 0000000..e77d5ad
--- /dev/null
+++ b/sdks/java/extensions/sorter/pom.xml
@@ -0,0 +1,167 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.beam</groupId>
+    <artifactId>beam-sdks-java-extensions-parent</artifactId>
+    <version>0.4.0-incubating-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-sdks-java-extensions-sorter</artifactId>
+  <name>Apache Beam :: SDKs :: Java :: Extensions :: Sorter</name>
+  
+  <properties>
+    <hadoop.version>2.7.1</hadoop.version>
+  </properties>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <systemPropertyVariables>
+            <beamUseDummyRunner>false</beamUseDummyRunner>
+          </systemPropertyVariables>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-source-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+      </plugin>
+
+      <!-- Shading Hadoop dependency so that users may use their own version 
+           of Hadoop without interference from this module. -->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>bundle-and-repackage</id>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <shadeTestJar>true</shadeTestJar>
+              <artifactSet>
+                <includes>
+                  <include>org.apache.hadoop:hadoop-mapreduce-client-core</include>
+                  <include>org.apache.hadoop:hadoop-common</include>
+                  <include>com.google.guava:guava</include>
+                </includes>
+              </artifactSet>
+              <filters>
+                <filter>
+                  <artifact>*:*</artifact>
+                  <excludes>
+                    <exclude>META-INF/*.SF</exclude>
+                    <exclude>META-INF/*.DSA</exclude>
+                    <exclude>META-INF/*.RSA</exclude>
+                  </excludes>
+                </filter>
+              </filters>
+              <relocations>
+                <relocation>
+                  <pattern>org.apache.hadoop</pattern>
+                  <shadedPattern>org.apache.beam.repackaged.org.apache.hadoop</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.google.common</pattern>
+                  <shadedPattern>org.apache.beam.repackaged.com.google.common</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.google.thirdparty</pattern>
+                  <shadedPattern>org.apache.beam.repackaged.com.google.thirdparty</shadedPattern>
+                </relocation>
+              </relocations>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+
+
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+    
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.version}</version>
+    </dependency>
+    
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <!-- test dependencies -->
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-direct-java</artifactId>
+      <scope>test</scope>
+    </dependency>
+    
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/49ee9299/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorter.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorter.java b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorter.java
new file mode 100644
index 0000000..0f89e30
--- /dev/null
+++ b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorter.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sorter;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * {@link Sorter} that will use in memory sorting until the values can't fit into memory and will
+ * then fall back to external sorting.
+ */
+public class BufferedExternalSorter implements Sorter {
+  /** Contains configuration for the sorter. */
+  public static class Options implements Serializable {
+    private String tempLocation = "/tmp";
+    private int memoryMB = 100;
+
+    /** Sets the path to a temporary location where the sorter writes intermediate files. */
+    public void setTempLocation(String tempLocation) {
+      checkArgument(
+          !tempLocation.startsWith("gs://"),
+          "BufferedExternalSorter does not support GCS temporary location");
+
+      this.tempLocation = tempLocation;
+    }
+
+    /** Returns the configured temporary location. */
+    public String getTempLocation() {
+      return tempLocation;
+    }
+
+    /**
+     * Sets the size of the memory buffer in megabytes. This controls both the buffer for initial in
+     * memory sorting and the buffer used when external sorting. Must be greater than zero.
+     */
+    public void setMemoryMB(int memoryMB) {
+      checkArgument(memoryMB > 0, "memoryMB must be greater than zero");
+      this.memoryMB = memoryMB;
+    }
+
+    /** Returns the configured size of the memory buffer. */
+    public int getMemoryMB() {
+      return memoryMB;
+    }
+  }
+
+  private ExternalSorter externalSorter;
+  private InMemorySorter inMemorySorter;
+
+  boolean inMemorySorterFull;
+
+  BufferedExternalSorter(ExternalSorter externalSorter, InMemorySorter inMemorySorter) {
+    this.externalSorter = externalSorter;
+    this.inMemorySorter = inMemorySorter;
+  }
+
+  public static BufferedExternalSorter create(Options options) {
+    ExternalSorter.Options externalSorterOptions = new ExternalSorter.Options();
+    externalSorterOptions.setMemoryMB(options.getMemoryMB());
+    externalSorterOptions.setTempLocation(options.getTempLocation());
+
+    InMemorySorter.Options inMemorySorterOptions = new InMemorySorter.Options();
+    inMemorySorterOptions.setMemoryMB(options.getMemoryMB());
+
+    return new BufferedExternalSorter(
+        ExternalSorter.create(externalSorterOptions), InMemorySorter.create(inMemorySorterOptions));
+  }
+
+  @Override
+  public void add(KV<byte[], byte[]> record) throws IOException {
+    if (!inMemorySorterFull) {
+      if (inMemorySorter.addIfRoom(record)) {
+        return;
+      } else {
+        // Flushing contents of in memory sorter to external sorter so we can rely on external
+        // from here on out
+        inMemorySorterFull = true;
+        transferToExternalSorter();
+      }
+    }
+
+    // In memory sorter is full, so put in external sorter instead
+    externalSorter.add(record);
+  }
+
+  /**
+   * Transfers all of the records loaded so far into the in memory sorter over to the external
+   * sorter.
+   */
+  private void transferToExternalSorter() throws IOException {
+    for (KV<byte[], byte[]> record : inMemorySorter.sort()) {
+      externalSorter.add(record);
+    }
+    // Allow in memory sorter and its contents to be garbage collected
+    inMemorySorter = null;
+  }
+
+  @Override
+  public Iterable<KV<byte[], byte[]>> sort() throws IOException {
+    if (!inMemorySorterFull) {
+      return inMemorySorter.sort();
+    } else {
+      return externalSorter.sort();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/49ee9299/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java
new file mode 100644
index 0000000..3cf0cc0
--- /dev/null
+++ b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/ExternalSorter.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sorter;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import org.apache.beam.sdk.values.KV;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.io.SequenceFile.Sorter.RawKeyValueIterator;
+import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.mapred.JobConf;
+
+/** Does an external sort of the provided values using Hadoop's {@link SequenceFile}. */
+class ExternalSorter implements Sorter {
+  private Options options;
+
+  /** Whether {@link #sort()} was already called. */
+  private boolean sortCalled = false;
+
+  /** SequenceFile Writer for writing all input data to a file. */
+  private Writer writer;
+
+  /** Sorter used to sort the input file. */
+  private SequenceFile.Sorter sorter;
+
+  /** Temporary directory for input and intermediate files. */
+  private Path tempDir;
+
+  /** The list of input files to be sorted. */
+  private Path[] paths;
+
+  private boolean initialized = false;
+
+  /** {@link Options} contains configuration of the sorter. */
+  public static class Options implements Serializable {
+    private String tempLocation = "/tmp";
+    private int memoryMB = 100;
+
+    /** Sets the path to a temporary location where the sorter writes intermediate files. */
+    public void setTempLocation(String tempLocation) {
+      if (tempLocation.startsWith("gs://")) {
+        throw new IllegalArgumentException("Sorter doesn't support GCS temporary location.");
+      }
+
+      this.tempLocation = tempLocation;
+    }
+
+    /** Returns the configured temporary location. */
+    public String getTempLocation() {
+      return tempLocation;
+    }
+
+    /** Sets the size of the memory buffer in megabytes. */
+    public void setMemoryMB(int memoryMB) {
+      checkArgument(memoryMB > 0, "memoryMB must be greater than zero");
+      this.memoryMB = memoryMB;
+    }
+
+    /** Returns the configured size of the memory buffer. */
+    public int getMemoryMB() {
+      return memoryMB;
+    }
+  }
+
+  /** Returns a {@link Sorter} configured with the given {@link Options}. */
+  public static ExternalSorter create(Options options) {
+    return new ExternalSorter(options);
+  }
+
+  @Override
+  public void add(KV<byte[], byte[]> record) throws IOException {
+    checkState(!sortCalled, "Records can only be added before sort()");
+
+    initHadoopSorter();
+
+    BytesWritable key = new BytesWritable(record.getKey());
+    BytesWritable value = new BytesWritable(record.getValue());
+
+    writer.append(key, value);
+  }
+
+  @Override
+  public Iterable<KV<byte[], byte[]>> sort() throws IOException {
+    checkState(!sortCalled, "sort() can only be called once.");
+    sortCalled = true;
+
+    initHadoopSorter();
+
+    writer.close();
+
+    return new SortedRecordsIterable();
+  }
+
+  private ExternalSorter(Options options) {
+    this.options = options;
+  }
+
+  /**
+   * Initializes the hadoop sorter. Does some local file system setup, and is somewhat expensive
+   * (~20 ms on local machine). Only executed when necessary.
+   */
+  private void initHadoopSorter() throws IOException {
+    if (!initialized) {
+      tempDir = new Path(options.getTempLocation(), "tmp" + UUID.randomUUID().toString());
+      paths = new Path[] {new Path(tempDir, "test.seq")};
+
+      JobConf conf = new JobConf();
+      writer =
+          SequenceFile.createWriter(
+              conf,
+              Writer.valueClass(BytesWritable.class),
+              Writer.keyClass(BytesWritable.class),
+              Writer.file(paths[0]),
+              Writer.compression(CompressionType.NONE));
+
+      FileSystem fs = FileSystem.getLocal(conf);
+      sorter =
+          new SequenceFile.Sorter(
+              fs, new BytesWritable.Comparator(), BytesWritable.class, BytesWritable.class, conf);
+      sorter.setMemory(options.getMemoryMB() * 1024 * 1024);
+
+      initialized = true;
+    }
+  }
+
+  /** An {@link Iterable} producing the iterators over sorted data. */
+  private class SortedRecordsIterable implements Iterable<KV<byte[], byte[]>> {
+    @Override
+    public Iterator<KV<byte[], byte[]>> iterator() {
+      return new SortedRecordsIterator();
+    }
+  }
+
+  /** An {@link Iterator} producing the sorted data. */
+  private class SortedRecordsIterator implements Iterator<KV<byte[], byte[]>> {
+    private RawKeyValueIterator iterator;
+
+    /** Next {@link KV} to return from {@link #next()}. */
+    private KV<byte[], byte[]> nextKV = null;
+
+    SortedRecordsIterator() {
+      try {
+        this.iterator = sorter.sortAndIterate(paths, tempDir, false);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
+      nextKV = KV.of(null, null); // A dummy value that will be overwritten by next().
+      next();
+    }
+
+    @Override
+    public boolean hasNext() {
+      return nextKV != null;
+    }
+
+    @Override
+    public KV<byte[], byte[]> next() {
+      if (nextKV == null) {
+        throw new NoSuchElementException();
+      }
+
+      KV<byte[], byte[]> current = nextKV;
+
+      try {
+        if (iterator.next()) {
+          // Parse key from DataOutputBuffer.
+          ByteArrayInputStream keyStream = new ByteArrayInputStream(iterator.getKey().getData());
+          BytesWritable key = new BytesWritable();
+          key.readFields(new DataInputStream(keyStream));
+
+          // Parse value from ValueBytes.
+          ByteArrayOutputStream valOutStream = new ByteArrayOutputStream();
+          iterator.getValue().writeUncompressedBytes(new DataOutputStream(valOutStream));
+          ByteArrayInputStream valInStream = new ByteArrayInputStream(valOutStream.toByteArray());
+          BytesWritable value = new BytesWritable();
+          value.readFields(new DataInputStream(valInStream));
+
+          nextKV = KV.of(key.copyBytes(), value.copyBytes());
+        } else {
+          nextKV = null;
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+
+      return current;
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("Iterator does not support remove");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/49ee9299/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/InMemorySorter.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/InMemorySorter.java b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/InMemorySorter.java
new file mode 100644
index 0000000..9203130
--- /dev/null
+++ b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/InMemorySorter.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sorter;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.primitives.UnsignedBytes;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * Sorts {@code <key, value>} pairs in memory. Based on the configured size of the memory buffer,
+ * will reject additional pairs.
+ */
+class InMemorySorter implements Sorter {
+  /** {@code Options} contains configuration of the sorter. */
+  public static class Options implements Serializable {
+    private int memoryMB = 100;
+
+    /** Sets the size of the memory buffer in megabytes. */
+    public void setMemoryMB(int memoryMB) {
+      checkArgument(memoryMB > 0, "memoryMB must be greater than zero");
+      this.memoryMB = memoryMB;
+    }
+
+    /** Returns the configured size of the memory buffer. */
+    public int getMemoryMB() {
+      return memoryMB;
+    }
+  }
+
+  /** The comparator to use to sort the records by key. */
+  private static final Comparator<byte[]> COMPARATOR = UnsignedBytes.lexicographicalComparator();
+
+  /** How many bytes per word in the running JVM. Assumes 64 bit/8 bytes if unknown. */
+  private static final int NUM_BYTES_PER_WORD = getNumBytesPerWord();
+
+  /**
+   * Estimate of memory overhead per KV record in bytes not including memory associated with keys
+   * and values.
+   *
+   * <ul>
+   *   <li> Object reference within {@link ArrayList} (1 word),
+   *   <li> A {@link KV} (2 words),
+   *   <li> Two byte arrays (2 words for array lengths),
+   *   <li> Per-object overhead (JVM-specific, guessing 2 words * 3 objects)
+   * </ul>
+   */
+  private static final int RECORD_MEMORY_OVERHEAD_ESTIMATE = 11 * NUM_BYTES_PER_WORD;
+
+  /** Maximum size of the buffer in bytes. */
+  private int maxBufferSize;
+
+  /** Current number of stored bytes. Including estimated overhead bytes. */
+  private int numBytes;
+
+  /** Whether sort has been called. */
+  private boolean sortCalled;
+
+  /** The stored records to be sorted. */
+  private ArrayList<KV<byte[], byte[]>> records = new ArrayList<KV<byte[], byte[]>>();
+
+  /** Private constructor. */
+  private InMemorySorter(Options options) {
+    maxBufferSize = options.getMemoryMB() * 1024 * 1024;
+  }
+
+  /** Create a new sorter from provided options. */
+  public static InMemorySorter create(Options options) {
+    return new InMemorySorter(options);
+  }
+
+  @Override
+  public void add(KV<byte[], byte[]> record) {
+    checkState(addIfRoom(record), "No space remaining for in memory sorting");
+  }
+
+  /** Adds the record is there is room and returns true. Otherwise returns false. */
+  public boolean addIfRoom(KV<byte[], byte[]> record) {
+    checkState(!sortCalled, "Records can only be added before sort()");
+
+    int recordBytes = estimateRecordBytes(record);
+    if (roomInBuffer(numBytes + recordBytes, records.size() + 1)) {
+      records.add(record);
+      numBytes += recordBytes;
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public Iterable<KV<byte[], byte[]>> sort() {
+    checkState(!sortCalled, "sort() can only be called once.");
+
+    sortCalled = true;
+
+    Comparator<KV<byte[], byte[]>> kvComparator =
+        new Comparator<KV<byte[], byte[]>>() {
+
+          @Override
+          public int compare(KV<byte[], byte[]> o1, KV<byte[], byte[]> o2) {
+            return COMPARATOR.compare(o1.getKey(), o2.getKey());
+          }
+        };
+    Collections.sort(records, kvComparator);
+    return Collections.unmodifiableList(records);
+  }
+
+  /**
+   * Estimate the number of additional bytes required to store this record. Including the key, the
+   * value and any overhead for objects and references.
+   */
+  private int estimateRecordBytes(KV<byte[], byte[]> record) {
+    return RECORD_MEMORY_OVERHEAD_ESTIMATE + record.getKey().length + record.getValue().length;
+  }
+
+  /**
+   * Check whether we have room to store the provided total number of bytes and total number of
+   * records.
+   */
+  private boolean roomInBuffer(int numBytes, int numRecords) {
+    // Collections.sort may allocate up to n/2 extra object references.
+    // Also, ArrayList grows by a factor of 1.5x, so there might be up to n/2 null object
+    // references in the backing array.
+    // And finally, in Java 7, Collections.sort performs a defensive copy to an array in case the
+    // input list is a LinkedList.
+    // So assume we need an additional overhead of two words per record in the worst case
+    return (numBytes + (numRecords * NUM_BYTES_PER_WORD * 2)) < maxBufferSize;
+  }
+
+  /**
+   * Returns the number of bytes in a word according to the JVM. Defaults to 8 for 64 bit if answer
+   * unknown.
+   */
+  private static int getNumBytesPerWord() {
+    String bitsPerWord = System.getProperty("sun.arch.data.model");
+
+    try {
+      return Integer.parseInt(bitsPerWord) / 8;
+    } catch (Exception e) {
+      // Can't determine whether 32 or 64 bit, so assume 64
+      return 8;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/49ee9299/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
new file mode 100644
index 0000000..3bd9afa
--- /dev/null
+++ b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/SortValues.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sorter;
+
+import java.io.IOException;
+import java.util.Iterator;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * {@code SortValues<PrimaryKeyT, SecondaryKeyT, ValueT>} takes a {@code PCollection<KV<PrimaryKeyT,
+ * Iterable<KV<SecondaryKeyT, ValueT>>>>} with elements consisting of a primary key and iterables
+ * over {@code <secondary key, value>} pairs, and returns a {@code PCollection<KV<PrimaryKeyT,
+ * Iterable<KV<SecondaryKeyT, ValueT>>>} of the same elements but with values sorted by a secondary
+ * key.
+ *
+ * <p>This transform ignores the primary key, there is no guarantee of any relationship between
+ * elements with different primary keys. The primary key is explicit here only because this
+ * transform is typically used on a result of a {@link GroupByKey} transform.
+ *
+ * <p>This transform sorts by lexicographic comparison of the byte representations of the secondary
+ * keys and may write secondary key-value pairs to disk. In order to retrieve the byte
+ * representations it requires the input PCollection to use a {@link KvCoder} for its input, an
+ * {@link IterableCoder} for its input values and a {@link KvCoder} for its secondary key-value
+ * pairs.
+ */
+public class SortValues<PrimaryKeyT, SecondaryKeyT, ValueT>
+    extends PTransform<
+        PCollection<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>>,
+        PCollection<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>>> {
+
+  private BufferedExternalSorter.Options sorterOptions;
+
+  private SortValues(BufferedExternalSorter.Options sorterOptions) {
+    this.sorterOptions = sorterOptions;
+  }
+
+  /**
+   * Returns a {@code SortValues<PrimaryKeyT, SecondaryKeyT, ValueT>} {@link PTransform}.
+   *
+   * @param <PrimaryKeyT> the type of the primary keys of the input and output {@code PCollection}s
+   * @param <SecondaryKeyT> the type of the secondary (sort) keys of the input and output {@code
+   *     PCollection}s
+   * @param <ValueT> the type of the values of the input and output {@code PCollection}s
+   */
+  public static <PrimaryKeyT, SecondaryKeyT, ValueT>
+      SortValues<PrimaryKeyT, SecondaryKeyT, ValueT> create(
+          BufferedExternalSorter.Options sorterOptions) {
+    return new SortValues<>(sorterOptions);
+  }
+
+  @Override
+  public PCollection<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> apply(
+      PCollection<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> input) {
+    return input.apply(
+        ParDo.of(
+            new SortValuesDoFn<PrimaryKeyT, SecondaryKeyT, ValueT>(
+                sorterOptions,
+                getSecondaryKeyCoder(input.getCoder()),
+                getValueCoder(input.getCoder()))));
+  }
+
+  @Override
+  protected Coder<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> getDefaultOutputCoder(
+      PCollection<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> input) {
+    return input.getCoder();
+  }
+
+  /** Retrieves the {@link Coder} for the secondary key-value pairs. */
+  @SuppressWarnings("unchecked")
+  private static <PrimaryKeyT, SecondaryKeyT, ValueT>
+      KvCoder<SecondaryKeyT, ValueT> getSecondaryKeyValueCoder(
+          Coder<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> inputCoder) {
+    if (!(inputCoder instanceof KvCoder)) {
+      throw new IllegalStateException("SortValues requires its input to use KvCoder");
+    }
+    @SuppressWarnings("unchecked")
+    KvCoder<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>> kvCoder =
+        (KvCoder<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>) (inputCoder);
+
+    if (!(kvCoder.getValueCoder() instanceof IterableCoder)) {
+      throw new IllegalStateException(
+          "SortValues requires the values be encoded with IterableCoder");
+    }
+    IterableCoder<KV<SecondaryKeyT, ValueT>> iterableCoder =
+        (IterableCoder<KV<SecondaryKeyT, ValueT>>) (kvCoder.getValueCoder());
+
+    if (!(iterableCoder.getElemCoder() instanceof KvCoder)) {
+      throw new IllegalStateException(
+          "SortValues requires the secondary key-value pairs to use KvCoder");
+    }
+    return (KvCoder<SecondaryKeyT, ValueT>) (iterableCoder.getElemCoder());
+  }
+
+  /** Retrieves the {@link Coder} for the secondary keys. */
+  private static <PrimaryKeyT, SecondaryKeyT, ValueT> Coder<SecondaryKeyT> getSecondaryKeyCoder(
+      Coder<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> inputCoder) {
+    return getSecondaryKeyValueCoder(inputCoder).getKeyCoder();
+  }
+
+  /** Returns the {@code Coder} of the values associated with the secondary keys. */
+  private static <PrimaryKeyT, SecondaryKeyT, ValueT> Coder<ValueT> getValueCoder(
+      Coder<KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> inputCoder) {
+    return getSecondaryKeyValueCoder(inputCoder).getValueCoder();
+  }
+
+  private static class SortValuesDoFn<PrimaryKeyT, SecondaryKeyT, ValueT>
+      extends DoFn<
+          KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>,
+          KV<PrimaryKeyT, Iterable<KV<SecondaryKeyT, ValueT>>>> {
+    private final BufferedExternalSorter.Options sorterOptions;
+    private final Coder<SecondaryKeyT> keyCoder;
+    private final Coder<ValueT> valueCoder;
+
+    SortValuesDoFn(
+        BufferedExternalSorter.Options sorterOptions,
+        Coder<SecondaryKeyT> keyCoder,
+        Coder<ValueT> valueCoder) {
+      this.sorterOptions = sorterOptions;
+      this.keyCoder = keyCoder;
+      this.valueCoder = valueCoder;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      Iterable<KV<SecondaryKeyT, ValueT>> records = c.element().getValue();
+
+      try {
+        Sorter sorter = BufferedExternalSorter.create(sorterOptions);
+        for (KV<SecondaryKeyT, ValueT> record : records) {
+          sorter.add(
+              KV.of(
+                  CoderUtils.encodeToByteArray(keyCoder, record.getKey()),
+                  CoderUtils.encodeToByteArray(valueCoder, record.getValue())));
+        }
+
+        c.output(
+            KV.of(
+                c.element().getKey(),
+                (Iterable<KV<SecondaryKeyT, ValueT>>) (new DecodingIterable(sorter.sort()))));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    private class DecodingIterable implements Iterable<KV<SecondaryKeyT, ValueT>> {
+      Iterable<KV<byte[], byte[]>> iterable;
+
+      DecodingIterable(Iterable<KV<byte[], byte[]>> iterable) {
+        this.iterable = iterable;
+      }
+
+      @Override
+      public Iterator<KV<SecondaryKeyT, ValueT>> iterator() {
+        return new DecodingIterator(iterable.iterator());
+      }
+    }
+
+    private class DecodingIterator implements Iterator<KV<SecondaryKeyT, ValueT>> {
+      Iterator<KV<byte[], byte[]>> iterator;
+
+      DecodingIterator(Iterator<KV<byte[], byte[]>> iterator) {
+        this.iterator = iterator;
+      }
+
+      @Override
+      public boolean hasNext() {
+        return iterator.hasNext();
+      }
+
+      @Override
+      public KV<SecondaryKeyT, ValueT> next() {
+        KV<byte[], byte[]> next = iterator.next();
+        try {
+          return KV.of(
+              CoderUtils.decodeFromByteArray(keyCoder, next.getKey()),
+              CoderUtils.decodeFromByteArray(valueCoder, next.getValue()));
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException("Iterator does not support remove");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/49ee9299/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/Sorter.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/Sorter.java b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/Sorter.java
new file mode 100644
index 0000000..c93a6c1
--- /dev/null
+++ b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/Sorter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sorter;
+
+import java.io.IOException;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * Interface for classes which can sort {@code <key, value>} pairs by the key.
+ *
+ * <p>Records must first be added by calling {@link #add(KV)}. Then {@link #sort()} can be called at
+ * most once.
+ *
+ * <p>TODO: Support custom comparison functions.
+ */
+interface Sorter {
+
+  /**
+   * Adds a given record to the sorter.
+   *
+   * <p>Records can only be added before calling {@link #sort()}.
+   */
+  void add(KV<byte[], byte[]> record) throws IOException;
+
+  /**
+   * Sorts the added elements and returns an {@link Iterable} over the sorted elements.
+   *
+   * <p>Can be called at most once.
+   */
+  Iterable<KV<byte[], byte[]>> sort() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/49ee9299/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/package-info.java b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/package-info.java
new file mode 100644
index 0000000..494ec14
--- /dev/null
+++ b/sdks/java/extensions/sorter/src/main/java/org/apache/beam/sdk/extensions/sorter/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Utility for performing local sort of potentially large sets of values. Will sort in memory and
+ * spill to disk for external sorting if necessary.
+ */
+package org.apache.beam.sdk.extensions.sorter;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/49ee9299/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorterTest.java b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorterTest.java
new file mode 100644
index 0000000..63dbedf
--- /dev/null
+++ b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/BufferedExternalSorterTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sorter;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import org.apache.beam.sdk.extensions.sorter.SorterTestUtils.SorterGenerator;
+import org.apache.beam.sdk.values.KV;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link BufferedExternalSorter}. */
+@RunWith(JUnit4.class)
+public class BufferedExternalSorterTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testNoFallback() throws Exception {
+    ExternalSorter mockExternalSorter = mock(ExternalSorter.class);
+    InMemorySorter mockInMemorySorter = mock(InMemorySorter.class);
+    BufferedExternalSorter testSorter =
+        new BufferedExternalSorter(mockExternalSorter, mockInMemorySorter);
+
+    KV<byte[], byte[]>[] kvs =
+        new KV[] {
+          KV.of(new byte[] {0}, new byte[] {}),
+          KV.of(new byte[] {0, 1}, new byte[] {}),
+          KV.of(new byte[] {1}, new byte[] {})
+        };
+
+    when(mockInMemorySorter.addIfRoom(kvs[0])).thenReturn(true);
+    when(mockInMemorySorter.addIfRoom(kvs[1])).thenReturn(true);
+    when(mockInMemorySorter.addIfRoom(kvs[2])).thenReturn(true);
+    when(mockInMemorySorter.sort()).thenReturn(Arrays.asList(kvs[0], kvs[1], kvs[2]));
+
+    testSorter.add(kvs[0]);
+    testSorter.add(kvs[1]);
+    testSorter.add(kvs[2]);
+
+    assertEquals(Arrays.asList(kvs[0], kvs[1], kvs[2]), testSorter.sort());
+
+    // Verify external sorter was never called
+    verify(mockExternalSorter, never()).add(any(KV.class));
+    verify(mockExternalSorter, never()).sort();
+  }
+
+  @Test
+  public void testFallback() throws Exception {
+    ExternalSorter mockExternalSorter = mock(ExternalSorter.class);
+    InMemorySorter mockInMemorySorter = mock(InMemorySorter.class);
+    BufferedExternalSorter testSorter =
+        new BufferedExternalSorter(mockExternalSorter, mockInMemorySorter);
+
+    @SuppressWarnings("unchecked")
+    KV<byte[], byte[]>[] kvs =
+        new KV[] {
+          KV.of(new byte[] {0}, new byte[] {}),
+          KV.of(new byte[] {0, 1}, new byte[] {}),
+          KV.of(new byte[] {1}, new byte[] {})
+        };
+
+    when(mockInMemorySorter.addIfRoom(kvs[0])).thenReturn(true);
+    when(mockInMemorySorter.addIfRoom(kvs[1])).thenReturn(true);
+    when(mockInMemorySorter.addIfRoom(kvs[2])).thenReturn(false);
+    when(mockInMemorySorter.sort()).thenReturn(Arrays.asList(kvs[0], kvs[1]));
+    when(mockExternalSorter.sort()).thenReturn(Arrays.asList(kvs[0], kvs[1], kvs[2]));
+
+    testSorter.add(kvs[0]);
+    testSorter.add(kvs[1]);
+    testSorter.add(kvs[2]);
+
+    assertEquals(Arrays.asList(kvs[0], kvs[1], kvs[2]), testSorter.sort());
+
+    verify(mockExternalSorter, times(1)).add(kvs[0]);
+    verify(mockExternalSorter, times(1)).add(kvs[1]);
+    verify(mockExternalSorter, times(1)).add(kvs[2]);
+  }
+
+  @Test
+  public void testEmpty() throws Exception {
+    SorterTestUtils.testEmpty(BufferedExternalSorter.create(new BufferedExternalSorter.Options()));
+  }
+
+  @Test
+  public void testSingleElement() throws Exception {
+    SorterTestUtils.testSingleElement(
+        BufferedExternalSorter.create(new BufferedExternalSorter.Options()));
+  }
+
+  @Test
+  public void testEmptyKeyValueElement() throws Exception {
+    SorterTestUtils.testEmptyKeyValueElement(
+        BufferedExternalSorter.create(new BufferedExternalSorter.Options()));
+  }
+
+  @Test
+  public void testMultipleIterations() throws Exception {
+    SorterTestUtils.testMultipleIterations(
+        BufferedExternalSorter.create(new BufferedExternalSorter.Options()));
+  }
+
+  @Test
+  public void testManySortersFewRecords() throws Exception {
+    SorterTestUtils.testRandom(
+        new SorterGenerator() {
+          @Override
+          public Sorter generateSorter() throws Exception {
+            return BufferedExternalSorter.create(new BufferedExternalSorter.Options());
+          }
+        },
+        1000000,
+        10);
+  }
+
+  @Test
+  public void testOneSorterManyRecords() throws Exception {
+    SorterTestUtils.testRandom(
+        new SorterGenerator() {
+          @Override
+          public Sorter generateSorter() throws Exception {
+            return BufferedExternalSorter.create(new BufferedExternalSorter.Options());
+          }
+        },
+        1,
+        1000000);
+  }
+
+  @Test
+  public void testAddAfterSort() throws Exception {
+    SorterTestUtils.testAddAfterSort(
+        BufferedExternalSorter.create(new BufferedExternalSorter.Options()), thrown);
+    fail();
+  }
+
+  @Test
+  public void testSortTwice() throws Exception {
+    SorterTestUtils.testSortTwice(
+        BufferedExternalSorter.create(new BufferedExternalSorter.Options()), thrown);
+    fail();
+  }
+
+  @Test
+  public void testNegativeMemory() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("memoryMB must be greater than zero");
+    BufferedExternalSorter.Options options = new BufferedExternalSorter.Options();
+    options.setMemoryMB(-1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/49ee9299/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/ExternalSorterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/ExternalSorterTest.java b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/ExternalSorterTest.java
new file mode 100644
index 0000000..9232b62
--- /dev/null
+++ b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/ExternalSorterTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sorter;
+
+import static org.junit.Assert.fail;
+
+import org.apache.beam.sdk.extensions.sorter.SorterTestUtils.SorterGenerator;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for Sorter. */
+@RunWith(JUnit4.class)
+public class ExternalSorterTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testEmpty() throws Exception {
+    SorterTestUtils.testEmpty(ExternalSorter.create(new ExternalSorter.Options()));
+  }
+
+  @Test
+  public void testSingleElement() throws Exception {
+    SorterTestUtils.testSingleElement(ExternalSorter.create(new ExternalSorter.Options()));
+  }
+
+  @Test
+  public void testEmptyKeyValueElement() throws Exception {
+    SorterTestUtils.testEmptyKeyValueElement(ExternalSorter.create(new ExternalSorter.Options()));
+  }
+
+  @Test
+  public void testMultipleIterations() throws Exception {
+    SorterTestUtils.testMultipleIterations(ExternalSorter.create(new ExternalSorter.Options()));
+  }
+
+  @Test
+  public void testRandom() throws Exception {
+    SorterTestUtils.testRandom(
+        new SorterGenerator() {
+          @Override
+          public Sorter generateSorter() throws Exception {
+            return ExternalSorter.create(new ExternalSorter.Options());
+          }
+        },
+        1,
+        1000000);
+  }
+
+  @Test
+  public void testAddAfterSort() throws Exception {
+    SorterTestUtils.testAddAfterSort(ExternalSorter.create(new ExternalSorter.Options()), thrown);
+    fail();
+  }
+
+  @Test
+  public void testSortTwice() throws Exception {
+    SorterTestUtils.testSortTwice(ExternalSorter.create(new ExternalSorter.Options()), thrown);
+    fail();
+  }
+
+  @Test
+  public void testNegativeMemory() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("memoryMB must be greater than zero");
+    ExternalSorter.Options options = new ExternalSorter.Options();
+    options.setMemoryMB(-1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/49ee9299/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/InMemorySorterTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/InMemorySorterTest.java b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/InMemorySorterTest.java
new file mode 100644
index 0000000..867390b
--- /dev/null
+++ b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/InMemorySorterTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sorter;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.fail;
+
+import org.apache.beam.sdk.extensions.sorter.SorterTestUtils.SorterGenerator;
+import org.apache.beam.sdk.values.KV;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link InMemorySorter}. */
+@RunWith(JUnit4.class)
+public class InMemorySorterTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testEmpty() throws Exception {
+    SorterTestUtils.testEmpty(InMemorySorter.create(new InMemorySorter.Options()));
+  }
+
+  @Test
+  public void testSingleElement() throws Exception {
+    SorterTestUtils.testSingleElement(InMemorySorter.create(new InMemorySorter.Options()));
+  }
+
+  @Test
+  public void testEmptyKeyValueElement() throws Exception {
+    SorterTestUtils.testEmptyKeyValueElement(InMemorySorter.create(new InMemorySorter.Options()));
+  }
+
+  @Test
+  public void testMultipleIterations() throws Exception {
+    SorterTestUtils.testMultipleIterations(InMemorySorter.create(new InMemorySorter.Options()));
+  }
+
+  @Test
+  public void testManySorters() throws Exception {
+    SorterTestUtils.testRandom(
+        new SorterGenerator() {
+          @Override
+          public Sorter generateSorter() throws Exception {
+            return InMemorySorter.create(new InMemorySorter.Options());
+          }
+        },
+        1000000,
+        10);
+  }
+
+  @Test
+  public void testAddAfterSort() throws Exception {
+    SorterTestUtils.testAddAfterSort(InMemorySorter.create(new InMemorySorter.Options()), thrown);
+    fail();
+  }
+
+  @Test
+  public void testSortTwice() throws Exception {
+    SorterTestUtils.testSortTwice(InMemorySorter.create(new InMemorySorter.Options()), thrown);
+    fail();
+  }
+
+  /**
+   * Verify an exception is thrown when the in memory sorter runs out of space.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testOutOfSpace() throws Exception {
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage(is("No space remaining for in memory sorting"));
+    SorterTestUtils.testRandom(
+        new SorterGenerator() {
+          @Override
+          public Sorter generateSorter() throws Exception {
+            InMemorySorter.Options options = new InMemorySorter.Options();
+            options.setMemoryMB(1);
+            return InMemorySorter.create(options);
+          }
+        },
+        1,
+        10000000);
+  }
+
+  @Test
+  public void testAddIfRoom() throws Exception {
+    InMemorySorter.Options options = new InMemorySorter.Options();
+    options.setMemoryMB(1);
+    InMemorySorter sorter = InMemorySorter.create(options);
+
+    // Should be a few kb less than what the total buffer supports
+    KV<byte[], byte[]> bigRecord = KV.of(new byte[1024 * 500], new byte[1024 * 500]);
+
+    // First add should succeed, second add should fail due to insufficient room
+    Assert.assertTrue(sorter.addIfRoom(bigRecord));
+    Assert.assertFalse(sorter.addIfRoom(bigRecord));
+  }
+
+  @Test
+  public void testAddIfRoomOverhead() throws Exception {
+    InMemorySorter.Options options = new InMemorySorter.Options();
+    options.setMemoryMB(1);
+    InMemorySorter sorter = InMemorySorter.create(options);
+
+    // No bytes within record, should still run out of room due to memory overhead of record
+    KV<byte[], byte[]> tinyRecord = KV.of(new byte[0], new byte[0]);
+
+    // Verify we can't insert one million records into this one megabyte buffer
+    boolean stillRoom = true;
+    for (int i = 0; (i < 1000000) && stillRoom; i++) {
+      stillRoom = sorter.addIfRoom(tinyRecord);
+    }
+
+    Assert.assertFalse(stillRoom);
+  }
+
+  @Test
+  public void testNegativeMemory() throws Exception {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("memoryMB must be greater than zero");
+    InMemorySorter.Options options = new InMemorySorter.Options();
+    options.setMemoryMB(-1);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/49ee9299/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SortValuesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SortValuesTest.java b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SortValuesTest.java
new file mode 100644
index 0000000..ebfbd0e
--- /dev/null
+++ b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SortValuesTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sorter;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.util.Arrays;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link SortValues} transform. */
+@RunWith(JUnit4.class)
+public class SortValuesTest {
+
+  @Test
+  public void testSecondaryKeySorting() throws Exception {
+    Pipeline p = TestPipeline.create();
+
+    // Create a PCollection of <Key, <SecondaryKey, Value>> pairs.
+    PCollection<KV<String, KV<String, Integer>>> input =
+        p.apply(
+            Create.of(
+                Arrays.asList(
+                    KV.of("key1", KV.of("secondaryKey2", 20)),
+                    KV.of("key2", KV.of("secondaryKey2", 200)),
+                    KV.of("key1", KV.of("secondaryKey3", 30)),
+                    KV.of("key1", KV.of("secondaryKey1", 10)),
+                    KV.of("key2", KV.of("secondaryKey1", 100)))));
+
+    // Group by Key, bringing <SecondaryKey, Value> pairs for the same Key together.
+    PCollection<KV<String, Iterable<KV<String, Integer>>>> grouped =
+        input.apply(GroupByKey.<String, KV<String, Integer>>create());
+
+    // For every Key, sort the iterable of <SecondaryKey, Value> pairs by SecondaryKey.
+    PCollection<KV<String, Iterable<KV<String, Integer>>>> groupedAndSorted =
+        grouped.apply(
+            SortValues.<String, String, Integer>create(new BufferedExternalSorter.Options()));
+
+    PAssert.that(groupedAndSorted)
+        .satisfies(new AssertThatHasExpectedContentsForTestSecondaryKeySorting());
+
+    p.run();
+  }
+
+  static class AssertThatHasExpectedContentsForTestSecondaryKeySorting
+      implements SerializableFunction<Iterable<KV<String, Iterable<KV<String, Integer>>>>, Void> {
+    @SuppressWarnings("unchecked")
+    @Override
+    public Void apply(Iterable<KV<String, Iterable<KV<String, Integer>>>> actual) {
+      assertThat(
+          actual,
+          containsInAnyOrder(
+              KvMatcher.isKv(
+                  is("key1"),
+                  contains(
+                      KvMatcher.isKv(is("secondaryKey1"), is(10)),
+                      KvMatcher.isKv(is("secondaryKey2"), is(20)),
+                      KvMatcher.isKv(is("secondaryKey3"), is(30)))),
+              KvMatcher.isKv(
+                  is("key2"),
+                  contains(
+                      KvMatcher.isKv(is("secondaryKey1"), is(100)),
+                      KvMatcher.isKv(is("secondaryKey2"), is(200))))));
+      return null;
+    }
+  }
+
+  /** Matcher for KVs. Forked from Beam's org/apache/beam/sdk/TestUtils.java */
+  public static class KvMatcher<K, V> extends TypeSafeMatcher<KV<? extends K, ? extends V>> {
+    final Matcher<? super K> keyMatcher;
+    final Matcher<? super V> valueMatcher;
+
+    public static <K, V> KvMatcher<K, V> isKv(Matcher<K> keyMatcher, Matcher<V> valueMatcher) {
+      return new KvMatcher<>(keyMatcher, valueMatcher);
+    }
+
+    public KvMatcher(Matcher<? super K> keyMatcher, Matcher<? super V> valueMatcher) {
+      this.keyMatcher = keyMatcher;
+      this.valueMatcher = valueMatcher;
+    }
+
+    @Override
+    public boolean matchesSafely(KV<? extends K, ? extends V> kv) {
+      return keyMatcher.matches(kv.getKey()) && valueMatcher.matches(kv.getValue());
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description
+          .appendText("a KV(")
+          .appendValue(keyMatcher)
+          .appendText(", ")
+          .appendValue(valueMatcher)
+          .appendText(")");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/49ee9299/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SorterTestUtils.java
----------------------------------------------------------------------
diff --git a/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SorterTestUtils.java b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SorterTestUtils.java
new file mode 100644
index 0000000..3ab8b34
--- /dev/null
+++ b/sdks/java/extensions/sorter/src/test/java/org/apache/beam/sdk/extensions/sorter/SorterTestUtils.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.extensions.sorter;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.primitives.UnsignedBytes;
+import java.util.Random;
+import org.apache.beam.sdk.values.KV;
+import org.junit.rules.ExpectedException;
+
+/** A set of basic tests for {@link Sorter}s. */
+public class SorterTestUtils {
+
+  public static void testEmpty(Sorter sorter) throws Exception {
+    assertThat(sorter.sort(), is(emptyIterable()));
+  }
+
+  @SuppressWarnings("unchecked")
+  public static void testSingleElement(Sorter sorter) throws Exception {
+    KV<byte[], byte[]> kv = KV.of(new byte[] {4, 7}, new byte[] {1, 2});
+    sorter.add(kv);
+    assertThat(sorter.sort(), contains(kv));
+  }
+
+  @SuppressWarnings("unchecked")
+  public static void testEmptyKeyValueElement(Sorter sorter) throws Exception {
+    KV<byte[], byte[]> kv = KV.of(new byte[] {}, new byte[] {});
+    sorter.add(kv);
+    assertThat(sorter.sort(), contains(kv));
+  }
+
+  @SuppressWarnings("unchecked")
+  public static void testMultipleIterations(Sorter sorter) throws Exception {
+    KV<byte[], byte[]>[] kvs =
+        new KV[] {
+          KV.of(new byte[] {0}, new byte[] {}),
+          KV.of(new byte[] {0, 1}, new byte[] {}),
+          KV.of(new byte[] {1}, new byte[] {})
+        };
+    sorter.add(kvs[1]);
+    sorter.add(kvs[2]);
+    sorter.add(kvs[0]);
+    Iterable<KV<byte[], byte[]>> sorted = sorter.sort();
+    assertThat(sorted, contains(kvs[0], kvs[1], kvs[2]));
+    // Iterate second time.
+    assertThat(sorted, contains(kvs[0], kvs[1], kvs[2]));
+  }
+
+  /** Class that generates a new sorter. Used when performance testing multiple sorter creation. */
+  interface SorterGenerator {
+    Sorter generateSorter() throws Exception;
+  }
+
+  /**
+   * Generates random records and executes a test with the provided number of sorters and number of
+   * records per sorter.
+   */
+  public static void testRandom(
+      SorterGenerator sorterGenerator, int numSorters, int numRecordsPerSorter) throws Exception {
+    long start = System.currentTimeMillis();
+    for (int i = 0; i < numSorters; ++i) {
+      Sorter sorter = sorterGenerator.generateSorter();
+      Random rnd = new Random(0L);
+      for (int j = 0; j < numRecordsPerSorter; ++j) {
+        byte[] key = new byte[8];
+        byte[] value = new byte[8];
+        rnd.nextBytes(key);
+        rnd.nextBytes(value);
+        sorter.add(KV.of(key, value));
+      }
+
+      byte[] prevKey = null;
+      for (KV<byte[], byte[]> record : sorter.sort()) {
+        assertTrue(
+            prevKey == null
+                || UnsignedBytes.lexicographicalComparator().compare(prevKey, record.getKey()) < 0);
+        prevKey = record.getKey();
+      }
+    }
+    long end = System.currentTimeMillis();
+    System.out.println(
+        "Took "
+            + (end - start)
+            + "ms for "
+            + numRecordsPerSorter * numSorters * 1000.0 / (end - start)
+            + " records/s");
+  }
+
+  /** Tests trying to call add after calling sort. Should throw an exception. */
+  public static void testAddAfterSort(Sorter sorter, ExpectedException thrown) throws Exception {
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage(is("Records can only be added before sort()"));
+    KV<byte[], byte[]> kv = KV.of(new byte[] {4, 7}, new byte[] {1, 2});
+    sorter.add(kv);
+    sorter.sort();
+    sorter.add(kv);
+  }
+
+  /** Tests trying to calling sort twice. Should throw an exception. */
+  public static void testSortTwice(Sorter sorter, ExpectedException thrown) throws Exception {
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage(is("sort() can only be called once."));
+    KV<byte[], byte[]> kv = KV.of(new byte[] {4, 7}, new byte[] {1, 2});
+    sorter.add(kv);
+    sorter.sort();
+    sorter.sort();
+  }
+}


Mime
View raw message