kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shangan chen <chenshangan...@gmail.com>
Subject Re: consumer can't consume data any more when change the number of kafka partitions
Date Mon, 03 Jun 2013 11:04:06 GMT
This is a testing log in non-storm enviroment, it can't fetch any messages
and seems to die there.


13/06/03 18:44:55 INFO consumer.ZookeeperConsumerConnector: Connecting to
zookeeper instance at 192.168.111.31:2181,192.168.111.40:2181,
192.168.111.41:2181,192.168.111.51:2181,192.168.111.62:2181
13/06/03 18:44:55 INFO zkclient.ZkEventThread: Starting ZkClient event
thread.
13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client
environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52 GMT
13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client environment:host.name
=zjm_111_31
13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client
environment:java.version=1.6.0_25
13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client
environment:java.vendor=Sun Microsystems Inc.
13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client
environment:java.home=/opt/apps_install/jdk-1.6.0_25/jre
13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client
environment:java.class.path=log_storm_0.1-jar-with-dependencies.jar
13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client
environment:java.library.path=/opt/apps_install/jdk-1.6.0_25/jre/lib/amd64/server:/opt/apps_install/jdk-1.6.0_25/jre/lib/amd64:/opt/apps_install/jdk-1.6.0_25/jre/../lib/amd64:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client
environment:java.io.tmpdir=/tmp
13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client
environment:java.compiler=<NA>
13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client environment:os.name=Linux
13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client environment:os.arch=amd64
13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client
environment:os.version=2.6.18-308.el5xen
13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client environment:user.name
=root
13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client
environment:user.home=/root
13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Client
environment:user.dir=/opt/services/storm
13/06/03 18:44:55 INFO zookeeper.ZooKeeper: Initiating client connection,
connectString=192.168.111.31:2181,192.168.111.40:2181,192.168.111.41:2181,
192.168.111.51:2181,192.168.111.62:2181 sessionTimeout=6000
watcher=org.I0Itec.zkclient.ZkClient@575fadcf
13/06/03 18:44:55 INFO zookeeper.ClientCnxn: Opening socket connection to
server zjm_111_62/192.168.111.62:2181. Will not attempt to authenticate
using SASL (Unable to locate a login configuration)
13/06/03 18:44:55 INFO zookeeper.ClientCnxn: Socket connection established
to zjm_111_62/192.168.111.62:2181, initiating session
13/06/03 18:44:55 INFO zookeeper.ClientCnxn: Session establishment complete
on server zjm_111_62/192.168.111.62:2181, sessionid = 0x53e635c520c0140,
negotiated timeout = 6000
13/06/03 18:44:55 INFO zkclient.ZkClient: zookeeper state changed
(SyncConnected)
13/06/03 18:44:55 INFO consumer.ZookeeperConsumerConnector: starting auto
committer every 1000 ms
13/06/03 18:44:55 INFO consumer.ZookeeperConsumerConnector: begin
registering consumer login-group_zjm_111_31-1370256295616-2d287a98 in ZK
13/06/03 18:44:55 INFO consumer.ZookeeperConsumerConnector: end registering
consumer login-group_zjm_111_31-1370256295616-2d287a98 in ZK
13/06/03 18:44:55 INFO consumer.ZookeeperConsumerConnector: begin
rebalancing consumer login-group_zjm_111_31-1370256295616-2d287a98 try #0
13/06/03 18:44:56 INFO consumer.ZookeeperConsumerConnector: Committing all
offsets
13/06/03 18:44:56 INFO consumer.ZookeeperConsumerConnector: Releasing
partition ownership
13/06/03 18:44:56 INFO consumer.ZookeeperConsumerConnector: Consumer
login-group_zjm_111_31-1370256295616-2d287a98 rebalancing the following
partitions: List(1-0, 2-0) for topic login with consumers:
List(login-group_zjm_109_152-1370253495249-eb0a173f-0,
login-group_zjm_109_152-1370253495249-eb0a173f-1,
login-group_zjm_110_159-1370253496049-119021ef-0,
login-group_zjm_110_159-1370253496049-119021ef-1,
login-group_zjm_111_31-1370256295616-2d287a98-0,
login-group_zjm_111_31-1370256295616-2d287a98-1,
login-group_zjm_111_49-1370253495793-26c03d19-0,
login-group_zjm_111_49-1370253495793-26c03d19-1,
login-group_zjm_111_49-1370253496335-c9be525d-0,
login-group_zjm_111_49-1370253496335-c9be525d-1,
login-group_zjm_111_62-1370253495280-2b03d497-0,
login-group_zjm_111_62-1370253495280-2b03d497-1,
login-group_zjm_111_62-1370253495412-4f605be7-0,
login-group_zjm_111_62-1370253495412-4f605be7-1)
13/06/03 18:44:56 WARN consumer.ZookeeperConsumerConnector: No broker
partitions consumed by consumer thread
login-group_zjm_111_31-1370256295616-2d287a98-1 for topic login
13/06/03 18:44:56 WARN consumer.ZookeeperConsumerConnector: No broker
partitions consumed by consumer thread
login-group_zjm_111_31-1370256295616-2d287a98-0 for topic login
13/06/03 18:44:56 INFO consumer.ZookeeperConsumerConnector: Consumer
login-group_zjm_111_31-1370256295616-2d287a98 selected partitions :
13/06/03 18:44:56 INFO consumer.ZookeeperConsumerConnector: end rebalancing
consumer login-group_zjm_111_31-1370256295616-2d287a98 try #0
13/06/03 18:49:16 INFO consumer.ZookeeperConsumerConnector: begin
rebalancing consumer login-group_zjm_111_31-1370256295616-2d287a98 try #0
13/06/03 18:49:16 INFO consumer.ZookeeperConsumerConnector: Committing all
offsets
13/06/03 18:49:16 INFO consumer.ZookeeperConsumerConnector: Releasing
partition ownership
13/06/03 18:49:16 INFO consumer.ZookeeperConsumerConnector: Consumer
login-group_zjm_111_31-1370256295616-2d287a98 rebalancing the following
partitions: List(1-0, 2-0) for topic login with consumers:
List(login-group_zjm_109_152-1370253495249-eb0a173f-0,
login-group_zjm_109_152-1370253495249-eb0a173f-1,
login-group_zjm_110_159-1370253496049-119021ef-0,
login-group_zjm_110_159-1370253496049-119021ef-1,
login-group_zjm_111_31-1370256295616-2d287a98-0,
login-group_zjm_111_31-1370256295616-2d287a98-1,
login-group_zjm_111_49-1370253495793-26c03d19-0,
login-group_zjm_111_49-1370253495793-26c03d19-1,
login-group_zjm_111_49-1370253496335-c9be525d-0,
login-group_zjm_111_49-1370253496335-c9be525d-1)
13/06/03 18:49:16 WARN consumer.ZookeeperConsumerConnector: No broker
partitions consumed by consumer thread
login-group_zjm_111_31-1370256295616-2d287a98-1 for topic login
13/06/03 18:49:16 WARN consumer.ZookeeperConsumerConnector: No broker
partitions consumed by consumer thread
login-group_zjm_111_31-1370256295616-2d287a98-0 for topic login
13/06/03 18:49:16 INFO consumer.ZookeeperConsumerConnector: Consumer
login-group_zjm_111_31-1370256295616-2d287a98 selected partitions :
13/06/03 18:49:16 INFO consumer.ZookeeperConsumerConnector: end rebalancing
consumer login-group_zjm_111_31-1370256295616-2d287a98 try #0
13/06/03 18:49:18 INFO consumer.ZookeeperConsumerConnector: begin
rebalancing consumer login-group_zjm_111_31-1370256295616-2d287a98 try #0
13/06/03 18:49:18 INFO consumer.ZookeeperConsumerConnector: Committing all
offsets
13/06/03 18:49:18 INFO consumer.ZookeeperConsumerConnector: Releasing
partition ownership
13/06/03 18:49:18 INFO consumer.ZookeeperConsumerConnector: Consumer
login-group_zjm_111_31-1370256295616-2d287a98 rebalancing the following
partitions: List(1-0, 2-0) for topic login with consumers:
List(login-group_zjm_111_31-1370256295616-2d287a98-0,
login-group_zjm_111_31-1370256295616-2d287a98-1)
13/06/03 18:49:18 INFO consumer.ZookeeperConsumerConnector:
login-group_zjm_111_31-1370256295616-2d287a98-1 attempting to claim
partition 2-0
13/06/03 18:49:18 INFO consumer.ZookeeperConsumerConnector:
login-group_zjm_111_31-1370256295616-2d287a98-0 attempting to claim
partition 1-0
13/06/03 18:49:18 INFO consumer.ZookeeperConsumerConnector: Consumer
login-group_zjm_111_31-1370256295616-2d287a98 selected partitions :
login:1-0: fetched offset = 482098298: consumed offset =
482098298,login:2-0: fetched offset = 385686015: consumed offset = 385686015
13/06/03 18:49:18 INFO consumer.FetcherRunnable: FetchRunnable-0 start
fetching topic: login part: 0 offset: 482098298 from 192.168.111.40:9092
13/06/03 18:49:18 INFO consumer.ZookeeperConsumerConnector: end rebalancing
consumer login-group_zjm_111_31-1370256295616-2d287a98 try #0
13/06/03 18:49:18 INFO consumer.FetcherRunnable: FetchRunnable-1 start
fetching topic: login part: 0 offset: 385686015 from 192.168.111.51:9092


On Mon, Jun 3, 2013 at 3:06 PM, shangan chen <chenshangan521@gmail.com>wrote:

> Today, I change the partition number from 2 to 6 in order to improve the
> consume performance, but when I restart the consumer, it can sometime
> consume some or nothing.  Actually, producers are producing data all the
> time.
> I change back the partition, not work, rmr zookeeper consumer info, still
> not work.Can only consume   some data and then stop, I don't know why,
> anyone can help ? The following is my storm code, Thanks a lot.
>
>
> public void open(Map conf, TopologyContext context,
>  SpoutOutputCollector collector) {
> // TODO Auto-generated method stub
>  _collector = collector;
> consumerConnector =
> Consumer.createJavaConsumerConnector(KafkaProperties.createConsumerConfig(groupId));
>  Map<String, List<KafkaMessageStream<Message>>> topicMessageStreams
=
>     consumerConnector.createMessageStreams(ImmutableMap.of(topic,
> numStreams));
>  streams = topicMessageStreams.get(topic);
> executor = Executors.newFixedThreadPool(4);
>  }
>
> @Override
>  public void nextTuple() {
> // TODO Auto-generated method stub
>  boolean hasMessage = true;
> if(streams != null && streams.size() > 0){
>  int i = 0;
> for(final KafkaMessageStream<Message> stream : streams){
>  ConsumerIterator<Message> it = stream.iterator();
> int count = 0;
>  i++;
> //stream是blocking queue ? while(it.hasNext())会死循环, log一直看不到
>  try{
>  while(it != null && it.hasNext()){
>  String msg = getMessage(it.next());
> msg = Debug.appendTimestamp(msg);
>  _collector.emit(new Values(msg), new KafkaMessageId(msg, 1));
> Debug.incr(topic + "_" + MultiKafkaSpout.class.getSimpleName(), 1);
>  Debug.log(this.getClass().getSimpleName(), msg);
> count++;
>  if(count >= LOOP_COUNT){
> break;
>  }
> }
> }catch(ConsumerTimeoutException ex){
>  LOG.warn(topic + " : no message to consume");
> hasMessage = false;
>  }catch(Exception ex){
> //设置了consumer timeout,避免blocking queue
>  LOG.info(topic + " : " + ex.getMessage());
> }
>  LOG.info(String.format("stream#%d emits : %d messages", i, count));
> }
>  }else{
> LOG.warn("no streams available, retry to get streams");
>  Map<String, List<KafkaMessageStream<Message>>> topicMessageStreams
=
>     consumerConnector.createMessageStreams(ImmutableMap.of(topic,
> numStreams));
>  streams = topicMessageStreams.get(topic);
> }
>  if(! hasMessage){
> try {
>  Thread.sleep(1000);
> } catch (InterruptedException e) {
>  // TODO Auto-generated catch block
> LOG.warn("Thread sleep Exception", e);
>  }
> }
> }
>
> --
> have a good day!
> chenshang'an
>
>


-- 
have a good day!
chenshang'an

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message