storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s...@apache.org
Subject [1/2] storm git commit: STORM-3175: Allow usage of custom Callback.
Date Mon, 06 Aug 2018 13:54:51 GMT
Repository: storm
Updated Branches:
  refs/heads/master 1828a17e5 -> 4bc28e7e8


STORM-3175: Allow usage of custom Callback.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d86ef7d2
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d86ef7d2
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d86ef7d2

Branch: refs/heads/master
Commit: d86ef7d2d5f21d5e4f29fce97a681f33c4ffa0ad
Parents: 146beff
Author: David DeMar <dfdemar@gmail.com>
Authored: Sun Aug 5 22:47:50 2018 -0400
Committer: David DeMar <dfdemar@gmail.com>
Committed: Sun Aug 5 22:47:50 2018 -0400

----------------------------------------------------------------------
 .../org/apache/storm/kafka/bolt/KafkaBolt.java  |  62 +++++---
 .../storm/kafka/bolt/PreparableCallback.java    |  32 ++++
 .../apache/storm/kafka/bolt/KafkaBoltTest.java  | 146 ++++++++++++++-----
 3 files changed, 187 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d86ef7d2/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
index 2ff1a54..c257c32 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
@@ -58,10 +59,11 @@ public class KafkaBolt<K, V> extends BaseTickTupleAwareRichBolt
{
 
     public static final String TOPIC = "topic";
 
-    private KafkaProducer<K, V> producer;
+    private Producer<K, V> producer;
     private OutputCollector collector;
     private TupleToKafkaMapper<K,V> mapper;
     private KafkaTopicSelector topicSelector;
+    private PreparableCallback providedCallback;
     private Properties boltSpecifiedProperties = new Properties();
     /**
      * {@see KafkaBolt#setFireAndForget(boolean)} for more details on this. 
@@ -98,6 +100,16 @@ public class KafkaBolt<K, V> extends BaseTickTupleAwareRichBolt
{
         return this;
     }
 
+    /**
+     * Sets a user defined callback for use with the KafkaProducer.
+     * @param producerCallback user defined callback
+     * @return this
+     */
+    public KafkaBolt<K, V> withProducerCallback(PreparableCallback producerCallback)
{
+        this.providedCallback = producerCallback;
+        return this;
+    }
+
     @Override
     public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector
collector) {
         LOG.info("Preparing bolt with configuration {}", this);
@@ -118,6 +130,10 @@ public class KafkaBolt<K, V> extends BaseTickTupleAwareRichBolt
{
             }
         }
 
+        if (providedCallback != null) {
+            providedCallback.prepare(topoConf, context);
+        }
+
         producer = mkProducer(boltSpecifiedProperties);
         this.collector = collector;
     }
@@ -125,10 +141,32 @@ public class KafkaBolt<K, V> extends BaseTickTupleAwareRichBolt
{
     /**
      * Intended to be overridden for tests.  Make the producer with the given props
      */
-    protected KafkaProducer<K, V> mkProducer(Properties props) {
+    protected Producer<K, V> mkProducer(Properties props) {
         return new KafkaProducer<>(props);
     }
 
+    /**
+     * Creates the Callback to send to the Producer. Using this Callback will also execute
+     * the user defined Callback, if provided.
+     */
+    private Callback createProducerCallback(final Tuple input) {
+        return (ignored, e) -> {
+            synchronized (collector) {
+                if (e != null) {
+                    collector.reportError(e);
+                    collector.fail(input);
+                } else {
+                    collector.ack(input);
+                }
+
+                // User defined Callback
+                if (providedCallback != null) {
+                    providedCallback.onCompletion(ignored, e);
+                }
+            }
+        };
+    }
+
     @Override
     protected void process(final Tuple input) {
         K key = null;
@@ -142,21 +180,11 @@ public class KafkaBolt<K, V> extends BaseTickTupleAwareRichBolt
{
                 Callback callback = null;
 
                 if (!fireAndForget && async) {
-                    callback = new Callback() {
-                        @Override
-                        public void onCompletion(RecordMetadata ignored, Exception e) {
-                            synchronized (collector) {
-                                if (e != null) {
-                                    collector.reportError(e);
-                                    collector.fail(input);
-                                } else {
-                                    collector.ack(input);
-                                }
-                            }
-                        }
-                    };
+                    callback = createProducerCallback(input);
+                } else if (providedCallback != null) {
+                    callback = providedCallback;
                 }
-                Future<RecordMetadata> result = producer.send(new ProducerRecord<K,
V>(topic, key, message), callback);
+                Future<RecordMetadata> result = producer.send(new ProducerRecord<>(topic,
key, message), callback);
                 if (!async) {
                     try {
                         result.get();
@@ -207,7 +235,7 @@ public class KafkaBolt<K, V> extends BaseTickTupleAwareRichBolt
{
     public void setAsync(boolean async) {
         this.async = async;
     }
-    
+
     @Override
     public String toString() {
         return "KafkaBolt: {mapper: " + mapper 

http://git-wip-us.apache.org/repos/asf/storm/blob/d86ef7d2/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/PreparableCallback.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/PreparableCallback.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/PreparableCallback.java
new file mode 100644
index 0000000..3381435
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/PreparableCallback.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.bolt;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * Serializable callback for use with the KafkaProducer on KafkaBolt.
+ */
+public interface PreparableCallback extends Callback, Serializable {
+    void prepare(Map<String, Object> topoConf, TopologyContext context);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d86ef7d2/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
index d84d803..b6e3f06 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
@@ -17,73 +17,147 @@
  */
 package org.apache.storm.kafka.bolt;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.argThat;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
+
+import org.apache.kafka.clients.producer.MockProducer;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.storm.Testing;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.testing.MkTupleParam;
 import org.apache.storm.tuple.Tuple;
 import org.junit.Test;
-import org.mockito.ArgumentMatcher;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class KafkaBoltTest {
     private static final Logger LOG = LoggerFactory.getLogger(KafkaBoltTest.class);
-    
-    @SuppressWarnings({ "unchecked", "serial" })
-    @Test
-    public void testSimple() {
-        final KafkaProducer<String, String> producer = mock(KafkaProducer.class);
-        when(producer.send(any(), any())).thenAnswer(new Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocation) throws Throwable {
-                Callback c = (Callback)invocation.getArguments()[1];
-                c.onCompletion(null, null);
-                return null;
-            }
-        });
-        KafkaBolt<String, String> bolt = new KafkaBolt<String, String>() {
+
+    private <K, V> KafkaBolt<K, V> makeBolt(Producer<K, V> producer) {
+        KafkaBolt<K, V> bolt = new KafkaBolt<K, V>() {
             @Override
-            protected KafkaProducer<String, String> mkProducer(Properties props) {
+            protected Producer<K, V> mkProducer(Properties props) {
                 return producer;
             }
         };
         bolt.withTopicSelector("MY_TOPIC");
-        
+
+        return bolt;
+    }
+
+    private Tuple createTestTuple(String... values) {
+        MkTupleParam param = new MkTupleParam();
+        param.setFields("key", "message");
+        return Testing.testTuple(Arrays.asList(values), param);
+    }
+
+    @Test
+    public void testSimple() {
+        MockProducer<String, String> producer = new MockProducer<>(Cluster.empty(),
false, null, null, null);
+        KafkaBolt<String, String> bolt = makeBolt(producer);
+
         OutputCollector collector = mock(OutputCollector.class);
         TopologyContext context = mock(TopologyContext.class);
         Map<String, Object> conf = new HashMap<>();
         bolt.prepare(conf, context, collector);
-        MkTupleParam param = new MkTupleParam();
-        param.setFields("key", "message");
-        Tuple testTuple = Testing.testTuple(Arrays.asList("KEY", "VALUE"), param);
+
+        String key = "KEY";
+        String value = "VALUE";
+        Tuple testTuple = createTestTuple(key, value);
         bolt.execute(testTuple);
-        verify(producer).send(argThat(new ArgumentMatcher<ProducerRecord<String, String>>()
{
-            @Override
-            public boolean matches(ProducerRecord<String, String> arg) {
-                LOG.info("GOT {} ->", arg);
-                LOG.info("  {} {} {}", arg.topic(), arg.key(), arg.value());
-                return "MY_TOPIC".equals(arg.topic()) &&
-                        "KEY".equals(arg.key()) &&
-                        "VALUE".equals(arg.value());
-            }
-        }), any(Callback.class));
+
+        assertThat(producer.history().size(), is(1));
+        ProducerRecord<String, String> arg = producer.history().get(0);
+
+        LOG.info("GOT {} ->", arg);
+        LOG.info("{}, {}, {}", arg.topic(), arg.key(), arg.value());
+        assertThat(arg.topic(), is("MY_TOPIC"));
+        assertThat(arg.key(), is(key));
+        assertThat(arg.value(), is(value));
+
+        // Complete the send
+        producer.completeNext();
         verify(collector).ack(testTuple);
     }
 
+    @Test
+    public void testSimpleWithError() {
+        MockProducer<String, String> producer = new MockProducer<>(Cluster.empty(),
false, null, null, null);
+        KafkaBolt<String, String> bolt = makeBolt(producer);
+
+        OutputCollector collector = mock(OutputCollector.class);
+        TopologyContext context = mock(TopologyContext.class);
+        Map<String, Object> conf = new HashMap<>();
+        bolt.prepare(conf, context, collector);
+
+        String key = "KEY";
+        String value = "VALUE";
+        Tuple testTuple = createTestTuple(key, value);
+        bolt.execute(testTuple);
+
+        assertThat(producer.history().size(), is(1));
+        ProducerRecord<String, String> arg = producer.history().get(0);
+
+        LOG.info("GOT {} ->", arg);
+        LOG.info("{}, {}, {}", arg.topic(), arg.key(), arg.value());
+        assertThat(arg.topic(), is("MY_TOPIC"));
+        assertThat(arg.key(), is(key));
+        assertThat(arg.value(), is(value));
+
+        // Force a send error
+        KafkaException ex = new KafkaException();
+        producer.errorNext(ex);
+        verify(collector).reportError(ex);
+        verify(collector).fail(testTuple);
+    }
+
+    @Test
+    public void testCustomCallbackIsWrappedByDefaultCallbackBehavior() {
+        MockProducer<String, String> producer = new MockProducer<>(Cluster.empty(),
false, null, null, null);
+        KafkaBolt<String, String> bolt = makeBolt(producer);
+
+        PreparableCallback customCallback = mock(PreparableCallback.class);
+        bolt.withProducerCallback(customCallback);
+
+        OutputCollector collector = mock(OutputCollector.class);
+        TopologyContext context = mock(TopologyContext.class);
+        Map<String, Object> topoConfig = new HashMap<>();
+        bolt.prepare(topoConfig, context, collector);
+
+        verify(customCallback).prepare(topoConfig, context);
+
+        String key = "KEY";
+        String value = "VALUE";
+        Tuple testTuple = createTestTuple(key, value);
+        bolt.execute(testTuple);
+
+        assertThat(producer.history().size(), is(1));
+        ProducerRecord<String, String> arg = producer.history().get(0);
+
+        LOG.info("GOT {} ->", arg);
+        LOG.info("{}, {}, {}", arg.topic(), arg.key(), arg.value());
+        assertThat(arg.topic(), is("MY_TOPIC"));
+        assertThat(arg.key(), is(key));
+        assertThat(arg.value(), is(value));
+
+        // Force a send error
+        KafkaException ex = new KafkaException();
+        producer.errorNext(ex);
+        verify(customCallback).onCompletion(any(), eq(ex));
+        verify(collector).reportError(ex);
+        verify(collector).fail(testTuple);
+    }
 }


Mime
View raw message