kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lohith Samaga M <Lohith.Sam...@mphasis.com>
Subject RE: New consumer API waits indefinitely
Date Tue, 12 Apr 2016 11:30:20 GMT
Dear All,
	I installed Kafka on a Linux VM.
	Here too:
	1. The producer is able to store messages in the topic (sent from Windows host).
	2. The consumer is unable to read it either from Windows host or from kafka-console-consumer
on the Linux VM console.

	In the logs, I see:
[2016-04-12 16:51:00,672] INFO [GroupCoordinator 0]: Stabilized group console-consumer-39913
generation 1 (kafka.coordinator.GroupCoordinator)
[2016-04-12 16:51:00,676] INFO [GroupCoordinator 0]: Assignment received from leader for group
console-consumer-39913 for generation 1 (kafka.coordinator.GroupCoordinator)
[2016-04-12 16:51:09,638] INFO [GroupCoordinator 0]: Preparing to restabilize group console-consumer-39913
with old generation 1 (kafka.coordinator.GroupCoordinator)
[2016-04-12 16:51:09,640] INFO [GroupCoordinator 0]: Group console-consumer-39913 generation
1 is dead and removed (kafka.coordinator.GroupCoordinator)
[2016-04-12 16:53:08,489] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets
in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)

	When I run my Java code, I still get the exception - org.apache.kafka.clients.consumer.internals.SendFailedException


	So, is it advisable to use the old consumer on Kafka 0.9.0.1?

	Please help.

Thanks in advance.
	

Best regards / Mit freundlichen Grüßen / Sincères salutations
M. Lohith Samaga



-----Original Message-----
From: Lohith Samaga M [mailto:Lohith.Samaga@mphasis.com] 
Sent: Tuesday, April 05, 2016 13.36
To: users@kafka.apache.org
Subject: RE: New consumer API waits indefinitely

Hi Ismael, Niko,
	After cleaning up the zookeeper and kafka logs, I do not get the below server exception anymore.
I think Kafka did not like me opening the .log file in notepad.....

	The only exception that I now get is org.apache.kafka.clients.consumer.internals.SendFailedException
in org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.RequestFutureCompletionHandler.
	After that, the consumer goes into a loop.

Best regards / Mit freundlichen Grüßen / Sincères salutations M. Lohith Samaga



-----Original Message-----
From: Lohith Samaga M [mailto:Lohith.Samaga@mphasis.com]
Sent: Tuesday, April 05, 2016 12.38
To: users@kafka.apache.org
Subject: RE: New consumer API waits indefinitely

Hi Ismael,
	I see the following exception when I (re)start Kafka (even a fresh setup after the previous
one). And where is the configuration to set the data directory for Kafka (not the logs)?

java.io.IOException: The requested operation cannot be performed on a file with a user-mapped
section open
        at java.io.RandomAccessFile.setLength(Native Method)
        at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:285)
        at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:276)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
        at kafka.log.OffsetIndex.resize(OffsetIndex.scala:276)
        at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetI
ndex.scala:265)
        at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
ala:265)
        at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.sc
ala:265)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
        at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:264)
        at kafka.log.LogSegment.recover(LogSegment.scala:199)
        at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
        at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(
TraversableLike.scala:778)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimize
d.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.s
cala:777)
        at kafka.log.Log.loadSegments(Log.scala:160)
        at kafka.log.Log.<init>(Log.scala:90)
        at kafka.log.LogManager.createLog(LogManager.scala:357)
        at kafka.cluster.Partition.getOrCreateReplica(Partition.scala:91)
        at kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.s
cala:173)
        at kafka.cluster.Partition$$anonfun$4$$anonfun$apply$2.apply(Partition.s
cala:173)
        at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
        at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:173)
        at kafka.cluster.Partition$$anonfun$4.apply(Partition.scala:165)
        at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
        at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:270)
        at kafka.cluster.Partition.makeLeader(Partition.scala:165)
        at kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManag
er.scala:692)
        at kafka.server.ReplicaManager$$anonfun$makeLeaders$4.apply(ReplicaManag
er.scala:691)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.sca
la:99)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.sca
la:99)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala
:230)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
        at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:691)
        at kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.sca
la:637)
        at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:131)

        at kafka.server.KafkaApis.handle(KafkaApis.scala:72)
        at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
        at java.lang.Thread.run(Thread.java:724)




Best regards / Mit freundlichen Grüßen / Sincères salutations M. Lohith Samaga



-----Original Message-----
From: ismaelj@gmail.com [mailto:ismaelj@gmail.com] On Behalf Of Ismael Juma
Sent: Monday, April 04, 2016 17.21
To: users@kafka.apache.org
Subject: Re: New consumer API waits indefinitely

Hi Lohith,

Are there any errors in your broker logs? I think there may be some issues with compacted
topics on Windows and the new consumer uses a compacted topic to store offsets.

Ismael

On Mon, Apr 4, 2016 at 12:20 PM, Lohith Samaga M <Lohith.Samaga@mphasis.com>
wrote:

> Dear All,
>         The error seems to be NOT_COORDINATOR_FOR_GROUP.
>         The exception thrown in
> org.apache.kafka.clients.consumer.internals.RequestFuture is:
>         org.apache.kafka.common.errors.NotCoordinatorForGroupException:
> This is not the correct coordinator for this group.
>
>         However, this exception is considered RetriableException in 
> org.apache.kafka.clients.consumer.internals.RequestFuture.
>         So, the retry goes on - in a loop.
>
>         It also happens that the Coordinator object becomes null in 
> AbstractCoordinator class.
>
>         Can somebody please help?
>
>
> Best regards / Mit freundlichen Grüßen / Sincères salutations M. 
> Lohith Samaga
>
>
>
>
> -----Original Message-----
> From: Ratha v [mailto:vijayaratha@gmail.com]
> Sent: Monday, April 04, 2016 12.22
> To: users@kafka.apache.org
> Subject: Re: New consumer API waits indefinitely
>
> Still struggling :)
> Check following threads;
>
>    - If my producer producing, then why the consumer couldn't consume? it
>    stuck @ poll()
>    - Consumer thread is waiting forever, not returning any objects
>
>
> I think new APIs are recommended.
>
>
> On 4 April 2016 at 16:37, Lohith Samaga M <Lohith.Samaga@mphasis.com>
> wrote:
>
> > Thanks for letting me know.
> >
> > Is there any work around? A fix?
> >
> > Which set of API is recommended for production use?
> >
> > Best regards / Mit freundlichen Grüßen / Sincères salutations M.
> > Lohith Samaga
> >
> >
> >
> >
> > -----Original Message-----
> > From: Ratha v [mailto:vijayaratha@gmail.com]
> > Sent: Monday, April 04, 2016 11.27
> > To: users@kafka.apache.org
> > Subject: Re: New consumer API waits indefinitely
> >
> > I too face same issue:(
> >
> > On 4 April 2016 at 15:51, Lohith Samaga M 
> > <Lohith.Samaga@mphasis.com>
> > wrote:
> >
> > > HI,
> > >                 Good morning.
> > >
> > >                 I am new to Kafka. So, please bear with me.
> > >                 I am using the new Producer and Consumer API with 
> > > Kafka
> > > 0.9.0.1 running on Windows 7 laptop with zookeeper.
> > >
> > >                 I was able to send messages using the new Producer 
> > > API. I can see the messages in the Kafka data directory.
> > >
> > >                 However, when I run the consumer, it does not 
> > > retrieve the messages. It keeps waiting for the messages indefinitely.
> > >                 My code (taken from Javadoc and modified)  is as below:
> > >
> > >             props.put("bootstrap.servers", "localhost:9092");
> > >             props.put("group.id", "new01");
> > >             props.put("enable.auto.commit", "true");
> > >             props.put("auto.commit.interval.ms", "1000");
> > >             props.put("session.timeout.ms", "30000");
> > >             props.put("key.deserializer", 
> > > "org.apache.kafka.common.serialization.StringDeserializer");
> > >             props.put("value.deserializer", 
> > > "org.apache.kafka.common.serialization.StringDeserializer");
> > >
> > >             KafkaConsumer<String, String> consumer = new 
> > > KafkaConsumer<>(props);
> > >             consumer.subscribe(Arrays.asList("new-producer"));
> > >             while (true) {
> > >                 ConsumerRecords<String, String> records = 
> > > consumer.poll(100);
> > >                 for (ConsumerRecord<String, String> record : records)
> > >                     System.out.printf("offset = %d, key = %s, 
> > > value = %s", record.offset(), record.key(), record.value());
> > >             }
> > >
> > >                 Can anybody please tell me what went wrong?
> > >
> > > Thanks & Regards,
> > > M. Lohith Samaga
> > >
> > > Information transmitted by this e-mail is proprietary to Mphasis, 
> > > its associated companies and/ or its customers and is intended for 
> > > use only by the individual or entity to which it is addressed, and 
> > > may contain information that is privileged, confidential or exempt 
> > > from disclosure under applicable law. If you are not the intended 
> > > recipient or it appears that this mail has been forwarded to you 
> > > without proper authority, you are notified that any use or 
> > > dissemination of this information in any manner is strictly 
> > > prohibited. In such cases, please notify us immediately at 
> > > mailmaster@mphasis.com and delete this mail from your records.
> > >
> >
> >
> >
> > --
> > -Ratha
> > http://vvratha.blogspot.com/
> > Information transmitted by this e-mail is proprietary to Mphasis, 
> > its associated companies and/ or its customers and is intended for 
> > use only by the individual or entity to which it is addressed, and 
> > may contain information that is privileged, confidential or exempt 
> > from disclosure under applicable law. If you are not the intended 
> > recipient or it appears that this mail has been forwarded to you 
> > without proper authority, you are notified that any use or 
> > dissemination of this information in any manner is strictly 
> > prohibited. In such cases, please notify us immediately at 
> > mailmaster@mphasis.com and delete this mail from your records.
> >
>
>
>
> --
> -Ratha
> http://vvratha.blogspot.com/
> Information transmitted by this e-mail is proprietary to Mphasis, its 
> associated companies and/ or its customers and is intended for use 
> only by the individual or entity to which it is addressed, and may 
> contain information that is privileged, confidential or exempt from 
> disclosure under applicable law. If you are not the intended recipient 
> or it appears that this mail has been forwarded to you without proper 
> authority, you are notified that any use or dissemination of this 
> information in any manner is strictly prohibited. In such cases, 
> please notify us immediately at mailmaster@mphasis.com and delete this 
> mail from your records.
>
Information transmitted by this e-mail is proprietary to Mphasis, its associated companies
and/ or its customers and is intended for use only by the individual or entity to which it
is addressed, and may contain information that is privileged, confidential or exempt from
disclosure under applicable law. If you are not the intended recipient or it appears that
this mail has been forwarded to you without proper authority, you are notified that any use
or dissemination of this information in any manner is strictly prohibited. In such cases,
please notify us immediately at mailmaster@mphasis.com and delete this mail from your records.
Information transmitted by this e-mail is proprietary to Mphasis, its associated companies
and/ or its customers and is intended for use only by the individual or entity to which it
is addressed, and may contain information that is privileged, confidential or exempt from
disclosure under applicable law. If you are not the intended recipient or it appears that
this mail has been forwarded to you without proper authority, you are notified that any use
or dissemination of this information in any manner is strictly prohibited. In such cases,
please notify us immediately at mailmaster@mphasis.com and delete this mail from your records.
Information transmitted by this e-mail is proprietary to Mphasis, its associated companies
and/ or its customers and is intended 
for use only by the individual or entity to which it is addressed, and may contain information
that is privileged, confidential or 
exempt from disclosure under applicable law. If you are not the intended recipient or it appears
that this mail has been forwarded 
to you without proper authority, you are notified that any use or dissemination of this information
in any manner is strictly 
prohibited. In such cases, please notify us immediately at mailmaster@mphasis.com and delete
this mail from your records.
Mime
View raw message