kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shantanu Deshmukh <shantanu...@gmail.com>
Subject Re: Long start time for consumer
Date Tue, 29 May 2018 12:44:07 GMT
Here is the code which consuming messages

>>>>>>>>
while(true && startShutdown == false) {
    Context context = new Context();
    JSONObject notifJSON = new JSONObject();
    String notificationMsg = "";
    NotificationEvent notifEvent = null;
    initializeContext();
    try {
        consumerConnect();
        ConsumerRecords<String, String> records = consumer.poll(100);
        if(records.count() == 0) {
            //logger.trace("No records in topic: "+this.topic);
            continue;
        }
        for(ConsumerRecord<String, String> record : records) {
            try {
                long totalStart = System.currentTimeMillis();
                notificationMsg = record.value();
                JSONParser jsonParser = new JSONParser();
                logger.trace("Kafka-Msg: >>"+notificationMsg);
                if(notificationMsg.equals("")) {
                    continue;
                }
                Profiler.start(workerId, "json-parse");
                notifJSON           =
(JSONObject)jsonParser.parse(notificationMsg);
                Profiler.end(workerId, "json-parse");
                notifEvent    = new NotificationEvent(notifJSON);
                if( notifEvent.getTransactionID().equals("") == true ) {
                    notifEvent.generateTransactionID();
                }
                context.setEventObject(notifEvent);
                updateContext(context);

                //======== Fetch template ==========//
                Profiler.start(workerId, "tpl-fetch");
                long start = System.currentTimeMillis();
                Template template   =
notifTplMngr.fetchTemplate(notifEvent);

logger.trace("fetch-tpl:"+(System.currentTimeMillis()-start));
                Profiler.end(workerId, "tpl-fetch");

                //======== Personalise template ==========//
                Profiler.start(workerId, "personalisation");
                start = System.currentTimeMillis();
                String message      =
NotificationTemplatePersonaliser.personaliseAuto(template, notifEvent);

notifEvent.setMaskedMessage(NotificationTemplatePersonaliser.getMaskedContent(template,
notifEvent));

logger.trace("personalise:"+(System.currentTimeMillis()-start));
                Profiler.end(workerId, "personalisation");

                context.setEventObject(notifEvent);
                updateContext(context);

                //======== Send notification==========//
                Profiler.start(workerId, "notif-dispatch");
                postOffice.sendNotification(message, notifEvent);
                Profiler.end(workerId, "notif-dispatch");

                retryCount = 0;
                logger.debug("Time to complete notification dispatch
:"+(System.currentTimeMillis()-totalStart));
                if(startShutdown == true) {
                    break;
                }
            } catch (Exception ex) {
                if(ex instanceof RetriableException) {
                    kafkaLogger.error(ex);
                    logger.warn("",ex);
                    addToFailedQueue(notifJSON, ex.getMessage(),
CODE_RETRIABLE_FAILURE);
                } else if(ex instanceof InvalidEventException) {

                    JsonLog jsonLog = new JsonLog();
                    jsonLog.setDescription("Invalid event message. Reason:
"+ex.getMessage());
                    jsonLog.setOriginalPayload(notificationMsg);
                    jsonLog.setEventType("ERROR");
                    jsonLog.setCode("InvalidEventException");
                    jsonLog.setComponent(kafkaLogger.getSourceClass(ex));
                    jsonLog.setSubComponent(notifEvent.getChannelName());
                    kafkaLogger.log(jsonLog);
                    //kafkaLogger.error(ex);
                    addToFailedQueue(notifJSON, ex.getMessage(),
CODE_PERMANENT_FAILURE);
                    logger.warn("Invalid event message. Reason:
"+ex.getMessage());

                } else if(ex instanceof EventFailedException) {
                    addToFailedQueue(notifJSON, ex.getMessage(),
CODE_PERMANENT_FAILURE);
                    kafkaLogger.error(ex);
                    logger.warn("Notification event failed. Reason:
"+ex.getMessage());

                } else if(ex instanceof
org.json.simple.parser.ParseException) {
                    kafkaLogger.error("Exception while parsing notification
JSON message.");
                    logger.warn("Exception while parsing notification JSON
message.");
                } else {
                    kafkaLogger.error(ex);
                    addToFailedQueue(notifJSON, ex.getMessage(),
CODE_PERMANENT_FAILURE);
                    logger.warn("",ex);
                }
            } finally {
                eventsProcessed++;
            }
        }
    } catch (Exception ex) {
        kafkaLogger.error(ex);
        addToFailedQueue(notifJSON, ex.getMessage(),
CODE_PERMANENT_FAILURE);
        logger.warn("",ex);
    }
}
<<<<<<<<<<

And here are server properties.

broker.id=0
port=9092
delete.topic.enable=true
message.max.bytes=1500000
listeners=SSL://x.x.x.x:9092
advertised.listeners=SSL://x.x.x.x:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/lotus/kafka-logs
num.partitions=3
auto.topic.creation.enable=false
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
ssl.keystore.location=/opt/kafka/certificates/kafka.keystore.jks
ssl.keystore.password=xxxx
ssl.key.password=xxxx
ssl.truststore.location=/opt/kafka/certificates/kafka.truststore.jks
ssl.truststore.password=xxxx
security.inter.broker.protocol=SSL
zookeeper.connect=x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181
zookeeper.connection.timeout.ms=6000

On Tue, May 29, 2018 at 5:59 PM M. Manna <manmedia@gmail.com> wrote:

> Thanks..
>
> Where is your consumer code that is consuming messages?
>
> On 29 May 2018 at 13:18, Shantanu Deshmukh <shantanu88d@gmail.com> wrote:
>
> > No problem, here are consumer properties
> > ---------
> > auto.commit.interval.ms = 3000
> > auto.offset.reset = latest
> > bootstrap.servers = [x.x.x.x:9092, x.x.x.x:9092, x.x.x.x:9092]
> > check.crcs = true
> > client.id =
> > connections.max.idle.ms = 540000
> > enable.auto.commit = true
> > exclude.internal.topics = true
> > fetch.max.bytes = 52428800
> > fetch.max.wait.ms = 500
> > fetch.min.bytes = 1
> > group.id = otp-notifications-consumer
> > heartbeat.interval.ms = 3000
> > interceptor.classes = null
> > key.deserializer = class
> > org.apache.kafka.common.serialization.StringDeserializer
> > max.partition.fetch.bytes = 1048576
> > max.poll.interval.ms = 300000
> > max.poll.records = 5
> > metadata.max.age.ms = 300000
> > metric.reporters = []
> > metrics.num.samples = 2
> > metrics.sample.window.ms = 30000
> > partition.assignment.strategy = [class
> > org.apache.kafka.clients.consumer.RangeAssignor]
> > receive.buffer.bytes = 65536
> > reconnect.backoff.ms = 50
> > request.timeout.ms = 305000
> > retry.backoff.ms = 100
> > sasl.kerberos.kinit.cmd = /usr/bin/kinit
> > sasl.kerberos.min.time.before.relogin = 60000
> > sasl.kerberos.service.name = null
> > sasl.kerberos.ticket.renew.jitter = 0.05
> > sasl.kerberos.ticket.renew.window.factor = 0.8
> > sasl.mechanism = GSSAPI
> > security.protocol = SSL
> > send.buffer.bytes = 131072
> > session.timeout.ms = 300000
> > ssl.cipher.suites = null
> > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> > ssl.endpoint.identification.algorithm = null
> > ssl.key.password = null
> > ssl.keymanager.algorithm = SunX509
> > ssl.keystore.location = null
> > ssl.keystore.password = null
> > ssl.keystore.type = JKS
> > ssl.protocol = TLS
> > ssl.provider = null
> > ssl.secure.random.implementation = null
> > ssl.trustmanager.algorithm = PKIX
> > ssl.truststore.location = ****
> > ssl.truststore.password = [hidden]
> > ssl.truststore.type = JKS
> > value.deserializer = class
> > org.apache.kafka.common.serialization.StringDeserializer
> > ------------
> >
> > On Tue, May 29, 2018 at 5:36 PM M. Manna <manmedia@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > It's not possible to answer questions based on text. You need to share
> > your
> > > consumer.properties, and server.properties file, and also, what exactly
> > you
> > > have changed from default configuration.
> > >
> > >
> > >
> > > On 29 May 2018 at 12:51, Shantanu Deshmukh <shantanu88d@gmail.com>
> > wrote:
> > >
> > > > Hello,
> > > >
> > > > We have 3 broker Kafka 0.10.0.1 cluster. We have 5 topics, each with
> 10
> > > > partitions. I have an application which consumes from all these
> topics
> > by
> > > > creating multiple consumer processes. All of these consumers are
> under
> > a
> > > > same consumer group. I am noticing that every time we restart this
> > > > application. It takes almost 5 minutes for consumers to start
> > consuming.
> > > > What might be going wrong?
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message