kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: consumer can't consume data any more when change the number of kafka partitions
Date Wed, 05 Jun 2013 14:35:54 GMT
Have you followed #6 and #7 in http://kafka.apache.org/faq.html?

Thanks,

Jun


On Tue, Jun 4, 2013 at 10:04 PM, shangan chen <chenshangan521@gmail.com>wrote:

> I do have more consumers than partitions, but I don't think this is the
> reason, as all consumers seem to die, no messages go to any consumer.
> I stop other consumers, and the one that own a partition, but still can't
> fetch data.
>
> 13/06/03 18:49:16 INFO consumer.ZookeeperConsumerConnector: Consumer
> login-group_zjm_111_31-1370256295616-2d287a98 selected partitions :
> 13/06/03 18:49:16 INFO consumer.ZookeeperConsumerConnector: end rebalancing
> consumer login-group_zjm_111_31-1370256295616-2d287a98 try #0
> 13/06/03 18:49:18 INFO consumer.ZookeeperConsumerConnector: begin
> rebalancing consumer login-group_zjm_111_31-1370256295616-2d287a98 try #0
> 13/06/03 18:49:18 INFO consumer.ZookeeperConsumerConnector: Committing all
> offsets
> 13/06/03 18:49:18 INFO consumer.ZookeeperConsumerConnector: Releasing
> partition ownership
> 13/06/03 18:49:18 INFO consumer.ZookeeperConsumerConnector: Consumer
> login-group_zjm_111_31-1370256295616-2d287a98 rebalancing the following
> partitions: List(1-0, 2-0) for topic login with consumers:
> List(login-group_zjm_111_31-1370256295616-2d287a98-0,
> login-group_zjm_111_31-1370256295616-2d287a98-1)
> 13/06/03 18:49:18 INFO consumer.ZookeeperConsumerConnector:
> login-group_zjm_111_31-1370256295616-2d287a98-1 attempting to claim
> partition 2-0
> 13/06/03 18:49:18 INFO consumer.ZookeeperConsumerConnector:
> login-group_zjm_111_31-1370256295616-2d287a98-0 attempting to claim
> partition 1-0
> 13/06/03 18:49:18 INFO consumer.ZookeeperConsumerConnector: Consumer
> login-group_zjm_111_31-1370256295616-2d287a98 selected partitions :
> login:1-0: fetched offset = 482098298: consumed offset =
> 482098298,login:2-0: fetched offset = 385686015: consumed offset =
> 385686015
> 13/06/03 18:49:18 INFO consumer.FetcherRunnable: FetchRunnable-0 start
> fetching topic: login part: 0 offset: 482098298 from 192.168.111.40:9092
> 13/06/03 18:49:18 INFO consumer.ZookeeperConsumerConnector: end rebalancing
> consumer login-group_zjm_111_31-1370256295616-2d287a98 try #0
> 13/06/03 18:49:18 INFO consumer.FetcherRunnable: FetchRunnable-1 start
> fetching topic: login part: 0 offset: 385686015 from 192.168.111.51:9092
>
>
> On Mon, Jun 3, 2013 at 11:24 PM, Jun Rao <junrao@gmail.com> wrote:
>
> > From the following log, it seems that you have more consumers than
> > partitions. So, some consumers will never get any data. Please see #4 in
> > http://kafka.apache.org/faq.html for details.
> >
> > 13/06/03 18:49:16 INFO consumer.ZookeeperConsumerConnector: Consumer
> > login-group_zjm_111_31-1370256295616-2d287a98 rebalancing the following
> > partitions: List(1-0, 2-0) for topic login with consumers:
> > List(login-group_zjm_109_152-1370253495249-eb0a173f-0,
> > login-group_zjm_109_152-1370253495249-eb0a173f-1,
> > login-group_zjm_110_159-1370253496049-119021ef-0,
> > login-group_zjm_110_159-1370253496049-119021ef-1,
> > login-group_zjm_111_31-1370256295616-2d287a98-0,
> > login-group_zjm_111_31-1370256295616-2d287a98-1,
> > login-group_zjm_111_49-1370253495793-26c03d19-0,
> > login-group_zjm_111_49-1370253495793-26c03d19-1,
> > login-group_zjm_111_49-1370253496335-c9be525d-0,
> > login-group_zjm_111_49-1370253496335-c9be525d-1)
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Mon, Jun 3, 2013 at 4:04 AM, shangan chen <chenshangan521@gmail.com
> > >wrote:
> >
> > > This is a testing log in non-storm enviroment, it can't fetch any
> > messages
> > > and seems to die there.
> > >
> > >
> > > 13/06/03 18:44:55 INFO consumer.ZookeeperConsumerConnector: Connecting
> to
> > > zookeeper instance at 192.168.111.31:2181,192.168.111.40:2181,
> > > 192.168.111.41:2181,192.168.111.51:2181,192.168.111.62:2181
> > > 13/06/03 18:44:55 INFO zkclient.ZkEventThread: Starting ZkClient event
> > > thread.
> > > 13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client
> > > environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52
> > GMT
> > > 13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client environment:
> host.name
> > > =zjm_111_31
> > > 13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client
> > > environment:java.version=1.6.0_25
> > > 13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client
> > > environment:java.vendor=Sun Microsystems Inc.
> > > 13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client
> > > environment:java.home=/opt/apps_install/jdk-1.6.0_25/jre
> > > 13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client
> > > environment:java.class.path=log_storm_0.1-jar-with-dependencies.jar
> > > 13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client
> > >
> > >
> >
> environment:java.library.path=/opt/apps_install/jdk-1.6.0_25/jre/lib/amd64/server:/opt/apps_install/jdk-1.6.0_25/jre/lib/amd64:/opt/apps_install/jdk-1.6.0_25/jre/../lib/amd64:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
> > > 13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client
> > > environment:java.io.tmpdir=/tmp
> > > 13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client
> > > environment:java.compiler=<NA>
> > > 13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client environment:os.name
> > > =Linux
> > > 13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client
> > > environment:os.arch=amd64
> > > 13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client
> > > environment:os.version=2.6.18-308.el5xen
> > > 13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client environment:
> user.name
> > > =root
> > > 13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client
> > > environment:user.home=/root
> > > 13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client
> > > environment:user.dir=/opt/services/storm
> > > 13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Initiating client
> connection,
> > > connectString=192.168.111.31:2181,192.168.111.40:2181,
> > 192.168.111.41:2181,
> > > 192.168.111.51:2181,192.168.111.62:2181 sessionTimeout=6000
> > > watcher=org.I0Itec.zkclient.ZkClient@575fadcf
> > > 13/06/03 18:44:55 INFO zookeeper.ClientCnxn: Opening socket connection
> to
> > > server zjm_111_62/192.168.111.62:2181. Will not attempt to
> authenticate
> > > using SASL (Unable to locate a login configuration)
> > > 13/06/03 18:44:55 INFO zookeeper.ClientCnxn: Socket connection
> > established
> > > to zjm_111_62/192.168.111.62:2181, initiating session
> > > 13/06/03 18:44:55 INFO zookeeper.ClientCnxn: Session establishment
> > complete
> > > on server zjm_111_62/192.168.111.62:2181, sessionid =
> 0x53e635c520c0140,
> > > negotiated timeout = 6000
> > > 13/06/03 18:44:55 INFO zkclient.ZkClient: zookeeper state changed
> > > (SyncConnected)
> > > 13/06/03 18:44:55 INFO consumer.ZookeeperConsumerConnector: starting
> auto
> > > committer every 1000 ms
> > > 13/06/03 18:44:55 INFO consumer.ZookeeperConsumerConnector: begin
> > > registering consumer login-group_zjm_111_31-1370256295616-2d287a98 in
> ZK
> > > 13/06/03 18:44:55 INFO consumer.ZookeeperConsumerConnector: end
> > registering
> > > consumer login-group_zjm_111_31-1370256295616-2d287a98 in ZK
> > > 13/06/03 18:44:55 INFO consumer.ZookeeperConsumerConnector: begin
> > > rebalancing consumer login-group_zjm_111_31-1370256295616-2d287a98 try
> #0
> > > 13/06/03 18:44:56 INFO consumer.ZookeeperConsumerConnector: Committing
> > all
> > > offsets
> > > 13/06/03 18:44:56 INFO consumer.ZookeeperConsumerConnector: Releasing
> > > partition ownership
> > > 13/06/03 18:44:56 INFO consumer.ZookeeperConsumerConnector: Consumer
> > > login-group_zjm_111_31-1370256295616-2d287a98 rebalancing the following
> > > partitions: List(1-0, 2-0) for topic login with consumers:
> > > List(login-group_zjm_109_152-1370253495249-eb0a173f-0,
> > > login-group_zjm_109_152-1370253495249-eb0a173f-1,
> > > login-group_zjm_110_159-1370253496049-119021ef-0,
> > > login-group_zjm_110_159-1370253496049-119021ef-1,
> > > login-group_zjm_111_31-1370256295616-2d287a98-0,
> > > login-group_zjm_111_31-1370256295616-2d287a98-1,
> > > login-group_zjm_111_49-1370253495793-26c03d19-0,
> > > login-group_zjm_111_49-1370253495793-26c03d19-1,
> > > login-group_zjm_111_49-1370253496335-c9be525d-0,
> > > login-group_zjm_111_49-1370253496335-c9be525d-1,
> > > login-group_zjm_111_62-1370253495280-2b03d497-0,
> > > login-group_zjm_111_62-1370253495280-2b03d497-1,
> > > login-group_zjm_111_62-1370253495412-4f605be7-0,
> > > login-group_zjm_111_62-1370253495412-4f605be7-1)
> > > 13/06/03 18:44:56 WARN consumer.ZookeeperConsumerConnector: No broker
> > > partitions consumed by consumer thread
> > > login-group_zjm_111_31-1370256295616-2d287a98-1 for topic login
> > > 13/06/03 18:44:56 WARN consumer.ZookeeperConsumerConnector: No broker
> > > partitions consumed by consumer thread
> > > login-group_zjm_111_31-1370256295616-2d287a98-0 for topic login
> > > 13/06/03 18:44:56 INFO consumer.ZookeeperConsumerConnector: Consumer
> > > login-group_zjm_111_31-1370256295616-2d287a98 selected partitions :
> > > 13/06/03 18:44:56 INFO consumer.ZookeeperConsumerConnector: end
> > rebalancing
> > > consumer login-group_zjm_111_31-1370256295616-2d287a98 try #0
> > > 13/06/03 18:49:16 INFO consumer.ZookeeperConsumerConnector: begin
> > > rebalancing consumer login-group_zjm_111_31-1370256295616-2d287a98 try
> #0
> > > 13/06/03 18:49:16 INFO consumer.ZookeeperConsumerConnector: Committing
> > all
> > > offsets
> > > 13/06/03 18:49:16 INFO consumer.ZookeeperConsumerConnector: Releasing
> > > partition ownership
> > > 13/06/03 18:49:16 INFO consumer.ZookeeperConsumerConnector: Consumer
> > > login-group_zjm_111_31-1370256295616-2d287a98 rebalancing the following
> > > partitions: List(1-0, 2-0) for topic login with consumers:
> > > List(login-group_zjm_109_152-1370253495249-eb0a173f-0,
> > > login-group_zjm_109_152-1370253495249-eb0a173f-1,
> > > login-group_zjm_110_159-1370253496049-119021ef-0,
> > > login-group_zjm_110_159-1370253496049-119021ef-1,
> > > login-group_zjm_111_31-1370256295616-2d287a98-0,
> > > login-group_zjm_111_31-1370256295616-2d287a98-1,
> > > login-group_zjm_111_49-1370253495793-26c03d19-0,
> > > login-group_zjm_111_49-1370253495793-26c03d19-1,
> > > login-group_zjm_111_49-1370253496335-c9be525d-0,
> > > login-group_zjm_111_49-1370253496335-c9be525d-1)
> > > 13/06/03 18:49:16 WARN consumer.ZookeeperConsumerConnector: No broker
> > > partitions consumed by consumer thread
> > > login-group_zjm_111_31-1370256295616-2d287a98-1 for topic login
> > > 13/06/03 18:49:16 WARN consumer.ZookeeperConsumerConnector: No broker
> > > partitions consumed by consumer thread
> > > login-group_zjm_111_31-1370256295616-2d287a98-0 for topic login
> > > 13/06/03 18:49:16 INFO consumer.ZookeeperConsumerConnector: Consumer
> > > login-group_zjm_111_31-1370256295616-2d287a98 selected partitions :
> > > 13/06/03 18:49:16 INFO consumer.ZookeeperConsumerConnector: end
> > rebalancing
> > > consumer login-group_zjm_111_31-1370256295616-2d287a98 try #0
> > > 13/06/03 18:49:18 INFO consumer.ZookeeperConsumerConnector: begin
> > > rebalancing consumer login-group_zjm_111_31-1370256295616-2d287a98 try
> #0
> > > 13/06/03 18:49:18 INFO consumer.ZookeeperConsumerConnector: Committing
> > all
> > > offsets
> > > 13/06/03 18:49:18 INFO consumer.ZookeeperConsumerConnector: Releasing
> > > partition ownership
> > > 13/06/03 18:49:18 INFO consumer.ZookeeperConsumerConnector: Consumer
> > > login-group_zjm_111_31-1370256295616-2d287a98 rebalancing the following
> > > partitions: List(1-0, 2-0) for topic login with consumers:
> > > List(login-group_zjm_111_31-1370256295616-2d287a98-0,
> > > login-group_zjm_111_31-1370256295616-2d287a98-1)
> > > 13/06/03 18:49:18 INFO consumer.ZookeeperConsumerConnector:
> > > login-group_zjm_111_31-1370256295616-2d287a98-1 attempting to claim
> > > partition 2-0
> > > 13/06/03 18:49:18 INFO consumer.ZookeeperConsumerConnector:
> > > login-group_zjm_111_31-1370256295616-2d287a98-0 attempting to claim
> > > partition 1-0
> > > 13/06/03 18:49:18 INFO consumer.ZookeeperConsumerConnector: Consumer
> > > login-group_zjm_111_31-1370256295616-2d287a98 selected partitions :
> > > login:1-0: fetched offset = 482098298: consumed offset =
> > > 482098298,login:2-0: fetched offset = 385686015: consumed offset =
> > > 385686015
> > > 13/06/03 18:49:18 INFO consumer.FetcherRunnable: FetchRunnable-0 start
> > > fetching topic: login part: 0 offset: 482098298 from
> 192.168.111.40:9092
> > > 13/06/03 18:49:18 INFO consumer.ZookeeperConsumerConnector: end
> > rebalancing
> > > consumer login-group_zjm_111_31-1370256295616-2d287a98 try #0
> > > 13/06/03 18:49:18 INFO consumer.FetcherRunnable: FetchRunnable-1 start
> > > fetching topic: login part: 0 offset: 385686015 from
> 192.168.111.51:9092
> > >
> > >
> > > On Mon, Jun 3, 2013 at 3:06 PM, shangan chen <chenshangan521@gmail.com
> > > >wrote:
> > >
> > > > Today, I change the partition number from 2 to 6 in order to improve
> > the
> > > > consume performance, but when I restart the consumer, it can sometime
> > > > consume some or nothing.  Actually, producers are producing data all
> > the
> > > > time.
> > > > I change back the partition, not work, rmr zookeeper consumer info,
> > still
> > > > not work.Can only consume   some data and then stop, I don't know
> why,
> > > > anyone can help ? The following is my storm code, Thanks a lot.
> > > >
> > > >
> > > > public void open(Map conf, TopologyContext context,
> > > >  SpoutOutputCollector collector) {
> > > > // TODO Auto-generated method stub
> > > >  _collector = collector;
> > > > consumerConnector =
> > > >
> > >
> >
> Consumer.createJavaConsumerConnector(KafkaProperties.createConsumerConfig(groupId));
> > > >  Map<String, List<KafkaMessageStream<Message>>> topicMessageStreams
=
> > > >     consumerConnector.createMessageStreams(ImmutableMap.of(topic,
> > > > numStreams));
> > > >  streams = topicMessageStreams.get(topic);
> > > > executor = Executors.newFixedThreadPool(4);
> > > >  }
> > > >
> > > > @Override
> > > >  public void nextTuple() {
> > > > // TODO Auto-generated method stub
> > > >  boolean hasMessage = true;
> > > > if(streams != null && streams.size() > 0){
> > > >  int i = 0;
> > > > for(final KafkaMessageStream<Message> stream : streams){
> > > >  ConsumerIterator<Message> it = stream.iterator();
> > > > int count = 0;
> > > >  i++;
> > > > //stream是blocking queue ? while(it.hasNext())会死循环, log一直看不到
> > > >  try{
> > > >  while(it != null && it.hasNext()){
> > > >  String msg = getMessage(it.next());
> > > > msg = Debug.appendTimestamp(msg);
> > > >  _collector.emit(new Values(msg), new KafkaMessageId(msg, 1));
> > > > Debug.incr(topic + "_" + MultiKafkaSpout.class.getSimpleName(), 1);
> > > >  Debug.log(this.getClass().getSimpleName(), msg);
> > > > count++;
> > > >  if(count >= LOOP_COUNT){
> > > > break;
> > > >  }
> > > > }
> > > > }catch(ConsumerTimeoutException ex){
> > > >  LOG.warn(topic + " : no message to consume");
> > > > hasMessage = false;
> > > >  }catch(Exception ex){
> > > > //设置了consumer timeout,避免blocking queue
> > > >  LOG.info(topic + " : " + ex.getMessage());
> > > > }
> > > >  LOG.info(String.format("stream#%d emits : %d messages", i, count));
> > > > }
> > > >  }else{
> > > > LOG.warn("no streams available, retry to get streams");
> > > >  Map<String, List<KafkaMessageStream<Message>>> topicMessageStreams
=
> > > >     consumerConnector.createMessageStreams(ImmutableMap.of(topic,
> > > > numStreams));
> > > >  streams = topicMessageStreams.get(topic);
> > > > }
> > > >  if(! hasMessage){
> > > > try {
> > > >  Thread.sleep(1000);
> > > > } catch (InterruptedException e) {
> > > >  // TODO Auto-generated catch block
> > > > LOG.warn("Thread sleep Exception", e);
> > > >  }
> > > > }
> > > > }
> > > >
> > > > --
> > > > have a good day!
> > > > chenshang'an
> > > >
> > > >
> > >
> > >
> > > --
> > > have a good day!
> > > chenshang'an
> > >
> >
>
>
>
> --
> have a good day!
> chenshang'an
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message