kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ajit Koti <ajitk...@gmail.com>
Subject Re: Fetch Latest messages from Kafka
Date Thu, 13 Sep 2012 03:16:47 GMT
Hi Jun,

Was trying out this last option or else would use the Simpleconsumer.

I am tryting to set the offset of my choice using

getZkClinet().createPersistent("/consumers/test_group/offsets/golf4/0-0",
"22274");

or
 ZkUtils.updatePersistentPath(getZkClinet(),"/consumers/test_group/offsets/golf4/0-0",
"22274");

But was getting this exception

 ZookeeperConsumerConnector:66 -
test_group_ajits-machine-1347505711889-9df96cdb exception during rebalance
java.lang.NumberFormatException: For input string: "��
at
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Long.parseLong(Long.java:441)
 at java.lang.Long.parseLong(Long.java:483)
at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:232)
 at scala.collection.immutable.StringOps.toLong(StringOps.scala:31)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$addPartitionTopicInfo(ZookeeperConsumerConnector.scala:644)
 at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2$$anonfun$apply$11$$anonfun$apply$13.apply(ZookeeperConsumerConnector.scala:523)
 at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2$$anonfun$apply$11$$anonfun$apply$13.apply(ZookeeperConsumerConnector.scala:520)
 at scala.collection.immutable.Range.foreach(Range.scala:78)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2$$anonfun$apply$11.apply(ZookeeperConsumerConnector.scala:520)
 at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2$$anonfun$apply$11.apply(ZookeeperConsumerConnector.scala:507)
 at scala.collection.mutable.HashSet.foreach(HashSet.scala:72)
at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2.apply(ZookeeperConsumerConnector.scala:507)
 at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance$2.apply(ZookeeperConsumerConnector.scala:494)
 at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:93)
at
scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:93)
 at scala.collection.Iterator$class.foreach(Iterator.scala:652)
at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157)
 at
scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:43)



Anything I could do here other way to resolve this issue

Thanks
Ajit Koti


On Wed, Sep 12, 2012 at 9:50 AM, Jun Rao <junrao@gmail.com> wrote:

> If you also need this for Hadoop, then it's more difficult. Unfortunately,
> we don't have an api for changing offsets programmatically right now. We
> will revisit this post 0.8.0.
>
> Thanks,
>
> Jun
>
> On Tue, Sep 11, 2012 at 8:32 AM, Ajit Koti <ajitkoti@gmail.com> wrote:
>
> > Hi Jun,
> >
> > By keeping the log segment size and the log retention size to same value
> > means I will lose out messages right,which I dont want to happen.
> > As the Hadoop consumer would not have consumed the message
> > and persisted them into database.
> >
> > Was trying to do this that is
> > set the autooffset to largest and was trying to delete the corresponding
> > offsets
> >
> >
> >
> http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201202.mbox/%3CCALxO6KitYw_xTbVxKKV2X31wYnxfWOz47O-e6dbfO21mSmQxVQ@mail.gmail.com%3E
> >
> > Not particularly sure how to delete the offsets
> > using the zkutils was trying to delete this path  before consuming any
> > messages.
> > /consumers/[group_id]/offsets/[topic]/[broker_id-partition_id].
> >
> > Not sure about right approach .
> >
> >
> > Thanks
> > Ajit
> >
> > On Tue, Sep 11, 2012 at 8:05 PM, Jun Rao <junrao@gmail.com> wrote:
> >
> > > That's an interesting use case and it is not what the ZK consumer is
> > really
> > > designed for. First, having a topic per user may not be ideal,
> especially
> > > if you have millions of users. Second, the ZK consumer is designed for
> > > continuously consuming data, not skipping data all the time.
> > >
> > > With that said, you can probably try the following trick and see if it
> > > works. Set both the log segment size and the log retention size to a
> > value
> > > that matches the size of about 100 messages. This should make sure that
> > at
> > > any given point time, only the most recent 100 messages are kept in the
> > > broker. Set autooffset.reset to earliest. This will make sure that if
> the
> > > consumer's offset is out of range, it will automatically switch to the
> > > current smallest offset.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Mon, Sep 10, 2012 at 9:07 PM, Ajit Koti <ajitkoti@gmail.com> wrote:
> > >
> > > > Hi,
> > > >
> > > > No Consumers are not falling behind .
> > > >   What happens is the users of the consumers don't fetch messages for
> > > > certain period of time and when then the users want to fetch the want
> > to
> > > > fetch the latest messages from the topic.
> > > >
> > > > For example in case of displaying news feed each user has got its own
> > > topic
> > > > , if the users don't login for couple of day
> > > > he wouldn't consume the messages from the topic.But when he logs in
> he
> > > want
> > > > to get the latest update for which we have to fetch the latest
> messages
> > > > from the topic.
> > > >
> > > > So is it possible to use the zkconsumer and set the offset to last
> 100
> > > > messages and then fetch them.
> > > >
> > > > Thanks
> > > > Ajit Koti
> > > >
> > > >
> > > > On Tue, Sep 11, 2012 at 9:04 AM, Jun Rao <junrao@gmail.com> wrote:
> > > >
> > > > > Is consuming falling behind? Can you have more consumers in the
> same
> > > > group
> > > > > to increase the consumption throughput?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Mon, Sep 10, 2012 at 12:22 PM, Ajit Koti <ajitkoti@gmail.com>
> > > wrote:
> > > > >
> > > > > > Hi All ,
> > > > > >
> > > > > > Was happy consuming messages from the consumer .
> > > > > > Recently got a new requirement  where I have to process the
> latest
> > > > > message
> > > > > > always .
> > > > > > Is there anyways I can fetch the latest 100 messages.I know
apart
> > > from
> > > > > > simple consumer I cannot specify the offset .
> > > > > > But still wondering is there any way to start consuming from
a
> > > > particular
> > > > > > offset.
> > > > > >
> > > > > >
> > > > > > Note : I have only one consumer per topic.
> > > > > >
> > > > > >
> > > > > >
> > > > > > Thanks
> > > > > > Ajit
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message