storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrea Gazzarini <gxs...@gmail.com>
Subject KafkaSpout (idle) generates a huge network traffic
Date Fri, 19 Aug 2016 06:05:01 GMT
Hi guys,
retrying here after no luck in SO (I'm not sure if this is a question 
about Storm or Kafka, or both).
However:

After developing and executing my Storm (1.0.1) topology with a 
KafkaSpout and a couple of Bolts, I noticed a huge network traffic even 
when the topology is idle (no message on Kafka, no processing is done in 
bolts). So I started to comment out my topology piece by piece in order 
to find the cause and I ended with only the KafkaSpout in my main:
....
final SpoutConfig spoutConfig = new SpoutConfig(
                 new ZkHosts(zkHosts, "/brokers"),
                 "files-topic", // topic
                 "/kafka", // ZK chroot
                 "consumer-group-name");
      spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
      spoutConfig.startOffsetTime = OffsetRequest.LatestTime();
      topologyBuilder.setSpout(
                         "kafka-spout-id,
                         new KafkaSpout(config),
                         1);
....
When this (useless) topology executes, even in local mode, even the very 
first time, the network traffic always grows a lot: I see (in my 
Activity Monitor)

  * an average of 432 KB of data received/sec
  * moreless the same average of data sent/sec
  * After a couple of hours the topology is running (idle) data received
    is 1.26GB and data sent is 1GB

(Important: Kafka - 0.10 - is not running in cluster, it is a single 
instance that runs in the same machine with a single topic and a single 
partition. I just downloaded Kafka on my machine, started it and created 
a simple topic. When I put a message in the topic, everything in the 
topology is working without any problem at all)

Obviously, the reason is in the KafkaSpout.nextTuple() method (below), 
but I don't understand why, without any message in Kafka, I should have 
such traffic. Is there something I didn't consider? Is that the expected 
behaviour? I had a look at Kafka logs, ZK logs, Storm logs, nothing, I 
have cleaned up Kafka and ZK data, nothing, still the same behaviour.

@Override
public void nextTuple() {
     List<PartitionManager> managers = 
_coordinator.getMyManagedPartitions();
     for (int i = 0; i < managers.size(); i++) {

         try {
             // in case the number of managers decreased
             _currPartitionIndex = _currPartitionIndex % managers.size();
             EmitState state = 
managers.get(_currPartitionIndex).next(_collector);
             if (state != EmitState.EMITTED_MORE_LEFT) {
                 _currPartitionIndex = (_currPartitionIndex + 1) % 
managers.size();
             }
             if (state != EmitState.NO_EMITTED) {
                 break;
             }
         } catch (FailedFetchException e) {
             LOG.warn("Fetch failed", e);
             _coordinator.refresh();
         }
     }

     long diffWithNow = System.currentTimeMillis() - _lastUpdateMs;

     /*
          As far as the System.currentTimeMillis() is dependent on 
System clock,
          additional check on negative value of diffWithNow in case of 
external changes.
      */
     if (diffWithNow > _spoutConfig.stateUpdateIntervalMs || diffWithNow 
< 0) {
         commit();
     }
}


Many thanks in advance
Andrea

Mime
View raw message