kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Snehalata Nagaje <snehalata.nag...@harbingergroup.com>
Subject how to fetch old message from kafka
Date Mon, 02 Feb 2015 09:04:08 GMT

Hi ,


We are using kafka for storing messages in chat application.

Currently we divided each topic in multiple partitions. each partition stores data for given
customer who uses the application.

Right now on very first request, application fetches log from kafka from earliest valid offset
to maxiumum 100000 bytes. hence it reads all messages for given topic

for given partition. Now we want to apply pagination as linkedin, facebook does. Only latest
10-15 messages should be displayed. And then on scroll down

fetch next set of previous messages, we are using Simple consumer to fetch messages.

Can you please guide on this?


Thanks,
Snehalata











----- Original Message -----
From: "Jaikiran Pai" <jai.forums2013@gmail.com>
To: users@kafka.apache.org
Sent: Monday, February 2, 2015 12:47:19 PM
Subject: Re: Increased CPU usage with 0.8.2-beta

Hi Mathias,

Looking at that thread dump, I think the potential culprit is this one:

TRACE 303545: (thread=200049)
sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:Unknown line)
     sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:79)
     sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
     sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:221)
sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385)
     kafka.utils.Utils$.read(Utils.scala:380)
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:67)
kafka.network.Receive$class.readCompletely(Transmission.scala:56)
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
kafka.network.BlockingChannel.receive(BlockingChannel.scala:108)
kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:72)
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:69)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:113)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:113)
     kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:112)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:112)
     kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
     kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:111)
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:97)
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)


I see many such threads all triggered through the SimpleConsumer and 
ending up polling. Looking at the code, in theory, I can see why there 
might be a busy CPU loop generated by that code path. If my guess is 
right, it could be because of an issue in the implementation of how data 
is read off a channel in a blocking manner and I think this patch might 
help overcome that problem:

diff --git a/core/src/main/scala/kafka/network/Transmission.scala 
b/core/src/main/scala/kafka/network/Transmission.scala
index 2827103..0bab9ed 100644
--- a/core/src/main/scala/kafka/network/Transmission.scala
+++ b/core/src/main/scala/kafka/network/Transmission.scala
@@ -54,8 +54,15 @@ trait Receive extends Transmission {
      var totalRead = 0
      while(!complete) {
        val read = readFrom(channel)
-      trace(read + " bytes read.")
-      totalRead += read
+      if (read > 0) {
+        trace(read + " bytes read.")
+        totalRead += read
+      } else if (read == 0) {
+        // it's possible that nothing was read (see javadoc of 
ReadableByteChannel#read), from the backing channel,
+        // so we wait for a while before polling again, so that we 
don't end up with a busy CPU loop
+        // TODO: For now, this 30 milli seconds is a random value.
+        Thread.sleep(30)
+      }
      }
      totalRead
    }

Is this something that you would be able to apply against the latest 
0.8.2 branch of Kafka, build the Kafka binary, try it out and see if it 
improves the situation?

-Jaikiran

On Monday 26 January 2015 11:35 PM, Mathias Söderberg wrote:
> Hi Neha,
>
> I sent an e-mail earlier today, but noticed now that it didn't 
> actually go through.
>
> Anyhow, I've attached two files, one with output from a 10 minute run 
> and one with output from a 30 minute run. Realized that maybe I 
> should've done one or two runs with 0.8.1.1 as well, but nevertheless.
>
> I upgraded our staging cluster to 0.8.2.0-rc2, and I'm seeing the same 
> CPU usage as with the beta version (basically pegging all cores). If I 
> manage to find the time I'll do another run with hprof on the rc2 
> version later today.
>
> Best regards,
> Mathias
>
> On Tue Dec 09 2014 at 10:08:21 PM Neha Narkhede <neha@confluent.io 
> <mailto:neha@confluent.io>> wrote:
>
>     The following should be sufficient
>
>     java
>     -agentlib:hprof=cpu=samples,depth=100,interval=20,lineno=y,thread=y,file=kafka.hprof
>     <classname>
>
>     You would need to start the Kafka server with the settings above for
>     sometime until you observe the problem.
>
>     On Tue, Dec 9, 2014 at 3:47 AM, Mathias Söderberg <
>     mathias.soederberg@gmail.com
>     <mailto:mathias.soederberg@gmail.com>> wrote:
>
>     > Hi Neha,
>     >
>     > Yeah sure. I'm not familiar with hprof, so any particular
>     options I should
>     > include or just run with defaults?
>     >
>     > Best regards,
>     > Mathias
>     >
>     > On Mon Dec 08 2014 at 7:41:32 PM Neha Narkhede
>     <neha@confluent.io <mailto:neha@confluent.io>> wrote:
>     >
>     > > Thanks for reporting the issue. Would you mind running hprof
>     and sending
>     > > the output?
>     > >
>     > > On Mon, Dec 8, 2014 at 1:25 AM, Mathias Söderberg <
>     > > mathias.soederberg@gmail.com
>     <mailto:mathias.soederberg@gmail.com>> wrote:
>     > >
>     > > > Good day,
>     > > >
>     > > > I upgraded a Kafka cluster from v0.8.1.1 to v0.8.2-beta and
>     noticed
>     > that
>     > > > the CPU usage on the broker machines went up by roughly 40%,
>     from ~60%
>     > to
>     > > > ~100% and am wondering if anyone else has experienced something
>     > similar?
>     > > > The load average also went up by 2x-3x.
>     > > >
>     > > > We're running on EC2 and the cluster currently consists of four
>     > > m1.xlarge,
>     > > > with roughly 1100 topics / 4000 partitions. Using Java 7
>     (1.7.0_65 to
>     > be
>     > > > exact) and Scala 2.9.2. Configurations can be found over here:
>     > > > https://gist.github.com/mthssdrbrg/7df34a795e07eef10262.
>     > > >
>     > > > I'm assuming that this is not expected behaviour for 0.8.2-beta?
>     > > >
>     > > > Best regards,
>     > > > Mathias
>     > > >
>     > >
>     > >
>     > >
>     > > --
>     > > Thanks,
>     > > Neha
>     > >
>     >
>
>
>
>     --
>     Thanks,
>     Neha
>


Mime
View raw message