kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sabarish Sasidharan <sabarish....@gmail.com>
Subject Re: Improving Kafka State Store performance
Date Sun, 17 Sep 2017 09:55:19 GMT
T2 instances are credit based instances that do not provide constant
throughout in cpu and io. You would want to reconsider using t2.

That said we too face issues with scaling rocksdb. Although I don't
remember the performance benchmark numbers but I do remember it was at
least 3x slower when compared to hashmap storage. Every lookup, every write
contributed to the degradation. Changelogging had negligible overhead btw.

Regards
Sab

On 17 Sep 2017 8:52 am, "dev loper" <sparkemr@gmail.com> wrote:

> Hi Bill.
>
> Thank you pointing out, But in actual code I am calling iter.close() in the
> finally block if the iterator is not null. I don't see any issues when I am
> running it on light traffic. As soon as I switch to production traffic I
> start seeing these issues.
>
> Below I have provided additional details about our current application. If
> you are looking for specific logs or details , please let me know. I will
> get the details captured.
>
> In production environment I am receiving 10,000 messages per second. There
> are 36 partitions  for the topic and there are around 2500 unique entities
> per partition for which I have to maintain the state.
>
> Below I have mentioned the hardware configuration and number of instances
> we are using for this solution. Please let me know if hardware is the
> limiting factor here. We didn't go for higher configuration since the load
> average on these instances were quite low and I could hardly see any CPU
> spikes .
>
>
> Kafka Machine Machine Details: - 2 Broker Instances with below
> Configuration ,  (Current CPU Usage 2%- 8%)
>
>  Instance Type : AWS T2 Large
>   Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS
>
> Kafka Streams Instance : 3 Kafka Streams  Application Instances (Current
> CPU Usage 8%- 24%)
>
>     Instance Type : AWS M4 Large
>     Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS (Dedicated EBS
> bandwidth 450 mbps)
>
>
>
> On Sat, Sep 16, 2017 at 10:05 PM, Bill Bejeck <bill@confluent.io> wrote:
>
> > Hi,
> >
> > It's hard to say exactly without a little more information.
> >
> > On a side note, I don't see where you are closing the KeyValueIterator in
> > the code above. Not closing a KeyValueIterator on a Permanent State Store
> > can cause a resource leak over time, so I'd add `iter.close()` right
> before
> > your `logger.info` call.  It might be worth retrying at that point.
> >
> > Thanks,
> > Bill
> >
> > On Sat, Sep 16, 2017 at 9:14 AM, dev loper <sparkemr@gmail.com> wrote:
> >
> > > Hi Kafka Streams Users,
> > >
> > > I am trying to improve the performance of Kafka Streams State Store
> > > Persistent Store. In our application we are using Kafka Streams
> Processor
> > > API  and using Persistent State Store.. My application when starts up
> it
> > > performing well but over a period of time the performance
> deteriorated. I
> > > am computing certain results in computeAnalytics method and this method
> > is
> > > not taking time at all. This method is being called within both process
> > and
> > > punctuate and I am storing the updated object back to store. Over the
> > > period of time its taking huge time for completing the punctuate
> process
> > > and I could see majority of the time is spent in storing the records
> and
> > > Iterating the records. The record size is just 2500 per partition. I am
> > not
> > > where I am going wrong and how can I improve the performance.
> > >
> > > Below is one such sample log record.
> > >
> > > INFO  | 07:59:58 | processors.MyProcessor (MyProcessor.java:123) - Time
> > > Metrics for punctuate  for TimeStamp :: 1505564655878 processed Records
> > ::
> > > 2109 totalTimeTakenToProcessRecords :: 2 totalTimeTakenToStoreRecord ::
> > > 27605toal time Taken to retrieve Records :: 12787 Total time Taken ::
> > 40394
> > >
> > > Below I have given my pseudo code for my processor which exactly
> > resembles
> > > the code which I am using in my application.
> > >
> > > MyProcessor(){
> > >
> > >   process(Key objectkey, Update eventupdate){
> > >    long timestamp=context.timestamp();
> > >    AnalyticeObj storeobj=store.get(objectkey);
> > >
> > >    if( storeobj ===null)
> > >          {
> > >           storeobj=new  AnalyticeObj(objectkey,eventupdate,timestamp)
> > >          }
> > >          else
> > >         {
> > >            storeobj.update(eventupdate,timestamp)
> > >         }
> > >      storeobj=storeobj.computeAnalytics();
> > >
> > >    store.put(objectkey,storeobj);
> > >   context.commit();
> > > }
> > > // Every 5 seconds
> > > punctuate(long timestamp)
> > > {
> > >  long startTime = System.currentTimeMillis();
> > > long totalTimeTakenToProcessRecords=0;
> > > long totalTimeTakenToStoreRecords=0;
> > > long counter=0;
> > > KeyValueIterator iter=this.visitStore.all();
> > > while (iter.hasNext()) {
> > > KeyValue<Key, AnalyticeObj> entry = iter.next();
> > >
> > >     if(AnalyticeObj.hasExpired(timestamp)
> > >          store.remove(entry.key)
> > >       else
> > >       {
> > >         long processStartTime=System.currentTimeMillis();
> > >          AnalyticeObj storeobj= entry.value.computeAnalytics(
> timestamp);
> > >
> > > totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords
> > > +(System.currentTimeMillis()-processStartTime);
> > >
> > >          long storeStartTime=System.currentTimeMillis();
> > >           store.put(entry.key,storeobj);
> > >
> > > totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+(
> > > System.currentTimeMillis()-storeStartTime);
> > >        }
> > >    counter++;
> > > }
> > >      logger.info(" Time Metrics for punctuate  "
> > >                     " for TimeStamp :: " + "" + timestamp + " processed
> > > Records :: "
> > >                     + counter +" totalTimeTakenToProcessRecords ::
> > > "+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord ::
> > > "+totalTimeTakenToStoreRecords
> > >                     +"toal time Taken to retrieve Records :: "+
> > > (System.currentTimeMillis() -
> > > (startTime+totalTimeTakenToProcessRecords
> +totalTimeTakenToStoreRecords)
> > )+"
> > > Total time Taken :: " + (System.currentTimeMillis() - startTime));
> > > }
> > > }
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message