storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/8] storm git commit: Revert "Merge branch 'STORM-2236-1.x' of https://github.com/MediaV/storm into STORM-2236-1.x-merge"
Date Wed, 01 Feb 2017 15:56:24 GMT
Repository: storm
Updated Branches:
  refs/heads/1.x-branch 83bf0ba8d -> fcdef97fc


Revert "Merge branch 'STORM-2236-1.x' of https://github.com/MediaV/storm into STORM-2236-1.x-merge"

This reverts commit 389966ea4969d76271da1ad80f9f46e1c87cade4, reversing
changes made to 397977797a6ef940403c0ff4c14b834a45b6ae05.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cbffc00d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cbffc00d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cbffc00d

Branch: refs/heads/1.x-branch
Commit: cbffc00d1e84c753f5e94135bceb3f4ecc4b6d7b
Parents: 2a01dbc
Author: Robert (Bobby) Evans <evans@yahoo-inc.com>
Authored: Mon Jan 23 15:11:22 2017 -0600
Committer: Robert (Bobby) Evans <evans@yahoo-inc.com>
Committed: Mon Jan 30 14:47:28 2017 -0600

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    |  95 +++++++++++++---
 .../storm/kafka/spout/KafkaSpoutConfig.java     |  34 ------
 .../kafka/spout/TopicPartitionComparator.java   |  34 ------
 .../storm/kafka/spout/internal/Timer.java       |  75 -------------
 .../fetcher/AutomaticKafkaRecordsFetcher.java   |  67 -----------
 .../internal/fetcher/KafkaRecordsFetcher.java   |  25 -----
 .../internal/fetcher/KafkaRecordsFetchers.java  |  54 ---------
 .../fetcher/ManualKafkaRecordsFetcher.java      | 110 -------------------
 .../partition/KafkaPartitionReader.java         |  28 -----
 .../partition/KafkaPartitionReaders.java        |  45 --------
 .../partition/NamedTopicPartitionReader.java    |  47 --------
 .../partition/WildcardTopicPartitionReader.java |  51 ---------
 12 files changed, 77 insertions(+), 588 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/cbffc00d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index c92b6ee..737c810 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -24,14 +24,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.InterruptException;
-import org.apache.kafka.common.errors.RetriableException;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
-import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
-import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
-import org.apache.storm.kafka.spout.internal.Timer;
-import org.apache.storm.kafka.spout.internal.fetcher.KafkaRecordsFetcher;
-import org.apache.storm.kafka.spout.internal.fetcher.KafkaRecordsFetchers;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -53,11 +46,16 @@ import java.util.NavigableSet;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
 
 import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
 import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
 import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
 import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
+
+import org.apache.kafka.common.errors.InterruptException;
 
 public class KafkaSpout<K, V> extends BaseRichSpout {
     private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
@@ -65,7 +63,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     // Storm
     protected SpoutOutputCollector collector;
-    private TopologyContext topologyContext;
 
     // Kafka
     private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
@@ -80,7 +77,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private transient KafkaSpoutRetryService retryService;              // Class that has
the logic to handle tuple failure
     private transient Timer commitTimer;                                // timer == null
for auto commit mode
     private transient boolean initialized;                              // Flag indicating
that the spout is still undergoing initialization process.
-    private transient KafkaRecordsFetcher<K, V> recordsFetcher;         // Class that
encapsulates the logic of managing partitions and fetching records
     // Initialization is only complete after the first call to  KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
 
     private KafkaSpoutStreams kafkaSpoutStreams;                        // Object that wraps
all the logic to declare output fields and emit tuples
@@ -106,9 +102,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     @Override
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
         initialized = false;
+
         // Spout internals
         this.collector = collector;
-        this.topologyContext = context;
         maxRetries = kafkaSpoutConfig.getMaxTupleRetries();
         numUncommittedOffsets = 0;
 
@@ -231,11 +227,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 }
 
                 if (poll()) {
-                    try {
-                        setWaitingToEmit(pollKafkaBroker());
-                    } catch (RetriableException e) {
-                        LOG.error("Failed to poll from kafka.", e);
-                    }
+                    setWaitingToEmit(pollKafkaBroker());
                 }
 
                 if (waitingToEmit()) {
@@ -291,7 +283,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private ConsumerRecords<K, V> pollKafkaBroker() {
         doSeekRetriableTopicPartitions();
 
-        final ConsumerRecords<K, V> consumerRecords = recordsFetcher.fetchRecords(kafkaSpoutConfig.getPollTimeoutMs());
+        final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
         final int numPolledRecords = consumerRecords.count();
         LOG.debug("Polled [{}] records from Kafka. [{}] uncommitted offsets across all topic
partitions", numPolledRecords, numUncommittedOffsets);
         return consumerRecords;
@@ -418,8 +410,19 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     private void subscribeKafkaConsumer() {
         kafkaConsumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig);
-        recordsFetcher = KafkaRecordsFetchers.create(kafkaSpoutConfig, kafkaConsumer, topologyContext,
-            new KafkaSpoutConsumerRebalanceListener());
+
+        if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) {
+            final List<String> topics = ((KafkaSpoutStreamsNamedTopics) kafkaSpoutStreams).getTopics();
+            kafkaConsumer.subscribe(topics, new KafkaSpoutConsumerRebalanceListener());
+            LOG.info("Kafka consumer subscribed topics {}", topics);
+        } else if (kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) {
+            final Pattern pattern = ((KafkaSpoutStreamsWildcardTopics) kafkaSpoutStreams).getTopicWildcardPattern();
+            kafkaConsumer.subscribe(pattern, new KafkaSpoutConsumerRebalanceListener());
+            LOG.info("Kafka consumer subscribed topics matching wildcard pattern [{}]", pattern);
+        }
+        // Initial poll to get the consumer registration process going.
+        // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition
registration
+        kafkaConsumer.poll(0);
     }
 
     @Override
@@ -615,4 +618,60 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                     '}';
         }
     }
+
+    // =========== Timer ===========
+
+    private class Timer {
+        private final long delay;
+        private final long period;
+        private final TimeUnit timeUnit;
+        private final long periodNanos;
+        private long start;
+
+        /**
+         * Creates a class that mimics a single threaded timer that expires periodically.
If a call to {@link
+         * #isExpiredResetOnTrue()} occurs later than {@code period} since the timer was
initiated or reset, this method returns
+         * true. Each time the method returns true the counter is reset. The timer starts
with the specified time delay.
+         *
+         * @param delay    the initial delay before the timer starts
+         * @param period   the period between calls {@link #isExpiredResetOnTrue()}
+         * @param timeUnit the time unit of delay and period
+         */
+        public Timer(long delay, long period, TimeUnit timeUnit) {
+            this.delay = delay;
+            this.period = period;
+            this.timeUnit = timeUnit;
+
+            periodNanos = timeUnit.toNanos(period);
+            start = System.nanoTime() + timeUnit.toNanos(delay);
+        }
+
+        public long period() {
+            return period;
+        }
+
+        public long delay() {
+            return delay;
+        }
+
+        public TimeUnit getTimeUnit() {
+            return timeUnit;
+        }
+
+        /**
+         * Checks if a call to this method occurs later than {@code period} since the timer
was initiated or reset. If that is the
+         * case the method returns true, otherwise it returns false. Each time this method
returns true, the counter is reset
+         * (re-initiated) and a new cycle will start.
+         *
+         * @return true if the time elapsed since the last call returning true is greater
than {@code period}. Returns false
+         * otherwise.
+         */
+        public boolean isExpiredResetOnTrue() {
+            final boolean expired = System.nanoTime() - start > periodNanos;
+            if (expired) {
+                start = System.nanoTime();
+            }
+            return expired;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/cbffc00d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 5daf13f..8aa525b 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -36,7 +36,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
     public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000;   // 30s
     public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;     // Retry forever
     public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000;    // 10,000,000
records => 80MBs of memory footprint in the worst case
-    public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000; // 2s
 
     // Kafka property names
     public interface Consumer {
@@ -77,8 +76,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
     private final long offsetCommitPeriodMs;
     private final int maxRetries;
     private final int maxUncommittedOffsets;
-    private final long partitionRefreshPeriodMs;
-    private final boolean manualPartitionAssignment;
     private final FirstPollOffsetStrategy firstPollOffsetStrategy;
     private final KafkaSpoutStreams kafkaSpoutStreams;
     private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
@@ -94,8 +91,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
         this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
         this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
-        this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
-        this.manualPartitionAssignment = builder.manualPartitionAssignment;
         this.tuplesBuilder = builder.tuplesBuilder;
         this.retryService = builder.retryService;
     }
@@ -118,8 +113,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         private FirstPollOffsetStrategy firstPollOffsetStrategy = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
         private final KafkaSpoutStreams kafkaSpoutStreams;
         private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS;
-        private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS;
-        private boolean manualPartitionAssignment = false;
         private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
         private final KafkaSpoutRetryService retryService;
 
@@ -236,25 +229,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
             return this;
         }
 
-        /**
-         * Sets partition refresh period in milliseconds in manual partition assignment model.
Default is 2s.
-         * @param partitionRefreshPeriodMs time in milliseconds
-         */
-        public Builder<K, V> setPartitionRefreshPeriodMs(long partitionRefreshPeriodMs)
{
-            this.partitionRefreshPeriodMs = partitionRefreshPeriodMs;
-            return this;
-        }
-
-        /**
-         * Defines whether the consumer manages partition manually.
-         * If set to true, the consumer manage partition manually, otherwise it will rely
on kafka to do partition assignment.
-         * @param manualPartitionAssignment True if using manual partition assignment.
-         */
-        public Builder<K, V> setManualPartitionAssignment(boolean manualPartitionAssignment)
{
-            this.manualPartitionAssignment = manualPartitionAssignment;
-            return this;
-        }
-
         public KafkaSpoutConfig<K,V> build() {
             return new KafkaSpoutConfig<>(this);
         }
@@ -333,14 +307,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         return retryService;
     }
 
-    public long getPartitionRefreshPeriodMs() {
-        return partitionRefreshPeriodMs;
-    }
-
-    public boolean isManualPartitionAssignment() {
-        return manualPartitionAssignment;
-    }
-
     @Override
     public String toString() {
         return "KafkaSpoutConfig{" +

http://git-wip-us.apache.org/repos/asf/storm/blob/cbffc00d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java
deleted file mode 100644
index b908001..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import org.apache.kafka.common.TopicPartition;
-
-import java.util.Comparator;
-
-public class TopicPartitionComparator implements Comparator<TopicPartition> {
-    @Override
-    public int compare(TopicPartition o1, TopicPartition o2) {
-        if (!o1.topic().equals(o2.topic())) {
-            return o1.topic().compareTo(o2.topic());
-        } else {
-            return o1.partition() - o2.partition();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/cbffc00d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
deleted file mode 100644
index 45ae330..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout.internal;
-
-import java.util.concurrent.TimeUnit;
-
-public class Timer {
-    private final long delay;
-    private final long period;
-    private final TimeUnit timeUnit;
-    private final long periodNanos;
-    private long start;
-
-    /**
-     * Creates a class that mimics a single threaded timer that expires periodically. If
a call to {@link
-     * #isExpiredResetOnTrue()} occurs later than {@code period} since the timer was initiated
or reset, this method returns
-     * true. Each time the method returns true the counter is reset. The timer starts with
the specified time delay.
-     *
-     * @param delay    the initial delay before the timer starts
-     * @param period   the period between calls {@link #isExpiredResetOnTrue()}
-     * @param timeUnit the time unit of delay and period
-     */
-    public Timer(long delay, long period, TimeUnit timeUnit) {
-        this.delay = delay;
-        this.period = period;
-        this.timeUnit = timeUnit;
-
-        periodNanos = timeUnit.toNanos(period);
-        start = System.nanoTime() + timeUnit.toNanos(delay);
-    }
-
-    public long period() {
-        return period;
-    }
-
-    public long delay() {
-        return delay;
-    }
-
-    public TimeUnit getTimeUnit() {
-        return timeUnit;
-    }
-
-    /**
-     * Checks if a call to this method occurs later than {@code period} since the timer was
initiated or reset. If that is the
-     * case the method returns true, otherwise it returns false. Each time this method returns
true, the counter is reset
-     * (re-initiated) and a new cycle will start.
-     *
-     * @return true if the time elapsed since the last call returning true is greater than
{@code period}. Returns false
-     * otherwise.
-     */
-    public boolean isExpiredResetOnTrue() {
-        final boolean expired = System.nanoTime() - start > periodNanos;
-        if (expired) {
-            start = System.nanoTime();
-        }
-        return expired;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/cbffc00d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/AutomaticKafkaRecordsFetcher.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/AutomaticKafkaRecordsFetcher.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/AutomaticKafkaRecordsFetcher.java
deleted file mode 100644
index 8ba7098..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/AutomaticKafkaRecordsFetcher.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout.internal.fetcher;
-
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.storm.kafka.spout.KafkaSpoutStreams;
-import org.apache.storm.kafka.spout.KafkaSpoutStreamsNamedTopics;
-import org.apache.storm.kafka.spout.KafkaSpoutStreamsWildcardTopics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.regex.Pattern;
-
-public class AutomaticKafkaRecordsFetcher<K, V> implements KafkaRecordsFetcher<K,
V> {
-    private static final Logger LOG = LoggerFactory.getLogger(AutomaticKafkaRecordsFetcher.class);
-
-    private final KafkaConsumer<K, V> kafkaConsumer;
-    private final ConsumerRebalanceListener consumerRebalanceListener;
-
-    public AutomaticKafkaRecordsFetcher(KafkaConsumer<K, V> kafkaConsumer,
-                                        ConsumerRebalanceListener consumerRebalanceListener,
-                                        KafkaSpoutStreams kafkaSpoutStreams) {
-        this.kafkaConsumer = kafkaConsumer;
-        this.consumerRebalanceListener = consumerRebalanceListener;
-
-        subscribe(kafkaSpoutStreams);
-    }
-
-    private void subscribe(KafkaSpoutStreams kafkaSpoutStreams) {
-        if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) {
-            final List<String> topics = ((KafkaSpoutStreamsNamedTopics) kafkaSpoutStreams).getTopics();
-            kafkaConsumer.subscribe(topics, consumerRebalanceListener);
-            LOG.info("Kafka consumer subscribed topics {}", topics);
-        } else if (kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) {
-            final Pattern pattern = ((KafkaSpoutStreamsWildcardTopics) kafkaSpoutStreams).getTopicWildcardPattern();
-            kafkaConsumer.subscribe(pattern, consumerRebalanceListener);
-            LOG.info("Kafka consumer subscribed topics matching wildcard pattern [{}]", pattern);
-        }
-        // Initial poll to get the consumer registration process going.
-        // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition
registration
-        kafkaConsumer.poll(0);
-    }
-
-    @Override
-    public ConsumerRecords<K, V> fetchRecords(long fetchTimeoutMs) {
-        return kafkaConsumer.poll(fetchTimeoutMs);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/cbffc00d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/KafkaRecordsFetcher.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/KafkaRecordsFetcher.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/KafkaRecordsFetcher.java
deleted file mode 100644
index 47a61c1..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/KafkaRecordsFetcher.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout.internal.fetcher;
-
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-
-public interface KafkaRecordsFetcher<K, V> {
-    ConsumerRecords<K, V> fetchRecords(long fetchTimeoutMs);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/cbffc00d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/KafkaRecordsFetchers.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/KafkaRecordsFetchers.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/KafkaRecordsFetchers.java
deleted file mode 100644
index e3b7cd6..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/KafkaRecordsFetchers.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout.internal.fetcher;
-
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig;
-import org.apache.storm.kafka.spout.internal.Timer;
-import org.apache.storm.kafka.spout.internal.partition.KafkaPartitionReader;
-import org.apache.storm.kafka.spout.internal.partition.KafkaPartitionReaders;
-import org.apache.storm.task.TopologyContext;
-
-import java.util.concurrent.TimeUnit;
-
-public final class KafkaRecordsFetchers {
-    public static <K, V> KafkaRecordsFetcher<K, V> create(KafkaSpoutConfig kafkaSpoutConfig,
-                                                          KafkaConsumer<K, V> consumer,
-                                                          TopologyContext context,
-                                                          ConsumerRebalanceListener rebalanceListener)
{
-        if (kafkaSpoutConfig.isManualPartitionAssignment()) {
-            int thisTaskIndex = context.getThisTaskIndex();
-            int totalTaskCount = context.getComponentTasks(context.getThisComponentId()).size();
-            KafkaPartitionReader partitionReader = KafkaPartitionReaders.create(
-                kafkaSpoutConfig.getKafkaSpoutStreams());
-            Timer partitionRefreshTimer = new Timer(500,
-                kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
-
-            ManualKafkaRecordsFetcher.PartitionAssignmentChangeListener partitionAssignmentChangeListener
=
-                ManualKafkaRecordsFetcher.listenerOf(rebalanceListener);
-
-            return new ManualKafkaRecordsFetcher<>(consumer, thisTaskIndex, totalTaskCount,
partitionReader,
-                partitionRefreshTimer, partitionAssignmentChangeListener);
-        } else {
-            return new AutomaticKafkaRecordsFetcher<>(consumer, rebalanceListener,
-                kafkaSpoutConfig.getKafkaSpoutStreams());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/cbffc00d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/ManualKafkaRecordsFetcher.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/ManualKafkaRecordsFetcher.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/ManualKafkaRecordsFetcher.java
deleted file mode 100644
index 124afc4..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/fetcher/ManualKafkaRecordsFetcher.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout.internal.fetcher;
-
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.kafka.spout.internal.Timer;
-import org.apache.storm.kafka.spout.internal.partition.KafkaPartitionReader;
-import org.apache.storm.kafka.spout.TopicPartitionComparator;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-
-public class ManualKafkaRecordsFetcher<K, V> implements KafkaRecordsFetcher<K, V>
{
-    private static final Comparator<TopicPartition> KAFKA_TOPIC_PARTITION_COMPARATOR
= new TopicPartitionComparator();
-
-    private final KafkaConsumer<K, V> consumer;
-    private final int thisTaskIndex;
-    private final int totalTaskCount;
-    private final KafkaPartitionReader partitionReader;
-    private final Timer partitionRefreshTimer;
-    private final PartitionAssignmentChangeListener partitionAssignmentChangeListener;
-    private Set<TopicPartition> myPartitions = Collections.emptySet();
-
-    public ManualKafkaRecordsFetcher(KafkaConsumer<K, V> consumer,
-                                     int thisTaskIndex,
-                                     int totalTaskCount,
-                                     KafkaPartitionReader partitionReader,
-                                     Timer partitionRefreshTimer,
-                                     PartitionAssignmentChangeListener partitionAssignmentChangeListener)
{
-        this.consumer = consumer;
-        this.thisTaskIndex = thisTaskIndex;
-        this.totalTaskCount = totalTaskCount;
-        this.partitionReader = partitionReader;
-        this.partitionRefreshTimer = partitionRefreshTimer;
-        this.partitionAssignmentChangeListener = partitionAssignmentChangeListener;
-
-        doRefreshMyPartitions();
-    }
-
-    private void refreshMyPartitionsIfNeeded() {
-        if (!partitionRefreshTimer.isExpiredResetOnTrue()) {
-            return;
-        }
-
-        doRefreshMyPartitions();
-    }
-
-    private void doRefreshMyPartitions() {
-        List<TopicPartition> topicPartitions = partitionReader.readPartitions(consumer);
-        Collections.sort(topicPartitions, KAFKA_TOPIC_PARTITION_COMPARATOR);
-
-        Set<TopicPartition> curPartitions = new HashSet<>(topicPartitions.size()/totalTaskCount+1);
-        for (int i=thisTaskIndex; i<topicPartitions.size(); i+=totalTaskCount) {
-            curPartitions.add(topicPartitions.get(i));
-        }
-
-        if (!myPartitions.equals(curPartitions) && myPartitions!=null) {
-            partitionAssignmentChangeListener.onPartitionAssignmentChange(myPartitions, curPartitions);
-        }
-
-        myPartitions = curPartitions;
-
-        consumer.assign(myPartitions);
-    }
-
-    @Override
-    public ConsumerRecords<K, V> fetchRecords(long fetchTimeoutMs) {
-        refreshMyPartitionsIfNeeded();
-
-        return consumer.poll(fetchTimeoutMs);
-    }
-
-    public interface PartitionAssignmentChangeListener {
-        void onPartitionAssignmentChange(Set<TopicPartition> oldPartitions, Set<TopicPartition>
newPartitions);
-    }
-
-    public static PartitionAssignmentChangeListener listenerOf(final ConsumerRebalanceListener
consumerRebalanceListener) {
-        return new PartitionAssignmentChangeListener() {
-            @Override
-            public void onPartitionAssignmentChange(Set<TopicPartition> oldPartitions,
Set<TopicPartition> newPartitions) {
-                consumerRebalanceListener.onPartitionsRevoked(oldPartitions);
-                consumerRebalanceListener.onPartitionsAssigned(newPartitions);
-            }
-        };
-    }
-}
-

http://git-wip-us.apache.org/repos/asf/storm/blob/cbffc00d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/KafkaPartitionReader.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/KafkaPartitionReader.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/KafkaPartitionReader.java
deleted file mode 100644
index e3480ac..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/KafkaPartitionReader.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout.internal.partition;
-
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-
-import java.util.List;
-
-public interface KafkaPartitionReader {
-    List<TopicPartition> readPartitions(KafkaConsumer<?, ?> consumer);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/cbffc00d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/KafkaPartitionReaders.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/KafkaPartitionReaders.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/KafkaPartitionReaders.java
deleted file mode 100644
index 4e51c1e..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/KafkaPartitionReaders.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout.internal.partition;
-
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.kafka.spout.KafkaSpoutStreams;
-import org.apache.storm.kafka.spout.KafkaSpoutStreamsNamedTopics;
-import org.apache.storm.kafka.spout.KafkaSpoutStreamsWildcardTopics;
-
-import java.util.HashSet;
-
-public final class KafkaPartitionReaders {
-    public static KafkaPartitionReader create(KafkaSpoutStreams kafkaSpoutStreams) {
-        if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) {
-            return new NamedTopicPartitionReader(new HashSet<>(
-                KafkaSpoutStreamsNamedTopics.class.cast(kafkaSpoutStreams).getTopics()));
-        } else if (kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) {
-            return new WildcardTopicPartitionReader(
-                KafkaSpoutStreamsWildcardTopics.class.cast(kafkaSpoutStreams).getTopicWildcardPattern());
-        } else {
-            throw new IllegalArgumentException("Unrecognized kafka spout stream: " + kafkaSpoutStreams.getClass());
-        }
-    }
-
-    public static TopicPartition toTopicPartition(PartitionInfo partitionInfo) {
-        return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/cbffc00d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/NamedTopicPartitionReader.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/NamedTopicPartitionReader.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/NamedTopicPartitionReader.java
deleted file mode 100644
index 41db169..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/NamedTopicPartitionReader.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout.internal.partition;
-
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-public class NamedTopicPartitionReader implements KafkaPartitionReader {
-    private final Set<String > topics;
-
-    public NamedTopicPartitionReader(Set<String> topics) {
-        this.topics = topics;
-    }
-
-    @Override
-    public List<TopicPartition> readPartitions(KafkaConsumer<?, ?> consumer)
{
-        List<TopicPartition> topicPartitions = new ArrayList<>();
-        for (String topic : topics) {
-            for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) {
-                topicPartitions.add(KafkaPartitionReaders.toTopicPartition(partitionInfo));
-            }
-        }
-
-        return topicPartitions;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/cbffc00d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/WildcardTopicPartitionReader.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/WildcardTopicPartitionReader.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/WildcardTopicPartitionReader.java
deleted file mode 100644
index fcac1c1..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/partition/WildcardTopicPartitionReader.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout.internal.partition;
-
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Pattern;
-
-public class WildcardTopicPartitionReader implements KafkaPartitionReader {
-    private final Pattern wildcardTopicPattern;
-
-    public WildcardTopicPartitionReader(Pattern wildcardTopicPattern) {
-        this.wildcardTopicPattern = wildcardTopicPattern;
-    }
-
-    @Override
-    public List<TopicPartition> readPartitions(KafkaConsumer<?, ?> consumer)
{
-        List<TopicPartition> topicPartitions = new ArrayList<>();
-
-        for(Map.Entry<String, List<PartitionInfo>> entry: consumer.listTopics().entrySet())
{
-            if (wildcardTopicPattern.matcher(entry.getKey()).matches()) {
-                for (PartitionInfo partitionInfo: entry.getValue()) {
-                    topicPartitions.add(KafkaPartitionReaders.toTopicPartition(partitionInfo));
-                }
-            }
-        }
-
-        return topicPartitions;
-    }
-}


Mime
View raw message