storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/3] storm git commit: STORM-2281: Running Multiple Kafka Spouts (Trident) Throws Illegal State Exception
Date Wed, 08 Feb 2017 02:11:36 GMT
Repository: storm
Updated Branches:
  refs/heads/master d3250b22f -> 3e232e2fc


STORM-2281: Running Multiple Kafka Spouts (Trident) Throws Illegal State Exception

   - Assign topic partitions to tasks running the instance of Kafka consumer that has assigned
the same list of topic partitions
   - Improve logging
   - Minor code refactoring


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/07cf86e9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/07cf86e9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/07cf86e9

Branch: refs/heads/master
Commit: 07cf86e95a66e47cac41f67f5b269ab24758d8cb
Parents: d3250b2
Author: Hugo Louro <hmclouro@gmail.com>
Authored: Mon Jan 30 00:02:42 2017 -0800
Committer: Hugo Louro <hmclouro@gmail.com>
Committed: Tue Feb 7 15:43:45 2017 -0800

----------------------------------------------------------------------
 .../TridentKafkaClientWordCountNamedTopics.java |  44 ++--
 .../storm/kafka/trident/LocalSubmitter.java     |   7 +-
 .../trident/TridentKafkaConsumerTopology.java   |  20 +-
 .../trident/KafkaTridentSpoutBatchMetadata.java |   8 +-
 .../spout/trident/KafkaTridentSpoutEmitter.java | 206 +++++++++++++------
 .../spout/trident/KafkaTridentSpoutManager.java |  85 ++++----
 .../spout/trident/KafkaTridentSpoutOpaque.java  |  17 +-
 .../KafkaTridentSpoutOpaqueCoordinator.java     |   4 +-
 .../KafkaTridentSpoutTopicPartition.java        |   2 +-
 ...KafkaTridentSpoutTopicPartitionRegistry.java |   4 +-
 10 files changed, 237 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/07cf86e9/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
index 2d08f6c..edd1f09 100644
--- a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
+++ b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
@@ -18,16 +18,13 @@
 
 package org.apache.storm.kafka.trident;
 
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
-
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.AlreadyAliveException;
 import org.apache.storm.generated.AuthorizationException;
 import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.kafka.spout.Func;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
@@ -36,6 +33,13 @@ import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+
 public class TridentKafkaClientWordCountNamedTopics {
     private static final String TOPIC_1 = "test-trident";
     private static final String TOPIC_2 = "test-trident-1";
@@ -45,11 +49,23 @@ public class TridentKafkaClientWordCountNamedTopics {
         return new KafkaTridentSpoutOpaque<>(newKafkaSpoutConfig());
     }
 
+    private static Func<ConsumerRecord<String, String>, List<Object>> JUST_VALUE_FUNC
= new JustValueFunc();
+
+    /**
+     * Needs to be serializable
+     */
+    private static class JustValueFunc implements Func<ConsumerRecord<String, String>,
List<Object>>, Serializable {
+        @Override
+        public List<Object> apply(ConsumerRecord<String, String> record) {
+            return new Values(record.value());
+        }
+    }
+
     protected KafkaSpoutConfig<String,String> newKafkaSpoutConfig() {
         return KafkaSpoutConfig.builder(KAFKA_LOCAL_BROKER, TOPIC_1, TOPIC_2)
-                .setGroupId("kafkaSpoutTestGroup")
+                .setGroupId("kafkaSpoutTestGroup_" + System.nanoTime())
                 .setMaxPartitionFectchBytes(200)
-                .setRecordTranslator((r) -> new Values(r.value()), new Fields("str"))
+                .setRecordTranslator(JUST_VALUE_FUNC, new Fields("str"))
                 .setRetry(newRetryService())
                 .setOffsetCommitPeriodMs(10_000)
                 .setFirstPollOffsetStrategy(EARLIEST)
@@ -77,7 +93,7 @@ public class TridentKafkaClientWordCountNamedTopics {
 
             System.out.printf("Running with broker_url: [%s], topics: [%s, %s]\n", brokerUrl,
topic1, topic2);
 
-            Config tpConf = LocalSubmitter.defaultConfig();
+            Config tpConf = LocalSubmitter.defaultConfig(true);
 
             if (args.length == 4) { //Submit Remote
                 // Producers
@@ -102,11 +118,15 @@ public class TridentKafkaClientWordCountNamedTopics {
                     localSubmitter.submit(topic1Tp, tpConf, KafkaProducerTopology.newTopology(brokerUrl,
topic1));
                     localSubmitter.submit(topic2Tp, tpConf, KafkaProducerTopology.newTopology(brokerUrl,
topic2));
                     // Consumer
-                    localSubmitter.submit(consTpName, tpConf, TridentKafkaConsumerTopology.newTopology(
-                            localSubmitter.getDrpc(), newKafkaTridentSpoutOpaque()));
+                    try {
+                        localSubmitter.submit(consTpName, tpConf, TridentKafkaConsumerTopology.newTopology(
+                                localSubmitter.getDrpc(), newKafkaTridentSpoutOpaque()));
+                        // print
+                        localSubmitter.printResults(15, 1, TimeUnit.SECONDS);
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    }
 
-                    // print
-                    new DrpcResultsPrinter(localSubmitter.getDrpc()).printResults(60, 1,
TimeUnit.SECONDS);
                 } finally {
                     // kill
                     localSubmitter.kill(topic1Tp);

http://git-wip-us.apache.org/repos/asf/storm/blob/07cf86e9/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/LocalSubmitter.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/LocalSubmitter.java
b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/LocalSubmitter.java
index 54ec99c..9666695 100644
--- a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/LocalSubmitter.java
+++ b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/LocalSubmitter.java
@@ -47,10 +47,13 @@ public class LocalSubmitter {
     }
 
     public static Config defaultConfig() {
+        return defaultConfig(false);
+    }
+
+    public static Config defaultConfig(boolean debug) {
         final Config conf = new Config();
         conf.setMaxSpoutPending(20);
-        conf.setMaxTaskParallelism(1);
-        conf.setNumWorkers(1);
+        conf.setDebug(debug);
         return conf;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/07cf86e9/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
index 4669f52..a39eba1 100644
--- a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
+++ b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
@@ -18,9 +18,7 @@
 
 package org.apache.storm.kafka.trident;
 
-import org.apache.storm.Config;
 import org.apache.storm.LocalDRPC;
-import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.starter.trident.DebugMemoryMapState;
 import org.apache.storm.trident.Stream;
@@ -42,14 +40,6 @@ import org.slf4j.LoggerFactory;
 public class TridentKafkaConsumerTopology {
     protected static final Logger LOG = LoggerFactory.getLogger(TridentKafkaConsumerTopology.class);
 
-    public static void submitRemote(String name, ITridentDataSource tridentSpout) {
-        try {
-            StormSubmitter.submitTopology(name, newTpConfig(), newTopology(null, tridentSpout));
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
     /**
      * See {@link TridentKafkaConsumerTopology#newTopology(LocalDRPC, ITridentDataSource)}
      */
@@ -85,19 +75,11 @@ public class TridentKafkaConsumerTopology {
     }
 
     private static TridentState addTridentState(TridentTopology tridentTopology, ITridentDataSource
tridentSpout) {
-        final Stream spoutStream = tridentTopology.newStream("spout1", tridentSpout).parallelismHint(1);
+        final Stream spoutStream = tridentTopology.newStream("spout1", tridentSpout).parallelismHint(2);
 
         return spoutStream.each(spoutStream.getOutputFields(), new Debug(true))
                 .each(new Fields("str"), new Split(), new Fields("word"))
                 .groupBy(new Fields("word"))
                 .persistentAggregate(new DebugMemoryMapState.Factory(), new Count(), new
Fields("count"));
     }
-
-    private static Config newTpConfig() {
-        Config conf = new Config();
-        conf.setMaxSpoutPending(20);
-        conf.setMaxTaskParallelism(1);
-        return conf;
-    }
-    
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/07cf86e9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
index 2faf52a..18a2246 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
@@ -34,8 +34,8 @@ public class KafkaTridentSpoutBatchMetadata<K,V> implements Serializable
{
     private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutBatchMetadata.class);
 
     private TopicPartition topicPartition;  // topic partition of this batch
-    private long firstOffset;   // first offset of this batch
-    private long lastOffset;    // last offset of this batch
+    private long firstOffset;               // first offset of this batch
+    private long lastOffset;                // last offset of this batch
 
     public KafkaTridentSpoutBatchMetadata(TopicPartition topicPartition, long firstOffset,
long lastOffset) {
         this.topicPartition = topicPartition;
@@ -74,8 +74,8 @@ public class KafkaTridentSpoutBatchMetadata<K,V> implements Serializable
{
 
     @Override
     public String toString() {
-        return "KafkaTridentSpoutBatchMetadata{" +
-                "topicPartition=" + topicPartition +
+        return super.toString() +
+                "{topicPartition=" + topicPartition +
                 ", firstOffset=" + firstOffset +
                 ", lastOffset=" + lastOffset +
                 '}';

http://git-wip-us.apache.org/repos/asf/storm/blob/07cf86e9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
index 19b4f01..79dfc60 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
@@ -18,20 +18,6 @@
 
 package org.apache.storm.kafka.spout.trident;
 
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -47,9 +33,28 @@ import org.apache.storm.trident.topology.TransactionAttempt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class KafkaTridentSpoutEmitter<K,V> implements IOpaquePartitionedTridentSpout.Emitter<List<TopicPartition>,
KafkaTridentSpoutTopicPartition, KafkaTridentSpoutBatchMetadata<K,V>>, Serializable
{
-    private static final long serialVersionUID = -7343927794834130435L;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
 
+public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTridentSpout.Emitter<
+        List<TopicPartition>,
+        KafkaTridentSpoutTopicPartition,
+        KafkaTridentSpoutBatchMetadata<K, V>>,
+        Serializable {
+
+    private static final long serialVersionUID = -7343927794834130435L;
     private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class);
 
     // Kafka
@@ -57,60 +62,81 @@ public class KafkaTridentSpoutEmitter<K,V> implements IOpaquePartitionedTridentS
 
     // Bookkeeping
     private final KafkaTridentSpoutManager<K, V> kafkaManager;
+
     // Declare some KafkaTridentSpoutManager references for convenience
     private final long pollTimeoutMs;
     private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy;
     private final RecordTranslator<K, V> translator;
     private final Timer refreshSubscriptionTimer;
 
-    public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K,V> kafkaManager, TopologyContext
context) {
-        this.kafkaManager = kafkaManager;
-        this.kafkaManager.subscribeKafkaConsumer(context);
-        refreshSubscriptionTimer = new Timer(500, kafkaManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(),
TimeUnit.MILLISECONDS);
+    private TopologyContext topologyContext;
 
-        //must subscribeKafkaConsumer before this line
-        kafkaConsumer = kafkaManager.getKafkaConsumer();
-        translator = kafkaManager.getKafkaSpoutConfig().getTranslator();
+    public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K,V> kafkaManager, TopologyContext
topologyContext, Timer refreshSubscriptionTimer) {
+        this.kafkaConsumer = kafkaManager.createAndSubscribeKafkaConsumer(topologyContext);
+        this.kafkaManager = kafkaManager;
+        this.topologyContext = topologyContext;
+        this.refreshSubscriptionTimer = refreshSubscriptionTimer;
+        this.translator = kafkaManager.getKafkaSpoutConfig().getTranslator();
 
         final KafkaSpoutConfig<K, V> kafkaSpoutConfig = kafkaManager.getKafkaSpoutConfig();
-        pollTimeoutMs = kafkaSpoutConfig.getPollTimeoutMs();
-        firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
+        this.pollTimeoutMs = kafkaSpoutConfig.getPollTimeoutMs();
+        this.firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
         LOG.debug("Created {}", this);
     }
 
+    /**
+     * Creates instance of this class with default 500 millisecond refresh subscription timer
+     */
+    public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K,V> kafkaManager, TopologyContext
topologyContext) {
+        this(kafkaManager, topologyContext, new Timer(500,
+                kafkaManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS));
+    }
+
     @Override
     public KafkaTridentSpoutBatchMetadata<K, V> emitPartitionBatch(TransactionAttempt
tx, TridentCollector collector,
-            KafkaTridentSpoutTopicPartition partitionTs, KafkaTridentSpoutBatchMetadata<K,
V> lastBatch) {
-        LOG.debug("Emitting batch: [transaction = {}], [partition = {}], [collector = {}],
[lastBatchMetadata = {}]",
-                tx, partitionTs, collector, lastBatch);
+            KafkaTridentSpoutTopicPartition currBatchPartition, KafkaTridentSpoutBatchMetadata<K,
V> lastBatch) {
 
-        final TopicPartition topicPartition = partitionTs.getTopicPartition();
+        LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata
= {}], [collector = {}]",
+                tx, currBatchPartition, lastBatch, collector);
+
+        final TopicPartition currBatchTp = currBatchPartition.getTopicPartition();
+        final Set<TopicPartition> assignments = kafkaConsumer.assignment();
         KafkaTridentSpoutBatchMetadata<K, V> currentBatch = lastBatch;
         Collection<TopicPartition> pausedTopicPartitions = Collections.emptySet();
 
-        try {
-            // pause other topic partitions to only poll from current topic partition
-            pausedTopicPartitions = pauseTopicPartitions(topicPartition);
+        if (assignments == null || !assignments.contains(currBatchPartition.getTopicPartition()))
{
+            LOG.warn("SKIPPING processing batch: [transaction = {}], [currBatchPartition
= {}], [lastBatchMetadata = {}], " +
+                            "[collector = {}] because it is not assigned {} to consumer instance
[{}] of consumer group [{}]",
+                    tx, currBatchPartition, lastBatch, collector, assignments, kafkaConsumer,
+                    kafkaManager.getKafkaSpoutConfig().getConsumerGroupId());
+        } else {
+            try {
+                // pause other topic-partitions to only poll from current topic-partition
+                pausedTopicPartitions = pauseTopicPartitions(currBatchTp);
 
-            seek(topicPartition, lastBatch);
+                seek(currBatchTp, lastBatch);
 
-            // poll
-            if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
-                kafkaManager.getKafkaSpoutConfig().getSubscription().refreshAssignment();
-            }
-            final ConsumerRecords<K, V> records = kafkaConsumer.poll(pollTimeoutMs);
-            LOG.debug("Polled [{}] records from Kafka.", records.count());
+                // poll
+                if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
+                    kafkaManager.getKafkaSpoutConfig().getSubscription().refreshAssignment();
+                }
+
+                final ConsumerRecords<K, V> records = kafkaConsumer.poll(pollTimeoutMs);
+                LOG.debug("Polled [{}] records from Kafka.", records.count());
 
-            if (!records.isEmpty()) {
-                emitTuples(collector, records);
-                // build new metadata
-                currentBatch = new KafkaTridentSpoutBatchMetadata<>(topicPartition,
records, lastBatch);
+                if (!records.isEmpty()) {
+                    emitTuples(collector, records);
+                    // build new metadata
+                    currentBatch = new KafkaTridentSpoutBatchMetadata<>(currBatchTp,
records, lastBatch);
+                }
+            } finally {
+                kafkaConsumer.resume(pausedTopicPartitions);
+                LOG.trace("Resumed topic-partitions {}", pausedTopicPartitions);
             }
-        } finally {
-            kafkaConsumer.resume(pausedTopicPartitions);
-            LOG.trace("Resumed topic partitions [{}]", pausedTopicPartitions);
+            LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata
= {}], " +
+                    "[currBatchMetadata = {}], [collector = {}]", tx, currBatchPartition,
lastBatch, currentBatch, collector);
         }
-        LOG.debug("Current batch metadata {}", currentBatch);
+
         return currentBatch;
     }
 
@@ -118,7 +144,7 @@ public class KafkaTridentSpoutEmitter<K,V> implements IOpaquePartitionedTridentS
         for (ConsumerRecord<K, V> record : records) {
             final List<Object> tuple = translator.apply(record);
             collector.emit(tuple);
-            LOG.debug("Emitted tuple [{}] for record: [{}]", tuple, record);
+            LOG.debug("Emitted tuple {} for record [{}]", tuple, record);
         }
     }
 
@@ -135,7 +161,6 @@ public class KafkaTridentSpoutEmitter<K,V> implements IOpaquePartitionedTridentS
         if (lastBatchMeta != null) {
             kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1);  // seek next offset
after last offset from previous batch
             LOG.debug("Seeking fetch offset to next offset after last offset from previous
batch");
-
         } else {
             LOG.debug("Seeking fetch offset from firstPollOffsetStrategy and last commit
to Kafka");
             final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
@@ -161,31 +186,86 @@ public class KafkaTridentSpoutEmitter<K,V> implements IOpaquePartitionedTridentS
         return fetchOffset;
     }
 
-    // returns paused topic partitions
+    // returns paused topic-partitions.
     private Collection<TopicPartition> pauseTopicPartitions(TopicPartition excludedTp)
{
-        final Set<TopicPartition> pausedTopicPartitions  = new HashSet<>(kafkaConsumer.assignment());
-        LOG.debug("Currently assigned topic partitions [{}]", pausedTopicPartitions);
+        final Set<TopicPartition> pausedTopicPartitions = new HashSet<>(kafkaConsumer.assignment());
+        LOG.debug("Currently assigned topic-partitions {}", pausedTopicPartitions);
         pausedTopicPartitions.remove(excludedTp);
         kafkaConsumer.pause(pausedTopicPartitions);
-        LOG.trace("Paused topic partitions [{}]", pausedTopicPartitions);
+        LOG.debug("Paused topic-partitions {}", pausedTopicPartitions);
         return pausedTopicPartitions;
     }
 
     @Override
     public void refreshPartitions(List<KafkaTridentSpoutTopicPartition> partitionResponsibilities)
{
-        LOG.debug("Refreshing topic partitions [{}]", partitionResponsibilities);
+        LOG.trace("Refreshing of topic-partitions handled by Kafka. " +
+                "No action taken by this method for topic partitions {}", partitionResponsibilities);
     }
 
+    /**
+     * Computes ordered list of topic-partitions for this task taking into consideration
that topic-partitions
+     * for this task must be assigned to the Kafka consumer running on this task.
+     * @param allPartitionInfo list of all partitions as returned by {@link KafkaTridentSpoutOpaqueCoordinator}
+     * @return ordered list of topic partitions for this task
+     */
     @Override
-    public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(List<TopicPartition>
allPartitionInfo) {
-        final List<KafkaTridentSpoutTopicPartition> topicPartitionsTrident = new ArrayList<>(allPartitionInfo
== null ? 0 : allPartitionInfo.size());
-        if (allPartitionInfo != null) {
-            for (TopicPartition topicPartition : allPartitionInfo) {
-                topicPartitionsTrident.add(new KafkaTridentSpoutTopicPartition(topicPartition));
+    public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(final List<TopicPartition>
allPartitionInfo) {
+        final int numTopicPartitions = allPartitionInfo == null ? 0 : allPartitionInfo.size();
+        final int taskIndex = topologyContext.getThisTaskIndex();
+        final int numTasks = topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size();
+
+        LOG.debug("Computing task ordered list of topic-partitions from all partitions list
{}, " +
+                "for task with index [{}] of total tasks [{}] ", allPartitionInfo, taskIndex,
numTasks);
+
+        final Set<TopicPartition> assignment = kafkaConsumer.assignment();
+        LOG.debug("Consumer [{}] has assigned topic-partitions {}", kafkaConsumer, assignment);
+
+        List<KafkaTridentSpoutTopicPartition> taskOrderedTps = new ArrayList<>(numTopicPartitions);
+
+        if (numTopicPartitions > 0) {
+            final KafkaTridentSpoutTopicPartition[] tps = new KafkaTridentSpoutTopicPartition[numTopicPartitions];
+            int tpTaskComputedIdx = taskIndex;
+            /*
+             * Put this task's Kafka consumer assigned topic-partitions in the right index
locations such
+             * that distribution by OpaquePartitionedTridentSpoutExecutor can be done correctly.
This algorithm
+             * does the distribution in exactly the same way as the one used in OpaquePartitionedTridentSpoutExecutor
+             */
+            for (TopicPartition assignedTp : assignment) {
+                if (tpTaskComputedIdx >= numTopicPartitions) {
+                    LOG.warn("Ignoring attempt to add consumer [{}] assigned topic-partition
[{}] to index [{}], " +
+                            "out of bounds [{}]. ", kafkaConsumer, assignedTp, tpTaskComputedIdx,
numTopicPartitions);
+                    break;
+                }
+                tps[tpTaskComputedIdx] = new KafkaTridentSpoutTopicPartition(assignedTp);
+                LOG.debug("Added consumer assigned topic-partition [{}] to position [{}]
for task with index [{}]",
+                        assignedTp, tpTaskComputedIdx, taskIndex);
+                tpTaskComputedIdx += numTasks;
+            }
+
+            // Put topic-partitions assigned to consumer instances running in different tasks
in the empty slots
+            int i = 0;
+            for (TopicPartition tp : allPartitionInfo) {
+                /*
+                 * Topic-partition not assigned to the Kafka consumer associated with this
emitter task, hence not yet
+                 * added to the list of task ordered partitions. To be processed next.
+                 */
+                if (!assignment.contains(tp)) {
+                    for (; i < numTopicPartitions; i++) {
+                        if (tps[i] == null) {   // find empty slot to put the topic-partition
+                            tps[i] = new KafkaTridentSpoutTopicPartition(tp);
+                            LOG.debug("Added to position [{}] topic-partition [{}], which
is assigned to a consumer " +
+                                    "running on a task other than task with index [{}] ",
i, tp, taskIndex);
+                            i++;
+                            break;
+                        }
+                    }
+                }
             }
+            taskOrderedTps = Arrays.asList(tps);
         }
-        LOG.debug("OrderedPartitions = {}", topicPartitionsTrident);
-        return topicPartitionsTrident;
+        LOG.debug("Returning ordered list of topic-partitions {} for task with index [{}],
of total tasks [{}] ",
+                taskOrderedTps, taskIndex, numTasks);
+        return taskOrderedTps;
     }
 
     @Override
@@ -196,8 +276,8 @@ public class KafkaTridentSpoutEmitter<K,V> implements IOpaquePartitionedTridentS
 
     @Override
     public String toString() {
-        return "KafkaTridentSpoutEmitter{" +
-                ", kafkaManager=" + kafkaManager +
+        return super.toString() +
+                "{kafkaManager=" + kafkaManager +
                 '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/07cf86e9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
index 4b60f33..4054b49 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
@@ -18,22 +18,20 @@
 
 package org.apache.storm.kafka.spout.trident;
 
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 import org.apache.storm.kafka.spout.RecordTranslator;
-import org.apache.storm.kafka.spout.internal.Timer;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Fields;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Set;
+
 public class KafkaTridentSpoutManager<K, V> implements Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutManager.class);
 
@@ -43,50 +41,20 @@ public class KafkaTridentSpoutManager<K, V> implements Serializable
{
     // Bookkeeping
     private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
     // Declare some KafkaSpoutConfig references for convenience
-    private final Fields fields;
+    private Fields fields;
 
     public KafkaTridentSpoutManager(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
         this.kafkaSpoutConfig = kafkaSpoutConfig;
-        RecordTranslator<K, V> translator = kafkaSpoutConfig.getTranslator();
-        Fields fields = null;
-        for (String stream: translator.streams()) {
-            if (fields == null) {
-                fields = translator.getFieldsFor(stream);
-            } else {
-                if (!fields.equals(translator.getFieldsFor(stream))) {
-                    throw new IllegalArgumentException("Trident Spouts do not support multiple
output Fields");
-                }
-            }
-        }
-        this.fields = fields;
+        this.fields = getFields();
         LOG.debug("Created {}", this);
     }
 
-    void subscribeKafkaConsumer(TopologyContext context) {
+    KafkaConsumer<K,V> createAndSubscribeKafkaConsumer(TopologyContext context) {
         kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
                 kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer());
 
         kafkaSpoutConfig.getSubscription().subscribe(kafkaConsumer, new KafkaSpoutConsumerRebalanceListener(),
context);
-
-        // Initial poll to get the consumer registration process going.
-        // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition
registration
-        kafkaConsumer.poll(0);
-    }
-
-    private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener
{
-        @Override
-        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-            LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]",
-                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
-            KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.removeAll(partitions);
-        }
-
-        @Override
-        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-            KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.addAll(partitions);
-            LOG.info("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]",
-                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
-        }
+        return kafkaConsumer;
     }
 
     KafkaConsumer<K, V> getKafkaConsumer() {
@@ -96,8 +64,23 @@ public class KafkaTridentSpoutManager<K, V> implements Serializable
{
     Set<TopicPartition> getTopicPartitions() {
         return KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.getTopicPartitions();
     }
-    
+
     Fields getFields() {
+        if (fields == null) {
+            RecordTranslator<K, V> translator = kafkaSpoutConfig.getTranslator();
+            Fields fs = null;
+            for (String stream : translator.streams()) {
+                if (fs == null) {
+                    fs = translator.getFieldsFor(stream);
+                } else {
+                    if (!fs.equals(translator.getFieldsFor(stream))) {
+                        throw new IllegalArgumentException("Trident Spouts do not support
multiple output Fields");
+                    }
+                }
+            }
+            fields = fs;
+        }
+        LOG.debug("OutputFields = {}", fields);
         return fields;
     }
 
@@ -107,9 +90,25 @@ public class KafkaTridentSpoutManager<K, V> implements Serializable
{
 
     @Override
     public String toString() {
-        return "KafkaTridentSpoutManager{" +
-                "kafkaConsumer=" + kafkaConsumer +
+        return super.toString() +
+                "{kafkaConsumer=" + kafkaConsumer +
                 ", kafkaSpoutConfig=" + kafkaSpoutConfig +
                 '}';
     }
+
+    private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener
{
+        @Override
+        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+            LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]",
+                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
+            KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.removeAll(partitions);
+        }
+
+        @Override
+        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+            KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.addAll(partitions);
+            LOG.info("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]",
+                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/07cf86e9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
index 5c5856c..18d37d9 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
@@ -34,7 +34,7 @@ public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSp
 
     private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaque.class);
 
-    private KafkaTridentSpoutManager<K, V> kafkaManager;
+    private final KafkaTridentSpoutManager<K, V> kafkaManager;
     private KafkaTridentSpoutEmitter<K, V> kafkaTridentSpoutEmitter;
     private KafkaTridentSpoutOpaqueCoordinator<K, V> coordinator;
 
@@ -50,19 +50,12 @@ public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSp
 
     @Override
     public Emitter<List<TopicPartition>, KafkaTridentSpoutTopicPartition, KafkaTridentSpoutBatchMetadata<K,V>>
getEmitter(Map conf, TopologyContext context) {
-        // Instance is created on first call rather than in constructor to avoid NotSerializableException
caused by KafkaConsumer
-        if (kafkaTridentSpoutEmitter == null) {
-            kafkaTridentSpoutEmitter = new KafkaTridentSpoutEmitter<>(kafkaManager,
context);
-        }
-        return kafkaTridentSpoutEmitter;
+        return new KafkaTridentSpoutEmitter<>(kafkaManager, context);
     }
 
     @Override
     public Coordinator<List<TopicPartition>> getCoordinator(Map conf, TopologyContext
context) {
-        if (coordinator == null) {
-            coordinator = new KafkaTridentSpoutOpaqueCoordinator<>(kafkaManager);
-        }
-        return coordinator;
+        return new KafkaTridentSpoutOpaqueCoordinator<>(kafkaManager);
     }
 
     @Override
@@ -79,8 +72,8 @@ public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSp
 
     @Override
     public String toString() {
-        return "KafkaTridentSpoutOpaque{" +
-                "kafkaManager=" + kafkaManager +
+        return super.toString() +
+                "{kafkaManager=" + kafkaManager +
                 ", kafkaTridentSpoutEmitter=" + kafkaTridentSpoutEmitter +
                 ", coordinator=" + coordinator +
                 '}';

http://git-wip-us.apache.org/repos/asf/storm/blob/07cf86e9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
index 6e85735..7898b6e 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
@@ -57,8 +57,8 @@ public class KafkaTridentSpoutOpaqueCoordinator<K,V> implements IOpaquePartition
 
     @Override
     public String toString() {
-        return "KafkaTridentSpoutOpaqueCoordinator{" +
-                "kafkaManager=" + kafkaManager +
+        return super.toString() +
+                "{kafkaManager=" + kafkaManager +
                 '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/07cf86e9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java
index ba6126c..b020bea 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java
@@ -43,7 +43,7 @@ public class KafkaTridentSpoutTopicPartition implements ISpoutPartition,
Seriali
 
     @Override
     public String getId() {
-        return topicPartition.topic() + "/" + topicPartition.partition();
+        return topicPartition.topic() + "@" + topicPartition.partition();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/07cf86e9/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java
index ee5220e..2d50ca7 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.TopicPartition;
 
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
+import java.util.LinkedHashSet;
 import java.util.Set;
 
 public enum KafkaTridentSpoutTopicPartitionRegistry {
@@ -31,7 +31,7 @@ public enum KafkaTridentSpoutTopicPartitionRegistry {
     private Set<TopicPartition> topicPartitions;
 
     KafkaTridentSpoutTopicPartitionRegistry() {
-        this.topicPartitions = new HashSet<>();
+        this.topicPartitions = new LinkedHashSet<>();
     }
 
     public Set<TopicPartition> getTopicPartitions() {


Mime
View raw message