storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/4] storm git commit: STORM-3123 - add support for Kafka security config in storm-kafka-monitor
Date Thu, 15 Nov 2018 23:16:26 GMT
Repository: storm
Updated Branches:
  refs/heads/master f17b3dad8 -> 29eb449ee


STORM-3123 - add support for Kafka security config in storm-kafka-monitor


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/40e24ce4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/40e24ce4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/40e24ce4

Branch: refs/heads/master
Commit: 40e24ce45a7744e2d24d4e8d3f6f146372c57824
Parents: 98ed0a8
Author: Vipin Rathor <v.rathor@gmail.com>
Authored: Wed Jul 11 17:01:36 2018 -0700
Committer: Arun Mahadevan <arunm@apache.org>
Committed: Mon Nov 12 18:12:18 2018 -0800

----------------------------------------------------------------------
 .../storm/kafka/monitor/KafkaOffsetLagUtil.java       | 14 ++++++++++++--
 .../storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java |  9 ++++++++-
 2 files changed, 20 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/40e24ce4/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
index 78b6993..e31fad4 100644
--- a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
+++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
@@ -33,6 +33,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
 import org.json.simple.JSONValue;
 
 /**
@@ -47,6 +48,8 @@ public class KafkaOffsetLagUtil {
     private static final String OPTION_GROUP_ID_LONG = "groupid";
     private static final String OPTION_SECURITY_PROTOCOL_SHORT = "s";
     private static final String OPTION_SECURITY_PROTOCOL_LONG = "security-protocol";
+    private static final String OPTION_CONSUMER_CONFIG_SHORT = "c";
+    private static final String OPTION_CONSUMER_CONFIG_LONG = "consumer-config";
 
     public static void main(String args[]) {
         try {
@@ -63,7 +66,8 @@ public class KafkaOffsetLagUtil {
             NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery =
                 new NewKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG),
                     commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG),
-                    commandLine.getOptionValue(OPTION_GROUP_ID_LONG), securityProtocol);
+                    commandLine.getOptionValue(OPTION_GROUP_ID_LONG), securityProtocol,
+                    commandLine.getOptionValue(OPTION_CONSUMER_CONFIG_LONG));
             List<KafkaOffsetLagResult> results = getOffsetLags(newKafkaSpoutOffsetQuery);
 
             Map<String, Map<Integer, KafkaPartitionOffsetLag>> keyedResult =
keyByTopicAndPartition(results);
@@ -110,6 +114,8 @@ public class KafkaOffsetLagUtil {
                           "consumer/spout e.g. hostname1:9092,hostname2:9092");
         options.addOption(OPTION_GROUP_ID_SHORT, OPTION_GROUP_ID_LONG, true, "Group id of
consumer");
         options.addOption(OPTION_SECURITY_PROTOCOL_SHORT, OPTION_SECURITY_PROTOCOL_LONG,
true, "Security protocol to connect to kafka");
+        options.addOption(OPTION_CONSUMER_CONFIG_SHORT, OPTION_CONSUMER_CONFIG_LONG, true,
"Security configuration file useful "
+                          + "when connecting to secure kafka");
         return options;
     }
 
@@ -117,7 +123,7 @@ public class KafkaOffsetLagUtil {
      * @param newKafkaSpoutOffsetQuery represents the information needed to query kafka for
log head and spout offsets
      * @return log head offset, spout offset and lag for each partition
      */
-    public static List<KafkaOffsetLagResult> getOffsetLags(NewKafkaSpoutOffsetQuery
newKafkaSpoutOffsetQuery) {
+    public static List<KafkaOffsetLagResult> getOffsetLags(NewKafkaSpoutOffsetQuery
newKafkaSpoutOffsetQuery) throws Exception {
         KafkaConsumer<String, String> consumer = null;
         List<KafkaOffsetLagResult> result = new ArrayList<>();
         try {
@@ -130,6 +136,10 @@ public class KafkaOffsetLagUtil {
             props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
             if (newKafkaSpoutOffsetQuery.getSecurityProtocol() != null) {
                 props.put("security.protocol", newKafkaSpoutOffsetQuery.getSecurityProtocol());
+                // Read Kafka property file for extra security options
+                if (newKafkaSpoutOffsetQuery.getConsumerConfig() != null) {
+                    props.putAll(Utils.loadProps(newKafkaSpoutOffsetQuery.getConsumerConfig()));
+                }
             }
             List<TopicPartition> topicPartitionList = new ArrayList<>();
             consumer = new KafkaConsumer<>(props);

http://git-wip-us.apache.org/repos/asf/storm/blob/40e24ce4/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
index e6c4524..53df461 100644
--- a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
+++ b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
@@ -27,12 +27,15 @@ public class NewKafkaSpoutOffsetQuery {
     private final String consumerGroupId; // consumer group id for which the offset needs
to be calculated
     private final String bootStrapBrokers; // bootstrap brokers
     private final String securityProtocol; // security protocol to connect to kafka
+    private final String consumerConfig; // security configuration file to connect to secure
kafka
 
-    public NewKafkaSpoutOffsetQuery(String topics, String bootstrapBrokers, String consumerGroupId,
String securityProtocol) {
+    public NewKafkaSpoutOffsetQuery(String topics, String bootstrapBrokers, String consumerGroupId,
String securityProtocol,
+        String consumerConfig) {
         this.topics = topics;
         this.bootStrapBrokers = bootstrapBrokers;
         this.consumerGroupId = consumerGroupId;
         this.securityProtocol = securityProtocol;
+        this.consumerConfig = consumerConfig;
     }
 
     public String getTopics() {
@@ -51,6 +54,10 @@ public class NewKafkaSpoutOffsetQuery {
         return this.securityProtocol;
     }
 
+    public String getConsumerConfig() {
+        return this.consumerConfig;
+    }
+
     @Override
     public String toString() {
         return "NewKafkaSpoutOffsetQuery{" +


Mime
View raw message