kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "M. Manna" <manme...@gmail.com>
Subject Re: Long start time for consumer
Date Tue, 29 May 2018 12:46:55 GMT
Have you tried increase the poll time higher, e.g. 4000 and see if that
helps matters?

On 29 May 2018 at 13:44, Shantanu Deshmukh <shantanu88d@gmail.com> wrote:

> Here is the code which consuming messages
>
> >>>>>>>>
> while(true && startShutdown == false) {
>     Context context = new Context();
>     JSONObject notifJSON = new JSONObject();
>     String notificationMsg = "";
>     NotificationEvent notifEvent = null;
>     initializeContext();
>     try {
>         consumerConnect();
>         ConsumerRecords<String, String> records = consumer.poll(100);
>         if(records.count() == 0) {
>             //logger.trace("No records in topic: "+this.topic);
>             continue;
>         }
>         for(ConsumerRecord<String, String> record : records) {
>             try {
>                 long totalStart = System.currentTimeMillis();
>                 notificationMsg = record.value();
>                 JSONParser jsonParser = new JSONParser();
>                 logger.trace("Kafka-Msg: >>"+notificationMsg);
>                 if(notificationMsg.equals("")) {
>                     continue;
>                 }
>                 Profiler.start(workerId, "json-parse");
>                 notifJSON           =
> (JSONObject)jsonParser.parse(notificationMsg);
>                 Profiler.end(workerId, "json-parse");
>                 notifEvent    = new NotificationEvent(notifJSON);
>                 if( notifEvent.getTransactionID().equals("") == true ) {
>                     notifEvent.generateTransactionID();
>                 }
>                 context.setEventObject(notifEvent);
>                 updateContext(context);
>
>                 //======== Fetch template ==========//
>                 Profiler.start(workerId, "tpl-fetch");
>                 long start = System.currentTimeMillis();
>                 Template template   =
> notifTplMngr.fetchTemplate(notifEvent);
>
> logger.trace("fetch-tpl:"+(System.currentTimeMillis()-start));
>                 Profiler.end(workerId, "tpl-fetch");
>
>                 //======== Personalise template ==========//
>                 Profiler.start(workerId, "personalisation");
>                 start = System.currentTimeMillis();
>                 String message      =
> NotificationTemplatePersonaliser.personaliseAuto(template, notifEvent);
>
> notifEvent.setMaskedMessage(NotificationTemplatePersonalis
> er.getMaskedContent(template,
> notifEvent));
>
> logger.trace("personalise:"+(System.currentTimeMillis()-start));
>                 Profiler.end(workerId, "personalisation");
>
>                 context.setEventObject(notifEvent);
>                 updateContext(context);
>
>                 //======== Send notification==========//
>                 Profiler.start(workerId, "notif-dispatch");
>                 postOffice.sendNotification(message, notifEvent);
>                 Profiler.end(workerId, "notif-dispatch");
>
>                 retryCount = 0;
>                 logger.debug("Time to complete notification dispatch
> :"+(System.currentTimeMillis()-totalStart));
>                 if(startShutdown == true) {
>                     break;
>                 }
>             } catch (Exception ex) {
>                 if(ex instanceof RetriableException) {
>                     kafkaLogger.error(ex);
>                     logger.warn("",ex);
>                     addToFailedQueue(notifJSON, ex.getMessage(),
> CODE_RETRIABLE_FAILURE);
>                 } else if(ex instanceof InvalidEventException) {
>
>                     JsonLog jsonLog = new JsonLog();
>                     jsonLog.setDescription("Invalid event message. Reason:
> "+ex.getMessage());
>                     jsonLog.setOriginalPayload(notificationMsg);
>                     jsonLog.setEventType("ERROR");
>                     jsonLog.setCode("InvalidEventException");
>                     jsonLog.setComponent(kafkaLogger.getSourceClass(ex));
>                     jsonLog.setSubComponent(notifEvent.getChannelName());
>                     kafkaLogger.log(jsonLog);
>                     //kafkaLogger.error(ex);
>                     addToFailedQueue(notifJSON, ex.getMessage(),
> CODE_PERMANENT_FAILURE);
>                     logger.warn("Invalid event message. Reason:
> "+ex.getMessage());
>
>                 } else if(ex instanceof EventFailedException) {
>                     addToFailedQueue(notifJSON, ex.getMessage(),
> CODE_PERMANENT_FAILURE);
>                     kafkaLogger.error(ex);
>                     logger.warn("Notification event failed. Reason:
> "+ex.getMessage());
>
>                 } else if(ex instanceof
> org.json.simple.parser.ParseException) {
>                     kafkaLogger.error("Exception while parsing notification
> JSON message.");
>                     logger.warn("Exception while parsing notification JSON
> message.");
>                 } else {
>                     kafkaLogger.error(ex);
>                     addToFailedQueue(notifJSON, ex.getMessage(),
> CODE_PERMANENT_FAILURE);
>                     logger.warn("",ex);
>                 }
>             } finally {
>                 eventsProcessed++;
>             }
>         }
>     } catch (Exception ex) {
>         kafkaLogger.error(ex);
>         addToFailedQueue(notifJSON, ex.getMessage(),
> CODE_PERMANENT_FAILURE);
>         logger.warn("",ex);
>     }
> }
> <<<<<<<<<<
>
> And here are server properties.
>
> broker.id=0
> port=9092
> delete.topic.enable=true
> message.max.bytes=1500000
> listeners=SSL://x.x.x.x:9092
> advertised.listeners=SSL://x.x.x.x:9092
> num.network.threads=3
> num.io.threads=8
> socket.send.buffer.bytes=102400
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> log.dirs=/lotus/kafka-logs
> num.partitions=3
> auto.topic.creation.enable=false
> num.recovery.threads.per.data.dir=1
> log.retention.hours=168
> log.segment.bytes=1073741824
> log.retention.check.interval.ms=300000
> ssl.keystore.location=/opt/kafka/certificates/kafka.keystore.jks
> ssl.keystore.password=xxxx
> ssl.key.password=xxxx
> ssl.truststore.location=/opt/kafka/certificates/kafka.truststore.jks
> ssl.truststore.password=xxxx
> security.inter.broker.protocol=SSL
> zookeeper.connect=x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181
> zookeeper.connection.timeout.ms=6000
>
> On Tue, May 29, 2018 at 5:59 PM M. Manna <manmedia@gmail.com> wrote:
>
> > Thanks..
> >
> > Where is your consumer code that is consuming messages?
> >
> > On 29 May 2018 at 13:18, Shantanu Deshmukh <shantanu88d@gmail.com>
> wrote:
> >
> > > No problem, here are consumer properties
> > > ---------
> > > auto.commit.interval.ms = 3000
> > > auto.offset.reset = latest
> > > bootstrap.servers = [x.x.x.x:9092, x.x.x.x:9092, x.x.x.x:9092]
> > > check.crcs = true
> > > client.id =
> > > connections.max.idle.ms = 540000
> > > enable.auto.commit = true
> > > exclude.internal.topics = true
> > > fetch.max.bytes = 52428800
> > > fetch.max.wait.ms = 500
> > > fetch.min.bytes = 1
> > > group.id = otp-notifications-consumer
> > > heartbeat.interval.ms = 3000
> > > interceptor.classes = null
> > > key.deserializer = class
> > > org.apache.kafka.common.serialization.StringDeserializer
> > > max.partition.fetch.bytes = 1048576
> > > max.poll.interval.ms = 300000
> > > max.poll.records = 5
> > > metadata.max.age.ms = 300000
> > > metric.reporters = []
> > > metrics.num.samples = 2
> > > metrics.sample.window.ms = 30000
> > > partition.assignment.strategy = [class
> > > org.apache.kafka.clients.consumer.RangeAssignor]
> > > receive.buffer.bytes = 65536
> > > reconnect.backoff.ms = 50
> > > request.timeout.ms = 305000
> > > retry.backoff.ms = 100
> > > sasl.kerberos.kinit.cmd = /usr/bin/kinit
> > > sasl.kerberos.min.time.before.relogin = 60000
> > > sasl.kerberos.service.name = null
> > > sasl.kerberos.ticket.renew.jitter = 0.05
> > > sasl.kerberos.ticket.renew.window.factor = 0.8
> > > sasl.mechanism = GSSAPI
> > > security.protocol = SSL
> > > send.buffer.bytes = 131072
> > > session.timeout.ms = 300000
> > > ssl.cipher.suites = null
> > > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> > > ssl.endpoint.identification.algorithm = null
> > > ssl.key.password = null
> > > ssl.keymanager.algorithm = SunX509
> > > ssl.keystore.location = null
> > > ssl.keystore.password = null
> > > ssl.keystore.type = JKS
> > > ssl.protocol = TLS
> > > ssl.provider = null
> > > ssl.secure.random.implementation = null
> > > ssl.trustmanager.algorithm = PKIX
> > > ssl.truststore.location = ****
> > > ssl.truststore.password = [hidden]
> > > ssl.truststore.type = JKS
> > > value.deserializer = class
> > > org.apache.kafka.common.serialization.StringDeserializer
> > > ------------
> > >
> > > On Tue, May 29, 2018 at 5:36 PM M. Manna <manmedia@gmail.com> wrote:
> > >
> > > > Hi,
> > > >
> > > > It's not possible to answer questions based on text. You need to
> share
> > > your
> > > > consumer.properties, and server.properties file, and also, what
> exactly
> > > you
> > > > have changed from default configuration.
> > > >
> > > >
> > > >
> > > > On 29 May 2018 at 12:51, Shantanu Deshmukh <shantanu88d@gmail.com>
> > > wrote:
> > > >
> > > > > Hello,
> > > > >
> > > > > We have 3 broker Kafka 0.10.0.1 cluster. We have 5 topics, each
> with
> > 10
> > > > > partitions. I have an application which consumes from all these
> > topics
> > > by
> > > > > creating multiple consumer processes. All of these consumers are
> > under
> > > a
> > > > > same consumer group. I am noticing that every time we restart this
> > > > > application. It takes almost 5 minutes for consumers to start
> > > consuming.
> > > > > What might be going wrong?
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message