storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s...@apache.org
Subject [1/3] storm git commit: STORM-3290: Split configuration for storm-kafka-client Trident and non-Trident spout into two classes
Date Thu, 29 Nov 2018 18:19:10 GMT
Repository: storm
Updated Branches:
  refs/heads/master 52cb6b20a -> 6d871a73e


http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java
index 1ccc9a7..059c8c7 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java
@@ -45,14 +45,14 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
+import org.apache.storm.kafka.spout.FirstPollOffsetStrategy;
 import org.apache.storm.kafka.spout.SpoutWithMockedConsumerSetupHelper;
 import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.kafka.spout.internal.ConsumerFactory;
 import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
 import org.apache.storm.kafka.spout.subscription.TopicAssigner;
 import org.apache.storm.kafka.spout.subscription.TopicFilter;
+import org.apache.storm.kafka.spout.trident.config.builder.SingleTopicKafkaTridentSpoutConfiguration;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.topology.TransactionAttempt;
@@ -91,7 +91,7 @@ public class KafkaTridentSpoutEmitterTest {
     @Test
     public void testGetOrderedPartitionsIsConsistent() {
         KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
-            SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
+            SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
                 .build(),
             topologyContextMock,
             config -> consumer, new TopicAssigner());
@@ -129,7 +129,7 @@ public class KafkaTridentSpoutEmitterTest {
             });
         
         KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
-            SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(mock(TopicFilter.class),
partitionerMock, -1)
+            SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(mock(TopicFilter.class),
partitionerMock, -1)
                 .build(),
             topologyContextMock,
             config -> consumer, new TopicAssigner());
@@ -154,7 +154,7 @@ public class KafkaTridentSpoutEmitterTest {
         TopicAssigner assignerMock = mock(TopicAssigner.class);
         
         KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
-            SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
+            SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
                 .build(),
             topologyContextMock,
             config -> consumer, assignerMock);
@@ -179,7 +179,7 @@ public class KafkaTridentSpoutEmitterTest {
         List<ConsumerRecord<String, String>> records = SpoutWithMockedConsumerSetupHelper.createRecords(tp,
firstOffset, numRecords);
         records.forEach(record -> consumer.addRecord(record));
         return new KafkaTridentSpoutEmitter<>(
-            SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
+            SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
                 .setRecordTranslator(r -> new Values(r.offset()), new Fields("offset"))
                 .setFirstPollOffsetStrategy(firstPollOffsetStrategy)
                 .build(),
@@ -242,8 +242,8 @@ public class KafkaTridentSpoutEmitterTest {
         when(consumerMock.assignment()).thenReturn(Collections.singleton(tp));
         ConsumerFactory<String, String> consumerFactory = spoutConfig -> consumerMock;
         KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
-            SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
-                .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
+            SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
+                .setFirstPollOffsetStrategy(FirstPollOffsetStrategy.LATEST)
                 .build(),
             mock(TopologyContext.class),
             consumerFactory, new TopicAssigner());

http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinatorTest.java
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinatorTest.java
index 6f46a36..ab8e3c2 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinatorTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinatorTest.java
@@ -33,10 +33,9 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig;
-import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
 import org.apache.storm.kafka.spout.subscription.TopicFilter;
+import org.apache.storm.kafka.spout.trident.config.builder.SingleTopicKafkaTridentSpoutConfiguration;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Time.SimulatedTime;
 import org.junit.Test;
@@ -52,8 +51,8 @@ public class KafkaTridentSpoutOpaqueCoordinatorTest {
         TopicFilter mockFilter = mock(TopicFilter.class);
         when(mockFilter.getAllSubscribedPartitions(any())).thenReturn(Collections.singleton(expectedPartition));
 
-        KafkaSpoutConfig<String, String> spoutConfig = 
-            SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(mockFilter,
mock(ManualPartitioner.class), -1)
+        KafkaTridentSpoutConfig<String, String> spoutConfig = 
+            SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(mockFilter,
mock(ManualPartitioner.class), -1)
                 .build();
         KafkaTridentSpoutCoordinator<String, String> coordinator = new KafkaTridentSpoutCoordinator<>(spoutConfig,
ignored -> mockConsumer);
 
@@ -80,8 +79,8 @@ public class KafkaTridentSpoutOpaqueCoordinatorTest {
                 .thenReturn(Collections.singleton(expectedPartition))
                 .thenReturn(allPartitions);
 
-            KafkaSpoutConfig<String, String> spoutConfig = 
-                SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(mockFilter,
mock(ManualPartitioner.class), -1)
+            KafkaTridentSpoutConfig<String, String> spoutConfig = 
+                SingleTopicKafkaTridentSpoutConfiguration.createKafkaSpoutConfigBuilder(mockFilter,
mock(ManualPartitioner.class), -1)
                     .build();
             KafkaTridentSpoutCoordinator<String, String> coordinator = new KafkaTridentSpoutCoordinator<>(spoutConfig,
ignored -> mockConsumer);
 

http://git-wip-us.apache.org/repos/asf/storm/blob/63de17ed/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/config/builder/SingleTopicKafkaTridentSpoutConfiguration.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/config/builder/SingleTopicKafkaTridentSpoutConfiguration.java
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/config/builder/SingleTopicKafkaTridentSpoutConfiguration.java
new file mode 100644
index 0000000..5adeb1e
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/config/builder/SingleTopicKafkaTridentSpoutConfiguration.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident.config.builder;
+
+import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.EARLIEST;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
+import org.apache.storm.kafka.spout.subscription.TopicFilter;
+import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutConfig;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+public class SingleTopicKafkaTridentSpoutConfiguration {
+
+    public static final String TOPIC = "test";
+
+    public static KafkaTridentSpoutConfig.Builder<String, String> createKafkaSpoutConfigBuilder(int
port) {
+        return setCommonSpoutConfig(KafkaTridentSpoutConfig.builder("127.0.0.1:" + port,
TOPIC));
+    }
+    
+    public static KafkaTridentSpoutConfig.Builder<String, String> createKafkaSpoutConfigBuilder(TopicFilter
topicFilter, ManualPartitioner topicPartitioner, int port) {
+        return setCommonSpoutConfig(new KafkaTridentSpoutConfig.Builder<>("127.0.0.1:"
+ port, topicFilter, topicPartitioner));
+    }
+
+    public static KafkaTridentSpoutConfig.Builder<String, String> setCommonSpoutConfig(KafkaTridentSpoutConfig.Builder<String,
String> config) {
+        return config.setRecordTranslator((r) -> new Values(r.topic(), r.key(), r.value()),
+            new Fields("topic", "key", "value"))
+            .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5)
+            .setFirstPollOffsetStrategy(EARLIEST)
+            .setPollTimeoutMs(1000);
+    }
+}


Mime
View raw message