kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: Just started with Kafka. This test is throwing failed to send message after 3 tries, could help please? what is wrong with my test.
Date Thu, 10 Apr 2014 15:09:37 GMT
Could you re-format your code in the email? They can hard be read from my
browser.

Guozhang


On Thu, Apr 10, 2014 at 6:58 AM, Khalef Bessaih
<bessaihkhalef@hotmail.com>wrote:

> I am using the latest version of kafka.
> package kafka.test;
> import kafka.consumer.Consumer;import kafka.consumer.ConsumerConfig;import
> kafka.consumer.ConsumerIterator;import kafka.consumer.KafkaStream;import
> kafka.javaapi.consumer.ConsumerConnector;import
> kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import
> kafka.producer.ProducerConfig;import kafka.server.KafkaConfig;import
> kafka.server.KafkaServer;import
> org.apache.zookeeper.server.ServerConfig;import
> org.apache.zookeeper.server.ZooKeeperServerMain;import
> org.springframework.beans.factory.DisposableBean;import
> org.springframework.beans.factory.InitializingBean;
> import java.io.IOException;import java.util.HashMap;import
> java.util.List;import java.util.Map;import java.util.Properties;import
> java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;
> /** * Melodi Kafka Broker. * TODO KB: This is not final. */public class
> MelodiKafkaBroker implements InitializingBean, DisposableBean{  public
> static final String KAFKA_LOCALHOST_BROKER = "127.0.0.1:9092";   public
> static final String NBV_BATCH_TOPIC = "nbv-batch-topic"; public static
> final String KAFKA_LOG_DIR = "c:/temp/kafka";     public static final int
> KAFKA_PORT = 9092;      public static final int KAFKA_BROKER_ID = 0;
>         public static final String ZK_LOG_DIR = "c:/temp/zookeeper";
>  public static final String ZK_PORT = "9093";
>         /**      * The actual kafka server.      */     private
> KafkaServer kafkaServer;
>         /**      * ZooKeeper config.     */     private ServerConfig
> zkConfig;
>         /**      * ZooKeeper server.     */     private
> ZooKeeperServerMain zkServer;
>         //      // BEAN INIT.   //
>         @Override       public void afterPropertiesSet() throws Exception
>       {               // start a zookeeper.           zkConfig = new
> ServerConfig();          zkConfig.parse(new String[] { ZK_PORT, ZK_LOG_DIR
> });           zkServer = new ZooKeeperServerMain();
>                 new Thread()            {                       public
> void run()                       {                               try
>                       {
> System.out.println("starting zookeeper");
>                                         zkServer.runFromConfig(zkConfig);
>                               }                               catch
> (IOException e)                           {
>       System.out.println("ZooKeeper Failed");
>   e.printStackTrace(System.err);                          }
>       }               }.start();
>                 // start a kafka                new Thread()            {
>                       public void run()                       {
>                   try                             {
>               System.out.println("starting kafka");
>                                         Properties properties = new
> Properties();                                       properties.put("port",
> KAFKA_PORT + "");                                        properties.put("
> broker.id", KAFKA_BROKER_ID + "");
>  properties.put("log.dir", KAFKA_LOG_DIR);
>       properties.put("zookeeper.connect", "127.0.0.1:9093");
>                                         kafkaServer = new KafkaServer(new
> KafkaConfig(properties), KafkaServer.$lessinit$greater$default$2());
>                            kafkaServer.startup();
>  }                               catch (Exception e)
>       {                                       System.out.println("Kafka
> Failed");
> e.printStackTrace(System.err);                          }
>     }               }.start();
>                 // start consumers
>  System.out.println("starting consumers....");
>                 Properties props = new Properties();
>  props.put("zookeeper.connect", "127.0.0.1:9093");
> props.put("group.id", "nbv-group");             props.put("
> zookeeper.session.timeout.ms", "500");               props.put("
> zookeeper.sync.time.ms", "250");             props.put("
> auto.commit.interval.ms", "1000");
>                 ConsumerConnector consumer =
> Consumer.createJavaConsumerConnector(                              new
> ConsumerConfig(props));
>                 Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();            topicCountMap.put(NBV_BATCH_TOPIC, new Integer(1));
>             Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap
=
> consumer.createMessageStreams(topicCountMap);
>  List<KafkaStream<byte[], byte[]>> streams =
> consumerMap.get(NBV_BATCH_TOPIC);
>                 // now launch all the threads           //
>                 ExecutorService executor = Executors.newFixedThreadPool(1);
>                 // now create an object to consume the messages         //
>              int threadNumber = 0;           for (final KafkaStream stream
> : streams)                {                       executor.submit(new
> ConsumerTest(stream, threadNumber));                        threadNumber++;
>         }       }
>         @Override       public void destroy() throws Exception  {
>       kafkaServer.shutdown();         System.out.println("kafka stop");
>   }
>         //      // PUBLIC INTERFACE     //
>         public static void main(String[] args) throws Exception {
>       MelodiKafkaBroker broker = new MelodiKafkaBroker();
> broker.afterPropertiesSet();
>                 System.out.println("starting a producer....");
>                 Properties props = new Properties();
>  props.put("metadata.broker.list", KAFKA_LOCALHOST_BROKER);
>  props.put("zookeeper.connect", "127.0.0.1:9093");
> props.put("serializer.class", "kafka.serializer.StringEncoder");
>      ProducerConfig config = new ProducerConfig(props);
>  Producer<String, String> kafkaProducer = new Producer<String,
> String>(config);
>                 System.out.println("Producing messages....");
> for (int i = 0; i < 10; i++)            {
> System.out.println("message to send......");
>  kafkaProducer.send(new KeyedMessage<String, String>(NBV_BATCH_TOPIC,
> "hello kafka " + i + "."));
>  System.out.println("message sent......");               }       }
>         public static class ConsumerTest implements Runnable    {
>       private KafkaStream stream;             private int id;
>                 public ConsumerTest(KafkaStream stream, int id)         {
>                       this.id = id;                   this.stream =
> stream;           }
>                 public void run()               {
> ConsumerIterator<byte[], byte[]> it = stream.iterator();
>      while (it.hasNext())                    {
>   System.out.println("Thread " + id + ": " + new
> String(it.next().message()));                    }
>                         System.out.println("Shutting down Thread: " + id);
>              }       }}




-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message