kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bill Bejeck <b...@confluent.io>
Subject Re: Improving Kafka State Store performance
Date Mon, 18 Sep 2017 14:05:10 GMT
Understood, but since we haven't updated to use 5.7.3 yet, I think it's
best to test against what is currently deployed.

Thanks.

On Mon, Sep 18, 2017 at 9:56 AM, Ted Yu <yuzhihong@gmail.com> wrote:

> We're using rocksdb 5.3.6
>
> It would make more sense to perform next round of experiment using rocksdb
> 5.7.3 which is latest.
>
> Cheers
>
> On Mon, Sep 18, 2017 at 5:00 AM, Bill Bejeck <bill@confluent.io> wrote:
>
> > I'm following up from your other thread as well here.  Thanks for the
> info
> > above, that is helpful.
> >
> > I think the AWS instance type might be a factor here, but let's do some
> > more homework first.
> >
> > For a next step, we could enable logging for RocksDB so we can observe
> the
> > performance.
> >
> > Here is some sample code that will allow logging at the INFO level as
> well
> > as print out statistics (using RocksDB internal stats) every 15 minutes.
> >
> > Would you mind reverting your Streams application to use a persistent
> store
> > again?
> >
> > Then let it run until you observe the behavior you described before and
> if
> > you don't mind share the logs with me so we can look them over.  Thanks!
> >
> > import org.apache.kafka.streams.state.RocksDBConfigSetter;
> > import org.rocksdb.InfoLogLevel;
> > import org.rocksdb.Options;
> >
> > import java.util.Map;
> >
> > public class RocksDbLogsConfig implements RocksDBConfigSetter {
> >
> >     @Override
> >     public void setConfig(String storeName, Options options, Map<String,
> > Object> configs) {
> >                    options.setInfoLogLevel(InfoLogLevel.INFO_LEVEL);
> >                    options.createStatistics();
> >                    options.setStatsDumpPeriodSec(900);
> >                    options.setDbLogDir("UPDATE WITH PATH WHERE YOU WANT
> LOG
> > FILES");
> >     }
> > }
> >
> > To use the RocksDbLogsConfig class, you'll need to update your Streams
> > configs like so:
> >
> >   props.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
> > RocksDbLogsConfig.class);
> >
> >
> >
> > Thanks
> > Bill
> >
> > On Sat, Sep 16, 2017 at 11:22 PM, dev loper <sparkemr@gmail.com> wrote:
> >
> > > Hi Bill.
> > >
> > > Thank you pointing out, But in actual code I am calling iter.close() in
> > the
> > > finally block if the iterator is not null. I don't see any issues when
> I
> > am
> > > running it on light traffic. As soon as I switch to production traffic
> I
> > > start seeing these issues.
> > >
> > > Below I have provided additional details about our current application.
> > If
> > > you are looking for specific logs or details , please let me know. I
> will
> > > get the details captured.
> > >
> > > In production environment I am receiving 10,000 messages per second.
> > There
> > > are 36 partitions  for the topic and there are around 2500 unique
> > entities
> > > per partition for which I have to maintain the state.
> > >
> > > Below I have mentioned the hardware configuration and number of
> instances
> > > we are using for this solution. Please let me know if hardware is the
> > > limiting factor here. We didn't go for higher configuration since the
> > load
> > > average on these instances were quite low and I could hardly see any
> CPU
> > > spikes .
> > >
> > >
> > > Kafka Machine Machine Details: - 2 Broker Instances with below
> > > Configuration ,  (Current CPU Usage 2%- 8%)
> > >
> > >  Instance Type : AWS T2 Large
> > >   Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS
> > >
> > > Kafka Streams Instance : 3 Kafka Streams  Application Instances
> (Current
> > > CPU Usage 8%- 24%)
> > >
> > >     Instance Type : AWS M4 Large
> > >     Machine Configuration : 2 VCPU;s, 8gb Ram, Storage : EBS (Dedicated
> > EBS
> > > bandwidth 450 mbps)
> > >
> > >
> > >
> > > On Sat, Sep 16, 2017 at 10:05 PM, Bill Bejeck <bill@confluent.io>
> wrote:
> > >
> > > > Hi,
> > > >
> > > > It's hard to say exactly without a little more information.
> > > >
> > > > On a side note, I don't see where you are closing the
> KeyValueIterator
> > in
> > > > the code above. Not closing a KeyValueIterator on a Permanent State
> > Store
> > > > can cause a resource leak over time, so I'd add `iter.close()` right
> > > before
> > > > your `logger.info` call.  It might be worth retrying at that point.
> > > >
> > > > Thanks,
> > > > Bill
> > > >
> > > > On Sat, Sep 16, 2017 at 9:14 AM, dev loper <sparkemr@gmail.com>
> wrote:
> > > >
> > > > > Hi Kafka Streams Users,
> > > > >
> > > > > I am trying to improve the performance of Kafka Streams State Store
> > > > > Persistent Store. In our application we are using Kafka Streams
> > > Processor
> > > > > API  and using Persistent State Store.. My application when starts
> up
> > > it
> > > > > performing well but over a period of time the performance
> > > deteriorated. I
> > > > > am computing certain results in computeAnalytics method and this
> > method
> > > > is
> > > > > not taking time at all. This method is being called within both
> > process
> > > > and
> > > > > punctuate and I am storing the updated object back to store. Over
> the
> > > > > period of time its taking huge time for completing the punctuate
> > > process
> > > > > and I could see majority of the time is spent in storing the
> records
> > > and
> > > > > Iterating the records. The record size is just 2500 per partition.
> I
> > am
> > > > not
> > > > > where I am going wrong and how can I improve the performance.
> > > > >
> > > > > Below is one such sample log record.
> > > > >
> > > > > INFO  | 07:59:58 | processors.MyProcessor (MyProcessor.java:123)
-
> > Time
> > > > > Metrics for punctuate  for TimeStamp :: 1505564655878 processed
> > Records
> > > > ::
> > > > > 2109 totalTimeTakenToProcessRecords :: 2
> totalTimeTakenToStoreRecord
> > ::
> > > > > 27605toal time Taken to retrieve Records :: 12787 Total time Taken
> ::
> > > > 40394
> > > > >
> > > > > Below I have given my pseudo code for my processor which exactly
> > > > resembles
> > > > > the code which I am using in my application.
> > > > >
> > > > > MyProcessor(){
> > > > >
> > > > >   process(Key objectkey, Update eventupdate){
> > > > >    long timestamp=context.timestamp();
> > > > >    AnalyticeObj storeobj=store.get(objectkey);
> > > > >
> > > > >    if( storeobj ===null)
> > > > >          {
> > > > >           storeobj=new  AnalyticeObj(objectkey,
> > eventupdate,timestamp)
> > > > >          }
> > > > >          else
> > > > >         {
> > > > >            storeobj.update(eventupdate,timestamp)
> > > > >         }
> > > > >      storeobj=storeobj.computeAnalytics();
> > > > >
> > > > >    store.put(objectkey,storeobj);
> > > > >   context.commit();
> > > > > }
> > > > > // Every 5 seconds
> > > > > punctuate(long timestamp)
> > > > > {
> > > > >  long startTime = System.currentTimeMillis();
> > > > > long totalTimeTakenToProcessRecords=0;
> > > > > long totalTimeTakenToStoreRecords=0;
> > > > > long counter=0;
> > > > > KeyValueIterator iter=this.visitStore.all();
> > > > > while (iter.hasNext()) {
> > > > > KeyValue<Key, AnalyticeObj> entry = iter.next();
> > > > >
> > > > >     if(AnalyticeObj.hasExpired(timestamp)
> > > > >          store.remove(entry.key)
> > > > >       else
> > > > >       {
> > > > >         long processStartTime=System.currentTimeMillis();
> > > > >          AnalyticeObj storeobj= entry.value.computeAnalytics(
> > > timestamp);
> > > > >
> > > > > totalTimeTakenToProcessRecords=totalTimeTakenToProcessRecords
> > > > > +(System.currentTimeMillis()-processStartTime);
> > > > >
> > > > >          long storeStartTime=System.currentTimeMillis();
> > > > >           store.put(entry.key,storeobj);
> > > > >
> > > > > totalTimeTakenToStoreRecords=totalTimeTakenToStoreRecords+(
> > > > > System.currentTimeMillis()-storeStartTime);
> > > > >        }
> > > > >    counter++;
> > > > > }
> > > > >      logger.info(" Time Metrics for punctuate  "
> > > > >                     " for TimeStamp :: " + "" + timestamp + "
> > processed
> > > > > Records :: "
> > > > >                     + counter +" totalTimeTakenToProcessRecords ::
> > > > > "+totalTimeTakenToProcessRecords +" totalTimeTakenToStoreRecord ::
> > > > > "+totalTimeTakenToStoreRecords
> > > > >                     +"toal time Taken to retrieve Records :: "+
> > > > > (System.currentTimeMillis() -
> > > > > (startTime+totalTimeTakenToProcessRecords
> > > +totalTimeTakenToStoreRecords)
> > > > )+"
> > > > > Total time Taken :: " + (System.currentTimeMillis() - startTime));
> > > > > }
> > > > > }
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message