storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ptgo...@apache.org
Subject [28/50] [abbrv] git commit: added Kafka bolt
Date Mon, 21 Apr 2014 19:44:27 GMT
added Kafka bolt


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/118cec43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/118cec43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/118cec43

Branch: refs/heads/master
Commit: 118cec43b19eec2ed1d66f6dd0aac7209edfde96
Parents: f573001
Author: wurstmeister <wurstmeister@users.noreply.github.com>
Authored: Wed Feb 26 22:38:21 2014 +0000
Committer: wurstmeister <wurstmeister@users.noreply.github.com>
Committed: Thu Feb 27 22:19:24 2014 +0000

----------------------------------------------------------------------
 pom.xml                                      |   8 +-
 src/jvm/storm/kafka/bolt/KafkaBolt.java      |  72 +++++++++
 src/test/storm/kafka/bolt/KafkaBoltTest.java | 170 ++++++++++++++++++++++
 3 files changed, 249 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/118cec43/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index c542158..ff9b9c1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
     <groupId>net.wurstmeister.storm</groupId>
     <artifactId>storm-kafka-0.8-plus</artifactId>
     <packaging>jar</packaging>
-    <version>0.4.0</version>
+    <version>0.5.0-SNAPSHOT</version>
     <name>storm-kafka-0.8-plus</name>
     <description>Storm module for kafka &gt; 0.8</description>
     <licenses>
@@ -99,6 +99,12 @@
     </repositories>
     <dependencies>
         <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>1.9.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.scala-lang</groupId>
             <artifactId>scala-library</artifactId>
             <version>${scalaVersion}</version>

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/118cec43/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/bolt/KafkaBolt.java b/src/jvm/storm/kafka/bolt/KafkaBolt.java
new file mode 100644
index 0000000..89969d9
--- /dev/null
+++ b/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -0,0 +1,72 @@
+package storm.kafka.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Bolt implementation that can send Tuple data to Kafka
+ * <p/>
+ * It expects the producer configuration and topic in storm config under
+ * <p/>
+ * 'kafka.broker.properties' and 'topic'
+ * <p/>
+ * respectively.
+ */
+public class KafkaBolt<K, V> extends BaseRichBolt {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class);
+
+    public static final String TOPIC = "topic";
+    public static final String KAFKA_BROKER_PROPERTIES = "kafka.broker.properties";
+
+    public static final String BOLT_KEY = "key";
+    public static final String BOLT_MESSAGE = "message";
+
+    private Producer<K, V> producer;
+    private OutputCollector collector;
+    private String topic;
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector)
{
+        Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
+        Properties properties = new Properties();
+        properties.putAll(configMap);
+        ProducerConfig config = new ProducerConfig(properties);
+        producer = new Producer<K, V>(config);
+        this.topic = (String) stormConf.get(TOPIC);
+        this.collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        K key = null;
+        if (input.contains(BOLT_KEY)) {
+            key = (K) input.getValueByField(BOLT_KEY);
+        }
+        V message = (V) input.getValueByField(BOLT_MESSAGE);
+        try {
+            producer.send(new KeyedMessage<K, V>(topic, key, message));
+        } catch (Exception ex) {
+            LOG.error("Could not send message with key '" + key + "' and value '" + message
+ "'", ex);
+        } finally {
+            collector.ack(input);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/118cec43/src/test/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git a/src/test/storm/kafka/bolt/KafkaBoltTest.java b/src/test/storm/kafka/bolt/KafkaBoltTest.java
new file mode 100644
index 0000000..129b0f6
--- /dev/null
+++ b/src/test/storm/kafka/bolt/KafkaBoltTest.java
@@ -0,0 +1,170 @@
+package storm.kafka.bolt;
+
+import backtype.storm.Config;
+import backtype.storm.task.GeneralTopologyContext;
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+import kafka.api.OffsetRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.Message;
+import kafka.message.MessageAndOffset;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import storm.kafka.*;
+import storm.kafka.trident.GlobalPartitionInformation;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.verify;
+
+public class KafkaBoltTest {
+
+    private static final String TEST_TOPIC = "test-topic";
+    private KafkaTestBroker broker;
+    private KafkaBolt bolt;
+    private Config config = new Config();
+    private KafkaConfig kafkaConfig;
+    private SimpleConsumer simpleConsumer;
+
+    @Mock
+    private IOutputCollector collector;
+
+    @Before
+    public void initMocks() {
+        MockitoAnnotations.initMocks(this);
+        broker = new KafkaTestBroker();
+        setupKafkaConsumer();
+        config.put(KafkaBolt.TOPIC, TEST_TOPIC);
+        bolt = generateStringSerializerBolt();
+    }
+
+    @After
+    public void shutdown() {
+        broker.shutdown();
+    }
+
+
+    private void setupKafkaConsumer() {
+        GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
+        globalPartitionInformation.addPartition(0, Broker.fromString(broker.getBrokerConnectionString()));
+        BrokerHosts brokerHosts = new StaticHosts(globalPartitionInformation);
+        kafkaConfig = new KafkaConfig(brokerHosts, TEST_TOPIC);
+        simpleConsumer = new SimpleConsumer("localhost", broker.getPort(), 60000, 1024, "testClient");
+    }
+
+    @Test
+    public void executeWithKey() throws Exception {
+        String message = "value-123";
+        String key = "key-123";
+        Tuple tuple = generateTestTuple(key, message);
+        bolt.execute(tuple);
+        verify(collector).ack(tuple);
+        verifyMessage(key, message);
+    }
+
+    @Test
+    public void executeWithByteArrayKeyAndMessage() {
+        bolt = generateDefaultSerializerBolt();
+        String keyString = "test-key";
+        String messageString = "test-message";
+        byte[] key = keyString.getBytes();
+        byte[] message = messageString.getBytes();
+        Tuple tuple = generateTestTuple(key, message);
+        bolt.execute(tuple);
+        verify(collector).ack(tuple);
+        verifyMessage(keyString, messageString);
+    }
+
+    private KafkaBolt generateStringSerializerBolt() {
+        KafkaBolt bolt = new KafkaBolt();
+        Properties props = new Properties();
+        props.put("metadata.broker.list", broker.getBrokerConnectionString());
+        props.put("request.required.acks", "1");
+        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
+        bolt.prepare(config, null, new OutputCollector(collector));
+        return bolt;
+    }
+
+    private KafkaBolt generateDefaultSerializerBolt() {
+        KafkaBolt bolt = new KafkaBolt();
+        Properties props = new Properties();
+        props.put("metadata.broker.list", broker.getBrokerConnectionString());
+        props.put("request.required.acks", "1");
+        config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
+        bolt.prepare(config, null, new OutputCollector(collector));
+        return bolt;
+    }
+
+    @Test
+    public void executeWithoutKey() throws Exception {
+        String message = "value-234";
+        Tuple tuple = generateTestTuple(message);
+        bolt.execute(tuple);
+        verify(collector).ack(tuple);
+        verifyMessage(null, message);
+    }
+
+
+    @Test
+    public void executeWithBrokerDown() throws Exception {
+        broker.shutdown();
+        String message = "value-234";
+        Tuple tuple = generateTestTuple(message);
+        bolt.execute(tuple);
+        verify(collector).ack(tuple);
+    }
+
+
+    private boolean verifyMessage(String key, String message) {
+        long lastMessageOffset = KafkaUtils.getOffset(simpleConsumer, kafkaConfig.topic,
0, OffsetRequest.LatestTime()) - 1;
+        ByteBufferMessageSet messageAndOffsets = KafkaUtils.fetchMessages(kafkaConfig, simpleConsumer,
+                new Partition(Broker.fromString(broker.getBrokerConnectionString()), 0),
lastMessageOffset);
+        MessageAndOffset messageAndOffset = messageAndOffsets.iterator().next();
+        Message kafkaMessage = messageAndOffset.message();
+        ByteBuffer messageKeyBuffer = kafkaMessage.key();
+        String keyString = null;
+        String messageString = new String(Utils.toByteArray(kafkaMessage.payload()));
+        if (messageKeyBuffer != null) {
+            keyString = new String(Utils.toByteArray(messageKeyBuffer));
+        }
+        assertEquals(key, keyString);
+        assertEquals(message, messageString);
+        return true;
+    }
+
+    private Tuple generateTestTuple(Object key, Object message) {
+        TopologyBuilder builder = new TopologyBuilder();
+        GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(),
new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
+            @Override
+            public Fields getComponentOutputFields(String componentId, String streamId) {
+                return new Fields("key", "message");
+            }
+        };
+        return new TupleImpl(topologyContext, new Values(key, message), 1, "");
+    }
+
+    private Tuple generateTestTuple(Object message) {
+        TopologyBuilder builder = new TopologyBuilder();
+        GeneralTopologyContext topologyContext = new GeneralTopologyContext(builder.createTopology(),
new Config(), new HashMap(), new HashMap(), new HashMap(), "") {
+            @Override
+            public Fields getComponentOutputFields(String componentId, String streamId) {
+                return new Fields("message");
+            }
+        };
+        return new TupleImpl(topologyContext, new Values(message), 1, "");
+    }
+}


Mime
View raw message