kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dev loper <spark...@gmail.com>
Subject Re: Kafka Streams application Unable to Horizontally scale and the application on other instances refusing to start.
Date Fri, 15 Sep 2017 09:31:38 GMT
Dear Kafka Users,


I have updated the question in stackoverlflow . Please let me know about
any possible solution.

https://stackoverflow.com/questions/46233138/kafka-streams-application-unable-to-horizontally-scale-and-the-application-on-ot

On Fri, Sep 15, 2017 at 2:57 PM, dev loper <sparkemr@gmail.com> wrote:

> Hi Damian,
>
> I do have the logs for the other application. But its kind of huge since
> it is continuously processing . Do you want me to grep anything specific
> and share it with you ?
>
> Thanks
> Dev
>
> On Fri, Sep 15, 2017 at 2:31 PM, Damian Guy <damian.guy@gmail.com> wrote:
>
>> Hi,
>>
>> Do you have the logs for the other instance?
>>
>> Thanks,
>> Damian
>>
>> On Fri, 15 Sep 2017 at 07:19 dev loper <sparkemr@gmail.com> wrote:
>>
>> > Dear Kafka Users,
>> >
>> > I am fairly new to Kafka Streams . I have deployed two instances of
>> Kafka
>> > 0.11 brokers on AWS M3.Xlarge insatnces. I have created a topic with 36
>> > partitions .and speperate application writes to this topic and it
>> produces
>> > records at the rate of 10000 messages per second. I have threes
>> instances
>> > of AWS  M4.xlarge instance  where my Kafka streams application is
>> running
>> > which consumes these messages produced by the other application. The
>> > application  starts up fine working fine and its processing messages on
>> the
>> > first instance,  but when I start the same application on other
>> instances
>> > it is not starting even though the process is alive it is not processing
>> > messages.Also I could see the other instances takes a long time to
>> start .
>> >
>> > Apart from first instance,  other instances I could see the consumer
>> > getting added and removed repeatedly and I couldn't see any message
>> > processing at all . I have attached the detailed logs where this
>> behavior
>> > is observed.
>> >
>> > Consumer is getting started with below log in these instances and
>> getting
>> > stopped with below log (* detailed logs attached *)
>> >
>> > INFO  | 21:59:30 | consumer.ConsumerConfig (AbstractConfig.java:223) -
>> > ConsumerConfig values:
>> >     auto.commit.interval.ms = 5000
>> >     auto.offset.reset = latest
>> >     bootstrap.servers = [l-mykafkainstancekafka5101:9092,
>> > l-mykafkainstancekafka5102:9092]
>> >     check.crcs = true
>> >     client.id =
>> >     connections.max.idle.ms = 540000
>> >     enable.auto.commit = false
>> >     exclude.internal.topics = true
>> >     fetch.max.bytes = 52428800
>> >     fetch.max.wait.ms = 500
>> >     fetch.min.bytes = 1
>> >     group.id = myKafka-kafkareplica101Sept08
>> >     heartbeat.interval.ms = 3000
>> >     interceptor.classes = null
>> >     internal.leave.group.on.close = true
>> >     isolation.level = read_uncommitted
>> >     key.deserializer = class mx.july.jmx.proximity.kafka.KafkaKryoCodec
>> >     max.partition.fetch.bytes = 1048576
>> >     max.poll.interval.ms = 300000
>> >     max.poll.records = 500
>> >     metadata.max.age.ms = 300000
>> >     metric.reporters = []
>> >     metrics.num.samples = 2
>> >     metrics.recording.level = INFO
>> >     metrics.sample.window.ms = 30000
>> >     partition.assignment.strategy = [class
>> > org.apache.kafka.clients.consumer.RangeAssignor]
>> >     receive.buffer.bytes = 65536
>> >     reconnect.backoff.max.ms = 1000
>> >     reconnect.backoff.ms = 50
>> >     request.timeout.ms = 305000
>> >     retry.backoff.ms = 100
>> >     sasl.jaas.config = null
>> >     sasl.kerberos.kinit.cmd = /usr/bin/kinit
>> >     sasl.kerberos.min.time.before.relogin = 60000
>> >     sasl.kerberos.service.name = null
>> >     sasl.kerberos.ticket.renew.jitter = 0.05
>> >     sasl.kerberos.ticket.renew.window.factor = 0.8
>> >     sasl.mechanism = GSSAPI
>> >     security.protocol = PLAINTEXT
>> >     send.buffer.bytes = 131072
>> >     session.timeout.ms = 10000
>> >     ssl.cipher.suites = null
>> >     ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>> >     ssl.endpoint.identification.algorithm = null
>> >     ssl.key.password = null
>> >     ssl.keymanager.algorithm = SunX509
>> >     ssl.keystore.location = null
>> >     ssl.keystore.password = null
>> >     ssl.keystore.type = JKS
>> >     ssl.protocol = TLS
>> >     ssl.provider = null
>> >     ssl.secure.random.implementation = null
>> >     ssl.trustmanager.algorithm = PKIX
>> >     ssl.truststore.location = null
>> >     ssl.truststore.password = null
>> >     ssl.truststore.type = JKS
>> >     value.deserializer = class my.dev.MessageUpdateCodec
>> >
>> >
>> > DEBUG | 21:59:30 | consumer.KafkaConsumer (KafkaConsumer.java:1617) -
>> The
>> > Kafka consumer has closed. and the whole process repeats.
>> >
>> >
>> >
>> > Below you can find my startup code for kafkastreams and the parameters
>> > which I have configured for starting the kafkastreams application .
>> >
>> >         private static Properties settings = new Properties();
>> >         settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
>> > "mykafkastreamsapplication");
>> >         settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
>> >         settings.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,"1
>> 0000");
>> >         settings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000");
>> >
>> > settings.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,Inte
>> ger.MAX_VALUE);
>> >         settings.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10000");
>> >
>> > settings.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG,"60000");
>> >
>> >         KStreamBuilder builder = new KStreamBuilder();
>> >         KafkaStreams streams = new KafkaStreams(builder, settings);
>> >         builder.addSource(.....
>> >          .addProcessor  .............
>> >          .addProcessor  ........
>> >
>> >
>> > .addStateStore(...................).persistent().build(),"myprocessor")
>> >          .addSink ..............
>> >          . addSink ..............
>> >           streams.start();
>> >
>> > and I am using a Simple  processor to process my logic ..
>> >
>> > public class InfoProcessor extends AbstractProcessor<Key, Update> {
>> > private static Logger logger = Logger.getLogger(InfoProcessor.class);
>> > private ProcessorContext context;
>> > private KeyValueStore<Key, Info> infoStore;
>> >
>> > @Override
>> > @SuppressWarnings("unchecked")
>> > public void init(ProcessorContext context) {
>> >     this.context = context;
>> >     this.context.schedule(Constants.BATCH_DURATION_SECONDS * 1000);
>> >     infoStore = (KeyValueStore<Key, Info>)
>> > context.getStateStore("InfoStore");
>> > }
>> >
>> > @Override
>> > public void process(Key key, Update update) {
>> >     try {
>> >         if (key != null && update != null) {
>> >             Info info = infoStore.get(key);
>> >             // merge logic
>> >             infoStore.put(key, info);
>> >         }
>> >
>> >     } catch (Exception e) {
>> >         logger.error(e.getMessage(), e);
>> >     } finally {
>> >     }
>> >     context.commit();
>> > }
>> >
>> > @Override
>> > public void punctuate(long timestamp) {
>> >     try {
>> >         KeyValueIterator<Key, Info> iter = this.infoStore.all();
>> >         while (iter.hasNext()) {
>> >             // processing logic
>> >
>> >         }
>> >         iter.close();
>> >         context.commit();
>> >     } catch (Exception e) {
>> >         logger.error(e.getMessage(), e);
>> >     }
>> > }
>> >
>> >
>> >
>> >
>> >
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message