crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mkw...@apache.org
Subject [crunch] branch master updated: CRUNCH-680: Kafka Source should split very large partitions
Date Fri, 01 Mar 2019 16:40:52 GMT
This is an automated email from the ASF dual-hosted git repository.

mkwhit pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/crunch.git


The following commit(s) were added to refs/heads/master by this push:
     new 21cb425  CRUNCH-680: Kafka Source should split very large partitions
     new 9b8849c  Merge pull request #21 from noslowerdna/CRUNCH-680
21cb425 is described below

commit 21cb425610676c0ec05627d21f8dc01d51904589
Author: Andrew Olson <aolson1@cerner.com>
AuthorDate: Fri Feb 22 13:34:32 2019 -0600

    CRUNCH-680: Kafka Source should split very large partitions
---
 .../crunch/kafka/record/KafkaInputFormat.java      |  30 +++-
 .../crunch/kafka/record/KafkaRecordReader.java     |  36 ++++-
 .../apache/crunch/kafka/record/KafkaSource.java    |   3 +
 .../crunch/kafka/record/KafkaInputFormatTest.java  | 141 +++++++++++++++++
 .../crunch/kafka/record/KafkaRecordReaderTest.java | 169 +++++++++++++++++++++
 .../apache/crunch/kafka/record/KafkaSourceIT.java  |  46 +++++-
 6 files changed, 416 insertions(+), 9 deletions(-)

diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputFormat.java
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputFormat.java
index 4e378ce..2f1d139 100644
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputFormat.java
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputFormat.java
@@ -89,6 +89,17 @@ public class KafkaInputFormat extends InputFormat<ConsumerRecord<BytesWritable,
   private static final String KAFKA_CONNECTION_PROPERTY_BASE = "org.apache.crunch.kafka.connection.properties";
 
   /**
+   * Configuration property for the maximum number of records per input split. Partitions
with more qualifying records than this
+   * limit will be divided into multiple splits.
+   */
+  public static final String KAFKA_MAX_RECORDS_PER_SPLIT = "org.apache.crunch.kafka.split.max";
+
+  /**
+   * Default value for {@link #KAFKA_MAX_RECORDS_PER_SPLIT}
+   */
+  public static final long DEFAULT_KAFKA_MAX_RECORDS_PER_SPLIT = 5000000L;
+
+  /**
    * Regex to discover all of the defined Kafka connection properties which should be passed
to the ConsumerConfig.
    */
   private static final Pattern CONNECTION_PROPERTY_REGEX = Pattern
@@ -98,16 +109,27 @@ public class KafkaInputFormat extends InputFormat<ConsumerRecord<BytesWritable,
 
   @Override
   public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException
{
-    Map<TopicPartition, Pair<Long, Long>> offsets = getOffsets(getConf());
+    Configuration conf = getConf();
+    long maxRecordsPerSplit = conf.getLong(KAFKA_MAX_RECORDS_PER_SPLIT, DEFAULT_KAFKA_MAX_RECORDS_PER_SPLIT);
+    if (maxRecordsPerSplit < 1L) {
+      throw new IllegalArgumentException("Invalid " + KAFKA_MAX_RECORDS_PER_SPLIT + " value
[" + maxRecordsPerSplit + "]");
+    }
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = getOffsets(conf);
     List<InputSplit> splits = new LinkedList<>();
     for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet())
{
       TopicPartition topicPartition = entry.getKey();
 
       long start = entry.getValue().first();
       long end = entry.getValue().second();
-      if (start != end) {
-        splits.add(new KafkaInputSplit(topicPartition.topic(), topicPartition.partition(),
entry.getValue().first(),
-            entry.getValue().second()));
+
+      // Chop up any excessively large partitions into multiple splits for more balanced
map task durations. This will
+      // also exclude any partitions with no records to read (where the start offset equals
the end offset).
+      long splitStart = start;
+      while (splitStart < end) {
+        long splitEnd = Math.min(splitStart + maxRecordsPerSplit, end);
+        splits.add(new KafkaInputSplit(topicPartition.topic(), topicPartition.partition(),
splitStart, splitEnd));
+        splitStart = splitEnd;
       }
     }
 
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java
b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java
index 754c9ce..3b7ac88 100644
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaRecordReader.java
@@ -75,7 +75,7 @@ public class KafkaRecordReader<K, V> extends RecordReader<ConsumerRecord<K,
V>,
 
     kafkaConnectionProperties = filterConnectionProperties(getKafkaConnectionProperties(taskAttemptContext.getConfiguration()));
 
-    consumer = new KafkaConsumer<>(kafkaConnectionProperties);
+    consumer = buildConsumer(kafkaConnectionProperties);
 
     KafkaInputSplit split = (KafkaInputSplit) inputSplit;
     topicPartition = split.getTopicPartition();
@@ -102,6 +102,26 @@ public class KafkaRecordReader<K, V> extends RecordReader<ConsumerRecord<K,
V>,
     maxNumberAttempts = config.getInt(KAFKA_RETRY_ATTEMPTS_KEY, KAFKA_RETRY_ATTEMPTS_DEFAULT);
   }
 
+  /**
+   * Builds a new kafka consumer
+   *
+   * @param properties
+   *      the properties to configure the consumer
+   * @return a new kafka consumer
+   */
+  // Visible for testing
+  protected KafkaConsumer<K, V> buildConsumer(Properties properties) {
+    return new KafkaConsumer<>(properties);
+  }
+
+  /**
+   * @return the current offset for the reader
+   */
+  // Visible for testing
+  protected long getCurrentOffset() {
+    return currentOffset;
+  }
+
   @Override
   public boolean nextKeyValue() throws IOException, InterruptedException {
     if (hasPendingData()) {
@@ -173,9 +193,18 @@ public class KafkaRecordReader<K, V> extends RecordReader<ConsumerRecord<K,
V>,
   }
 
   /**
+   * @return the record iterator used by the reader
+   */
+  // Visble for testing
+  protected Iterator<ConsumerRecord<K, V>> getRecordIterator() {
+    return recordIterator;
+  }
+
+  /**
    * Loads new records into the record iterator
    */
-  private void loadRecords() {
+  // Visible for testing
+  protected void loadRecords() {
     if ((recordIterator == null) || !recordIterator.hasNext()) {
       ConsumerRecords<K, V> records = null;
       int numTries = 0;
@@ -212,7 +241,8 @@ public class KafkaRecordReader<K, V> extends RecordReader<ConsumerRecord<K,
V>,
       if ((records == null) || records.isEmpty()){
         LOG.info("No records retrieved from Kafka partition {} therefore nothing to iterate
over", topicPartition);
       } else{
-        LOG.info("Retrieved records {} from Kafka partition {} to iterate over", records.count(),
topicPartition);
+        LOG.info("Retrieved {} records from Kafka partition {} to iterate over starting from
offset {}", new Object[] {
+            records.count(), topicPartition, records.iterator().next().offset()});
       }
 
       recordIterator = records != null ? records.iterator() : ConsumerRecords.<K, V>empty().iterator();
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaSource.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaSource.java
index 4d59edd..98964d8 100644
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaSource.java
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaSource.java
@@ -163,6 +163,9 @@ public class KafkaSource
     props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
     props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
 
+    //disable automatic committing of consumer offsets
+    props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.toString(false));
+
     return props;
   }
 
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaInputFormatTest.java
b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaInputFormatTest.java
new file mode 100644
index 0000000..2062058
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaInputFormatTest.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.record;
+
+import org.apache.crunch.io.FormatBundle;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Properties;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+
+public class KafkaInputFormatTest {
+
+  @Test
+  public void generateConnectionPropertyKeyTest() {
+    String propertyName = "some.property";
+    String actual = KafkaInputFormat.generateConnectionPropertyKey(propertyName);
+    String expected = "org.apache.crunch.kafka.connection.properties.some.property";
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void getConnectionPropertyFromKeyTest() {
+    String prefixedConnectionProperty = "org.apache.crunch.kafka.connection.properties.some.property";
+    String actual = KafkaInputFormat.getConnectionPropertyFromKey(prefixedConnectionProperty);
+    String expected = "some.property";
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void writeConnectionPropertiesToBundleTest() {
+    FormatBundle<KafkaInputFormat> actual = FormatBundle.forInput(KafkaInputFormat.class);
+    Properties connectionProperties = new Properties();
+    connectionProperties.put("key1", "value1");
+    connectionProperties.put("key2", "value2");
+    KafkaInputFormat.writeConnectionPropertiesToBundle(connectionProperties, actual);
+
+    FormatBundle<KafkaInputFormat> expected = FormatBundle.forInput(KafkaInputFormat.class);
+    expected.set("org.apache.crunch.kafka.connection.properties.key1", "value1");
+    expected.set("org.apache.crunch.kafka.connection.properties.key2", "value2");
+
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void filterConnectionPropertiesTest() {
+    Properties props = new Properties();
+    props.put("org.apache.crunch.kafka.connection.properties.key1", "value1");
+    props.put("org.apache.crunch.kafka.connection.properties.key2", "value2");
+    props.put("org_apache_crunch_kafka_connection_properties.key3", "value3");
+    props.put("org.apache.crunch.another.prefix.properties.key4", "value4");
+
+    Properties actual = KafkaInputFormat.filterConnectionProperties(props);
+    Properties expected = new Properties();
+    expected.put("key1", "value1");
+    expected.put("key2", "value2");
+
+    assertEquals(expected, actual);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void getSplitsInvalidMaxRecords() throws IOException, InterruptedException {
+    KafkaInputFormat kafkaInputFormat = new KafkaInputFormat();
+    Configuration conf = new Configuration(false);
+    conf.setLong(KafkaInputFormat.KAFKA_MAX_RECORDS_PER_SPLIT, 0L);
+    kafkaInputFormat.setConf(conf);
+    kafkaInputFormat.getSplits(mock(JobContext.class));
+  }
+
+  @Test
+  public void getSplitsConfiguredMaxRecords() throws IOException, InterruptedException {
+    KafkaInputFormat kafkaInputFormat = new KafkaInputFormat();
+    Configuration conf = new Configuration(false);
+    conf.setLong(KafkaInputFormat.KAFKA_MAX_RECORDS_PER_SPLIT, 2L);
+
+    conf.set("org.apache.crunch.kafka.offsets.topic.abc.partitions", "0,1");
+    conf.setLong("org.apache.crunch.kafka.offsets.topic.abc.partitions.0.start", 300L);
+    conf.setLong("org.apache.crunch.kafka.offsets.topic.abc.partitions.0.end", 1000L);
+    conf.setLong("org.apache.crunch.kafka.offsets.topic.abc.partitions.1.start", 30L);
+    conf.setLong("org.apache.crunch.kafka.offsets.topic.abc.partitions.1.end", 100L);
+
+    conf.set("org.apache.crunch.kafka.offsets.topic.xyz.partitions", "0");
+    conf.setLong("org.apache.crunch.kafka.offsets.topic.xyz.partitions.0.start", 3L);
+    conf.setLong("org.apache.crunch.kafka.offsets.topic.xyz.partitions.0.end", 10L);
+
+    kafkaInputFormat.setConf(conf);
+    List<InputSplit> splits = kafkaInputFormat.getSplits(mock(JobContext.class));
+    assertThat(splits.size(), is((700/2 + 700%2) + (70/2 + 70%2) + (7/2 + 7%2)));
+  }
+
+  @Test
+  public void getSplitsDefaultMaxRecords() throws IOException, InterruptedException {
+    KafkaInputFormat kafkaInputFormat = new KafkaInputFormat();
+    Configuration conf = new Configuration(false);
+
+    conf.set("org.apache.crunch.kafka.offsets.topic.abc.partitions", "0");
+    conf.setLong("org.apache.crunch.kafka.offsets.topic.abc.partitions.0.start", 0L);
+    conf.setLong("org.apache.crunch.kafka.offsets.topic.abc.partitions.0.end", 5234567L);
+
+    kafkaInputFormat.setConf(conf);
+    List<InputSplit> splits = kafkaInputFormat.getSplits(mock(JobContext.class));
+    assertThat(splits.size(), is(2));
+  }
+
+  @Test
+  public void getSplitsNoRecords() throws IOException, InterruptedException {
+    KafkaInputFormat kafkaInputFormat = new KafkaInputFormat();
+    Configuration conf = new Configuration(false);
+
+    conf.set("org.apache.crunch.kafka.offsets.topic.abc.partitions", "0");
+    conf.setLong("org.apache.crunch.kafka.offsets.topic.abc.partitions.0.start", 5L);
+    conf.setLong("org.apache.crunch.kafka.offsets.topic.abc.partitions.0.end", 5L);
+
+    kafkaInputFormat.setConf(conf);
+    List<InputSplit> splits = kafkaInputFormat.getSplits(mock(JobContext.class));
+    assertThat(splits.size(), is(0));
+  }
+}
\ No newline at end of file
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaRecordReaderTest.java
b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaRecordReaderTest.java
new file mode 100644
index 0000000..fd8cd8e
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaRecordReaderTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements.  See the NOTICE file
+ * distributed with this work for additional information regarding copyright ownership. 
The ASF licenses this file to you under the
+ * Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
with the License.  You may obtain a
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language
+ * governing permissions and limitations under the License.
+ */
+package org.apache.crunch.kafka.record;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.DisconnectException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaRecordReaderTest {
+
+  @Mock
+  private KafkaConsumer<String, String> consumer;
+
+  @Mock
+  private TaskAttemptContext taskAttemptContext;
+
+  private TopicPartition topicPartition;
+  private long startOffset;
+  private long endOffset;
+  private KafkaInputSplit inputSplit;
+
+  private ConsumerRecords<String, String> records;
+
+  private KafkaRecordReader<String, String> reader;
+
+  @Before
+  public void before() throws IOException, InterruptedException {
+    when(taskAttemptContext.getConfiguration()).thenReturn(new Configuration(false));
+    startOffset = 0L;
+    endOffset = 100L;
+    topicPartition = new TopicPartition("topic", 0);
+
+    inputSplit = new KafkaInputSplit(topicPartition.topic(), topicPartition.partition(),
startOffset, endOffset);
+
+    when(consumer.beginningOffsets(Collections.singleton(inputSplit.getTopicPartition()))).thenReturn(
+        Collections.singletonMap(inputSplit.getTopicPartition(), 0L));
+
+    records = new ConsumerRecords<>(Collections.singletonMap(inputSplit.getTopicPartition(),
+        Collections.singletonList(new ConsumerRecord<>("topic", 0, 0, "key", "value"))));
+
+    when(consumer.poll(anyLong())).thenReturn(records);
+
+    reader = new KafkaRecordReaderTester();
+    reader.initialize(inputSplit, taskAttemptContext);
+  }
+
+  @Test
+  public void getRecords_consumerPollThrowsException_thenReturnsMessage() {
+    // DisconnectException is retriable
+    when(consumer.poll(anyLong())).thenThrow(new DisconnectException()).thenReturn(records);
+
+    reader.loadRecords();
+    Iterator<ConsumerRecord<String, String>> iterator = reader.getRecordIterator();
+    assertThat(iterator.hasNext(), is(true));
+    assertThat(iterator.next(), is(records.records(topicPartition).get(0)));
+  }
+
+  @Test
+  public void getRecords_consumerPollEmpty_thenReturnsMessage() {
+    // DisconnectException is retriable
+    when(consumer.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.<TopicPartition,
+        List<ConsumerRecord<String, String>>> emptyMap())).thenReturn(records);
+
+    reader.loadRecords();
+    Iterator<ConsumerRecord<String, String>> iterator = reader.getRecordIterator();
+    assertThat(iterator.hasNext(), is(true));
+    assertThat(iterator.next(), is(records.records(topicPartition).get(0)));
+  }
+
+  @Test
+  public void nextKeyValue() throws IOException, InterruptedException {
+    assertThat(reader.nextKeyValue(), is(true));
+    assertThat(reader.getCurrentKey(), is(records.records(topicPartition).get(0)));
+    assertThat(reader.getCurrentOffset(), is(0L));
+  }
+
+  @Test
+  public void nextKeyValue_recordOffsetAheadOfExpected() throws IOException, InterruptedException
{
+    records = new ConsumerRecords<>(Collections.singletonMap(inputSplit.getTopicPartition(),
+        Collections.singletonList(new ConsumerRecord<>("topic", 0, 10L, "key", "value"))));
+
+    when(consumer.poll(anyLong())).thenReturn(records);
+
+    assertThat(reader.nextKeyValue(), is(true));
+    assertThat(reader.getCurrentKey(), is(records.records(topicPartition).get(0)));
+    assertThat(reader.getCurrentOffset(), is(10L));
+  }
+
+  @Test
+  public void nextKeyValue_noRecord_emptyPartition() throws IOException, InterruptedException
{
+    when(consumer.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.<TopicPartition,
+        List<ConsumerRecord<String, String>>> emptyMap()));
+
+    when(consumer.beginningOffsets(Collections.singletonList(topicPartition))).thenReturn(
+        Collections.singletonMap(topicPartition, endOffset));
+
+    assertThat(reader.nextKeyValue(), is(false));
+  }
+
+  @Test(expected = IOException.class)
+  public void nextKeyValue_noRecord_nonEmptyPartition() throws IOException, InterruptedException
{
+    when(consumer.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.<TopicPartition,
+        List<ConsumerRecord<String, String>>> emptyMap()));
+
+    reader.nextKeyValue();
+  }
+
+  @Test
+  public void nextKeyValue_recordIsBeyondEndOffset() throws IOException, InterruptedException
{
+    records = new ConsumerRecords<>(Collections.singletonMap(inputSplit.getTopicPartition(),
+        Collections.singletonList(new ConsumerRecord<>("topic", 0, 100L, "key", "value"))));
+
+    when(consumer.poll(anyLong())).thenReturn(records);
+
+    assertThat(reader.nextKeyValue(), is(false));
+  }
+
+  @Test
+  public void getEarliestOffset_noOffsetFound() {
+    when(consumer.beginningOffsets(Collections.singletonList(inputSplit.getTopicPartition()))).thenReturn(
+        Collections.<TopicPartition, Long> emptyMap());
+    assertThat(reader.getEarliestOffset(), is(0L));
+  }
+
+  @Test
+  public void getEarliestOffset() {
+    when(consumer.beginningOffsets(Collections.singletonList(inputSplit.getTopicPartition()))).thenReturn(
+        Collections.singletonMap(inputSplit.getTopicPartition(), 100L));
+    assertThat(reader.getEarliestOffset(), is(100L));
+  }
+
+  private class KafkaRecordReaderTester extends KafkaRecordReader<String, String> {
+    @Override
+    protected KafkaConsumer<String, String> buildConsumer(Properties properties) {
+      return consumer;
+    }
+  }
+}
\ No newline at end of file
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaSourceIT.java
b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaSourceIT.java
index 56241ae..af03d64 100644
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaSourceIT.java
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaSourceIT.java
@@ -27,8 +27,8 @@ import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.From;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.To;
-import org.apache.crunch.kafka.*;
-import org.apache.crunch.kafka.inputformat.KafkaInputFormat;
+import org.apache.crunch.kafka.ClusterTest;
+import org.apache.crunch.kafka.KafkaUtils;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.types.avro.Avros;
 import org.apache.hadoop.conf.Configuration;
@@ -197,6 +197,48 @@ public class KafkaSourceIT {
     pipeline.done();
   }
 
+  @Test
+  public void sourceReadDataThroughPipelineMultipleSplitsPerPartition() {
+    Configuration config = ClusterTest.getConf();
+
+    config.setLong(KafkaInputFormat.KAFKA_MAX_RECORDS_PER_SPLIT, 7L);
+
+    List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(),
topic, "batch", 10, 10);
+    Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(),
topic);
+    Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(),
topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      Long endingOffset = endOffsets.get(entry.getKey());
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
+    }
+
+    Pipeline pipeline = new MRPipeline(KafkaSourceIT.class, config);
+    pipeline.enableDebug();
+
+    ReadableSource<ConsumerRecord<BytesWritable, BytesWritable>> kafkaSource
= new KafkaSource(consumerProps, offsets);
+
+    PCollection<ConsumerRecord<BytesWritable, BytesWritable>> read = pipeline.read(kafkaSource);
+    Path out = path.getPath("out");
+    read.parallelDo(new KafkaSourceIT.SimpleConvertFn(), Avros.strings()).write(To.textFile(out));
+
+    pipeline.run();
+
+    PCollection<String> persistedKeys = pipeline.read(From.textFile(out));
+
+    Set<String> keysRead = new HashSet<>();
+    int numRecordsFound = 0;
+    for (String value : persistedKeys.materialize()) {
+      assertThat(keys, hasItem(value));
+      numRecordsFound++;
+      keysRead.add(value);
+    }
+
+    assertThat(numRecordsFound, is(keys.size()));
+    assertThat(keysRead.size(), is(keys.size()));
+
+    pipeline.done();
+  }
 
   private static class SimpleConvertFn extends MapFn<ConsumerRecord<BytesWritable,
BytesWritable>, String> {
     @Override


Mime
View raw message