kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sa Li <sal...@gmail.com>
Subject Re: Number of Consumers Connected
Date Tue, 03 Feb 2015 18:32:55 GMT
GuoZhang

Sorry for leaving this topic for a while, I am still not clear how to
commit the offset to zk from commandline, I tried this

bin/kafka-console-consumer.sh --zookeeper 10.100.71.33:2181 --topic
pipe-test-2 --from-beginning --property pipe

It seems generate a console-consumer-001 in zK, but when I did that to
other topics, nothing in zk, (I can't read anything from consumer group in
kafka-web-console), see
[zk: localhost:2181(CONNECTED) 2] ls /consumers/web-console-consumer-38650
[offsets, owners, ids]
[zk: localhost:2181(CONNECTED) 3] ls
/consumers/web-console-consumer-38650/offsets
[PofApiTest-1]
[zk: localhost:2181(CONNECTED) 4] ls
/consumers/web-console-consumer-38650/offsets/PofApiTest-1
[3, 2, 1, 0, 7, 6, 5, 4]
[zk: localhost:2181(CONNECTED) 5] ls
/consumers/web-console-consumer-38650/offsets/PofApiTest-1/3
[]

Any ideas?

thanks

AL


On Tue, Jan 20, 2015 at 9:57 PM, Guozhang Wang <wangguoz@gmail.com> wrote:

> It seems not the latest version of Kafka, which version are you using?
>
> On Tue, Jan 20, 2015 at 9:46 AM, Sa Li <salicn@gmail.com> wrote:
>
> > Guozhang
> >
> > Thank you very much for reply, here I print out the
> > kafka-console-consumer.sh help,
> >
> > root@voluminous-mass:/srv/kafka# bin/kafka-console-consumer.sh
> > Missing required argument "[zookeeper]"
> > Option                                  Description
> > ------                                  -----------
> > --autocommit.interval.ms <Integer: ms>  The time interval at which to
> save
> > the
> >                                           current offset in ms (default:
> > 60000)
> > --blacklist <blacklist>                 Blacklist of topics to exclude
> from
> >                                           consumption.
> > --consumer-timeout-ms <Integer: prop>   consumer throws timeout exception
> >                                           after waiting this much of time
> >                                           without incoming messages
> > (default:
> >                                           -1)
> > --csv-reporter-enabled                  If set, the CSV metrics reporter
> > will
> >                                           be enabled
> > --fetch-size <Integer: size>            The amount of data to fetch in a
> >                                           single request. (default:
> > 1048576)
> > --formatter <class>                     The name of a class to use for
> >                                           formatting kafka messages for
> >                                           display. (default:
> > kafka.consumer.
> >                                           DefaultMessageFormatter)
> > --from-beginning                        If the consumer does not already
> > have
> >                                           an established offset to
> consume
> >                                           from, start with the earliest
> >                                           message present in the log
> rather
> >                                           than the latest message.
> > --group <gid>                           The group id to consume on.
> > (default:
> >                                           console-consumer-85664)
> > --max-messages <Integer: num_messages>  The maximum number of messages to
> >                                           consume before exiting. If not
> > set,
> >                                           consumption is continual.
> > --max-wait-ms <Integer: ms>             The max amount of time each fetch
> >                                           request waits. (default: 100)
> > --metrics-dir <metrics dictory>         If csv-reporter-enable is set,
> and
> >                                           this parameter isset, the csv
> >                                           metrics will be outputed here
> > --min-fetch-bytes <Integer: bytes>      The min number of bytes each
> fetch
> >                                           request waits for. (default: 1)
> > --property <prop>
> > --refresh-leader-backoff-ms <Integer:   Backoff time before refreshing
> >   ms>                                     metadata (default: 200)
> > --skip-message-on-error                 If there is an error when
> > processing a
> >                                           message, skip it instead of
> halt.
> > --socket-buffer-size <Integer: size>    The size of the tcp RECV size.
> >                                           (default: 2097152)
> > --socket-timeout-ms <Integer: ms>       The socket timeout used for the
> >                                           connection to the broker
> > (default:
> >                                           30000)
> > --topic <topic>                         The topic id to consume on.
> > --whitelist <whitelist>                 Whitelist of topics to include
> for
> >                                           consumption.
> > --zookeeper <urls>                      REQUIRED: The connection string
> for
> >                                           the zookeeper connection in the
> > form
> >                                           host:port. Multiple URLS can be
> >                                           given to allow fail-over.
> >
> > --property option is not provided the description, is there an example
> how
> > to use it?
> >
> > thanks
> >
> > AL
> >
> > On Mon, Jan 19, 2015 at 6:30 PM, Guozhang Wang <wangguoz@gmail.com>
> wrote:
> >
> > > There is a property config you can set via
> bin/kafka-console-consumer.sh
> > to
> > > commit offsets to ZK, you can use bin/kafka-console-consumer.sh --help
> to
> > > list all the properties.
> > >
> > > Guozhang
> > >
> > > On Mon, Jan 19, 2015 at 5:15 PM, Sa Li <salicn@gmail.com> wrote:
> > >
> > > > Guozhang,
> > > >
> > > > Currently we are in the stage to testing producer, our C# producer
> > > sending
> > > > data to brokers, and use
> > > >
> > > > bin/kafka-run-class.sh
> > org.apache.kafka.clients.tools.ProducerPerformance
> > > > command to produce the messages. We don't have a coded consumer to
> > commit
> > > > offset, we use
> > > > bin/kafka-console-consumer.sh --zookeeper command to consume, is
> there
> > a
> > > > command that we can use on command line to create zk path?
> > > >
> > > > thanks
> > > >
> > > > AL
> > > >
> > > > On Mon, Jan 19, 2015 at 4:14 PM, Guozhang Wang <wangguoz@gmail.com>
> > > wrote:
> > > >
> > > > > Sa,
> > > > >
> > > > > Did your consumer ever commit offsets to Kafka? If not then no
> > > > > corresponding ZK path will be created.
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Mon, Jan 19, 2015 at 3:58 PM, Sa Li <salicn@gmail.com> wrote:
> > > > >
> > > > > > Hi,
> > > > > >
> > > > > > I use such tool
> > > > > >
> > > > > > Consumer Offset Checker
> > > > > >
> > > > > > Displays the:  Consumer Group, Topic, Partitions, Offset,
> logSize,
> > > Lag,
> > > > > > Owner for the specified set of Topics and Consumer Group
> > > > > > bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker
> > > > > >
> > > > > > To be able to know the consumer group, in zkCli.sh
> > > > > >
> > > > > > [zk: localhost:2181(CONNECTED) 3] ls /
> > > > > > [transactional, admin, zookeeper, consumers, config, controller,
> > > storm,
> > > > > > brokers, controller_epoch]
> > > > > > [zk: localhost:2181(CONNECTED) 4] ls /consumers
> > > > > > [web-console-consumer-99295, web-console-consumer-37853,
> > > > > > web-console-consumer-30841, perf-consumer-92283,
> > perf-consumer-21631,
> > > > > > perf-consumer-95281, perf-consumer-59296,
> > web-console-consumer-52126,
> > > > > > web-console-consumer-89137, perf-consumer-72484,
> > perf-consumer-80363,
> > > > > > web-console-consumer-47543, web-console-consumer-22509,
> > > > > > perf-consumer-16954, perf-consumer-53957, perf-consumer-39448,
> > > > > > web-console-consumer-17021, perf-consumer-88693,
> > > > > > web-console-consumer-48744, web-console-consumer-82543,
> > > > > > perf-consumer-89565, web-console-consumer-97959,
> > perf-consumer-40427,
> > > > > > web-console-consumer-95350, web-console-consumer-26473,
> > > > > > web-console-consumer-79384, web-console-consumer-87777,
> > > > > > perf-consumer-91681, web-console-consumer-36136,
> > > > > > web-console-consumer-86924, perf-consumer-24510,
> > perf-consumer-5888,
> > > > > > perf-consumer-73534, perf-consumer-92985, perf-consumer-7675,
> > > > > > perf-consumer-52306, perf-consumer-87352,
> > web-console-consumer-30400]
> > > > > > [zk: localhost:2181(CONNECTED) 5]
> > > > > >
> > > > > > I then run
> > > > > >
> > > > > > root@exemplary-birds:/srv/kafka# bin/kafka-run-class.sh
> > > > > > kafka.tools.ConsumerOffsetChecker --topic PofApiTest-1 --group
> > > > > > web-console-consumer-48744
> > > > > > Group           Topic                          Pid Offset
> > > > > > logSize         Lag             Owner
> > > > > > Exception in thread "main"
> > > > > org.I0Itec.zkclient.exception.ZkNoNodeException:
> > > > > > org.apache.zookeeper.KeeperException$NoNodeException:
> > > KeeperErrorCode =
> > > > > > NoNode for
> > > /consumers/web-console-consumer-48744/offsets/PofApiTest-1/0
> > > > > >         at
> > > > > >
> > org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
> > > > > >         at
> > > > > >
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
> > > > > >         at
> org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
> > > > > >         at
> org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
> > > > > >         at kafka.utils.ZkUtils$.readData(ZkUtils.scala:461)
> > > > > >         at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processPartition(ConsumerOffsetChecker.scala:59)
> > > > > >         at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply$mcVI$sp(ConsumerOffsetChecker.scala:89)
> > > > > >         at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
> > > > > >         at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$kafka$tools$ConsumerOffsetChecker$$processTopic$1.apply(ConsumerOffsetChecker.scala:89)
> > > > > >         at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> > > > > >         at
> > > > > >
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> > > > > >         at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$.kafka$tools$ConsumerOffsetChecker$$processTopic(ConsumerOffsetChecker.scala:88)
> > > > > >         at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:153)
> > > > > >         at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.tools.ConsumerOffsetChecker$$anonfun$main$3.apply(ConsumerOffsetChecker.scala:153)
> > > > > >         at
> scala.collection.immutable.List.foreach(List.scala:318)
> > > > > >         at
> > > > > >
> > > >
> > kafka.tools.ConsumerOffsetChecker$.main(ConsumerOffsetChecker.scala:152)
> > > > > >         at
> > > > > >
> kafka.tools.ConsumerOffsetChecker.main(ConsumerOffsetChecker.scala)
> > > > > > Caused by: org.apache.zookeeper.KeeperException$NoNodeException:
> > > > > > KeeperErrorCode = NoNode for
> > > > > > /consumers/web-console-consumer-48744/offsets/PofApiTest-1/0
> > > > > >         at
> > > > > >
> > org.apache.zookeeper.KeeperException.create(KeeperException.java:102)
> > > > > >         at
> > > > > >
> > org.apache.zookeeper.KeeperException.create(KeeperException.java:42)
> > > > > >         at
> > org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:927)
> > > > > >         at
> > org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:956)
> > > > > >         at
> > > > > org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103)
> > > > > >         at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770)
> > > > > >         at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766)
> > > > > >         at
> > > > > >
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675)
> > > > > >         ... 15 more
> > > > > >
> > > > > > So consumer groups make confusing, I didn't specify the
> > > > consumer-group-id
> > > > > > in producer, the only place I know to config group is
> > > > consumer.properties
> > > > > >
> > > > > >
> > > > > > #consumer group id
> > > > > > group.id=test-consumer-group
> > > > > >
> > > > > >
> > > > > > Any hints? Thanks
> > > > > >
> > > > > >
> > > > > > AL
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Mon, Dec 15, 2014 at 6:46 PM, nitin sharma <
> > > > > kumarsharma.nitin@gmail.com
> > > > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > got it ... thanks a lot.
> > > > > > >
> > > > > > > Regards,
> > > > > > > Nitin Kumar Sharma.
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Dec 15, 2014 at 9:26 PM, Gwen Shapira <
> > > gshapira@cloudera.com
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > Hi Nitin,
> > > > > > > >
> > > > > > > > Go to where you installed zookeeper and run:
> > > > > > > >
> > > > > > > > bin/zkCli.sh -server 127.0.0.1:2181
> > > > > > > >
> > > > > > > > On Mon, Dec 15, 2014 at 6:09 PM, nitin sharma
> > > > > > > > <kumarsharma.nitin@gmail.com> wrote:
> > > > > > > > > Thanks Neha and Gwen for your responses..
> > > > > > > > >
> > > > > > > > > @Gwen -- Kindly explain how to perform the steps
you have
> > > > > mentioned.
> > > > > > > how
> > > > > > > > > should i connect to a zookeeper..?
> > > > > > > > >
> > > > > > > > > Regards,
> > > > > > > > > Nitin Kumar Sharma.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Mon, Dec 15, 2014 at 6:36 PM, Neha Narkhede
<
> > > > neha@confluent.io>
> > > > > > > > wrote:
> > > > > > > > >>
> > > > > > > > >> In addition to Gwen's suggestion, we actually
don't have
> jmx
> > > > > metrics
> > > > > > > > that
> > > > > > > > >> give you a list of actively consuming processes.
> > > > > > > > >>
> > > > > > > > >> On Mon, Dec 15, 2014 at 12:59 PM, Gwen Shapira
<
> > > > > > gshapira@cloudera.com
> > > > > > > >
> > > > > > > > >> wrote:
> > > > > > > > >> >
> > > > > > > > >> > Currently you can find the number of
consumer groups
> > through
> > > > > > > > ZooKeeper:
> > > > > > > > >> >
> > > > > > > > >> > connect to ZK and run
> > > > > > > > >> > ls /consumers
> > > > > > > > >> >
> > > > > > > > >> > and count the number of results
> > > > > > > > >> >
> > > > > > > > >> > On Mon, Dec 15, 2014 at 11:34 AM, nitin
sharma
> > > > > > > > >> > <kumarsharma.nitin@gmail.com>
wrote:
> > > > > > > > >> > > Hi Team,
> > > > > > > > >> > >
> > > > > > > > >> > > Is it possible to know how many
Consumer Group
> connected
> > > to
> > > > > > kafka
> > > > > > > > >> broker
> > > > > > > > >> > Ids
> > > > > > > > >> > > and as well as how many Instances
within a Group are
> > > > fetching
> > > > > > > > messages
> > > > > > > > >> > from
> > > > > > > > >> > > Kafka Brokers
> > > > > > > > >> > >
> > > > > > > > >> > > Regards,
> > > > > > > > >> > > Nitin Kumar Sharma.
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >>
> > > > > > > > >> --
> > > > > > > > >> Thanks,
> > > > > > > > >> Neha
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > >
> > > > > > Alec Li
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Alec Li
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
> >
> >
> > --
> >
> > Alec Li
> >
>
>
>
> --
> -- Guozhang
>



-- 

Alec Li

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message