storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/2] storm git commit: STORM-3201: Cleanup Kafka Spout Lag
Date Thu, 23 Aug 2018 14:08:52 GMT
Repository: storm
Updated Branches:
  refs/heads/master 2f399579d -> 47575ec6f


STORM-3201: Cleanup Kafka Spout Lag


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dd90dcad
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dd90dcad
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dd90dcad

Branch: refs/heads/master
Commit: dd90dcadd63c579838e32090c491c9192a9105c5
Parents: 518f947
Author: Robert (Bobby) Evans <evans@yahoo-inc.com>
Authored: Mon Aug 20 14:27:50 2018 -0500
Committer: Robert (Bobby) Evans <evans@yahoo-inc.com>
Committed: Wed Aug 22 11:09:09 2018 -0500

----------------------------------------------------------------------
 bin/storm.py                                    |  6 ++
 .../src/jvm/org/apache/storm/utils/Utils.java   |  4 ++
 .../storm/PaceMakerStateStorageFactoryTest.java |  6 +-
 .../apache/storm/utils/TopologySpoutLag.java    | 75 +++++++++++---------
 .../java/org/apache/storm/PacemakerTest.java    | 21 +++---
 5 files changed, 64 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/dd90dcad/bin/storm.py
----------------------------------------------------------------------
diff --git a/bin/storm.py b/bin/storm.py
index a764d3d..7909fa2 100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@ -720,6 +720,7 @@ def nimbus(klass="org.apache.storm.daemon.nimbus.Nimbus"):
     """
     cppaths = [CLUSTER_CONF_DIR]
     jvmopts = parse_args(confvalue("nimbus.childopts", cppaths)) + [
+        "-Djava.deserialization.disabled=true",
         "-Dlogfile.name=nimbus.log",
         "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(), "cluster.xml"),
     ]
@@ -741,6 +742,7 @@ def pacemaker(klass="org.apache.storm.pacemaker.Pacemaker"):
     """
     cppaths = [CLUSTER_CONF_DIR]
     jvmopts = parse_args(confvalue("pacemaker.childopts", cppaths)) + [
+        "-Djava.deserialization.disabled=true",
         "-Dlogfile.name=pacemaker.log",
         "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(), "cluster.xml"),
     ]
@@ -762,6 +764,7 @@ def supervisor(klass="org.apache.storm.daemon.supervisor.Supervisor"):
     """
     cppaths = [CLUSTER_CONF_DIR]
     jvmopts = parse_args(confvalue("supervisor.childopts", cppaths)) + [
+        "-Djava.deserialization.disabled=true",
         "-Dlogfile.name=" + STORM_SUPERVISOR_LOG_FILE,
         "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(), "cluster.xml"),
     ]
@@ -784,6 +787,7 @@ def ui():
     """
     cppaths = [CLUSTER_CONF_DIR]
     jvmopts = parse_args(confvalue("ui.childopts", cppaths)) + [
+        "-Djava.deserialization.disabled=true",
         "-Dlogfile.name=ui.log",
         "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(), "cluster.xml")
     ]
@@ -809,6 +813,7 @@ def logviewer():
     """
     cppaths = [CLUSTER_CONF_DIR]
     jvmopts = parse_args(confvalue("logviewer.childopts", cppaths)) + [
+        "-Djava.deserialization.disabled=true",
         "-Dlogfile.name=logviewer.log",
         "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(), "cluster.xml")
     ]
@@ -853,6 +858,7 @@ def drpc():
     """
     cppaths = [CLUSTER_CONF_DIR]
     jvmopts = parse_args(confvalue("drpc.childopts", cppaths)) + [
+        "-Djava.deserialization.disabled=true",
         "-Dlogfile.name=drpc.log",
         "-Dlog4j.configurationFile=" + os.path.join(get_log4j2_conf_dir(), "cluster.xml")
     ]

http://git-wip-us.apache.org/repos/asf/storm/blob/dd90dcad/storm-client/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index 5594039..81298d6 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -491,6 +491,10 @@ public class Utils {
     }
 
     public static <T> T javaDeserialize(byte[] serialized, Class<T> clazz) {
+        if ("true".equalsIgnoreCase(System.getProperty("java.deserialization.disabled")))
{
+            throw new AssertionError("java deserialization has been disabled and is only
safe from within a worker process");
+        }
+
         try {
             ByteArrayInputStream bis = new ByteArrayInputStream(serialized);
             ObjectInputStream ois = null;

http://git-wip-us.apache.org/repos/asf/storm/blob/dd90dcad/storm-client/test/jvm/org/apache/storm/PaceMakerStateStorageFactoryTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/PaceMakerStateStorageFactoryTest.java
b/storm-client/test/jvm/org/apache/storm/PaceMakerStateStorageFactoryTest.java
index 39db4ef..63d8cdc 100644
--- a/storm-client/test/jvm/org/apache/storm/PaceMakerStateStorageFactoryTest.java
+++ b/storm-client/test/jvm/org/apache/storm/PaceMakerStateStorageFactoryTest.java
@@ -65,19 +65,19 @@ public class PaceMakerStateStorageFactoryTest {
     @Test
     public void testSetWorkerHb() throws Exception {
         createPaceMakerStateStorage(HBServerMessageType.SEND_PULSE_RESPONSE, null);
-        stateStorage.set_worker_hb("/foo", Utils.javaSerialize("data"), null);
+        stateStorage.set_worker_hb("/foo", "data".getBytes("UTF-8"), null);
         verify(clientMock).send(hbMessageCaptor.capture());
         HBMessage sent = hbMessageCaptor.getValue();
         HBPulse pulse = sent.get_data().get_pulse();
         Assert.assertEquals(HBServerMessageType.SEND_PULSE, sent.get_type());
         Assert.assertEquals("/foo", pulse.get_id());
-        Assert.assertEquals("data", Utils.javaDeserialize(pulse.get_details(), String.class));
+        Assert.assertEquals("data", new String(pulse.get_details(), "UTF-8"));
     }
 
     @Test(expected = RuntimeException.class)
     public void testSetWorkerHbResponseType() throws Exception {
         createPaceMakerStateStorage(HBServerMessageType.SEND_PULSE, null);
-        stateStorage.set_worker_hb("/foo", Utils.javaSerialize("data"), null);
+        stateStorage.set_worker_hb("/foo", "data".getBytes("UTF-8"), null);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/storm/blob/dd90dcad/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
index 25a5312..ce3b22b 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java
@@ -1,7 +1,11 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses
this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.
 You may obtain a copy of the License at
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
  * http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -19,7 +23,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.storm.generated.ComponentCommon;
-import org.apache.storm.generated.ComponentObject;
 import org.apache.storm.generated.SpoutSpec;
 import org.apache.storm.generated.StormTopology;
 import org.json.simple.JSONValue;
@@ -34,63 +37,65 @@ public class TopologySpoutLag {
     private static final String SPOUT_TYPE = "spoutType";
     private static final String SPOUT_LAG_RESULT = "spoutLagResult";
     private static final String ERROR_INFO = "errorInfo";
+    private static final String CONFIG_KEY_PREFIX = "config.";
+    private static final String TOPICS_CONFIG = CONFIG_KEY_PREFIX + "topics";
+    private static final String GROUPID_CONFIG = CONFIG_KEY_PREFIX + "groupid";
+    private static final String BOOTSTRAP_CONFIG = CONFIG_KEY_PREFIX + "bootstrap.servers";
     private final static Logger logger = LoggerFactory.getLogger(TopologySpoutLag.class);
 
     public static Map<String, Map<String, Object>> lag(StormTopology stormTopology,
Map<String, Object> topologyConf) {
         Map<String, Map<String, Object>> result = new HashMap<>();
         Map<String, SpoutSpec> spouts = stormTopology.get_spouts();
-        String className = null;
-        for (Map.Entry<String, SpoutSpec> spout : spouts.entrySet()) {
+        for (Map.Entry<String, SpoutSpec> spout: spouts.entrySet()) {
             try {
                 SpoutSpec spoutSpec = spout.getValue();
-                ComponentObject componentObject = spoutSpec.get_spout_object();
-                // FIXME: yes it's a trick so we might be better to find alternative way...
-                className = getClassNameFromComponentObject(componentObject);
-                logger.debug("spout classname: {}", className);
-                if (className.endsWith("storm.kafka.spout.KafkaSpout")) {
-                    result.put(spout.getKey(), getLagResultForNewKafkaSpout(spout.getKey(),
spoutSpec));
-                }
+                addLagResultForKafkaSpout(result, spout.getKey(), spoutSpec);
             } catch (Exception e) {
-                logger.warn("Exception thrown while getting lag for spout id: " + spout.getKey()
+ " and spout class: " + className);
+                logger.warn("Exception thrown while getting lag for spout id: " + spout.getKey());
                 logger.warn("Exception message:" + e.getMessage(), e);
             }
         }
         return result;
     }
 
-    private static String getClassNameFromComponentObject(ComponentObject componentObject)
{
-        try {
-            Object object = Utils.getSetComponentObject(componentObject);
-            return object.getClass().getCanonicalName();
-        } catch (RuntimeException e) {
-
-            if (e.getCause() instanceof ClassNotFoundException) {
-                return e.getCause().getMessage().trim();
-            }
-
-            throw e;
-        }
-    }
-
     private static List<String> getCommandLineOptionsForNewKafkaSpout(Map<String,
Object> jsonConf) {
         logger.debug("json configuration: {}", jsonConf);
 
         List<String> commands = new ArrayList<>();
-        String configKeyPrefix = "config.";
         commands.add("-t");
-        commands.add((String) jsonConf.get(configKeyPrefix + "topics"));
+        commands.add((String) jsonConf.get(TOPICS_CONFIG));
         commands.add("-g");
-        commands.add((String) jsonConf.get(configKeyPrefix + "groupid"));
+        commands.add((String) jsonConf.get(GROUPID_CONFIG));
         commands.add("-b");
-        commands.add((String) jsonConf.get(configKeyPrefix + "bootstrap.servers"));
-        String securityProtocol = (String) jsonConf.get(configKeyPrefix + "security.protocol");
+        commands.add((String) jsonConf.get(BOOTSTRAP_CONFIG));
+        String securityProtocol = (String) jsonConf.get(CONFIG_KEY_PREFIX + "security.protocol");
         if (securityProtocol != null && !securityProtocol.isEmpty()) {
             commands.add("-s");
             commands.add(securityProtocol);
         }
         return commands;
     }
-    
+
+    private static void addLagResultForKafkaSpout(Map<String, Map<String, Object>>
finalResult, String spoutId, SpoutSpec spoutSpec)
+        throws IOException {
+        ComponentCommon componentCommon = spoutSpec.get_common();
+        String json = componentCommon.get_json_conf();
+        if (json != null && !json.isEmpty()) {
+            Map<String, Object> jsonMap = null;
+            try {
+                jsonMap = (Map<String, Object>) JSONValue.parseWithException(json);
+            } catch (ParseException e) {
+                throw new IOException(e);
+            }
+
+            if (jsonMap.containsKey(TOPICS_CONFIG)
+                && jsonMap.containsKey(GROUPID_CONFIG)
+                && jsonMap.containsKey(BOOTSTRAP_CONFIG)) {
+                finalResult.put(spoutId, getLagResultForNewKafkaSpout(spoutId, spoutSpec));
+            }
+        }
+    }
+
     private static Map<String, Object> getLagResultForKafka(String spoutId, SpoutSpec
spoutSpec) throws IOException {
         ComponentCommon componentCommon = spoutSpec.get_common();
         String json = componentCommon.get_json_conf();

http://git-wip-us.apache.org/repos/asf/storm/blob/dd90dcad/storm-server/src/test/java/org/apache/storm/PacemakerTest.java
----------------------------------------------------------------------
diff --git a/storm-server/src/test/java/org/apache/storm/PacemakerTest.java b/storm-server/src/test/java/org/apache/storm/PacemakerTest.java
index 5a9c130..5f7f8e9 100644
--- a/storm-server/src/test/java/org/apache/storm/PacemakerTest.java
+++ b/storm-server/src/test/java/org/apache/storm/PacemakerTest.java
@@ -12,6 +12,7 @@
 
 package org.apache.storm;
 
+import java.io.UnsupportedEncodingException;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
@@ -83,13 +84,13 @@ public class PacemakerTest {
     }
 
     @Test
-    public void testServerSendPulseGetPulse() {
+    public void testServerSendPulseGetPulse() throws UnsupportedEncodingException {
         String path = "/pulsepath";
         String dataString = "pulse data";
         Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
         HBPulse hbPulse = new HBPulse();
         hbPulse.set_id(path);
-        hbPulse.set_details(Utils.javaSerialize(dataString));
+        hbPulse.set_details(dataString.getBytes("UTF-8"));
         messageWithRandId(HBServerMessageType.SEND_PULSE, HBMessageData.pulse(hbPulse));
         HBMessage sendResponse = handler.handleMessage(hbMessage, true);
         Assert.assertEquals(mid, sendResponse.get_message_id());
@@ -100,7 +101,7 @@ public class PacemakerTest {
         HBMessage response = handler.handleMessage(hbMessage, true);
         Assert.assertEquals(mid, response.get_message_id());
         Assert.assertEquals(HBServerMessageType.GET_PULSE_RESPONSE, response.get_type());
-        Assert.assertEquals(dataString, Utils.javaDeserialize(response.get_data().get_pulse().get_details(),
String.class));
+        Assert.assertEquals(dataString, new String(response.get_data().get_pulse().get_details(),
"UTF-8"));
     }
 
     @Test
@@ -118,7 +119,7 @@ public class PacemakerTest {
     }
 
     @Test
-    public void testServerGetAllNodesForPath() {
+    public void testServerGetAllNodesForPath() throws UnsupportedEncodingException {
         Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
         makeNode(handler, "/some-root-path/foo");
         makeNode(handler, "/some-root-path/bar");
@@ -160,7 +161,7 @@ public class PacemakerTest {
     }
 
     @Test
-    public void testServerGetPulse() {
+    public void testServerGetPulse() throws UnsupportedEncodingException {
         Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
         makeNode(handler, "/some-root/GET_PULSE");
         messageWithRandId(HBServerMessageType.GET_PULSE, HBMessageData.path("/some-root/GET_PULSE"));
@@ -174,11 +175,11 @@ public class PacemakerTest {
         Assert.assertEquals(mid, goodResponse.get_message_id());
         Assert.assertEquals(HBServerMessageType.GET_PULSE_RESPONSE, goodResponse.get_type());
         Assert.assertEquals("/some-root/GET_PULSE", goodPulse.get_id());
-        Assert.assertEquals("nothing", Utils.javaDeserialize(goodPulse.get_details(), String.class));
+        Assert.assertEquals("nothing", new String(goodPulse.get_details(), "UTF-8"));
     }
 
     @Test
-    public void testServerDeletePath() {
+    public void testServerDeletePath() throws UnsupportedEncodingException {
         Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
         makeNode(handler, "/some-root/DELETE_PATH/foo");
         makeNode(handler, "/some-root/DELETE_PATH/bar");
@@ -200,7 +201,7 @@ public class PacemakerTest {
     }
 
     @Test
-    public void testServerDeletePulseId() {
+    public void testServerDeletePulseId() throws UnsupportedEncodingException {
         Pacemaker handler = new Pacemaker(new ConcurrentHashMap());
         makeNode(handler, "/some-root/DELETE_PULSE_ID/foo");
         makeNode(handler, "/some-root/DELETE_PULSE_ID/bar");
@@ -227,10 +228,10 @@ public class PacemakerTest {
         hbMessage.set_message_id(mid);
     }
 
-    private HBMessage makeNode(Pacemaker handler, String path) {
+    private HBMessage makeNode(Pacemaker handler, String path) throws UnsupportedEncodingException
{
         HBPulse hbPulse = new HBPulse();
         hbPulse.set_id(path);
-        hbPulse.set_details(Utils.javaSerialize("nothing"));
+        hbPulse.set_details("nothing".getBytes("UTF-8"));
         HBMessage message = new HBMessage(HBServerMessageType.SEND_PULSE, HBMessageData.pulse(hbPulse));
         return handler.handleMessage(message, true);
     }


Mime
View raw message