storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrea Gazzarini <>
Subject KafkaSpout (idle) generates a huge network traffic
Date Fri, 19 Aug 2016 06:05:01 GMT
Hi guys,
retrying here after no luck in SO (I'm not sure if this is a question 
about Storm or Kafka, or both).

After developing and executing my Storm (1.0.1) topology with a 
KafkaSpout and a couple of Bolts, I noticed a huge network traffic even 
when the topology is idle (no message on Kafka, no processing is done in 
bolts). So I started to comment out my topology piece by piece in order 
to find the cause and I ended with only the KafkaSpout in my main:
final SpoutConfig spoutConfig = new SpoutConfig(
                 new ZkHosts(zkHosts, "/brokers"),
                 "files-topic", // topic
                 "/kafka", // ZK chroot
      spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
      spoutConfig.startOffsetTime = OffsetRequest.LatestTime();
                         new KafkaSpout(config),
When this (useless) topology executes, even in local mode, even the very 
first time, the network traffic always grows a lot: I see (in my 
Activity Monitor)

  * an average of 432 KB of data received/sec
  * moreless the same average of data sent/sec
  * After a couple of hours the topology is running (idle) data received
    is 1.26GB and data sent is 1GB

(Important: Kafka - 0.10 - is not running in cluster, it is a single 
instance that runs in the same machine with a single topic and a single 
partition. I just downloaded Kafka on my machine, started it and created 
a simple topic. When I put a message in the topic, everything in the 
topology is working without any problem at all)

Obviously, the reason is in the KafkaSpout.nextTuple() method (below), 
but I don't understand why, without any message in Kafka, I should have 
such traffic. Is there something I didn't consider? Is that the expected 
behaviour? I had a look at Kafka logs, ZK logs, Storm logs, nothing, I 
have cleaned up Kafka and ZK data, nothing, still the same behaviour.

public void nextTuple() {
     List<PartitionManager> managers = 
     for (int i = 0; i < managers.size(); i++) {

         try {
             // in case the number of managers decreased
             _currPartitionIndex = _currPartitionIndex % managers.size();
             EmitState state = 
             if (state != EmitState.EMITTED_MORE_LEFT) {
                 _currPartitionIndex = (_currPartitionIndex + 1) % 
             if (state != EmitState.NO_EMITTED) {
         } catch (FailedFetchException e) {
             LOG.warn("Fetch failed", e);

     long diffWithNow = System.currentTimeMillis() - _lastUpdateMs;

          As far as the System.currentTimeMillis() is dependent on 
System clock,
          additional check on negative value of diffWithNow in case of 
external changes.
     if (diffWithNow > _spoutConfig.stateUpdateIntervalMs || diffWithNow 
< 0) {

Many thanks in advance

View raw message