kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Khalef Bessaih <bessaihkha...@hotmail.com>
Subject Just started with Kafka. This test is throwing failed to send message after 3 tries, could help please? what is wrong with my test.
Date Thu, 10 Apr 2014 13:58:54 GMT
I am using the latest version of kafka.
package kafka.test;
import kafka.consumer.Consumer;import kafka.consumer.ConsumerConfig;import kafka.consumer.ConsumerIterator;import
kafka.consumer.KafkaStream;import kafka.javaapi.consumer.ConsumerConnector;import kafka.javaapi.producer.Producer;import
kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;import kafka.server.KafkaConfig;import
kafka.server.KafkaServer;import org.apache.zookeeper.server.ServerConfig;import org.apache.zookeeper.server.ZooKeeperServerMain;import
org.springframework.beans.factory.DisposableBean;import org.springframework.beans.factory.InitializingBean;
import java.io.IOException;import java.util.HashMap;import java.util.List;import java.util.Map;import
java.util.Properties;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;
/** * Melodi Kafka Broker. * TODO KB: This is not final. */public class MelodiKafkaBroker
implements InitializingBean, DisposableBean{	public static final String KAFKA_LOCALHOST_BROKER
= "127.0.0.1:9092";	public static final String NBV_BATCH_TOPIC = "nbv-batch-topic";	public
static final String KAFKA_LOG_DIR = "c:/temp/kafka";	public static final int KAFKA_PORT =
9092;	public static final int KAFKA_BROKER_ID = 0;
	public static final String ZK_LOG_DIR = "c:/temp/zookeeper";	public static final String ZK_PORT
= "9093";
	/**	 * The actual kafka server.	 */	private KafkaServer kafkaServer;
	/**	 * ZooKeeper config.	 */	private ServerConfig zkConfig;
	/**	 * ZooKeeper server.	 */	private ZooKeeperServerMain zkServer;
	//	// BEAN INIT.	//
	@Override	public void afterPropertiesSet() throws Exception	{		// start a zookeeper.		zkConfig
= new ServerConfig();		zkConfig.parse(new String[] { ZK_PORT, ZK_LOG_DIR });		zkServer = new
ZooKeeperServerMain();
		new Thread()		{			public void run()			{				try				{					System.out.println("starting zookeeper");
					zkServer.runFromConfig(zkConfig);				}				catch (IOException e)				{					System.out.println("ZooKeeper
Failed");					e.printStackTrace(System.err);				}			}		}.start();
		// start a kafka		new Thread()		{			public void run()			{				try				{					System.out.println("starting
kafka");
					Properties properties = new Properties();					properties.put("port", KAFKA_PORT + "");
				properties.put("broker.id", KAFKA_BROKER_ID + "");					properties.put("log.dir", KAFKA_LOG_DIR);
				properties.put("zookeeper.connect", "127.0.0.1:9093");
					kafkaServer = new KafkaServer(new KafkaConfig(properties), KafkaServer.$lessinit$greater$default$2());
				kafkaServer.startup();				}				catch (Exception e)				{					System.out.println("Kafka
Failed");					e.printStackTrace(System.err);				}			}		}.start();
		// start consumers		System.out.println("starting consumers....");
		Properties props = new Properties();		props.put("zookeeper.connect", "127.0.0.1:9093");
	props.put("group.id", "nbv-group");		props.put("zookeeper.session.timeout.ms", "500");		props.put("zookeeper.sync.time.ms",
"250");		props.put("auto.commit.interval.ms", "1000");
		ConsumerConnector consumer = Consumer.createJavaConsumerConnector(				new ConsumerConfig(props));
		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();		topicCountMap.put(NBV_BATCH_TOPIC,
new Integer(1));		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap
= consumer.createMessageStreams(topicCountMap);		List<KafkaStream<byte[], byte[]>>
streams = consumerMap.get(NBV_BATCH_TOPIC);
		// now launch all the threads		//
		ExecutorService executor = Executors.newFixedThreadPool(1);
		// now create an object to consume the messages		//		int threadNumber = 0;		for (final KafkaStream
stream : streams)		{			executor.submit(new ConsumerTest(stream, threadNumber));			threadNumber++;
	}	}
	@Override	public void destroy() throws Exception	{		kafkaServer.shutdown();		System.out.println("kafka
stop");	}
	//	// PUBLIC INTERFACE	//
	public static void main(String[] args) throws Exception	{		MelodiKafkaBroker broker = new
MelodiKafkaBroker();		broker.afterPropertiesSet();
		System.out.println("starting a producer....");
		Properties props = new Properties();		props.put("metadata.broker.list", KAFKA_LOCALHOST_BROKER);
	props.put("zookeeper.connect", "127.0.0.1:9093");		props.put("serializer.class", "kafka.serializer.StringEncoder");
	ProducerConfig config = new ProducerConfig(props);		Producer<String, String> kafkaProducer
= new Producer<String, String>(config);
		System.out.println("Producing messages....");		for (int i = 0; i < 10; i++)		{			System.out.println("message
to send......");			kafkaProducer.send(new KeyedMessage<String, String>(NBV_BATCH_TOPIC,
"hello kafka " + i + "."));			System.out.println("message sent......");		}	}
	public static class ConsumerTest implements Runnable	{		private KafkaStream stream;		private
int id;
		public ConsumerTest(KafkaStream stream, int id)		{			this.id = id;			this.stream = stream;
	}
		public void run()		{			ConsumerIterator<byte[], byte[]> it = stream.iterator();		
while (it.hasNext())			{				System.out.println("Thread " + id + ": " + new String(it.next().message()));
		}
			System.out.println("Shutting down Thread: " + id);		}	}} 		 	   		  
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message