storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [2/4] storm git commit: STORM-3123: Changes to return extra properties from KafkaSpout and use it in TopologySpoutLag
Date Mon, 19 Nov 2018 02:51:05 GMT
STORM-3123: Changes to return extra properties from KafkaSpout and use it in TopologySpoutLag


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bbe78279
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bbe78279
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bbe78279

Branch: refs/heads/1.x-branch
Commit: bbe7827987dc03393652740522975b7bf0169c64
Parents: 371cc26
Author: Arun Mahadevan <arunm@apache.org>
Authored: Mon Nov 12 11:19:31 2018 -0800
Committer: Arun Mahadevan <arunm@apache.org>
Committed: Thu Nov 15 17:20:00 2018 -0800

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    | 20 ++++++-
 .../storm/kafka/monitor/KafkaOffsetLagUtil.java | 12 ++--
 .../apache/storm/utils/TopologySpoutLag.java    | 60 +++++++++++++++++---
 3 files changed, 76 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/bbe78279/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 9bb77b8..94cb893 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -708,11 +708,27 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         configuration.put(configKeyPrefix + "topics", getTopicsString());
 
         configuration.put(configKeyPrefix + "groupid", kafkaSpoutConfig.getConsumerGroupId());
-        configuration.put(configKeyPrefix + "bootstrap.servers", kafkaSpoutConfig.getKafkaProps().get("bootstrap.servers"));
-        configuration.put(configKeyPrefix + "security.protocol", kafkaSpoutConfig.getKafkaProps().get("security.protocol"));
+        for (Entry<String, Object> conf: kafkaSpoutConfig.getKafkaProps().entrySet())
{
+            if (conf.getValue() != null && isPrimitiveOrWrapper(conf.getValue().getClass()))
{
+                configuration.put(configKeyPrefix + conf.getKey(), conf.getValue());
+            }
+        }
         return configuration;
     }
 
+    private boolean isPrimitiveOrWrapper(Class<?> type) {
+        if (type == null) {
+            return false;
+        }
+        return type.isPrimitive() || isWrapper(type);
+    }
+
+    private boolean isWrapper(Class<?> type) {
+        return type == Double.class || type == Float.class || type == Long.class ||
+                type == Integer.class || type == Short.class || type == Character.class ||
+                type == Byte.class || type == Boolean.class || type == String.class;
+    }
+
     private String getTopicsString() {
         return kafkaSpoutConfig.getSubscription().getTopicsString();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/bbe78279/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
index 325d608..6f29043 100644
--- a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
+++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
@@ -198,8 +198,8 @@ public class KafkaOffsetLagUtil {
         options.addOption(OPTION_ZK_BROKERS_ROOT_SHORT, OPTION_ZK_BROKERS_ROOT_LONG, true,
"Zk node prefix where kafka stores broker information e.g. " +
                 "/brokers (applicable only for old kafka spout) ");
         options.addOption(OPTION_SECURITY_PROTOCOL_SHORT, OPTION_SECURITY_PROTOCOL_LONG,
true, "Security protocol to connect to kafka");
-        options.addOption(OPTION_CONSUMER_CONFIG_SHORT, OPTION_CONSUMER_CONFIG_LONG, true,
"Security configuration file useful "
-                          + "when connecting to secure kafka");
+        options.addOption(OPTION_CONSUMER_CONFIG_SHORT, OPTION_CONSUMER_CONFIG_LONG, true,
"Properties file with additional " +
+                "Kafka consumer properties");
         return options;
     }
 
@@ -221,10 +221,10 @@ public class KafkaOffsetLagUtil {
             props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
             if (newKafkaSpoutOffsetQuery.getSecurityProtocol() != null) {
                 props.put("security.protocol", newKafkaSpoutOffsetQuery.getSecurityProtocol());
-                // Read Kafka property file for extra security options
-                if (newKafkaSpoutOffsetQuery.getConsumerConfig() != null) {
-                    props.putAll(Utils.loadProps(newKafkaSpoutOffsetQuery.getConsumerConfig()));
-                }
+            }
+            // Read property file for extra consumer properties
+            if (newKafkaSpoutOffsetQuery.getConsumerConfig() != null) {
+                props.putAll(Utils.loadProps(newKafkaSpoutOffsetQuery.getConsumerConfig()));
             }
             List<TopicPartition> topicPartitionList = new ArrayList<>();
             consumer = new KafkaConsumer<>(props);

http://git-wip-us.apache.org/repos/asf/storm/blob/bbe78279/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
index 1217577..0a65923 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
@@ -19,12 +19,18 @@
 package org.apache.storm.utils;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import org.apache.storm.Config;
+import java.util.Properties;
+import java.util.Set;
+
 import org.apache.storm.generated.ComponentCommon;
 import org.apache.storm.generated.SpoutSpec;
 import org.apache.storm.generated.StormTopology;
@@ -44,6 +50,9 @@ public class TopologySpoutLag {
     private static final String BOOTSTRAP_CONFIG = CONFIG_KEY_PREFIX + "bootstrap.servers";
     private static final String LEADERS_CONFIG = CONFIG_KEY_PREFIX + "leaders";
     private static final String ZKROOT_CONFIG = CONFIG_KEY_PREFIX + "zkRoot";
+    private static final String SECURITY_PROTOCOL_CONFIG = CONFIG_KEY_PREFIX + "security.protocol";
+    private static final Set<String> ALL_CONFIGS = new HashSet<>(Arrays.asList(TOPICS_CONFIG,
GROUPID_CONFIG,
+            BOOTSTRAP_CONFIG, SECURITY_PROTOCOL_CONFIG));
     private final static Logger logger = LoggerFactory.getLogger(TopologySpoutLag.class);
 
     public static Map<String, Map<String, Object>> lag(StormTopology stormTopology,
Map topologyConf) {
@@ -71,7 +80,7 @@ public class TopologySpoutLag {
         commands.add((String) jsonConf.get(GROUPID_CONFIG));
         commands.add("-b");
         commands.add((String) jsonConf.get(BOOTSTRAP_CONFIG));
-        String securityProtocol = (String) jsonConf.get(CONFIG_KEY_PREFIX + "security.protocol");
+        String securityProtocol = (String) jsonConf.get(SECURITY_PROTOCOL_CONFIG);
         if (securityProtocol != null && !securityProtocol.isEmpty()) {
             commands.add("-s");
             commands.add(securityProtocol);
@@ -115,6 +124,30 @@ public class TopologySpoutLag {
         return commands;
     }
 
+    private static File getExtraPropertiesFile(Map<String, Object> jsonConf) {
+        File file = null;
+        Map<String, String> extraProperties = new HashMap<>();
+        for (Map.Entry<String, Object> conf: jsonConf.entrySet()) {
+            if (conf.getKey().startsWith(CONFIG_KEY_PREFIX) && !ALL_CONFIGS.contains(conf.getKey()))
{
+                extraProperties.put(conf.getKey().substring(CONFIG_KEY_PREFIX.length()),
conf.getValue().toString());
+            }
+        }
+        if (!extraProperties.isEmpty()) {
+            try {
+                file = File.createTempFile("kafka-consumer-extra", "props");
+                file.deleteOnExit();
+                Properties properties = new Properties();
+                properties.putAll(extraProperties);
+                try(FileOutputStream fos = new FileOutputStream(file)) {
+                    properties.store(fos, "Kafka consumer extra properties");
+                }
+            } catch (IOException ex) {
+                // ignore
+            }
+        }
+        return file;
+    }
+
     private static void addLagResultForKafkaSpout(Map<String, Map<String, Object>>
finalResult, String spoutId, SpoutSpec spoutSpec,
                                                   Map topologyConf) throws IOException {
         ComponentCommon componentCommon = spoutSpec.get_common();
@@ -159,18 +192,29 @@ public class TopologySpoutLag {
             }
             commands.addAll(old ? getCommandLineOptionsForOldKafkaSpout(jsonMap, topologyConf)
: getCommandLineOptionsForNewKafkaSpout(jsonMap));
 
+            File extraPropertiesFile = getExtraPropertiesFile(jsonMap);
+            if (extraPropertiesFile != null) {
+                commands.add("-c");
+                commands.add(extraPropertiesFile.getAbsolutePath());
+            }
             logger.debug("Command to run: {}", commands);
 
             // if commands contains one or more null value, spout is compiled with lower
version of storm-kafka / storm-kafka-client
             if (!commands.contains(null)) {
-                String resultFromMonitor = ShellUtils.execCommand(commands.toArray(new String[0]));
-
                 try {
-                    result = (Map<String, Object>) JSONValue.parseWithException(resultFromMonitor);
-                } catch (ParseException e) {
-                    logger.debug("JSON parsing failed, assuming message as error message:
{}", resultFromMonitor);
-                    // json parsing fail -> error received
-                    errorMsg = resultFromMonitor;
+                    String resultFromMonitor = ShellUtils.execCommand(commands.toArray(new
String[0]));
+
+                    try {
+                        result = (Map<String, Object>) JSONValue.parseWithException(resultFromMonitor);
+                    } catch (ParseException e) {
+                        logger.debug("JSON parsing failed, assuming message as error message:
{}", resultFromMonitor);
+                        // json parsing fail -> error received
+                        errorMsg = resultFromMonitor;
+                    }
+                } finally {
+                    if (extraPropertiesFile != null) {
+                        extraPropertiesFile.delete();
+                    }
                 }
             }
         }


Mime
View raw message