kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dev loper <spark...@gmail.com>
Subject Re: Improving Kafka State Store performance
Date Thu, 21 Sep 2017 03:28:16 GMT
Hi Bill,

I will repeat my tests with Rocks DB enabled and I will revert to you with
details. I might take 1-2 days to get back to you with details since I am
traveling.  But I will try my level best to get it tonight.

On Mon, Sep 18, 2017 at 5:30 PM, Bill Bejeck <bill@confluent.io> wrote:

> I'm following up from your other thread as well here.  Thanks for the info
> above, that is helpful.
>
> I think the AWS instance type might be a factor here, but let's do some
> more homework first.
>
> For a next step, we could enable logging for RocksDB so we can observe the
> performance.
>
> Here is some sample code that will allow logging at the INFO level as well
> as print out statistics (using RocksDB internal stats) every 15 minutes.
>
> Would you mind reverting your Streams application to use a persistent store
> again?
>
> Then let it run until you observe the behavior you described before and if
> you don't mind share the logs with me so we can look them over.  Thanks!
>
> import org.apache.kafka.streams.state.RocksDBConfigSetter;
> import org.rocksdb.InfoLogLevel;
> import org.rocksdb.Options;
>
> import java.util.Map;
>
> public class RocksDbLogsConfig implements RocksDBConfigSetter {
>
>     @Override
>     public void setConfig(String storeName, Options options, Map<String,
> Object> configs) {
>                    options.setInfoLogLevel(InfoLogLevel.INFO_LEVEL);
>                    options.createStatistics();
>                    options.setStatsDumpPeriodSec(900);
>                    options.setDbLogDir("UPDATE WITH PATH WHERE YOU WANT LOG
> FILES");
>     }
> }
>
> To use the RocksDbLogsConfig class, you'll need to update your Streams
> configs like so:
>
>   props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
> RocksDbLogsConfig.class);
>
>
>
> Thanks
> Bill
>
> On Sat, Sep 16, 2017 at 11:22 PM, dev loper <sparkemr@gmail.com> wrote:
>
> > Hi Bill.
> >
> > Thank you pointing out, But in actual code I am calling iter.close() in
> the
> > finally block if the iterator is not null. I don't see any issues when I
> am
> > running it on light traffic. As soon as I switch to production traffic I
> > start seeing these issues.
> >
> > Below I have provided additional details about our current application.
> If
> > you are looking for specific logs or details , please let me know. I will
> > get the details captured.
> >
> > In production environment I am receiving 10,000 messages per second.
> There
> > are 36 partitions  for the topic and there are around 2500 unique
> entities
> > per partition for which I have to maintain the state.
> >
> > Below I have mentioned the hardware configuration and number of instances
> > we are using for this solution. Please let me know if hardware is the
> > limiting factor here. We didn't go for higher configuration since the
> load
> > average on these instances were quite low and I could hardly see any CPU
> > spikes .
> >
> >
> > Kafka Machine Machine Details: - 2 Broker Instances with below
> > Configuration ,  (Current CPU Usage 2%- 8%)
> >
> >  Instance Type : AWS T2 Large
> >   Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS
> >
> > Kafka Streams Instance : 3 Kafka Streams  Application Instances (Current
> > CPU Usage 8%- 24%)
> >
> >     Instance Type : AWS M4 Large
> >     Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS (Dedicated
> EBS
> > bandwidth 450 mbps)
> >
> >
> >
> > On Sat, Sep 16, 2017 at 10:05 PM, Bill Bejeck <bill@confluent.io> wrote:
> >
> > > Hi,
> > >
> > > It's hard to say exactly without a little more information.
> > >
> > > On a side note, I don't see where you are closing the KeyValueIterator
> in
> > > the code above. Not closing a KeyValueIterator on a Permanent State
> Store
> > > can cause a resource leak over time, so I'd add `iter.close()` right
> > before
> > > your `logger.info` call.  It might be worth retrying at that point.
> > >
> > > Thanks,
> > > Bill
> > >
> > > On Sat, Sep 16, 2017 at 9:14 AM, dev loper <sparkemr@gmail.com> wrote:
> > >
> > > > Hi Kafka Streams Users,
> > > >
> > > > I am trying to improve the performance of Kafka Streams State Store
> > > > Persistent Store. In our application we are using Kafka Streams
> > Processor
> > > > API  and using Persistent State Store.. My application when starts up
> > it
> > > > performing well but over a period of time the performance
> > deteriorated. I
> > > > am computing certain results in computeAnalytics method and this
> method
> > > is
> > > > not taking time at all. This method is being called within both
> process
> > > and
> > > > punctuate and I am storing the updated object back to store. Over the
> > > > period of time its taking huge time for completing the punctuate
> > process
> > > > and I could see majority of the time is spent in storing the records
> > and
> > > > Iterating the records. The record size is just 2500 per partition. I
> am
> > > not
> > > > where I am going wrong and how can I improve the performance.
> > > >
> > > > Below is one such sample log record.
> > > >
> > > > INFO  | 07:59:58 | processors.MyProcessor (MyProcessor.java:123) -
> Time
> > > > Metrics for punctuate  for TimeStamp :: 1505564655878 processed
> Records
> > > ::
> > > > 2109 totalTimeTakenToProcessRecords :: 2 totalTimeTakenToStoreRecord
> ::
> > > > 27605toal time Taken to retrieve Records :: 12787 Total time Taken ::
> > > 40394
> > > >
> > > > Below I have given my pseudo code for my processor which exactly
> > > resembles
> > > > the code which I am using in my application.
> > > >
> > > > MyProcessor(){
> > > >
> > > >   process(Key objectkey, Update eventupdate){
> > > >    long timestamp=context.timestamp();
> > > >    AnalyticeObj storeobj=store.get(objectkey);
> > > >
> > > >    if( storeobj ===null)
> > > >          {
> > > >           storeobj=new  AnalyticeObj(objectkey,
> eventupdate,timestamp)
> > > >          }
> > > >          else
> > > >         {
> > > >            storeobj.update(eventupdate,timestamp)
> > > >         }
> > > >      storeobj=storeobj.computeAnalytics();
> > > >
> > > >    store.put(objectkey,storeobj);
> > > >   context.commit();
> > > > }
> > > > // Every 5 seconds
> > > > punctuate(long timestamp)
> > > > {
> > > >  long startTime = System.currentTimeMillis();
> > > > long totalTimeTakenToProcessRecords=0;
> > > > long totalTimeTakenToStoreRecords=0;
> > > > long counter=0;
> > > > KeyValueIterator iter=this.visitStore.all();
> > > > while (iter.hasNext()) {
> > > > KeyValue<Key, AnalyticeObj> entry = iter.next();
> > > >
> > > >     if(AnalyticeObj.hasExpired(timestamp)
> > > >          store.remove(entry.key)
> > > >       else
> > > >       {
> > > >         long processStartTime=System.currentTimeMillis();
> > > >          AnalyticeObj storeobj= entry.value.computeAnalytics(
> > timestamp);
> > > >
> > > > totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords
> > > > +(System.currentTimeMillis()-processStartTime);
> > > >
> > > >          long storeStartTime=System.currentTimeMillis();
> > > >           store.put(entry.key,storeobj);
> > > >
> > > > totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+(
> > > > System.currentTimeMillis()-storeStartTime);
> > > >        }
> > > >    counter++;
> > > > }
> > > >      logger.info(" Time Metrics for punctuate  "
> > > >                     " for TimeStamp :: " + "" + timestamp + "
> processed
> > > > Records :: "
> > > >                     + counter +" totalTimeTakenToProcessRecords ::
> > > > "+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord ::
> > > > "+totalTimeTakenToStoreRecords
> > > >                     +"toal time Taken to retrieve Records :: "+
> > > > (System.currentTimeMillis() -
> > > > (startTime+totalTimeTakenToProcessRecords
> > +totalTimeTakenToStoreRecords)
> > > )+"
> > > > Total time Taken :: " + (System.currentTimeMillis() - startTime));
> > > > }
> > > > }
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message