storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [storm] branch master updated: STORM-2720 : Add TIMESTAMP option for FirstPollOffset for Kafka Trident spout (#2911)
Date Fri, 17 May 2019 12:42:10 GMT
This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 1453009  STORM-2720 : Add TIMESTAMP option for FirstPollOffset for Kafka Trident
spout (#2911)
1453009 is described below

commit 14530099b2accd462f3af815eee82d3d1b407c0a
Author: Janith <janithkv@gmail.com>
AuthorDate: Fri May 17 05:42:01 2019 -0700

    STORM-2720 : Add TIMESTAMP option for FirstPollOffset for Kafka Trident spout (#2911)
---
 .../storm/kafka/spout/FirstPollOffsetStrategy.java | 12 +++-
 .../spout/internal/CommonKafkaSpoutConfig.java     | 22 ++++++-
 .../spout/trident/KafkaTridentSpoutEmitter.java    | 24 +++++++
 .../trident/KafkaTridentSpoutEmitterEmitTest.java  | 73 ++++++++++++++++++----
 4 files changed, 116 insertions(+), 15 deletions(-)

diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/FirstPollOffsetStrategy.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/FirstPollOffsetStrategy.java
index f2e14cb..9184bc3 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/FirstPollOffsetStrategy.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/FirstPollOffsetStrategy.java
@@ -32,11 +32,21 @@ public enum FirstPollOffsetStrategy {
      */
     LATEST,
     /**
+     * The kafka spout polls records starting at the earliest offset whose timestamp is greater
than or equal to the given startTimestamp.
+     * This setting only takes effect on topology deployment. This option is currently available
only for the Trident Spout
+     */
+    TIMESTAMP,
+    /**
      * The kafka spout polls records from the last committed offset, if any. If no offset
has been committed it behaves as EARLIEST
      */
     UNCOMMITTED_EARLIEST,
     /**
      * The kafka spout polls records from the last committed offset, if any. If no offset
has been committed it behaves as LATEST
      */
-    UNCOMMITTED_LATEST;
+    UNCOMMITTED_LATEST,
+    /**
+     * The kafka spout polls records from the last committed offset, if any. If no offset
has been committed it behaves as TIMESTAMP.
+     * This option is currently available only for the Trident Spout
+     */
+    UNCOMMITTED_TIMESTAMP;
 }
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommonKafkaSpoutConfig.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommonKafkaSpoutConfig.java
index 4e1160f..14c6a02 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommonKafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/CommonKafkaSpoutConfig.java
@@ -45,7 +45,8 @@ import org.slf4j.LoggerFactory;
 public abstract class CommonKafkaSpoutConfig<K, V> implements Serializable {
     public static final long DEFAULT_POLL_TIMEOUT_MS = 200;
     public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000;
-
+    // Earliest start
+    public static final long DEFAULT_START_TS = 0L;
     public static final FirstPollOffsetStrategy DEFAULT_FIRST_POLL_OFFSET_STRATEGY = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
 
     public static final Logger LOG = LoggerFactory.getLogger(CommonKafkaSpoutConfig.class);
@@ -60,7 +61,8 @@ public abstract class CommonKafkaSpoutConfig<K, V> implements Serializable
{
     private final RecordTranslator<K, V> translator;
     private final FirstPollOffsetStrategy firstPollOffsetStrategy;
     private final long partitionRefreshPeriodMs;
-    
+    private final long startTimeStamp;
+
     /**
      * Creates a new CommonKafkaSpoutConfig using a Builder.
      *
@@ -74,6 +76,7 @@ public abstract class CommonKafkaSpoutConfig<K, V> implements Serializable
{
         this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
         this.pollTimeoutMs = builder.pollTimeoutMs;
         this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
+        this.startTimeStamp = builder.startTimeStamp;
     }
 
     public abstract static class Builder<K, V, T extends Builder<K, V, T>> {
@@ -85,6 +88,7 @@ public abstract class CommonKafkaSpoutConfig<K, V> implements Serializable
{
         private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
         private FirstPollOffsetStrategy firstPollOffsetStrategy = DEFAULT_FIRST_POLL_OFFSET_STRATEGY;
         private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS;
+        private long startTimeStamp = DEFAULT_START_TS;
 
         public Builder(String bootstrapServers, String... topics) {
             this(bootstrapServers, new NamedTopicFilter(topics), new RoundRobinManualPartitioner());
@@ -207,6 +211,15 @@ public abstract class CommonKafkaSpoutConfig<K, V> implements Serializable
{
             this.partitionRefreshPeriodMs = partitionRefreshPeriodMs;
             return (T)this;
         }
+
+        /**
+         * Specifies the startTimeStamp if the first poll strategy is TIMESTAMP or UNCOMMITTED_TIMESTAMP.
+         * @param startTimeStamp time in ms
+         */
+        public T setStartTimeStamp(long startTimeStamp) {
+            this.startTimeStamp = startTimeStamp;
+            return (T)this;
+        }
         
         protected Map<String, Object> getKafkaProps() {
             return kafkaProps;
@@ -248,6 +261,10 @@ public abstract class CommonKafkaSpoutConfig<K, V> implements Serializable
{
         return partitionRefreshPeriodMs;
     }
 
+    public long getStartTimeStamp() {
+        return startTimeStamp;
+    }
+
     @Override
     public String toString() {
         return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
@@ -257,6 +274,7 @@ public abstract class CommonKafkaSpoutConfig<K, V> implements Serializable
{
             .append("topicFilter", topicFilter)
             .append("topicPartitioner", topicPartitioner)
             .append("translator", translator)
+            .append("startTimeStamp", startTimeStamp)
             .toString();
     }
 }
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
index aee8987..69df461 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
@@ -20,8 +20,10 @@ package org.apache.storm.kafka.spout.trident;
 
 import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.EARLIEST;
 import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.LATEST;
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.TIMESTAMP;
 import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
 import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.UNCOMMITTED_TIMESTAMP;
 
 import com.google.common.annotations.VisibleForTesting;
 import java.io.Serializable;
@@ -34,10 +36,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
 import org.apache.storm.kafka.spout.RecordTranslator;
@@ -69,6 +73,7 @@ public class KafkaTridentSpoutEmitter<K, V> implements Serializable
{
     private final RecordTranslator<K, V> translator;
     private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
     private final TopologyContext topologyContext;
+    private final long startTimeStamp;
 
     /**
      * Create a new Kafka spout emitter.
@@ -90,6 +95,7 @@ public class KafkaTridentSpoutEmitter<K, V> implements Serializable
{
         this.topicAssigner = topicAssigner;
         this.pollTimeoutMs = kafkaSpoutConfig.getPollTimeoutMs();
         this.firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
+        this.startTimeStamp = kafkaSpoutConfig.getStartTimeStamp();
         LOG.debug("Created {}", this.toString());
     }
 
@@ -249,6 +255,9 @@ public class KafkaTridentSpoutEmitter<K, V> implements Serializable
{
             } else if (firstPollOffsetStrategy == LATEST && isFirstPollSinceTopologyWasDeployed)
{
                 LOG.debug("First poll for topic partition [{}], seeking to partition end",
tp);
                 consumer.seekToEnd(Collections.singleton(tp));
+            } else if (firstPollOffsetStrategy == TIMESTAMP && isFirstPollSinceTopologyWasDeployed)
{
+                LOG.debug("First poll for topic partition [{}], seeking to partition based
on startTimeStamp", tp);
+                seekOffsetByStartTimeStamp(tp);
             } else if (lastBatchMeta != null) {
                 LOG.debug("First poll for topic partition [{}], using last batch metadata",
tp);
                 consumer.seek(tp, lastBatchMeta.getLastOffset() + 1);  // seek next offset
after last offset from previous batch
@@ -258,6 +267,10 @@ public class KafkaTridentSpoutEmitter<K, V> implements Serializable
{
             } else if (firstPollOffsetStrategy == UNCOMMITTED_LATEST) {
                 LOG.debug("First poll for topic partition [{}] with no last batch metadata,
seeking to partition end", tp);
                 consumer.seekToEnd(Collections.singleton(tp));
+            } else if (firstPollOffsetStrategy == UNCOMMITTED_TIMESTAMP) {
+                LOG.debug("First poll for topic partition [{}] with no last batch metadata,
"
+                        + "seeking to partition based on startTimeStamp", tp);
+                seekOffsetByStartTimeStamp(tp);
             }
             tpToFirstSeekOffset.put(tp, consumer.position(tp));
         } else if (lastBatchMeta != null) {
@@ -279,6 +292,17 @@ public class KafkaTridentSpoutEmitter<K, V> implements Serializable
{
         return fetchOffset;
     }
 
+    /**
+     * Seek the consumer to offset corresponding to startTimeStamp.
+     */
+    private void seekOffsetByStartTimeStamp(TopicPartition tp) {
+        Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = consumer.offsetsForTimes(Collections.singletonMap(tp,
startTimeStamp));
+        OffsetAndTimestamp startOffsetAndTimeStamp = offsetsForTimes.get(tp);
+        long startTimeStampOffset = startOffsetAndTimeStamp.offset();
+        LOG.debug("First poll for topic partition [{}], seeking to partition from startTimeStamp
[{}]", tp, startTimeStamp);
+        consumer.seek(tp, startTimeStampOffset);
+    }
+
     private boolean isFirstPollSinceExecutorStarted(TopicPartition tp) {
         return !tpToFirstSeekOffset.containsKey(tp);
     }
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterEmitTest.java
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterEmitTest.java
index 3692087..c18289e 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterEmitTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterEmitTest.java
@@ -26,10 +26,16 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
@@ -50,6 +56,7 @@ import org.junit.jupiter.params.provider.EnumSource;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.junit.jupiter.MockitoExtension;
 
 @ExtendWith(MockitoExtension.class)
@@ -70,6 +77,7 @@ public class KafkaTridentSpoutEmitterEmitTest {
     private final long firstOffsetInKafka = 0;
     private final int recordsInKafka = 100;
     private final long lastOffsetInKafka = 99;
+    private final long startTimeStamp = 1557214606103L;
 
     @BeforeEach
     public void setUp() {
@@ -81,15 +89,20 @@ public class KafkaTridentSpoutEmitterEmitTest {
         records.forEach(record -> consumer.addRecord(record));
     }
 
-    private KafkaTridentSpoutEmitter<String, String> createEmitter(FirstPollOffsetStrategy
firstPollOffsetStrategy) {
+    private KafkaTridentSpoutEmitter<String, String> createEmitter(Consumer<String,String>
kafkaConsumer, FirstPollOffsetStrategy firstPollOffsetStrategy) {
         return new KafkaTridentSpoutEmitter<>(
-            SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
-                .setRecordTranslator(r -> new Values(r.offset()), new Fields("offset"))
-                .setFirstPollOffsetStrategy(firstPollOffsetStrategy)
-                .setPollTimeoutMs(1)
-                .build(),
-            topologyContextMock,
-            config -> consumer, new TopicAssigner());
+                SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
+                        .setRecordTranslator(r -> new Values(r.offset()), new Fields("offset"))
+                        .setFirstPollOffsetStrategy(firstPollOffsetStrategy)
+                        .setPollTimeoutMs(1)
+                        .setStartTimeStamp(startTimeStamp)
+                        .build(),
+                topologyContextMock,
+                config -> kafkaConsumer, new TopicAssigner());
+    }
+
+    private KafkaTridentSpoutEmitter<String, String> createEmitter(FirstPollOffsetStrategy
firstPollOffsetStrategy) {
+        return createEmitter(consumer,firstPollOffsetStrategy);
     }
 
     private Map<String, Object> doEmitNewBatchTest(FirstPollOffsetStrategy firstPollOffsetStrategy,
TridentCollector collectorMock, TopicPartition tp, Map<String, Object> previousBatchMeta)
{
@@ -126,7 +139,7 @@ public class KafkaTridentSpoutEmitterEmitTest {
     }
 
     @ParameterizedTest
-    @EnumSource(value = FirstPollOffsetStrategy.class, names = {"EARLIEST", "LATEST"})
+    @EnumSource(value = FirstPollOffsetStrategy.class, names = {"EARLIEST", "LATEST", "TIMESTAMP"})
     public void testEmitNewBatchWithPreviousMeta(FirstPollOffsetStrategy firstPollOffsetStrategy)
{
         //Check that non-null meta makes the spout seek according to the provided metadata,
and that the returned meta is correct
         long firstExpectedEmittedOffset = 50;
@@ -239,11 +252,11 @@ public class KafkaTridentSpoutEmitterEmitTest {
     }
 
     @ParameterizedTest
-    @EnumSource(value = FirstPollOffsetStrategy.class, names = {"EARLIEST", "LATEST"})
+    @EnumSource(value = FirstPollOffsetStrategy.class, names = {"EARLIEST", "LATEST", "TIMESTAMP"})
     public void testUnconditionalStrategyWhenSpoutWorkerIsRestarted(FirstPollOffsetStrategy
firstPollOffsetStrategy) {
         /**
-         * EARLIEST/LATEST should act like UNCOMMITTED_EARLIEST/LATEST if the emitter is
new but the topology has not restarted (storm id
-         * has not changed)
+         * EARLIEST/LATEST/TIMESTAMP should act like UNCOMMITTED_EARLIEST/LATEST/TIMESTAMP
if the emitter is new but the
+         * topology has not restarted (storm id has not changed)
          */
         long preRestartEmittedOffset = 20;
         int lastBatchEmittedRecords = 10;
@@ -303,4 +316,40 @@ public class KafkaTridentSpoutEmitterEmitTest {
         verify(collectorMock, never()).emit(anyList());
     }
 
+    @Test
+    public void testTimeStampStrategyWhenTopologyIsRedeployed() {
+        /**
+         * TIMESTAMP strategy should be applied if the emitter is new and the topology has
been redeployed (storm id has changed)
+         * Offset should be reset according to the offset corresponding to startTimeStamp
+         */
+        long preRestartEmittedOffset = 20;
+        int preRestartEmittedRecords = 10;
+        long timeStampStartOffset = 2L;
+        long pollTimeout = 1L;
+        KafkaTridentSpoutBatchMetadata preExecutorRestartLastMeta = new KafkaTridentSpoutBatchMetadata(preRestartEmittedOffset,
preRestartEmittedOffset + preRestartEmittedRecords - 1, "Some older topology");
+        
+        KafkaConsumer<String, String> kafkaConsumer = Mockito.mock(KafkaConsumer.class);
+        when(kafkaConsumer.assignment()).thenReturn(Collections.singleton(partition));
+        OffsetAndTimestamp offsetAndTimestamp = new OffsetAndTimestamp(timeStampStartOffset,
startTimeStamp);
+        HashMap<TopicPartition, OffsetAndTimestamp> map = new HashMap<>();
+        map.put(partition, offsetAndTimestamp);
+        when(kafkaConsumer.offsetsForTimes(Collections.singletonMap(partition, startTimeStamp))).thenReturn(map);
+        HashMap<TopicPartition, List<ConsumerRecord<String, String>>> topicPartitionMap
= new HashMap<>();
+        List<ConsumerRecord<String, String>> newRecords = SpoutWithMockedConsumerSetupHelper.createRecords(partition,
timeStampStartOffset, recordsInKafka);
+        topicPartitionMap.put(partition, newRecords);
+        when(kafkaConsumer.poll(pollTimeout)).thenReturn(new ConsumerRecords<>(topicPartitionMap));
+
+        KafkaTridentSpoutEmitter<String, String> emitter = createEmitter(kafkaConsumer,
FirstPollOffsetStrategy.TIMESTAMP);
+        TransactionAttempt txid = new TransactionAttempt(0L, 0);
+        KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(partition);
+        Map<String, Object> meta = emitter.emitPartitionBatchNew(txid, collectorMock,
kttp, preExecutorRestartLastMeta.toMap());
+
+        verify(collectorMock, times(recordsInKafka)).emit(emitCaptor.capture());
+        verify(kafkaConsumer, times(1)).seek(partition, timeStampStartOffset);
+        List<List<Object>> emits = emitCaptor.getAllValues();
+        assertThat(emits.get(0).get(0), is(timeStampStartOffset));
+        KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(meta);
+        assertThat("The batch should start at the first offset for startTimestamp", deserializedMeta.getFirstOffset(),
is(timeStampStartOffset));
+    }
+
 }


Mime
View raw message