storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/7] storm git commit: STORM-2953: Remove storm-kafka
Date Thu, 19 Jul 2018 03:34:57 GMT
Repository: storm
Updated Branches:
  refs/heads/master 3883bf74c -> a7e817bcd


http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 503924c..ea5b616 100644
--- a/pom.xml
+++ b/pom.xml
@@ -315,14 +315,9 @@
         <rocketmq.version>4.2.0</rocketmq.version>
 
         <jackson.version>2.9.4</jackson.version>
-        <!-- Kafka version used by old storm-kafka spout code -->
-        <storm.kafka.version>0.8.2.2</storm.kafka.version>
-        <storm.kafka.artifact.id>kafka_2.10</storm.kafka.artifact.id>
-
-        <!-- kafka version used by new storm-kafka-client spout code -->
+        
         <storm.kafka.client.version>0.10.1.0</storm.kafka.client.version>
 
-
         <!-- Java and clojure build lifecycle test properties are defined here to avoid
having to create a default profile -->
         <java.unit.test.exclude>org.apache.storm.testing.IntegrationTest, org.apache.storm.testing.PerformanceTest</java.unit.test.exclude>
         <java.unit.test.include>**/Test*.java, **/*Test.java, **/*TestCase.java</java.unit.test.include>
   <!--maven surefire plugin default test list-->
@@ -365,7 +360,6 @@
 
         <!-- externals -->
         <module>external/storm-autocreds</module>
-        <module>external/storm-kafka</module>
         <module>external/storm-hdfs</module>
         <module>external/storm-hdfs-blobstore</module>
         <module>external/storm-hbase</module>
@@ -397,7 +391,6 @@
         <module>examples/storm-redis-examples</module>
         <module>examples/storm-opentsdb-examples</module>
         <module>examples/storm-solr-examples</module>
-        <module>examples/storm-kafka-examples</module>
         <module>examples/storm-kafka-client-examples</module>
         <module>examples/storm-jdbc-examples</module>
         <module>examples/storm-hdfs-examples</module>
@@ -1085,28 +1078,6 @@
                 <artifactId>calcite-core</artifactId>
                 <version>${calcite.version}</version>
             </dependency>
-
-            <!-- kafka artifact dependency needed for storm-kafka -->
-            <dependency>
-                <groupId>org.apache.kafka</groupId>
-                <artifactId>${storm.kafka.artifact.id}</artifactId>
-                <version>${storm.kafka.version}</version>
-                <scope>provided</scope>
-                <exclusions>
-                    <exclusion>
-                        <groupId>org.apache.zookeeper</groupId>
-                        <artifactId>zookeeper</artifactId>
-                    </exclusion>
-                    <exclusion>
-                        <groupId>log4j</groupId>
-                        <artifactId>log4j</artifactId>
-                    </exclusion>
-                    <exclusion>
-                        <groupId>org.slf4j</groupId>
-                        <artifactId>slf4j-log4j12</artifactId>
-                    </exclusion>
-                </exclusions>
-            </dependency>
             <dependency>
                 <groupId>org.apache.kafka</groupId>
                 <artifactId>kafka-clients</artifactId>

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
index db4c1d7..25a5312 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
@@ -18,7 +18,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.storm.Config;
 import org.apache.storm.generated.ComponentCommon;
 import org.apache.storm.generated.ComponentObject;
 import org.apache.storm.generated.SpoutSpec;
@@ -49,9 +48,7 @@ public class TopologySpoutLag {
                 className = getClassNameFromComponentObject(componentObject);
                 logger.debug("spout classname: {}", className);
                 if (className.endsWith("storm.kafka.spout.KafkaSpout")) {
-                    result.put(spout.getKey(), getLagResultForNewKafkaSpout(spout.getKey(),
spoutSpec, topologyConf));
-                } else if (className.endsWith("storm.kafka.KafkaSpout")) {
-                    result.put(spout.getKey(), getLagResultForOldKafkaSpout(spout.getKey(),
spoutSpec, topologyConf));
+                    result.put(spout.getKey(), getLagResultForNewKafkaSpout(spout.getKey(),
spoutSpec));
                 }
             } catch (Exception e) {
                 logger.warn("Exception thrown while getting lag for spout id: " + spout.getKey()
+ " and spout class: " + className);
@@ -93,46 +90,8 @@ public class TopologySpoutLag {
         }
         return commands;
     }
-
-    private static List<String> getCommandLineOptionsForOldKafkaSpout(Map<String,
Object> jsonConf, Map<String, Object> topologyConf) {
-        logger.debug("json configuration: {}", jsonConf);
-
-        List<String> commands = new ArrayList<>();
-        String configKeyPrefix = "config.";
-        commands.add("-o");
-        commands.add("-t");
-        commands.add((String) jsonConf.get(configKeyPrefix + "topics"));
-        commands.add("-n");
-        commands.add((String) jsonConf.get(configKeyPrefix + "zkRoot"));
-        String zkServers = (String) jsonConf.get(configKeyPrefix + "zkServers");
-        if (zkServers == null || zkServers.isEmpty()) {
-            StringBuilder zkServersBuilder = new StringBuilder();
-            Integer zkPort = ((Number) topologyConf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
-            for (String zkServer : (List<String>) topologyConf.get(Config.STORM_ZOOKEEPER_SERVERS))
{
-                zkServersBuilder.append(zkServer + ":" + zkPort + ",");
-            }
-            zkServers = zkServersBuilder.toString();
-        }
-        commands.add("-z");
-        commands.add(zkServers);
-        if (jsonConf.get(configKeyPrefix + "leaders") != null) {
-            commands.add("-p");
-            commands.add((String) jsonConf.get(configKeyPrefix + "partitions"));
-            commands.add("-l");
-            commands.add((String) jsonConf.get(configKeyPrefix + "leaders"));
-        } else {
-            commands.add("-r");
-            commands.add((String) jsonConf.get(configKeyPrefix + "zkNodeBrokers"));
-            Boolean isWildCard = (Boolean) topologyConf.get("kafka.topic.wildcard.match");
-            if (isWildCard != null && isWildCard.booleanValue()) {
-                commands.add("-w");
-            }
-        }
-        return commands;
-    }
-
-    private static Map<String, Object> getLagResultForKafka(String spoutId, SpoutSpec
spoutSpec, Map<String, Object> topologyConf,
-                                                            boolean old) throws IOException
{
+    
+    private static Map<String, Object> getLagResultForKafka(String spoutId, SpoutSpec
spoutSpec) throws IOException {
         ComponentCommon componentCommon = spoutSpec.get_common();
         String json = componentCommon.get_json_conf();
         Map<String, Object> result = null;
@@ -150,12 +109,11 @@ public class TopologySpoutLag {
             } catch (ParseException e) {
                 throw new IOException(e);
             }
-            commands.addAll(
-                old ? getCommandLineOptionsForOldKafkaSpout(jsonMap, topologyConf) : getCommandLineOptionsForNewKafkaSpout(jsonMap));
+            commands.addAll(getCommandLineOptionsForNewKafkaSpout(jsonMap));
 
             logger.debug("Command to run: {}", commands);
 
-            // if commands contains one or more null value, spout is compiled with lower
version of storm-kafka / storm-kafka-client
+            // if commands contains one or more null value, spout is compiled with lower
version of storm-kafka-client
             if (!commands.contains(null)) {
                 String resultFromMonitor = new ShellCommandRunnerImpl().execCommand(commands.toArray(new
String[0]));
 
@@ -182,13 +140,7 @@ public class TopologySpoutLag {
         return kafkaSpoutLagInfo;
     }
 
-    private static Map<String, Object> getLagResultForNewKafkaSpout(String spoutId,
SpoutSpec spoutSpec,
-                                                                    Map<String, Object>
topologyConf) throws IOException {
-        return getLagResultForKafka(spoutId, spoutSpec, topologyConf, false);
-    }
-
-    private static Map<String, Object> getLagResultForOldKafkaSpout(String spoutId,
SpoutSpec spoutSpec,
-                                                                    Map<String, Object>
topologyConf) throws IOException {
-        return getLagResultForKafka(spoutId, spoutSpec, topologyConf, true);
+    private static Map<String, Object> getLagResultForNewKafkaSpout(String spoutId,
SpoutSpec spoutSpec) throws IOException {
+        return getLagResultForKafka(spoutId, spoutSpec);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/storm-dist/binary/final-package/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/final-package/src/main/assembly/binary.xml b/storm-dist/binary/final-package/src/main/assembly/binary.xml
index 316ac1b..6e0d968 100644
--- a/storm-dist/binary/final-package/src/main/assembly/binary.xml
+++ b/storm-dist/binary/final-package/src/main/assembly/binary.xml
@@ -111,13 +111,6 @@
         <!-- EXTERNAL -->
         <!-- only include the README file -->
         <fileSet>
-            <directory>${project.basedir}/../../../external/storm-kafka</directory>
-            <outputDirectory>external/storm-kafka</outputDirectory>
-            <includes>
-                <include>README.*</include>
-            </includes>
-        </fileSet>
-        <fileSet>
             <directory>${project.basedir}/../../../external/storm-kinesis</directory>
             <outputDirectory>external/storm-kinesis</outputDirectory>
             <includes>


Mime
View raw message