kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dev loper <spark...@gmail.com>
Subject Re: Kafka Streams application Unable to Horizontally scale and the application on other instances refusing to start.
Date Sat, 16 Sep 2017 07:34:34 GMT
Thank you Ted for your inputs. I have also observed the similarity in logs.
But I am not sure how to resolve the issue.

@Bill, I repeated my tests by reducing the max MAX_POLL_RECORDS to 500. I
could see the consumer getting started with 500. But the same problem exits
even after the configuration change as well. The consumer for one of the
instance will go down and the whole process repeats which I mentioned in my
previous mail.

On Sat, Sep 16, 2017 at 9:08 AM, Ted Yu <yuzhihong@gmail.com> wrote:

> Though there was no LockException in your log, there is still enough
> similarity between your log and the one posted by johnchou:
>
> https://issues.apache.org/jira/browse/KAFKA-5397?
> focusedCommentId=16162689&page=com.atlassian.jira.
> plugin.system.issuetabpanels:comment-tabpanel#comment-16162689
>
> Even the line numbers match: sendOffsetCommitRequest
> (ConsumerCoordinator.java:725)
>
> Reducing MAX_POLL_RECORDS_CONFIG may lessen the symptom but that would
> not meet your throughput goal.
>
> On Fri, Sep 15, 2017 at 8:09 PM, dev loper <sparkemr@gmail.com> wrote:
>
>> Hi All ,
>>
>> @Bill,
>>
>> I will reduce the MAX_POLL_RECORDS to 500/1000 and I will share the
>> results shortly.
>>
>> @Ted,
>>
>> Yes I reduced MAX_POLL_RECORDS_CONFIG from 50000 to 5000 . It was not a
>> typo . Do you think 50000 is way too high for an kafkaStreams Application
>> ?  My Spark application which I was trying to replace with kafka streams
>> was processing 250000 messages per 5 second batch, That was the reason I
>> set 50000 records for MAX_POLL_RECORDS_CONFIG .
>>
>> I don't think I have hit  KAFKA-5397since I couldn't find any instance "
>> org.apache.kafka.streams.errors.LockException"  in my logs. I could see
>> below exception , but these exceptions  are triggered long after the
>> application stopped consuming any messages .
>>
>> StreamThread100.log:org.apache.kafka.streams.errors.StreamsException:
>> stream-thread failed to suspend stream tasks
>> User provided listener org.apache.kafka.streams.proce
>> ssor.internals.StreamThread$RebalanceListener for group
>> myKafka-kafkareplica101Sept08 failed on partition revocation
>>
>> @Damian , I figured out the  pattern below, I don't know whether it
>> helps.  The streamthread logs which I shared, Did it help?. I
>>
>>
>> I couldn't figure of the reason why the consumers are getting closed
>> while its getting allocated and suddenly stopped without any reason. If you
>> look at the pattern
>>
>>  1) Consumer is getting Created with Config Values Supplied by the
>> Application.
>>  2) Adding Sensors
>>  3) Fetching API Version and Initiating Connections to Kafka Brokers
>> (NetworkClient.java)
>>  4)  Sending metadata request and Recorded API Version From Broker
>> (NetworkClient.java)
>> 5)  Updated cluster metadata version (Some Incremental verison ) to
>> Cluster
>> 6)  Discovered coordinator (One of the kafka Brokers)
>> 7) Initiating connection to coordinator  (One of the kafka Brokers)
>> 8) fetching committed offsets for partitions (
>> ConsumerCoordinator.java:826)
>> 9)  Recorded API versions for node
>> 10)  Resetting offset for partition( for all the partitions)
>> 11) Handling ListOffsetResponse ( for all the partitions)
>> 12) Removing Sensors ( I couldn't see any exceptions though)
>> 13) consumer.KafkaConsumer (KafkaConsumer.java:1617) - The Kafka consumer
>> has closed. (
>> I couldn't see any exceptions though, I couldn't figure out the reason
>> why KafkaConsumer Closed , no reasons provided in the logs)
>> 14) The kafka streams application goes back o step 1 and creates the
>> consumer again.( the whole process is repeated but the application 90% of
>> the time never recovers)
>> 15) Since I have introduced the limit for MAX_POLL_RECORDS_CONFIG =60000
>> (previouslyI nteger.MAXVALUE), I could See below exception after 60 seconds
>> of consumer retry.
>>  org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot
>> be completed since the group has already rebalancedAfter
>>
>> I am not sure what could be wrong on my side and how I can resolve this
>> issue so that my application start processing messages consistently.
>>
>> Thanks
>> Dev
>>
>> On Sat, Sep 16, 2017 at 2:02 AM, Bill Bejeck <bill@confluent.io> wrote:
>>
>>> Hi,
>>>
>>> Could you set MAX_POLL_RECORDS to something lower like 500 or 1000 and
>>> try
>>> again?
>>>
>>> Thanks,
>>> Bill
>>>
>>> On Fri, Sep 15, 2017 at 3:40 PM, dev loper <sparkemr@gmail.com> wrote:
>>>
>>> > Hi Damian,
>>> >
>>> > I have repeated my tests with slight configuration change. The current
>>> > logs captured for "StreamThread"  keyword has more relevant logs when
>>> > compared to logs which i shared previously. I started the application
>>> on
>>> > instances 100,101 and 102 simultaneously with below configuration
>>> >
>>> > 1)  Reduced  MAX_POLL_RECORDS_CONFIG to 5000  (previously 50000)
>>> >  2) Reduced MAX_POLL_RECORDS_CONFIG =60000 (Ipreviously
>>> nteger.MAXVALUE)
>>> >
>>> > When the application started all three instances started processing for
>>> > first few minutes everything went well. After that I could see that
>>> > "StreamThread100" error consumer was going for a toss and it started
>>> > closing and creating the consumers for a while exactly with the
>>> pattern of
>>> > logs I mentioned in my previous email and after some time I could see
>>> that
>>> > " StreamThread100" stopped processing messages with below exception
>>> and the
>>> > other two continued processing messages without any issues.
>>> >
>>> > org.apache.kafka.clients.consumer.CommitFailedException: Commit
>>> cannot be
>>> > completed since the group has already rebalanced and assigned the
>>> > partitions to another member. This means that the time between
>>> subsequent
>>> > calls to poll() was longer than the configured max.poll.interval.ms,
>>> > which typically implies that the poll loop is spending too much time
>>> > message processing. You can address this either by increasing the
>>> session
>>> > timeout or by reducing the maximum size of batches returned in poll()
>>> with
>>> > max.poll.records.
>>> >
>>> > I think since the consumers were starting and stopping there was no
>>> poll
>>> > made form the system . Since I reduced Reduced MAX_POLL_RECORDS_CONFIG
>>> > =60000  and the processors were getting closed and started which might
>>> have
>>> > resulted in the   "CommitFailedException due to non avialability of
>>> > processing processors.
>>> >
>>> > After some time the issue got propagated to other servers, I have
>>> attached
>>> > the relevant logs with this mail Kindly go through this and let me
>>> know how
>>> > I can  solve this issue ?
>>> >
>>> >
>>> >
>>> >
>>> > <https://mail.google.com/mail/?ui=2&ik=6aa5d30a60&view=att&t
>>> h=15e8701650e040b7&attid=0.3&disp=safe&realattid=f_j7m9sld82&zw>
>>> >
>>> > On Fri, Sep 15, 2017 at 10:33 PM, dev loper <sparkemr@gmail.com>
>>> wrote:
>>> >
>>> >> Hi Ted,
>>> >>
>>> >> What should I be looking in broker logs ? I haven't looked at the
>>> broker
>>> >> side since my spark application processing from the same topic with
a
>>> >> different group id is able to process well.
>>> >>
>>> >> On Fri, Sep 15, 2017 at 3:30 PM, Ted Yu <yuzhihong@gmail.com>
wrote:
>>> >>
>>> >>> Is there some clue in broker logs ?
>>> >>>
>>> >>> Thanks
>>> >>>
>>> >>> On Thu, Sep 14, 2017 at 11:19 PM, dev loper <sparkemr@gmail.com>
>>> wrote:
>>> >>>
>>> >>>> Dear Kafka Users,
>>> >>>>
>>> >>>> I am fairly new to Kafka Streams . I have deployed two instances
of
>>> >>>> Kafka 0.11 brokers on AWS M3.Xlarge insatnces. I have created
a
>>> topic with
>>> >>>> 36 partitions .and speperate application writes to this topic
and it
>>> >>>> produces records at the rate of 10000 messages per second. I
have
>>> threes
>>> >>>> instances of AWS  M4.xlarge instance  where my Kafka streams
>>> application is
>>> >>>> running which consumes these messages produced by the other
>>> application.
>>> >>>> The application  starts up fine working fine and its processing
>>> messages on
>>> >>>> the first instance,  but when I start the same application on
other
>>> >>>> instances it is not starting even though the process is alive
it is
>>> not
>>> >>>> processing messages.Also I could see the other instances takes
a
>>> long time
>>> >>>> to start .
>>> >>>>
>>> >>>> Apart from first instance,  other instances I could see the
consumer
>>> >>>> getting added and removed repeatedly and I couldn't see any
message
>>> >>>> processing at all . I have attached the detailed logs where
this
>>> behavior
>>> >>>> is observed.
>>> >>>>
>>> >>>> Consumer is getting started with below log in these instances
and
>>> >>>> getting stopped with below log (* detailed logs attached *)
>>> >>>>
>>> >>>> INFO  | 21:59:30 | consumer.ConsumerConfig
>>> (AbstractConfig.java:223) -
>>> >>>> ConsumerConfig values:
>>> >>>>     auto.commit.interval.ms = 5000
>>> >>>>     auto.offset.reset = latest
>>> >>>>     bootstrap.servers = [l-mykafkainstancekafka5101:9092,
>>> >>>> l-mykafkainstancekafka5102:9092]
>>> >>>>     check.crcs = true
>>> >>>>     client.id =
>>> >>>>     connections.max.idle.ms = 540000
>>> >>>>     enable.auto.commit = false
>>> >>>>     exclude.internal.topics = true
>>> >>>>     fetch.max.bytes = 52428800
>>> >>>>     fetch.max.wait.ms = 500
>>> >>>>     fetch.min.bytes = 1
>>> >>>>     group.id = myKafka-kafkareplica101Sept08
>>> >>>>     heartbeat.interval.ms = 3000
>>> >>>>     interceptor.classes = null
>>> >>>>     internal.leave.group.on.close = true
>>> >>>>     isolation.level = read_uncommitted
>>> >>>>     key.deserializer = class mx.july.jmx.proximity.kafka.Ka
>>> fkaKryoCodec
>>> >>>>     max.partition.fetch.bytes = 1048576
>>> >>>>     max.poll.interval.ms = 300000
>>> >>>>     max.poll.records = 500
>>> >>>>     metadata.max.age.ms = 300000
>>> >>>>     metric.reporters = []
>>> >>>>     metrics.num.samples = 2
>>> >>>>     metrics.recording.level = INFO
>>> >>>>     metrics.sample.window.ms = 30000
>>> >>>>     partition.assignment.strategy = [class
>>> >>>> org.apache.kafka.clients.consumer.RangeAssignor]
>>> >>>>     receive.buffer.bytes = 65536
>>> >>>>     reconnect.backoff.max.ms = 1000
>>> >>>>     reconnect.backoff.ms = 50
>>> >>>>     request.timeout.ms = 305000
>>> >>>>     retry.backoff.ms = 100
>>> >>>>     sasl.jaas.config = null
>>> >>>>     sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>> >>>>     sasl.kerberos.min.time.before.relogin = 60000
>>> >>>>     sasl.kerberos.service.name = null
>>> >>>>     sasl.kerberos.ticket.renew.jitter = 0.05
>>> >>>>     sasl.kerberos.ticket.renew.window.factor = 0.8
>>> >>>>     sasl.mechanism = GSSAPI
>>> >>>>     security.protocol = PLAINTEXT
>>> >>>>     send.buffer.bytes = 131072
>>> >>>>     session.timeout.ms = 10000
>>> >>>>     ssl.cipher.suites = null
>>> >>>>     ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>> >>>>     ssl.endpoint.identification.algorithm = null
>>> >>>>     ssl.key.password = null
>>> >>>>     ssl.keymanager.algorithm = SunX509
>>> >>>>     ssl.keystore.location = null
>>> >>>>     ssl.keystore.password = null
>>> >>>>     ssl.keystore.type = JKS
>>> >>>>     ssl.protocol = TLS
>>> >>>>     ssl.provider = null
>>> >>>>     ssl.secure.random.implementation = null
>>> >>>>     ssl.trustmanager.algorithm = PKIX
>>> >>>>     ssl.truststore.location = null
>>> >>>>     ssl.truststore.password = null
>>> >>>>     ssl.truststore.type = JKS
>>> >>>>     value.deserializer = class my.dev.MessageUpdateCodec
>>> >>>>
>>> >>>>
>>> >>>> DEBUG | 21:59:30 | consumer.KafkaConsumer (KafkaConsumer.java:1617)
>>> -
>>> >>>> The Kafka consumer has closed. and the whole process repeats.
>>> >>>>
>>> >>>>
>>> >>>>
>>> >>>> Below you can find my startup code for kafkastreams and the
>>> parameters
>>> >>>> which I have configured for starting the kafkastreams application
.
>>> >>>>
>>> >>>>         private static Properties settings = new Properties();
>>> >>>>         settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
>>> >>>> "mykafkastreamsapplication");
>>> >>>>         settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"lates
>>> t");
>>> >>>>         settings.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG,"1
>>> 0
>>> >>>> 000");
>>> >>>>         settings.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"3000
>>> 0");
>>> >>>>         settings.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,Int
>>> e
>>> >>>> ger.MAX_VALUE);
>>> >>>>         settings.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
>>> "10000");
>>> >>>>         settings.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG,
>>> "
>>> >>>> 60000");
>>> >>>>
>>> >>>>         KStreamBuilder builder = new KStreamBuilder();
>>> >>>>         KafkaStreams streams = new KafkaStreams(builder, settings);
>>> >>>>         builder.addSource(.....
>>> >>>>          .addProcessor  .............
>>> >>>>          .addProcessor  ........
>>> >>>>
>>> >>>>          .addStateStore(...............
>>> ....).persistent().build(),"my
>>> >>>> processor")
>>> >>>>          .addSink ..............
>>> >>>>          . addSink ..............
>>> >>>>           streams.start();
>>> >>>>
>>> >>>> and I am using a Simple  processor to process my logic ..
>>> >>>>
>>> >>>> public class InfoProcessor extends AbstractProcessor<Key,
Update> {
>>> >>>> private static Logger logger = Logger.getLogger(InfoProcessor
>>> .class);
>>> >>>> private ProcessorContext context;
>>> >>>> private KeyValueStore<Key, Info> infoStore;
>>> >>>>
>>> >>>> @Override
>>> >>>> @SuppressWarnings("unchecked")
>>> >>>> public void init(ProcessorContext context) {
>>> >>>>     this.context = context;
>>> >>>>     this.context.schedule(Constants.BATCH_DURATION_SECONDS *
1000);
>>> >>>>     infoStore = (KeyValueStore<Key, Info>)
>>> >>>> context.getStateStore("InfoStore");
>>> >>>> }
>>> >>>>
>>> >>>> @Override
>>> >>>> public void process(Key key, Update update) {
>>> >>>>     try {
>>> >>>>         if (key != null && update != null) {
>>> >>>>             Info info = infoStore.get(key);
>>> >>>>             // merge logic
>>> >>>>             infoStore.put(key, info);
>>> >>>>         }
>>> >>>>
>>> >>>>     } catch (Exception e) {
>>> >>>>         logger.error(e.getMessage(), e);
>>> >>>>     } finally {
>>> >>>>     }
>>> >>>>     context.commit();
>>> >>>> }
>>> >>>>
>>> >>>> @Override
>>> >>>> public void punctuate(long timestamp) {
>>> >>>>     try {
>>> >>>>         KeyValueIterator<Key, Info> iter = this.infoStore.all();
>>> >>>>         while (iter.hasNext()) {
>>> >>>>             // processing logic
>>> >>>>
>>> >>>>         }
>>> >>>>         iter.close();
>>> >>>>         context.commit();
>>> >>>>     } catch (Exception e) {
>>> >>>>         logger.error(e.getMessage(), e);
>>> >>>>     }
>>> >>>> }
>>> >>>>
>>> >>>>
>>> >>>>
>>> >>>>
>>> >>>>
>>> >>>
>>> >>
>>> >
>>>
>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message