storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/2] storm git commit: STORM-2947: Remove some deprecated methods in storm-client and storm-server
Date Thu, 12 Jul 2018 17:22:33 GMT
Repository: storm
Updated Branches:
  refs/heads/master 96b702dfc -> efb2e9a33


STORM-2947: Remove some deprecated methods in storm-client and storm-server


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c4ed52e7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c4ed52e7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c4ed52e7

Branch: refs/heads/master
Commit: c4ed52e79732d0026f615f00ae6be4683dbf9c74
Parents: 2d7c7d3
Author: Stig Rohde Døssing <srdo@apache.org>
Authored: Tue Jul 10 15:56:34 2018 +0200
Committer: Stig Rohde Døssing <srdo@apache.org>
Committed: Thu Jul 12 16:14:23 2018 +0200

----------------------------------------------------------------------
 conf/defaults.yaml                              |  1 -
 .../java/org/apache/storm/loadgen/LoadBolt.java |  7 +-
 .../src/jvm/org/apache/storm/Config.java        | 15 ----
 .../org/apache/storm/daemon/StormCommon.java    |  6 --
 .../apache/storm/messaging/netty/Client.java    |  3 +-
 .../security/auth/ICredentialsRenewer.java      | 16 +---
 .../DefaultSerializationDelegate.java           | 57 ---------------
 .../GzipBridgeSerializationDelegate.java        | 59 ---------------
 .../apache/storm/testing/FixedTupleSpout.java   |  8 --
 .../src/jvm/org/apache/storm/tuple/Tuple.java   |  8 --
 .../jvm/org/apache/storm/tuple/TupleImpl.java   |  5 --
 .../org/apache/storm/utils/NimbusClient.java    |  9 +--
 .../StormBoundedExponentialBackoffRetry.java    | 13 ++--
 .../src/jvm/org/apache/storm/utils/Utils.java   |  5 +-
 .../GzipBridgeSerializationDelegateTest.java    | 77 --------------------
 .../storm/messaging/netty_integration_test.clj  |  1 -
 .../test/clj/org/apache/storm/nimbus_test.clj   | 48 ++++++------
 .../apache/storm/messaging/netty/NettyTest.java |  1 -
 18 files changed, 36 insertions(+), 303 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index f544903..da4753a 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -223,7 +223,6 @@ storm.messaging.netty.buffer.high.watermark: 16777216 # 16 MB
 # dropped down below this value, any blocked clients will unblock and start processing further
messages.
 storm.messaging.netty.buffer.low.watermark: 8388608 # 8 MB
 # Since nimbus.task.launch.secs and supervisor.worker.start.timeout.secs are 120, other workers
should also wait at least that long before giving up on connecting to the other worker. The
reconnection period need also be bigger than storm.zookeeper.session.timeout(default is 20s),
so that we can abort the reconnection when the target worker is dead.
-storm.messaging.netty.max_retries: 300
 storm.messaging.netty.max_wait_ms: 1000
 storm.messaging.netty.min_wait_ms: 100
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java
index 28b611f..7eb2b73 100644
--- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java
+++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java
@@ -23,11 +23,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.task.OutputCollector;
@@ -83,7 +78,7 @@ public class LoadBolt extends BaseRichBolt {
     @Override
     public void execute(final Tuple input) {
         long startTimeNs = System.nanoTime();
-        InputStream in = inputStreams.get(input.getSourceGlobalStreamid());
+        InputStream in = inputStreams.get(input.getSourceGlobalStreamId());
         sleep.simulateProcessAndExecTime(executorIndex, startTimeNs, in, () -> {
             emitTuples(input);
             collector.ack(input);

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-client/src/jvm/org/apache/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 8dfcee9..318a130 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -1115,13 +1115,6 @@ public class Config extends HashMap<String, Object> {
     @isString
     public static final String STORM_LOCAL_HOSTNAME = "storm.local.hostname";
     /**
-     * The host that the master server is running on, added only for backward compatibility,
the usage deprecated in favor of nimbus.seeds
-     * config.
-     */
-    @Deprecated
-    @isString
-    public static final String NIMBUS_HOST = "nimbus.host";
-    /**
      * List of seed nimbus hosts to use for leader nimbus discovery.
      */
     @isStringList
@@ -1309,14 +1302,6 @@ public class Config extends HashMap<String, Object> {
     @isInteger
     public static final String STORM_NETTY_MESSAGE_BATCH_SIZE = "storm.messaging.netty.transfer.batch.size";
     /**
-     * Netty based messaging: The max # of retries that a peer will perform when a remote
is not accessible
-     *
-     * @deprecated "Since netty clients should never stop reconnecting - this does not make
sense anymore.
-     */
-    @Deprecated
-    @isInteger
-    public static final String STORM_MESSAGING_NETTY_MAX_RETRIES = "storm.messaging.netty.max_retries";
-    /**
      * Netty based messaging: The min # of milliseconds that a peer will wait.
      */
     @isInteger

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
index cd7ef6d..2bb1871 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
@@ -24,7 +24,6 @@ import java.util.TreeMap;
 import org.apache.storm.Config;
 import org.apache.storm.Constants;
 import org.apache.storm.Thrift;
-import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.generated.Bolt;
 import org.apache.storm.generated.ComponentCommon;
 import org.apache.storm.generated.GlobalStreamId;
@@ -85,11 +84,6 @@ public class StormCommon {
         return oldInstance;
     }
 
-    @Deprecated
-    public static String getStormId(final IStormClusterState stormClusterState, final String
topologyName) {
-        return stormClusterState.getTopoId(topologyName).get();
-    }
-
     public static void validateDistributedMode(Map<String, Object> conf) {
         if (ConfigUtils.isLocalMode(conf)) {
             throw new IllegalArgumentException("Cannot start server in local mode!");

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
index 1ecdd26..d46d785 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java
@@ -140,10 +140,9 @@ public class Client extends ConnectionWithStatus implements IStatefulObject,
ISa
         LOG.info("Creating Netty Client, connecting to {}:{}, bufferSize: {}, lowWatermark:
{}, highWatermark: {}",
                  host, port, bufferSize, lowWatermark, highWatermark);
 
-        int maxReconnectionAttempts = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
         int minWaitMs = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
         int maxWaitMs = ObjectReader.getInt(topoConf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
-        retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, maxWaitMs, maxReconnectionAttempts);
+        retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, maxWaitMs, -1);
 
         // Initiate connection to remote destination
         this.eventLoopGroup = eventLoopGroup;

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-client/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java
b/storm-client/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java
index abe11ba..09173b7 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/ICredentialsRenewer.java
@@ -33,19 +33,5 @@ public interface ICredentialsRenewer {
      * @param topologyConf           topology configuration.
      * @param topologyOwnerPrincipal the full principal name of the owner of the topology
      */
-    @SuppressWarnings("deprecation")
-    default void renew(Map<String, String> credentials, Map<String, Object> topologyConf,
String topologyOwnerPrincipal) {
-        renew(credentials, topologyConf);
-    }
-
-    /**
-     * Renew any credentials that need to be renewed. (Update the credentials if needed)
-     *
-     * @param credentials  the credentials that may have something to renew.
-     * @param topologyConf topology configuration.
-     */
-    @Deprecated
-    default void renew(Map<String, String> credentials, Map<String, Object> topologyConf)
{
-        throw new IllegalStateException("At least one of the renew methods must be overridden");
-    }
+    void renew(Map<String, String> credentials, Map<String, Object> topologyConf,
String topologyOwnerPrincipal);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-client/src/jvm/org/apache/storm/serialization/DefaultSerializationDelegate.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/serialization/DefaultSerializationDelegate.java
b/storm-client/src/jvm/org/apache/storm/serialization/DefaultSerializationDelegate.java
deleted file mode 100644
index c2cacc2..0000000
--- a/storm-client/src/jvm/org/apache/storm/serialization/DefaultSerializationDelegate.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses
this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.
 You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.serialization;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.Map;
-
-@Deprecated
-public class DefaultSerializationDelegate implements SerializationDelegate {
-
-    @Override
-    public void prepare(Map<String, Object> topoConf) {
-        // No-op
-    }
-
-    @Override
-    public byte[] serialize(Object object) {
-        try {
-            ByteArrayOutputStream bos = new ByteArrayOutputStream();
-            ObjectOutputStream oos = new ObjectOutputStream(bos);
-            oos.writeObject(object);
-            oos.close();
-            return bos.toByteArray();
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public <T> T deserialize(byte[] bytes, Class<T> clazz) {
-        try {
-            ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
-            ObjectInputStream ois = new ObjectInputStream(bis);
-            Object ret = ois.readObject();
-            ois.close();
-            return (T) ret;
-        } catch (IOException ioe) {
-            throw new RuntimeException(ioe);
-        } catch (ClassNotFoundException e) {
-            throw new RuntimeException(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-client/src/jvm/org/apache/storm/serialization/GzipBridgeSerializationDelegate.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/serialization/GzipBridgeSerializationDelegate.java
b/storm-client/src/jvm/org/apache/storm/serialization/GzipBridgeSerializationDelegate.java
deleted file mode 100644
index 152afd8..0000000
--- a/storm-client/src/jvm/org/apache/storm/serialization/GzipBridgeSerializationDelegate.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses
this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.
 You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.serialization;
-
-import java.util.Map;
-import java.util.zip.GZIPInputStream;
-
-/**
- * Always writes gzip out, but tests incoming to see if it's gzipped. If it is, deserializes
with gzip. If not, uses {@link
- * org.apache.storm.serialization.DefaultSerializationDelegate} to deserialize. Any logic
needing to be enabled via {@link
- * #prepare(java.util.Map)} is passed through to both delegates.
- */
-@Deprecated
-public class GzipBridgeSerializationDelegate implements SerializationDelegate {
-
-    // Split up GZIP_MAGIC into readable bytes
-    private static final byte GZIP_MAGIC_FIRST_BYTE = (byte) GZIPInputStream.GZIP_MAGIC;
-    private static final byte GZIP_MAGIC_SECOND_BYTE = (byte) (GZIPInputStream.GZIP_MAGIC
>> 8);
-    private DefaultSerializationDelegate defaultDelegate = new DefaultSerializationDelegate();
-    private GzipSerializationDelegate gzipDelegate = new GzipSerializationDelegate();
-
-    @Override
-    public void prepare(Map<String, Object> topoConf) {
-        defaultDelegate.prepare(topoConf);
-        gzipDelegate.prepare(topoConf);
-    }
-
-    @Override
-    public byte[] serialize(Object object) {
-        return gzipDelegate.serialize(object);
-    }
-
-    @Override
-    public <T> T deserialize(byte[] bytes, Class<T> clazz) {
-        if (isGzipped(bytes)) {
-            return gzipDelegate.deserialize(bytes, clazz);
-        } else {
-            return defaultDelegate.deserialize(bytes, clazz);
-        }
-    }
-
-    /**
-     * Looks ahead to see if the GZIP magic constant is heading {@code bytes}
-     */
-    private boolean isGzipped(byte[] bytes) {
-        return (bytes.length > 1) && (bytes[0] == GZIP_MAGIC_FIRST_BYTE)
-               && (bytes[1] == GZIP_MAGIC_SECOND_BYTE);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-client/src/jvm/org/apache/storm/testing/FixedTupleSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/testing/FixedTupleSpout.java b/storm-client/src/jvm/org/apache/storm/testing/FixedTupleSpout.java
index 5de142d..44ff71c 100644
--- a/storm-client/src/jvm/org/apache/storm/testing/FixedTupleSpout.java
+++ b/storm-client/src/jvm/org/apache/storm/testing/FixedTupleSpout.java
@@ -41,14 +41,6 @@ public class FixedTupleSpout implements IRichSpout, CompletableSpout {
         this(tuples, (Fields) null);
     }
 
-    /**
-     * @deprecated please use {@link #FixedTupleSpout(List, Fields)}
-     */
-    @Deprecated
-    public FixedTupleSpout(List tuples, String fieldName) {
-        this(tuples, new Fields(fieldName));
-    }
-
     public FixedTupleSpout(List tuples, Fields fields) {
         _id = UUID.randomUUID().toString();
         synchronized (acked) {

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-client/src/jvm/org/apache/storm/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/tuple/Tuple.java b/storm-client/src/jvm/org/apache/storm/tuple/Tuple.java
index 23f531e..ec91d77 100644
--- a/storm-client/src/jvm/org/apache/storm/tuple/Tuple.java
+++ b/storm-client/src/jvm/org/apache/storm/tuple/Tuple.java
@@ -29,14 +29,6 @@ public interface Tuple extends ITuple {
 
     /**
      * Returns the global stream id (component + stream) of this tuple.
-     *
-     * @deprecated replaced by {@link #getSourceGlobalStreamId()} due to broken naming convention
-     */
-    @Deprecated
-    public GlobalStreamId getSourceGlobalStreamid();
-
-    /**
-     * Returns the global stream id (component + stream) of this tuple.
      */
     public GlobalStreamId getSourceGlobalStreamId();
 

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java b/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java
index 0c1ced2..06bfd48 100644
--- a/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/tuple/TupleImpl.java
@@ -225,11 +225,6 @@ public class TupleImpl implements Tuple {
     }
 
     @Override
-    public GlobalStreamId getSourceGlobalStreamid() {
-        return getSourceGlobalStreamId();
-    }
-
-    @Override
     public GlobalStreamId getSourceGlobalStreamId() {
         return new GlobalStreamId(getSourceComponent(), streamId);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java b/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java
index da8bcf5..47ccbe1 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/NimbusClient.java
@@ -136,14 +136,7 @@ public class NimbusClient extends ThriftClient {
             asUser = (String) conf.get(Config.STORM_DO_AS_USER);
         }
 
-        List<String> seeds;
-        if (conf.containsKey(Config.NIMBUS_HOST) && StringUtils.isNotBlank(conf.get(Config.NIMBUS_HOST).toString()))
{
-            LOG.warn("Using deprecated config {} for backward compatibility. Please update
your storm.yaml so it only has config {}",
-                     Config.NIMBUS_HOST, Config.NIMBUS_SEEDS);
-            seeds = Lists.newArrayList(conf.get(Config.NIMBUS_HOST).toString());
-        } else {
-            seeds = (List<String>) conf.get(Config.NIMBUS_SEEDS);
-        }
+        List<String> seeds = (List<String>) conf.get(Config.NIMBUS_SEEDS);
 
         for (String host : seeds) {
             int port = Integer.parseInt(conf.get(Config.NIMBUS_THRIFT_PORT).toString());

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-client/src/jvm/org/apache/storm/utils/StormBoundedExponentialBackoffRetry.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/StormBoundedExponentialBackoffRetry.java
b/storm-client/src/jvm/org/apache/storm/utils/StormBoundedExponentialBackoffRetry.java
index 38f06d4..768c83c 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/StormBoundedExponentialBackoffRetry.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/StormBoundedExponentialBackoffRetry.java
@@ -21,7 +21,7 @@ public class StormBoundedExponentialBackoffRetry extends BoundedExponentialBacko
     private static final Logger LOG = LoggerFactory.getLogger(StormBoundedExponentialBackoffRetry.class);
     private final Random random = new Random();
     private final int linearBaseSleepMs;
-    private int stepSize;
+    private final int stepSize;
     private int expRetriesThreshold;
 
     /**
@@ -31,7 +31,6 @@ public class StormBoundedExponentialBackoffRetry extends BoundedExponentialBacko
      * Also adds jitter for exponential/linear retry. It guarantees `currSleepTimeMs >=
prevSleepTimeMs` and `baseSleepTimeMs <=
      * currSleepTimeMs <= maxSleepTimeMs`
      */
-
     public StormBoundedExponentialBackoffRetry(int baseSleepTimeMs, int maxSleepTimeMs, int
maxRetries) {
         super(baseSleepTimeMs, maxSleepTimeMs, maxRetries);
         expRetriesThreshold = 1;
@@ -39,10 +38,10 @@ public class StormBoundedExponentialBackoffRetry extends BoundedExponentialBacko
             expRetriesThreshold++;
         }
         LOG.debug("The baseSleepTimeMs [{}] the maxSleepTimeMs [{}] the maxRetries [{}]",
-                  baseSleepTimeMs, maxSleepTimeMs, maxRetries);
+            baseSleepTimeMs, maxSleepTimeMs, maxRetries);
         if (baseSleepTimeMs > maxSleepTimeMs) {
-            LOG.warn("Misconfiguration: the baseSleepTimeMs [" + baseSleepTimeMs + "] can't
be greater than " +
-                     "the maxSleepTimeMs [" + maxSleepTimeMs + "].");
+            LOG.warn("Misconfiguration: the baseSleepTimeMs [" + baseSleepTimeMs + "] can't
be greater than "
+                + "the maxSleepTimeMs [" + maxSleepTimeMs + "].");
         }
         if (maxRetries > 0 && maxRetries > expRetriesThreshold) {
             this.stepSize = Math.max(1, (maxSleepTimeMs - (1 << expRetriesThreshold))
/ (maxRetries - expRetriesThreshold));
@@ -62,8 +61,8 @@ public class StormBoundedExponentialBackoffRetry extends BoundedExponentialBacko
             return sleepTimeMs;
         } else {
             int stepJitter = random.nextInt(stepSize);
-            long sleepTimeMs = Math.min(super.getMaxSleepTimeMs(), (linearBaseSleepMs +
-                                                                    (stepSize * (retryCount
- expRetriesThreshold)) + stepJitter));
+            long sleepTimeMs = Math.min(super.getMaxSleepTimeMs(), (linearBaseSleepMs
+                + (stepSize * (retryCount - expRetriesThreshold)) + stepJitter));
             LOG.warn("WILL SLEEP FOR {}ms (MAX)", sleepTimeMs);
             return sleepTimeMs;
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-client/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
index 34ab137..ab8307d 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java
@@ -81,7 +81,7 @@ import org.apache.storm.generated.StormTopology;
 import org.apache.storm.generated.TopologyInfo;
 import org.apache.storm.generated.TopologySummary;
 import org.apache.storm.security.auth.ReqContext;
-import org.apache.storm.serialization.DefaultSerializationDelegate;
+import org.apache.storm.serialization.GzipThriftSerializationDelegate;
 import org.apache.storm.serialization.SerializationDelegate;
 import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
 import org.apache.storm.shade.com.google.common.collect.Lists;
@@ -780,8 +780,7 @@ public class Utils {
             Class delegateClass = Class.forName(delegateClassName);
             delegate = (SerializationDelegate) delegateClass.newInstance();
         } catch (ClassNotFoundException | InstantiationException | IllegalAccessException
e) {
-            LOG.error("Failed to construct serialization delegate, falling back to default",
e);
-            delegate = new DefaultSerializationDelegate();
+            throw new RuntimeException("Failed to construct serialization delegate class
" + delegateClassName, e);
         }
         delegate.prepare(topoConf);
         return delegate;

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-client/test/jvm/org/apache/storm/serialization/GzipBridgeSerializationDelegateTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/serialization/GzipBridgeSerializationDelegateTest.java
b/storm-client/test/jvm/org/apache/storm/serialization/GzipBridgeSerializationDelegateTest.java
deleted file mode 100644
index 5f2dd46..0000000
--- a/storm-client/test/jvm/org/apache/storm/serialization/GzipBridgeSerializationDelegateTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses
this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.
 You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under
the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.serialization;
-
-import java.io.Serializable;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-
-public class GzipBridgeSerializationDelegateTest {
-
-    SerializationDelegate testDelegate;
-
-    @Before
-    public void setUp() throws Exception {
-        testDelegate = new GzipBridgeSerializationDelegate();
-    }
-
-    @Test
-    public void testDeserialize_readingFromGzip() throws Exception {
-        TestPojo pojo = new TestPojo();
-        pojo.name = "foo";
-        pojo.age = 100;
-
-        byte[] serialized = new GzipSerializationDelegate().serialize(pojo);
-
-        TestPojo pojo2 = (TestPojo) testDelegate.deserialize(serialized, TestPojo.class);
-
-        assertEquals(pojo2.name, pojo.name);
-        assertEquals(pojo2.age, pojo.age);
-    }
-
-    @Test
-    public void testDeserialize_readingFromGzipBridge() throws Exception {
-        TestPojo pojo = new TestPojo();
-        pojo.name = "bar";
-        pojo.age = 200;
-
-        byte[] serialized = new GzipBridgeSerializationDelegate().serialize(pojo);
-
-        TestPojo pojo2 = (TestPojo) testDelegate.deserialize(serialized, TestPojo.class);
-
-        assertEquals(pojo2.name, pojo.name);
-        assertEquals(pojo2.age, pojo.age);
-    }
-
-    @Test
-    public void testDeserialize_readingFromDefault() throws Exception {
-        TestPojo pojo = new TestPojo();
-        pojo.name = "baz";
-        pojo.age = 300;
-
-        byte[] serialized = new DefaultSerializationDelegate().serialize(pojo);
-
-        TestPojo pojo2 = (TestPojo) testDelegate.deserialize(serialized, TestPojo.class);
-
-        assertEquals(pojo2.name, pojo.name);
-        assertEquals(pojo2.age, pojo.age);
-    }
-
-    static class TestPojo implements Serializable {
-        String name;
-        int age;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj b/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
index 0e29d8c..8e813af 100644
--- a/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
+++ b/storm-core/test/clj/org/apache/storm/messaging/netty_integration_test.clj
@@ -31,7 +31,6 @@
                                                     STORM-MESSAGING-TRANSPORT  "org.apache.storm.messaging.netty.Context"
                                                     STORM-MESSAGING-NETTY-AUTHENTICATION
false
                                                     STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
-                                                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
                                                     STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000

                                                     STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
                                                     STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS
1

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/org/apache/storm/nimbus_test.clj b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 7d389b3..c8bc42a 100644
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@ -71,7 +71,7 @@
          nil))
 
 (defn storm-component->task-info [cluster storm-name]
-  (let [storm-id (StormCommon/getStormId (.getClusterState cluster) storm-name)
+  (let [storm-id (.get (.getTopoId (.getClusterState cluster) storm-name))
         nimbus (.getNimbus cluster)]
     (-> (.getUserTopology nimbus storm-id)
         (#(StormCommon/stormTaskInfo % (from-json (.getTopologyConf nimbus storm-id))))
@@ -79,12 +79,12 @@
         clojurify-structure)))
 
 (defn getCredentials [cluster storm-name]
-  (let [storm-id (StormCommon/getStormId (.getClusterState cluster) storm-name)
+  (let [storm-id (.get (.getTopoId (.getClusterState cluster) storm-name))
         creds (.credentials (.getClusterState cluster) storm-id nil)]
     (if creds (into {} (.get_creds creds)))))
 
 (defn storm-component->executor-info [cluster storm-name]
-  (let [storm-id (StormCommon/getStormId (.getClusterState cluster) storm-name)
+  (let [storm-id (.get (.getTopoId (.getClusterState cluster) storm-name))
         nimbus (.getNimbus cluster)
         storm-conf (from-json (.getTopologyConf nimbus storm-id))
         topology (.getUserTopology nimbus storm-id)
@@ -101,12 +101,12 @@
          clojurify-structure)))
 
 (defn storm-num-workers [state storm-name]
-  (let [storm-id (StormCommon/getStormId state storm-name)
+  (let [storm-id (.get (.getTopoId state storm-name))
         assignment (.assignmentInfo state storm-id nil)]
     (.size (Utils/reverseMap (.get_executor_node_port assignment)))))
 
 (defn topology-nodes [state storm-name]
-  (let [storm-id (StormCommon/getStormId state storm-name)
+  (let [storm-id (.get (.getTopoId state storm-name))
         assignment (.assignmentInfo state storm-id nil)]
     (->> assignment
          .get_executor_node_port
@@ -116,7 +116,7 @@
          )))
 
 (defn topology-slots [state storm-name]
-  (let [storm-id (StormCommon/getStormId state storm-name)
+  (let [storm-id (.get (.getTopoId state storm-name))
         assignment (.assignmentInfo state storm-id nil)]
     (->> assignment
          .get_executor_node_port
@@ -127,7 +127,7 @@
 ;TODO: when translating this function, don't call map-val, but instead use an inline for
loop.
 ; map-val is a temporary kluge for clojure.
 (defn topology-node-distribution [state storm-name]
-  (let [storm-id (StormCommon/getStormId state storm-name)
+  (let [storm-id (.get (.getTopoId state storm-name))
         assignment (.assignmentInfo state storm-id nil)]
     (->> assignment
          .get_executor_node_port
@@ -206,7 +206,7 @@
 
 (defnk check-consistency [cluster storm-name :assigned? true]
   (let [state (.getClusterState cluster)
-        storm-id (StormCommon/getStormId state storm-name)
+        storm-id (.get (.getTopoId state storm-name))
         task-ids (task-ids cluster storm-id)
         assignment (.assignmentInfo state storm-id nil)
         executor->node+port (.get_executor_node_port assignment)
@@ -548,7 +548,7 @@
              (log-message "Checking user " (System/getProperty "user.name") " " hist-topo-ids)
              (is (= 0 (count hist-topo-ids))))
         (.submitTopology cluster "test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20, LOGS-USERS ["alice",
(System/getProperty "user.name")]} topology)
-        (bind storm-id (StormCommon/getStormId state "test"))
+        (bind storm-id (.get (.getTopoId state "test")))
         (.advanceClusterTime cluster 5)
         (is (not-nil? (.stormBase state storm-id nil)))
         (is (not-nil? (.assignmentInfo state storm-id nil)))
@@ -559,7 +559,7 @@
         (.advanceClusterTime cluster 35)
         ;; kill topology read on group
         (.submitTopology cluster "killgrouptest" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20, LOGS-GROUPS
["alice-group"]} topology)
-        (bind storm-id-killgroup (StormCommon/getStormId state "killgrouptest"))
+        (bind storm-id-killgroup (.get (.getTopoId state "killgrouptest")))
         (.advanceClusterTime cluster 5)
         (is (not-nil? (.stormBase state storm-id-killgroup nil)))
         (is (not-nil? (.assignmentInfo state storm-id-killgroup nil)))
@@ -570,7 +570,7 @@
         (.advanceClusterTime cluster 35)
         ;; kill topology can't read
         (.submitTopology cluster "killnoreadtest" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20} topology)
-        (bind storm-id-killnoread (StormCommon/getStormId state "killnoreadtest"))
+        (bind storm-id-killnoread (.get (.getTopoId state "killnoreadtest")))
         (.advanceClusterTime cluster 5)
         (is (not-nil? (.stormBase state storm-id-killnoread nil)))
         (is (not-nil? (.assignmentInfo state storm-id-killnoread nil)))
@@ -583,19 +583,19 @@
         ;; active topology can read
         (.submitTopology cluster "2test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10, LOGS-USERS ["alice",
(System/getProperty "user.name")]} topology)
         (.advanceClusterTime cluster 11)
-        (bind storm-id2 (StormCommon/getStormId state "2test"))
+        (bind storm-id2 (.get (.getTopoId state "2test")))
         (is (not-nil? (.stormBase state storm-id2 nil)))
         (is (not-nil? (.assignmentInfo state storm-id2 nil)))
         ;; active topology can not read
         (.submitTopology cluster "testnoread" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10, LOGS-USERS
["alice"]} topology)
         (.advanceClusterTime cluster 11)
-        (bind storm-id3 (StormCommon/getStormId state "testnoread"))
+        (bind storm-id3 (.get (.getTopoId state "testnoread")))
         (is (not-nil? (.stormBase state storm-id3 nil)))
         (is (not-nil? (.assignmentInfo state storm-id3 nil)))
         ;; active topology can read based on group
         (.submitTopology cluster "testreadgroup" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10, LOGS-GROUPS
["alice-group"]} topology)
         (.advanceClusterTime cluster 11)
-        (bind storm-id4 (StormCommon/getStormId state "testreadgroup"))
+        (bind storm-id4 (.get (.getTopoId state "testreadgroup")))
         (is (not-nil? (.stormBase state storm-id4 nil)))
         (is (not-nil? (.assignmentInfo state storm-id4 nil)))
         ;; at this point have 1 running, 1 killed topo
@@ -648,7 +648,7 @@
                        {}))
       (bind state (.getClusterState cluster))
       (.submitTopology cluster "test" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20} topology)
-      (bind storm-id (StormCommon/getStormId state "test"))
+      (bind storm-id (.get (.getTopoId state "test")))
       (.advanceClusterTime cluster 15)
       (is (not-nil? (.stormBase state storm-id nil)))
       (is (not-nil? (.assignmentInfo state storm-id nil)))
@@ -673,7 +673,7 @@
       (.advanceClusterTime cluster 11)
       (is (thrown? AlreadyAliveException (.submitTopology cluster "2test" {} topology)))
       (.advanceClusterTime cluster 11)
-      (bind storm-id (StormCommon/getStormId state "2test"))
+      (bind storm-id (.get (.getTopoId state "2test")))
       (is (not-nil? (.stormBase state storm-id nil)))
       (.killTopology (.getNimbus cluster) "2test")
       (is (thrown? AlreadyAliveException (.submitTopology cluster "2test" {} topology)))
@@ -687,7 +687,7 @@
       (is (= 0 (count (.heartbeatStorms state))))
 
       (.submitTopology cluster "test3" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 5} topology)
-      (bind storm-id3 (StormCommon/getStormId state "test3"))
+      (bind storm-id3 (.get (.getTopoId state "test3")))
       (.advanceClusterTime cluster 11)
       ;; this guarantees an immediate kill notification
       (.killTopology (.getNimbus cluster) "test3")
@@ -702,7 +702,7 @@
       (.waitForIdle cluster)
 
       (.submitTopology cluster "test3" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 5} topology)
-      (bind storm-id3 (StormCommon/getStormId state "test3"))
+      (bind storm-id3 (.get (.getTopoId state "test3")))
 
       (.advanceClusterTime cluster 11)
       (bind executor-id (first (topology-executors cluster storm-id3)))
@@ -719,7 +719,7 @@
       (.submitTopology cluster "test4" {TOPOLOGY-MESSAGE-TIMEOUT-SECS 100} topology)
       (.advanceClusterTime cluster 11)
       (.killTopologyWithOpts (.getNimbus cluster) "test4" (doto (KillOptions.) (.set_wait_secs
10)))
-      (bind storm-id4 (StormCommon/getStormId state "test4"))
+      (bind storm-id4 (.get (.getTopoId state "test4")))
       (.advanceClusterTime cluster 9)
       (is (not-nil? (.assignmentInfo state storm-id4 nil)))
       (.advanceClusterTime cluster 2)
@@ -748,7 +748,7 @@
       (.submitTopology cluster "test" {TOPOLOGY-WORKERS 2} topology)
       (.advanceClusterTime cluster 11)
       (check-consistency cluster "test")
-      (bind storm-id (StormCommon/getStormId state "test"))
+      (bind storm-id (.get (.getTopoId state "test")))
       (bind [executor-id1 executor-id2]  (topology-executors cluster storm-id))
       (bind ass1 (executor-assignment cluster storm-id executor-id1))
       (bind ass2 (executor-assignment cluster storm-id executor-id2))
@@ -872,7 +872,7 @@
       (.submitTopology cluster "test" {TOPOLOGY-WORKERS 2} topology)
       (.advanceClusterTime cluster 11)
       (check-consistency cluster "test")
-      (bind storm-id (StormCommon/getStormId state "test"))
+      (bind storm-id (.get (.getTopoId state "test")))
       (bind [executor-id1 executor-id2]  (topology-executors cluster storm-id))
       (bind ass1 (executor-assignment cluster storm-id executor-id1))
       (bind ass2 (executor-assignment cluster storm-id executor-id2))
@@ -931,7 +931,7 @@
       (bind state (.getClusterState cluster))
       (.submitTopology cluster "test" {TOPOLOGY-WORKERS 4} topology)  ; distribution should
be 2, 2, 2, 3 ideally
       (.advanceClusterTime cluster 11)
-      (bind storm-id (StormCommon/getStormId state "test"))
+      (bind storm-id (.get (.getTopoId state "test")))
       (bind slot-executors (slot-assignments cluster storm-id))
       (check-executor-distribution slot-executors [9])
       (check-consistency cluster "test")
@@ -1065,7 +1065,7 @@
                              {TOPOLOGY-WORKERS 3
                               TOPOLOGY-MESSAGE-TIMEOUT-SECS 60} topology)
       (.advanceClusterTime cluster 11)
-      (bind storm-id (StormCommon/getStormId state "test"))
+      (bind storm-id (.get (.getTopoId state "test")))
       (.addSupervisor cluster 3)
       (.addSupervisor cluster 3)
 
@@ -1115,7 +1115,7 @@
                              {TOPOLOGY-WORKERS 3
                               TOPOLOGY-MESSAGE-TIMEOUT-SECS 30} topology)
       (.advanceClusterTime cluster 11)
-      (bind storm-id (StormCommon/getStormId state "test"))
+      (bind storm-id (.get (.getTopoId state "test")))
       (bind checker (fn [distribution]
                       (check-executor-distribution
                         (slot-assignments cluster storm-id)

http://git-wip-us.apache.org/repos/asf/storm/blob/c4ed52e7/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java b/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java
index 4b84e55..477bf05 100644
--- a/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java
+++ b/storm-core/test/jvm/org/apache/storm/messaging/netty/NettyTest.java
@@ -137,7 +137,6 @@ public class NettyTest {
         stormConf.put(Config.STORM_MESSAGING_TRANSPORT, "org.apache.storm.messaging.netty.Context");
         stormConf.put(Config.STORM_MESSAGING_NETTY_AUTHENTICATION, false);
         stormConf.put(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE, 1024);
-        stormConf.put(Config.STORM_MESSAGING_NETTY_MAX_RETRIES, 10);
         stormConf.put(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS, 1000);
         stormConf.put(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS, 5000);
         stormConf.put(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS, 1);


Mime
View raw message