kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ted Yu <yuzhih...@gmail.com>
Subject Re: Kafka Streams application Unable to Horizontally scale and the application on other instances refusing to start.
Date Sun, 17 Sep 2017 14:30:39 GMT
>From StreamThread100.log :

DEBUG | 13:35:32 | clients.NetworkClient (NetworkClient.java:760) -
Initiating connection to node 1 at emspproximitykafka5102.mydomain.com:9092.

Can you take a look at the log on emspproximitykafka5102 to see if there
was some clue around the time of disconnection ?

On Sat, Sep 16, 2017 at 10:34 PM, dev loper <sparkemr@gmail.com> wrote:

> Hi Ted/Bill,
>
> I deleted the data directory of Kafka Brokers and restarted brokers
> yesterday.  It has been better since , But the problem still exits, Below
> are my observations.
>
> *----- *The application pauses in between and take a long time to resume
> in certain instances for no reason I could identify . All this time
> application is not processing messages and the application processing slows
> down on other instance, I understand that some kind of rebalance is going
> on, But I am not sure what triggered this. From application logs I could
> see the  message processing logic what I have is not taking time . Its near
> zero and I am not doing anything more than few mathematical computations
> and forwarding the messages to sink.  Only think I could correlate is , It
> is going for some metadata version  upgrade is going on and it is also one
> other contributing factor for long pauses.
>
>  Below are the sequence of the events when this happens:
>
> a)  DEBUG | 23:19:45 | internals.ProcessorStateManager
> (ProcessorStateManager.java:256) - task [0_1] Flushing all stores
> registered in the state manager
> DEBUG | 23:19:45 | internals.RecordCollectorImpl
> (RecordCollectorImpl.java:142) - task [0_1] Flushing producer
> DEBUG | 23:19:45 | internals.StreamTask (StreamTask.java:288) - task [0_1]
> Committing offsets
>
>
>
> * It happens from task [0_1] to [0_35] since I have 36 Partitions .*
> *b) *Here I could see Commit interval took time 62013 ms, but I am not
> sure why it took
> 62013 ms , earlier I found out in punctuate while I  am putting back to
> state store , state store was taking time over the period of time, I have
> commented the portion of the code where I am putting back to store during
> this test to rule out that issue. On a separate thread  I have raised my
> queries regarding state store performance improvement.
>
>  DEBUG | 23:19:51 | internals.ConsumerCoordinator
> (ConsumerCoordinator.java:758) - Group ProximityKafka-proxkafkalivereplicaengine14
> committed offset 2500775 for partition MYTOPIC05SEPT-17
> DEBUG | 23:19:57 | internals.ConsumerCoordinator
> (ConsumerCoordinator.java:758) - Group ProximityKafka-proxkafkalivereplicaengine14
> committed offset 2518966 for partition MYTOPIC05SEPT-35
> DEBUG | 23:19:57 | internals.StreamThread (StreamThread.java:775) -
> stream-thread [ProximityKafka-proxkafkalivereplicaengine14-
> dd702abb-e1ad-4640-aad3-cc5aa922e373-StreamThread-1] Committing all
> active tasks [0_0, 0_1, 0_2, 0_3, 0_4, 0_5, 0_6, 0_7, 0_8, 0_9, 0_10, 0_11,
> 0_12, 0_13, 0_14, 0_15, 0_16, 0_17, 0_18, 0_19, 0_20, 0_21, 0_22, 0_23,
> 0_24, 0_25, 0_26, 0_27, 0_28, 0_29, 0_30, 0_31, 0_32, 0_33, 0_34, 0_35] and
> standby tasks [] because the commit interval 30000ms has elapsed by 62013ms
> DEBUG | 23:19:57 | internals.ProcessorStateManager
> (ProcessorStateManager.java:256) - task [0_0] Flushing all stores
> registered in the state manager
>
> *c) *Some times I have observed below error , But I don't have  any clue
> why its says Node 1 disconnected. I am able to do a telnet to both Kafka
> broker at the same time and How I can avoid below error, since it is taking
> considerable toll on application performance.
>
> DEBUG | 00:16:41 | clients.NetworkClient (NetworkClient.java:702) - Node 1
> disconnected.
> DEBUG | 00:16:41 | clients.NetworkClient (NetworkClient.java:889) -
> Sending metadata request (type=MetadataRequest, topics=ProximityVisitEventsProtoBuf)
> to node 0
> DEBUG | 00:16:41 | clients.Metadata (Metadata.java:251) - Updated cluster
> metadata version 14 to Cluster(id = NR-WaCkTRwK9Dl2sNKozPQ, nodes = [
> mykafka5102.mydomain.com:9092 (id: 1 rack: null),
> mykafka5101.mydomain.com:9092 (id: 0 rack: null)], partitions =
> [Partition(topic = ProximityVisitEventsProtoBuf, partition = 0, leader = 0,
> replicas = [0], isr = [0])])
>
>
> *c) *After this new consumer is created and it starts processing
> messages.   Again it goes back to Step a and the application paused for
> long time .
>
> *-----* Kafka Re balance is taking lot of time  when I start the instance
> one after the another and if  the gap between application start time
> between one instance and other instance is more than a minute.
>
> Below I have mentioned the hardware configuration and number of instances
> we are using for this solution. Please let me know if hardware is the
> limiting factor here. We didn't go for higher configuration since the load
> average on these instances were quite low and I could hardly see any CPU
> spikes .
>
> Kafka Machine Machine Details: - 2 Broker Instances with below
> Configuration ,  (Current CPU Usage 2%- 8%)
>
>  Instance Type : AWS T2 Large
>   Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS
>
> Kafka Streams Instance : 3 Kafka Streams  Application Instances (Current
> CPU Usage 8%- 24%)
>
>     Instance Type : AWS M4 Large
>     Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS (Dedicated
> EBS bandwidth 450 mbps)
> Thanks
>
> Dev
>
>
>
> On Sun, Sep 17, 2017 at 10:01 AM, Ted Yu <yuzhihong@gmail.com> wrote:
>
>> I downloaded and expanded StreamThread100.log
>>
>> I found the following line:
>>
>> StreamThread100.log:org.apache.kafka.streams.errors.StreamsException:
>> stream-thread failed to suspend stream tasks
>>
>> Previously I was using this line from your previous email:
>>
>> StreamThread100.log:org.apache.kafka.streams.errors.StreamsException:
>> stream-thread failed to suspend stream tasks
>>
>> The error seems to correspond to the following in
>> ConsumerCoordinator.sendOffsetCommitRequest():
>>
>>         if (generation == null)
>>
>>             return RequestFuture.failure(new CommitFailedException());
>> Let me study the code more.
>>
>> On Sat, Sep 16, 2017 at 9:09 PM, dev loper <sparkemr@gmail.com> wrote:
>>
>>> Hi Ted ,
>>>
>>> You might be looking at he logs which I shared earlier. Later in the
>>> mail chain I have provided another set of  StreamThread logs . I have
>>> re-attahced the  logs with this mail for your reference .Also I have copied
>>> the mail content below.
>>>
>>> Hi Damian,
>>>
>>> I have repeated my tests with slight configuration change. The current
>>> logs captured for "StreamThread"  keyword has more relevant logs when
>>> compared to logs which i shared previously. I started the application on
>>> instances 100,101 and 102 simultaneously with below configuration
>>>
>>> 1)  Reduced  MAX_POLL_RECORDS_CONFIG to 5000  (previously 50000)
>>>  2) Reduced MAX_POLL_RECORDS_CONFIG =60000 (Ipreviously nteger.MAXVALUE)
>>>
>>> When the application started all three instances started processing for
>>> first few minutes everything went well. After that I could see that
>>> "StreamThread100" error consumer was going for a toss and it started
>>> closing and creating the consumers for a while exactly with the pattern of
>>> logs I mentioned in my previous email and after some time I could see that
>>> " StreamThread100" stopped processing messages with below exception and the
>>> other two continued processing messages without any issues.
>>>
>>> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot
>>> be completed since the group has already rebalanced and assigned the
>>> partitions to another member. This means that the time between subsequent
>>> calls to poll() was longer than the configured max.poll.interval.ms,
>>> which typically implies that the poll loop is spending too much time
>>> message processing. You can address this either by increasing the session
>>> timeout or by reducing the maximum size of batches returned in poll() with
>>> max.poll.records.
>>>
>>> I think since the consumers were starting and stopping there was no poll
>>> made form the system . Since I reduced Reduced MAX_POLL_RECORDS_CONFIG
>>> =60000  and the processors were getting closed and started which might have
>>> resulted in the   "CommitFailedException due to non avialability of
>>> processing processors.
>>>
>>> After some time the issue got propagated to other servers, I have
>>> attached the relevant logs with this mail Kindly go through this and let me
>>> know how I can  solve this issue ?
>>>
>>>
>>> On Sun, Sep 17, 2017 at 2:23 AM, Ted Yu <yuzhihong@gmail.com> wrote:
>>>
>>>> I searched for 'stream-thread failed to' in StreamThread100.log but
>>>> didn't find any occurrence:
>>>>
>>>> StreamThread100.log:org.apache.kafka.streams.errors.StreamsException:
>>>> stream-thread failed to suspend stream tasks
>>>>
>>>> Can you double check ?
>>>>
>>>> On Fri, Sep 15, 2017 at 8:09 PM, dev loper <sparkemr@gmail.com> wrote:
>>>>
>>>>> Hi All ,
>>>>>
>>>>> @Bill,
>>>>>
>>>>> I will reduce the MAX_POLL_RECORDS to 500/1000 and I will share the
>>>>> results shortly.
>>>>>
>>>>> @Ted,
>>>>>
>>>>> Yes I reduced MAX_POLL_RECORDS_CONFIG from 50000 to 5000 . It was not
>>>>> a typo . Do you think 50000 is way too high for an kafkaStreams Application
>>>>> ?  My Spark application which I was trying to replace with kafka streams
>>>>> was processing 250000 messages per 5 second batch, That was the reason
I
>>>>> set 50000 records for MAX_POLL_RECORDS_CONFIG .
>>>>>
>>>>> I don't think I have hit  KAFKA-5397since I couldn't find any
>>>>> instance "org.apache.kafka.streams.errors.LockException"  in my logs.
>>>>> I could see below exception , but these exceptions  are triggered long
>>>>> after the application stopped consuming any messages .
>>>>>
>>>>> StreamThread100.log:org.apache.kafka.streams.errors.StreamsException:
>>>>> stream-thread failed to suspend stream tasks
>>>>> User provided listener org.apache.kafka.streams.proce
>>>>> ssor.internals.StreamThread$RebalanceListener for group
>>>>> myKafka-kafkareplica101Sept08 failed on partition revocation
>>>>>
>>>>> @Damian , I figured out the  pattern below, I don't know whether it
>>>>> helps.  The streamthread logs which I shared, Did it help?. I
>>>>>
>>>>>
>>>>> I couldn't figure of the reason why the consumers are getting closed
>>>>> while its getting allocated and suddenly stopped without any reason.
If you
>>>>> look at the pattern
>>>>>
>>>>>  1) Consumer is getting Created with Config Values Supplied by the
>>>>> Application.
>>>>>  2) Adding Sensors
>>>>>  3) Fetching API Version and Initiating Connections to Kafka Brokers
>>>>> (NetworkClient.java)
>>>>>  4)  Sending metadata request and Recorded API Version From Broker
>>>>> (NetworkClient.java)
>>>>> 5)  Updated cluster metadata version (Some Incremental verison ) to
>>>>> Cluster
>>>>> 6)  Discovered coordinator (One of the kafka Brokers)
>>>>> 7) Initiating connection to coordinator  (One of the kafka Brokers)
>>>>> 8) fetching committed offsets for partitions (
>>>>> ConsumerCoordinator.java:826)
>>>>> 9)  Recorded API versions for node
>>>>> 10)  Resetting offset for partition( for all the partitions)
>>>>> 11) Handling ListOffsetResponse ( for all the partitions)
>>>>> 12) Removing Sensors ( I couldn't see any exceptions though)
>>>>> 13) consumer.KafkaConsumer (KafkaConsumer.java:1617) - The Kafka
>>>>> consumer has closed. (
>>>>> I couldn't see any exceptions though, I couldn't figure out the reason
>>>>> why KafkaConsumer Closed , no reasons provided in the logs)
>>>>> 14) The kafka streams application goes back o step 1 and creates the
>>>>> consumer again.( the whole process is repeated but the application 90%
of
>>>>> the time never recovers)
>>>>> 15) Since I have introduced the limit for MAX_POLL_RECORDS_CONFIG
>>>>> =60000 (previouslyI nteger.MAXVALUE), I could See below exception after
60
>>>>> seconds of consumer retry.
>>>>>  org.apache.kafka.clients.consumer.CommitFailedException: Commit
>>>>> cannot be completed since the group has already rebalancedAfter
>>>>>
>>>>> I am not sure what could be wrong on my side and how I can resolve
>>>>> this issue so that my application start processing messages consistently.
>>>>>
>>>>> Thanks
>>>>> Dev
>>>>>
>>>>> On Sat, Sep 16, 2017 at 2:02 AM, Bill Bejeck <bill@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Could you set MAX_POLL_RECORDS to something lower like 500 or 1000
>>>>>> and try
>>>>>> again?
>>>>>>
>>>>>> Thanks,
>>>>>> Bill
>>>>>>
>>>>>> On Fri, Sep 15, 2017 at 3:40 PM, dev loper <sparkemr@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> > Hi Damian,
>>>>>> >
>>>>>> > I have repeated my tests with slight configuration change. The
>>>>>> current
>>>>>> > logs captured for "StreamThread"  keyword has more relevant
logs
>>>>>> when
>>>>>> > compared to logs which i shared previously. I started the
>>>>>> application on
>>>>>> > instances 100,101 and 102 simultaneously with below configuration
>>>>>> >
>>>>>> > 1)  Reduced  MAX_POLL_RECORDS_CONFIG to 5000  (previously 50000)
>>>>>> >  2) Reduced MAX_POLL_RECORDS_CONFIG =60000 (Ipreviously
>>>>>> nteger.MAXVALUE)
>>>>>> >
>>>>>> > When the application started all three instances started processing
>>>>>> for
>>>>>> > first few minutes everything went well. After that I could see
that
>>>>>> > "StreamThread100" error consumer was going for a toss and it
started
>>>>>> > closing and creating the consumers for a while exactly with
the
>>>>>> pattern of
>>>>>> > logs I mentioned in my previous email and after some time I
could
>>>>>> see that
>>>>>> > " StreamThread100" stopped processing messages with below exception
>>>>>> and the
>>>>>> > other two continued processing messages without any issues.
>>>>>> >
>>>>>> > org.apache.kafka.clients.consumer.CommitFailedException: Commit
>>>>>> cannot be
>>>>>> > completed since the group has already rebalanced and assigned
the
>>>>>> > partitions to another member. This means that the time between
>>>>>> subsequent
>>>>>> > calls to poll() was longer than the configured max.poll.interval.ms
>>>>>> ,
>>>>>> > which typically implies that the poll loop is spending too much
time
>>>>>> > message processing. You can address this either by increasing
the
>>>>>> session
>>>>>> > timeout or by reducing the maximum size of batches returned
in
>>>>>> poll() with
>>>>>> > max.poll.records.
>>>>>> >
>>>>>> > I think since the consumers were starting and stopping there
was no
>>>>>> poll
>>>>>> > made form the system . Since I reduced Reduced
>>>>>> MAX_POLL_RECORDS_CONFIG
>>>>>> > =60000  and the processors were getting closed and started which
>>>>>> might have
>>>>>> > resulted in the   "CommitFailedException due to non avialability
of
>>>>>> > processing processors.
>>>>>> >
>>>>>> > After some time the issue got propagated to other servers, I
have
>>>>>> attached
>>>>>> > the relevant logs with this mail Kindly go through this and
let me
>>>>>> know how
>>>>>> > I can  solve this issue ?
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > <https://mail.google.com/mail/?ui=2&ik=6aa5d30a60&view=att&t
>>>>>> h=15e8701650e040b7&attid=0.3&disp=safe&realattid=f_j7m9sld82&zw>
>>>>>> >
>>>>>> > On Fri, Sep 15, 2017 at 10:33 PM, dev loper <sparkemr@gmail.com>
>>>>>> wrote:
>>>>>> >
>>>>>> >> Hi Ted,
>>>>>> >>
>>>>>> >> What should I be looking in broker logs ? I haven't looked
at the
>>>>>> broker
>>>>>> >> side since my spark application processing from the same
topic
>>>>>> with a
>>>>>> >> different group id is able to process well.
>>>>>> >>
>>>>>> >> On Fri, Sep 15, 2017 at 3:30 PM, Ted Yu <yuzhihong@gmail.com>
>>>>>> wrote:
>>>>>> >>
>>>>>> >>> Is there some clue in broker logs ?
>>>>>> >>>
>>>>>> >>> Thanks
>>>>>> >>>
>>>>>> >>> On Thu, Sep 14, 2017 at 11:19 PM, dev loper <sparkemr@gmail.com>
>>>>>> wrote:
>>>>>> >>>
>>>>>> >>>> Dear Kafka Users,
>>>>>> >>>>
>>>>>> >>>> I am fairly new to Kafka Streams . I have deployed
two instances
>>>>>> of
>>>>>> >>>> Kafka 0.11 brokers on AWS M3.Xlarge insatnces. I
have created a
>>>>>> topic with
>>>>>> >>>> 36 partitions .and speperate application writes
to this topic
>>>>>> and it
>>>>>> >>>> produces records at the rate of 10000 messages per
second. I
>>>>>> have threes
>>>>>> >>>> instances of AWS  M4.xlarge instance  where my Kafka
streams
>>>>>> application is
>>>>>> >>>> running which consumes these messages produced by
the other
>>>>>> application.
>>>>>> >>>> The application  starts up fine working fine and
its processing
>>>>>> messages on
>>>>>> >>>> the first instance,  but when I start the same application
on
>>>>>> other
>>>>>> >>>> instances it is not starting even though the process
is alive it
>>>>>> is not
>>>>>> >>>> processing messages.Also I could see the other instances
takes a
>>>>>> long time
>>>>>> >>>> to start .
>>>>>> >>>>
>>>>>> >>>> Apart from first instance,  other instances I could
see the
>>>>>> consumer
>>>>>> >>>> getting added and removed repeatedly and I couldn't
see any
>>>>>> message
>>>>>> >>>> processing at all . I have attached the detailed
logs where this
>>>>>> behavior
>>>>>> >>>> is observed.
>>>>>> >>>>
>>>>>> >>>> Consumer is getting started with below log in these
instances and
>>>>>> >>>> getting stopped with below log (* detailed logs
attached *)
>>>>>> >>>>
>>>>>> >>>> INFO  | 21:59:30 | consumer.ConsumerConfig
>>>>>> (AbstractConfig.java:223) -
>>>>>> >>>> ConsumerConfig values:
>>>>>> >>>>     auto.commit.interval.ms = 5000
>>>>>> >>>>     auto.offset.reset = latest
>>>>>> >>>>     bootstrap.servers = [l-mykafkainstancekafka5101:9092,
>>>>>> >>>> l-mykafkainstancekafka5102:9092]
>>>>>> >>>>     check.crcs = true
>>>>>> >>>>     client.id =
>>>>>> >>>>     connections.max.idle.ms = 540000
>>>>>> >>>>     enable.auto.commit = false
>>>>>> >>>>     exclude.internal.topics = true
>>>>>> >>>>     fetch.max.bytes = 52428800
>>>>>> >>>>     fetch.max.wait.ms = 500
>>>>>> >>>>     fetch.min.bytes = 1
>>>>>> >>>>     group.id = myKafka-kafkareplica101Sept08
>>>>>> >>>>     heartbeat.interval.ms = 3000
>>>>>> >>>>     interceptor.classes = null
>>>>>> >>>>     internal.leave.group.on.close = true
>>>>>> >>>>     isolation.level = read_uncommitted
>>>>>> >>>>     key.deserializer = class mx.july.jmx.proximity.kafka.Ka
>>>>>> fkaKryoCodec
>>>>>> >>>>     max.partition.fetch.bytes = 1048576
>>>>>> >>>>     max.poll.interval.ms = 300000
>>>>>> >>>>     max.poll.records = 500
>>>>>> >>>>     metadata.max.age.ms = 300000
>>>>>> >>>>     metric.reporters = []
>>>>>> >>>>     metrics.num.samples = 2
>>>>>> >>>>     metrics.recording.level = INFO
>>>>>> >>>>     metrics.sample.window.ms = 30000
>>>>>> >>>>     partition.assignment.strategy = [class
>>>>>> >>>> org.apache.kafka.clients.consumer.RangeAssignor]
>>>>>> >>>>     receive.buffer.bytes = 65536
>>>>>> >>>>     reconnect.backoff.max.ms = 1000
>>>>>> >>>>     reconnect.backoff.ms = 50
>>>>>> >>>>     request.timeout.ms = 305000
>>>>>> >>>>     retry.backoff.ms = 100
>>>>>> >>>>     sasl.jaas.config = null
>>>>>> >>>>     sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>>>>> >>>>     sasl.kerberos.min.time.before.relogin = 60000
>>>>>> >>>>     sasl.kerberos.service.name = null
>>>>>> >>>>     sasl.kerberos.ticket.renew.jitter = 0.05
>>>>>> >>>>     sasl.kerberos.ticket.renew.window.factor = 0.8
>>>>>> >>>>     sasl.mechanism = GSSAPI
>>>>>> >>>>     security.protocol = PLAINTEXT
>>>>>> >>>>     send.buffer.bytes = 131072
>>>>>> >>>>     session.timeout.ms = 10000
>>>>>> >>>>     ssl.cipher.suites = null
>>>>>> >>>>     ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>>>>> >>>>     ssl.endpoint.identification.algorithm = null
>>>>>> >>>>     ssl.key.password = null
>>>>>> >>>>     ssl.keymanager.algorithm = SunX509
>>>>>> >>>>     ssl.keystore.location = null
>>>>>> >>>>     ssl.keystore.password = null
>>>>>> >>>>     ssl.keystore.type = JKS
>>>>>> >>>>     ssl.protocol = TLS
>>>>>> >>>>     ssl.provider = null
>>>>>> >>>>     ssl.secure.random.implementation = null
>>>>>> >>>>     ssl.trustmanager.algorithm = PKIX
>>>>>> >>>>     ssl.truststore.location = null
>>>>>> >>>>     ssl.truststore.password = null
>>>>>> >>>>     ssl.truststore.type = JKS
>>>>>> >>>>     value.deserializer = class my.dev.MessageUpdateCodec
>>>>>> >>>>
>>>>>> >>>>
>>>>>> >>>> DEBUG | 21:59:30 | consumer.KafkaConsumer
>>>>>> (KafkaConsumer.java:1617) -
>>>>>> >>>> The Kafka consumer has closed. and the whole process
repeats.
>>>>>> >>>>
>>>>>> >>>>
>>>>>> >>>>
>>>>>> >>>> Below you can find my startup code for kafkastreams
and the
>>>>>> parameters
>>>>>> >>>> which I have configured for starting the kafkastreams
>>>>>> application .
>>>>>> >>>>
>>>>>> >>>>         private static Properties settings = new
Properties();
>>>>>> >>>>         settings.put(StreamsConfig.APPLICATION_ID_CONFIG,
>>>>>> >>>> "mykafkastreamsapplication");
>>>>>> >>>>         settings.put(ConsumerConfig.A
>>>>>> UTO_OFFSET_RESET_CONFIG,"latest");
>>>>>> >>>>         settings.put(ConsumerConfig.H
>>>>>> EARTBEAT_INTERVAL_MS_CONFIG,"10
>>>>>> >>>> 000");
>>>>>> >>>>         settings.put(ConsumerConfig.S
>>>>>> ESSION_TIMEOUT_MS_CONFIG,"30000");
>>>>>> >>>>         settings.put(ConsumerConfig.M
>>>>>> AX_POLL_INTERVAL_MS_CONFIG,Inte
>>>>>> >>>> ger.MAX_VALUE);
>>>>>> >>>>         settings.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
>>>>>> "10000");
>>>>>> >>>>         settings.put(ConsumerConfig.C
>>>>>> ONNECTIONS_MAX_IDLE_MS_CONFIG,"
>>>>>> >>>> 60000");
>>>>>> >>>>
>>>>>> >>>>         KStreamBuilder builder = new KStreamBuilder();
>>>>>> >>>>         KafkaStreams streams = new KafkaStreams(builder,
>>>>>> settings);
>>>>>> >>>>         builder.addSource(.....
>>>>>> >>>>          .addProcessor  .............
>>>>>> >>>>          .addProcessor  ........
>>>>>> >>>>
>>>>>> >>>>          .addStateStore(...............
>>>>>> ....).persistent().build(),"my
>>>>>> >>>> processor")
>>>>>> >>>>          .addSink ..............
>>>>>> >>>>          . addSink ..............
>>>>>> >>>>           streams.start();
>>>>>> >>>>
>>>>>> >>>> and I am using a Simple  processor to process my
logic ..
>>>>>> >>>>
>>>>>> >>>> public class InfoProcessor extends AbstractProcessor<Key,
>>>>>> Update> {
>>>>>> >>>> private static Logger logger = Logger.getLogger(InfoProcessor
>>>>>> .class);
>>>>>> >>>> private ProcessorContext context;
>>>>>> >>>> private KeyValueStore<Key, Info> infoStore;
>>>>>> >>>>
>>>>>> >>>> @Override
>>>>>> >>>> @SuppressWarnings("unchecked")
>>>>>> >>>> public void init(ProcessorContext context) {
>>>>>> >>>>     this.context = context;
>>>>>> >>>>     this.context.schedule(Constants.BATCH_DURATION_SECONDS
*
>>>>>> 1000);
>>>>>> >>>>     infoStore = (KeyValueStore<Key, Info>)
>>>>>> >>>> context.getStateStore("InfoStore");
>>>>>> >>>> }
>>>>>> >>>>
>>>>>> >>>> @Override
>>>>>> >>>> public void process(Key key, Update update) {
>>>>>> >>>>     try {
>>>>>> >>>>         if (key != null && update != null)
{
>>>>>> >>>>             Info info = infoStore.get(key);
>>>>>> >>>>             // merge logic
>>>>>> >>>>             infoStore.put(key, info);
>>>>>> >>>>         }
>>>>>> >>>>
>>>>>> >>>>     } catch (Exception e) {
>>>>>> >>>>         logger.error(e.getMessage(), e);
>>>>>> >>>>     } finally {
>>>>>> >>>>     }
>>>>>> >>>>     context.commit();
>>>>>> >>>> }
>>>>>> >>>>
>>>>>> >>>> @Override
>>>>>> >>>> public void punctuate(long timestamp) {
>>>>>> >>>>     try {
>>>>>> >>>>         KeyValueIterator<Key, Info> iter =
this.infoStore.all();
>>>>>> >>>>         while (iter.hasNext()) {
>>>>>> >>>>             // processing logic
>>>>>> >>>>
>>>>>> >>>>         }
>>>>>> >>>>         iter.close();
>>>>>> >>>>         context.commit();
>>>>>> >>>>     } catch (Exception e) {
>>>>>> >>>>         logger.error(e.getMessage(), e);
>>>>>> >>>>     }
>>>>>> >>>> }
>>>>>> >>>>
>>>>>> >>>>
>>>>>> >>>>
>>>>>> >>>>
>>>>>> >>>>
>>>>>> >>>
>>>>>> >>
>>>>>> >
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message