kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dev loper <spark...@gmail.com>
Subject Kafka Streams application Unable to Horizontally scale and the application on other instances refusing to start.
Date Fri, 15 Sep 2017 06:19:01 GMT
Dear Kafka Users,

I am fairly new to Kafka Streams . I have deployed two instances of Kafka
0.11 brokers on AWS M3.Xlarge insatnces. I have created a topic with 36
partitions .and speperate application writes to this topic and it produces
records at the rate of 10000 messages per second. I have threes instances
of AWS  M4.xlarge instance  where my Kafka streams application is running
which consumes these messages produced by the other application. The
application  starts up fine working fine and its processing messages on the
first instance,  but when I start the same application on other instances
it is not starting even though the process is alive it is not processing
messages.Also I could see the other instances takes a long time to start .

Apart from first instance,  other instances I could see the consumer
getting added and removed repeatedly and I couldn't see any message
processing at all . I have attached the detailed logs where this behavior
is observed.

Consumer is getting started with below log in these instances and getting
stopped with below log (* detailed logs attached *)

INFO  | 21:59:30 | consumer.ConsumerConfig (AbstractConfig.java:223) -
ConsumerConfig values:
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [l-mykafkainstancekafka5101:9092,
l-mykafkainstancekafka5102:9092]
    check.crcs = true
    client.id =
    connections.max.idle.ms = 540000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = myKafka-kafkareplica101Sept08
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class mx.july.jmx.proximity.kafka.KafkaKryoCodec
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class
org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class my.dev.MessageUpdateCodec


DEBUG | 21:59:30 | consumer.KafkaConsumer (KafkaConsumer.java:1617) - The
Kafka consumer has closed. and the whole process repeats.



Below you can find my startup code for kafkastreams and the parameters
which I have configured for starting the kafkastreams application .

        private static Properties settings = new Properties();
        settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
"mykafkastreamsapplication");
        settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
        settings.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,"10000");
        settings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000");
        settings.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
Integer.MAX_VALUE);
        settings.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10000");
        settings.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG,"60000");

        KStreamBuilder builder = new KStreamBuilder();
        KafkaStreams streams = new KafkaStreams(builder, settings);
        builder.addSource(.....
         .addProcessor  .............
         .addProcessor  ........


.addStateStore(...................).persistent().build(),"myprocessor")
         .addSink ..............
         . addSink ..............
          streams.start();

and I am using a Simple  processor to process my logic ..

public class InfoProcessor extends AbstractProcessor<Key, Update> {
private static Logger logger = Logger.getLogger(InfoProcessor.class);
private ProcessorContext context;
private KeyValueStore<Key, Info> infoStore;

@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
    this.context = context;
    this.context.schedule(Constants.BATCH_DURATION_SECONDS * 1000);
    infoStore = (KeyValueStore<Key, Info>)
context.getStateStore("InfoStore");
}

@Override
public void process(Key key, Update update) {
    try {
        if (key != null && update != null) {
            Info info = infoStore.get(key);
            // merge logic
            infoStore.put(key, info);
        }

    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    } finally {
    }
    context.commit();
}

@Override
public void punctuate(long timestamp) {
    try {
        KeyValueIterator<Key, Info> iter = this.infoStore.all();
        while (iter.hasNext()) {
            // processing logic

        }
        iter.close();
        context.commit();
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    }
}

Mime
View raw message