kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Michael G. Noll" <mich...@michael-noll.com>
Subject Re: Kicking the tires on 0.8.1...tires kicking back
Date Sun, 30 Mar 2014 20:05:13 GMT
You might be running into the following known bug in 0.8.1:
https://issues.apache.org/jira/browse/KAFKA-1310

The fix is to downgrade to 0.8.0, or to migrate to 0.8.1.1 once it gets
released (IIRC the tentative 0.8.1.1 release date is mid-April).

Best,
Michael




On 30.03.2014 18:30, Edward Capriolo wrote:
> I am trying to convert a few projects to the latest kafka...
> Is this the latest artifact?
>   <dependency>
>                 <groupId>org.apache.kafka</groupId>
>                 <artifactId>kafka-perf_2.8.0</artifactId>
>                 <version>0.8.1</version>
>         </dependency>
> 
> I have a piece of code
> 
>   @Test
>   public void test() throws InterruptedException {
>     //super.createTopic("mytopic", 1, 1);
>     KafkaWriter kw = new
> KafkaWriter(super.createProducerConfig().props().props()
>             , "mytopic"
>             , new DefaultMessagePartitioner());
>     kw.init();
>     for (int i =0 ;i< 1000 ; i++){
>       System.out.println("sending");
>       kw.send(("bla "+i+" yyy zzz").getBytes());
>     }
>     System.out.println("done");
>     ConsumerConnector consumerConnector =
> Consumer.createJavaConsumerConnector(super.createConsumerConfig());
> 
>     Map<String, Integer> consumers = new HashMap<String, Integer>();
>     consumers.put("mytopic", 1);
>     Map<String, List<KafkaStream<byte[], byte []>>> topicMessageStreams
=
> consumerConnector
>             .createMessageStreams(consumers);
> 
>     final List<KafkaStream<byte[], byte[]>> streams =
> topicMessageStreams.get("mytopic");
>     final AtomicInteger in = new AtomicInteger();
>     final KafkaStream<byte[], byte[]> stream = streams.get(0);
> 
>     Thread t = new Thread() {
>       public void run() {
>         ConsumerIterator<byte[], byte[]> it = stream.iterator();
>         while (it.hasNext()){
>           System.out.println("waiting for messages");
>           MessageAndMetadata<byte[], byte[]> m = it.next();
>           System.out.println("key " + new String(m.key()));
>           System.out.println("message " + new String(m.message()));
>           in.incrementAndGet();
>           System.out.println("count " + in.get());
>           if (in.get() == 999) {
>             break;
>           }
>         }
>       }
>     };
> 
>     t.start();
>     t.join();
> 
>   }
> 
>  protected static kafka.producer.ProducerConfig createProducerConfig(){
>     Properties producerProps = new Properties();
>     //producerProps.put("serializer.class",
> "kafka.serializer.StringEncoder");
>     putZkConnect(producerProps, "localhost:"+zookeeperTestServer.getPort());
>     producerProps.setProperty("batch.size", "10");
>     producerProps.setProperty("producer.type", "async");
>     producerProps.put("metadata.broker.list", "localhost:9092");
>     return new kafka.producer.ProducerConfig(producerProps);
>   }
> 
>   protected ConsumerConfig createConsumerConfig(){
>     Properties consumerProps = new Properties();
>     putZkConnect(consumerProps, "localhost:"+zookeeperTestServer.getPort());
>     putGroupId(consumerProps, "group1");
>     consumerProps.put("auto.offset.reset", "smallest");
>     ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
>     return consumerConfig;
>   }
> 
> The first 200 messages I send seem to get lost in the ether.
> 
> I have also tried creating the topic myself.
> 
>   public static void createTopic(String name, int replica , int partitions
> ){
>     ZkClient z = new ZkClient("localhost:"+zookeeperTestServer.getPort());
>     Properties p  = new Properties();
>     AdminUtils.createTopic(z, name, replica, partitions, p);
>   }
> 
> Which complains when i try to write to the topic.
> 
> 014-03-30 12:28:35 WARN  BrokerPartitionInfo:83 - Error while fetching
> metadata [{TopicMetadata for topic mytopic ->
> No partition metadata for topic mytopic due to
> kafka.common.LeaderNotAvailableException}] for topic [mytopic]: class
> kafka.common.LeaderNotAvailableException
> 2014-03-30 12:28:35 WARN  BrokerPartitionInfo:83 - Error while fetching
> metadata [{TopicMetadata for topic mytopic ->
> No partition metadata for topic mytopic due to
> kafka.common.LeaderNotAvailableException}] for topic [mytopic]: class
> kafka.common.LeaderNotAvailableException
> 2014-03-30 12:28:35 ERROR DefaultEventHandler:97 - Failed to collate
> messages by topic, partition due to: Failed to fetch topic metadata for
> topic: mytopic
> 
> 
> Does anyone know what is going on. I went through these pains converting to
> 0.8.0-betas and I was hoping to be done dealing with this :)
> 

Mime
View raw message