beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] incubator-beam git commit: Wait for Elements to be fetched in KafkaIO#start
Date Wed, 22 Jun 2016 06:28:39 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 3ff98ea23 -> f4809446b


Wait for Elements to be fetched in KafkaIO#start

This makes it more likely that the reader has elements after the call to
start returns.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/15f69edf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/15f69edf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/15f69edf

Branch: refs/heads/master
Commit: 15f69edf80237152739a737e7e84c9ec933d372c
Parents: 3ff98ea
Author: Thomas Groh <tgroh@google.com>
Authored: Mon Jun 13 15:32:19 2016 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Tue Jun 21 23:28:02 2016 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java  | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/15f69edf/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index d540a8d..3b64bd5 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -759,6 +759,8 @@ public class KafkaIO {
     private Iterator<PartitionState> curBatch = Collections.emptyIterator();
 
     private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
+    // how long to wait for new records from kafka consumer inside start()
+    private static final Duration START_NEW_RECORDS_POLL_TIMEOUT = Duration.standardSeconds(5);
     // how long to wait for new records from kafka consumer inside advance()
     private static final Duration NEW_RECORDS_POLL_TIMEOUT = Duration.millis(10);
 
@@ -891,12 +893,12 @@ public class KafkaIO {
       LOG.info("{}: Returning from consumer pool loop", this);
     }
 
-    private void nextBatch() {
+    private void nextBatch(Duration timeout) {
       curBatch = Collections.emptyIterator();
 
       ConsumerRecords<byte[], byte[]> records;
       try {
-        records = availableRecordsQueue.poll(NEW_RECORDS_POLL_TIMEOUT.getMillis(),
+        records = availableRecordsQueue.poll(timeout.getMillis(),
                                              TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
@@ -965,6 +967,9 @@ public class KafkaIO {
             }
           }, 0, OFFSET_UPDATE_INTERVAL_SECONDS, TimeUnit.SECONDS);
 
+      // Wait for longer than normal when fetching a batch to improve chances a record is
available
+      // when start() returns.
+      nextBatch(START_NEW_RECORDS_POLL_TIMEOUT);
       return advance();
     }
 
@@ -1028,7 +1033,7 @@ public class KafkaIO {
           return true;
 
         } else { // -- (b)
-          nextBatch();
+          nextBatch(NEW_RECORDS_POLL_TIMEOUT);
 
           if (!curBatch.hasNext()) {
             return false;


Mime
View raw message