storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/4] git commit: STORM-328 More restrictive to Config, strict range check within Utils.getInt
Date Tue, 15 Jul 2014 14:21:00 GMT
Repository: incubator-storm
Updated Branches:
  refs/heads/master 9e8c769a6 -> e5226dbc5


STORM-328 More restrictive to Config, strict range check within Utils.getInt


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/214ee745
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/214ee745
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/214ee745

Branch: refs/heads/master
Commit: 214ee7454548b884c591991b1faea770d1478cec
Parents: 18a0721
Author: Jungtaek Lim <kabhwan@gmail.com>
Authored: Sat Jul 12 00:59:01 2014 +0900
Committer: Jungtaek Lim <kabhwan@gmail.com>
Committed: Sat Jul 12 00:59:01 2014 +0900

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/Config.java   | 121 +++++++++----------
 .../jvm/backtype/storm/ConfigValidation.java    |  70 +++++++++++
 .../src/jvm/backtype/storm/utils/Utils.java     |  23 ++--
 .../test/clj/backtype/storm/config_test.clj     |  36 +++++-
 4 files changed, 175 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/214ee745/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index eefaee7..ac8b6b6 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -17,7 +17,6 @@
  */
 package backtype.storm;
 
-import backtype.storm.ConfigValidation;
 import backtype.storm.serialization.IKryoDecorator;
 import backtype.storm.serialization.IKryoFactory;
 import com.esotericsoftware.kryo.Serializer;
@@ -53,49 +52,49 @@ public class Config extends HashMap<String, Object> {
      * Netty based messaging: The buffer size for send/recv buffer
      */
     public static final String STORM_MESSAGING_NETTY_BUFFER_SIZE = "storm.messaging.netty.buffer_size";

-    public static final Object STORM_MESSAGING_NETTY_BUFFER_SIZE_SCHEMA = Number.class;
+    public static final Object STORM_MESSAGING_NETTY_BUFFER_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Netty based messaging: The max # of retries that a peer will perform when a remote
is not accessible
      */
     public static final String STORM_MESSAGING_NETTY_MAX_RETRIES = "storm.messaging.netty.max_retries";

-    public static final Object STORM_MESSAGING_NETTY_MAX_RETRIES_SCHEMA = Number.class;
+    public static final Object STORM_MESSAGING_NETTY_MAX_RETRIES_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Netty based messaging: The min # of milliseconds that a peer will wait. 
      */
     public static final String STORM_MESSAGING_NETTY_MIN_SLEEP_MS = "storm.messaging.netty.min_wait_ms";

-    public static final Object STORM_MESSAGING_NETTY_MIN_SLEEP_MS_SCHEMA = Number.class;
+    public static final Object STORM_MESSAGING_NETTY_MIN_SLEEP_MS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Netty based messaging: The max # of milliseconds that a peer will wait. 
      */
     public static final String STORM_MESSAGING_NETTY_MAX_SLEEP_MS = "storm.messaging.netty.max_wait_ms";

-    public static final Object STORM_MESSAGING_NETTY_MAX_SLEEP_MS_SCHEMA = Number.class;
+    public static final Object STORM_MESSAGING_NETTY_MAX_SLEEP_MS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Netty based messaging: The # of worker threads for the server.
      */
     public static final String STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS = "storm.messaging.netty.server_worker_threads";

-    public static final Object STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS_SCHEMA = Number.class;
+    public static final Object STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Netty based messaging: The # of worker threads for the client.
      */
     public static final String STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS = "storm.messaging.netty.client_worker_threads";

-    public static final Object STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS_SCHEMA = Number.class;
+    public static final Object STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS_SCHEMA = ConfigValidation.IntegerValidator;
     
     /**
      * If the Netty messaging layer is busy, the Netty client will try to batch message as
more as possible up to the size of STORM_NETTY_MESSAGE_BATCH_SIZE bytes
      */
     public static final String STORM_NETTY_MESSAGE_BATCH_SIZE = "storm.messaging.netty.transfer.batch.size";
-    public static final Object STORM_NETTY_MESSAGE_BATCH_SIZE_SCHEMA = Number.class;
+    public static final Object STORM_NETTY_MESSAGE_BATCH_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * We check with this interval that whether the Netty channel is writable and try to
write pending messages
      */
     public static final String STORM_NETTY_FLUSH_CHECK_INTERVAL_MS = "storm.messaging.netty.flush.check.interval.ms";
-    public static final Object STORM_NETTY_FLUSH_CHECK_INTERVAL_MS_SCHEMA = Number.class;
+    public static final Object STORM_NETTY_FLUSH_CHECK_INTERVAL_MS_SCHEMA = ConfigValidation.IntegerValidator;
     
     
     /**
@@ -108,7 +107,7 @@ public class Config extends HashMap<String, Object> {
      * The port Storm will use to connect to each of the ZooKeeper servers.
      */
     public static final String STORM_ZOOKEEPER_PORT = "storm.zookeeper.port";
-    public static final Object STORM_ZOOKEEPER_PORT_SCHEMA = Number.class;
+    public static final Object STORM_ZOOKEEPER_PORT_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * A directory on the local filesystem used by Storm for any local
@@ -177,31 +176,31 @@ public class Config extends HashMap<String, Object> {
      * The session timeout for clients to ZooKeeper.
      */
     public static final String STORM_ZOOKEEPER_SESSION_TIMEOUT = "storm.zookeeper.session.timeout";
-    public static final Object STORM_ZOOKEEPER_SESSION_TIMEOUT_SCHEMA = Number.class;
+    public static final Object STORM_ZOOKEEPER_SESSION_TIMEOUT_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The connection timeout for clients to ZooKeeper.
      */
     public static final String STORM_ZOOKEEPER_CONNECTION_TIMEOUT = "storm.zookeeper.connection.timeout";
-    public static final Object STORM_ZOOKEEPER_CONNECTION_TIMEOUT_SCHEMA = Number.class;
+    public static final Object STORM_ZOOKEEPER_CONNECTION_TIMEOUT_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The number of times to retry a Zookeeper operation.
      */
     public static final String STORM_ZOOKEEPER_RETRY_TIMES="storm.zookeeper.retry.times";
-    public static final Object STORM_ZOOKEEPER_RETRY_TIMES_SCHEMA = Number.class;
+    public static final Object STORM_ZOOKEEPER_RETRY_TIMES_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The interval between retries of a Zookeeper operation.
      */
     public static final String STORM_ZOOKEEPER_RETRY_INTERVAL="storm.zookeeper.retry.interval";
-    public static final Object STORM_ZOOKEEPER_RETRY_INTERVAL_SCHEMA = Number.class;
+    public static final Object STORM_ZOOKEEPER_RETRY_INTERVAL_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The ceiling of the interval between retries of a Zookeeper operation.
      */
     public static final String STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING="storm.zookeeper.retry.intervalceiling.millis";
-    public static final Object STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING_SCHEMA = Number.class;
+    public static final Object STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The Zookeeper authentication scheme to use, e.g. "digest". Defaults to no authentication.
@@ -232,13 +231,13 @@ public class Config extends HashMap<String, Object> {
      * connect to this port to upload jars and submit topologies.
      */
     public static final String NIMBUS_THRIFT_PORT = "nimbus.thrift.port";
-    public static final Object NIMBUS_THRIFT_PORT_SCHEMA = Number.class;
+    public static final Object NIMBUS_THRIFT_PORT_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The maximum buffer size thrift should use when reading messages.
      */
     public static final String NIMBUS_THRIFT_MAX_BUFFER_SIZE = "nimbus.thrift.max_buffer_size";
-    public static final Object NIMBUS_THRIFT_MAX_BUFFER_SIZE_SCHEMA = Number.class;
+    public static final Object NIMBUS_THRIFT_MAX_BUFFER_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
 
 
     /**
@@ -254,7 +253,7 @@ public class Config extends HashMap<String, Object> {
      * task dead and reassign it to another location.
      */
     public static final String NIMBUS_TASK_TIMEOUT_SECS = "nimbus.task.timeout.secs";
-    public static final Object NIMBUS_TASK_TIMEOUT_SECS_SCHEMA = Number.class;
+    public static final Object NIMBUS_TASK_TIMEOUT_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
 
     /**
@@ -264,14 +263,14 @@ public class Config extends HashMap<String, Object> {
      * occuring.
      */
     public static final String NIMBUS_MONITOR_FREQ_SECS = "nimbus.monitor.freq.secs";
-    public static final Object NIMBUS_MONITOR_FREQ_SECS_SCHEMA = Number.class;
+    public static final Object NIMBUS_MONITOR_FREQ_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * How often nimbus should wake the cleanup thread to clean the inbox.
      * @see NIMBUS_INBOX_JAR_EXPIRATION_SECS
      */
     public static final String NIMBUS_CLEANUP_INBOX_FREQ_SECS = "nimbus.cleanup.inbox.freq.secs";
-    public static final Object NIMBUS_CLEANUP_INBOX_FREQ_SECS_SCHEMA = Number.class;
+    public static final Object NIMBUS_CLEANUP_INBOX_FREQ_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The length of time a jar file lives in the inbox before being deleted by the cleanup
thread.
@@ -283,14 +282,14 @@ public class Config extends HashMap<String, Object> {
      * @see NIMBUS_CLEANUP_FREQ_SECS
      */
     public static final String NIMBUS_INBOX_JAR_EXPIRATION_SECS = "nimbus.inbox.jar.expiration.secs";
-    public static final Object NIMBUS_INBOX_JAR_EXPIRATION_SECS_SCHEMA = Number.class;
+    public static final Object NIMBUS_INBOX_JAR_EXPIRATION_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * How long before a supervisor can go without heartbeating before nimbus considers it
dead
      * and stops assigning new work to it.
      */
     public static final String NIMBUS_SUPERVISOR_TIMEOUT_SECS = "nimbus.supervisor.timeout.secs";
-    public static final Object NIMBUS_SUPERVISOR_TIMEOUT_SECS_SCHEMA = Number.class;
+    public static final Object NIMBUS_SUPERVISOR_TIMEOUT_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * A special timeout used when a task is initially launched. During launch, this is the
timeout
@@ -300,7 +299,7 @@ public class Config extends HashMap<String, Object> {
      * to launching new JVM's and configuring them.</p>
      */
     public static final String NIMBUS_TASK_LAUNCH_SECS = "nimbus.task.launch.secs";
-    public static final Object NIMBUS_TASK_LAUNCH_SECS_SCHEMA = Number.class;
+    public static final Object NIMBUS_TASK_LAUNCH_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Whether or not nimbus should reassign tasks if it detects that a task goes down. 
@@ -314,7 +313,7 @@ public class Config extends HashMap<String, Object> {
      * before nimbus considers it dead and drops the connection.
      */
     public static final String NIMBUS_FILE_COPY_EXPIRATION_SECS = "nimbus.file.copy.expiration.secs";
-    public static final Object NIMBUS_FILE_COPY_EXPIRATION_SECS_SCHEMA = Number.class;
+    public static final Object NIMBUS_FILE_COPY_EXPIRATION_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * A custom class that implements ITopologyValidator that is run whenever a
@@ -334,13 +333,13 @@ public class Config extends HashMap<String, Object> {
      * Storm UI binds to this port.
      */
     public static final String UI_PORT = "ui.port";
-    public static final Object UI_PORT_SCHEMA = Number.class;
+    public static final Object UI_PORT_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * HTTP UI port for log viewer
      */
     public static final String LOGVIEWER_PORT = "logviewer.port";
-    public static final Object LOGVIEWER_PORT_SCHEMA = Number.class;
+    public static final Object LOGVIEWER_PORT_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Childopts for log viewer java process.
@@ -370,25 +369,25 @@ public class Config extends HashMap<String, Object> {
      * This port is used by Storm DRPC for receiving DPRC requests from clients.
      */
     public static final String DRPC_PORT = "drpc.port";
-    public static final Object DRPC_PORT_SCHEMA = Number.class;
+    public static final Object DRPC_PORT_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * DRPC thrift server worker threads 
      */
     public static final String DRPC_WORKER_THREADS = "drpc.worker.threads";
-    public static final Object DRPC_WORKER_THREADS_SCHEMA = Number.class;
+    public static final Object DRPC_WORKER_THREADS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * DRPC thrift server queue size 
      */
     public static final String DRPC_QUEUE_SIZE = "drpc.queue.size";
-    public static final Object DRPC_QUEUE_SIZE_SCHEMA = Number.class;
+    public static final Object DRPC_QUEUE_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * This port on Storm DRPC is used by DRPC topologies to receive function invocations
and send results back. 
      */
     public static final String DRPC_INVOCATIONS_PORT = "drpc.invocations.port";
-    public static final Object DRPC_INVOCATIONS_PORT_SCHEMA = Number.class;
+    public static final Object DRPC_INVOCATIONS_PORT_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The timeout on DRPC requests within the DRPC server. Defaults to 10 minutes. Note
that requests can also
@@ -396,7 +395,7 @@ public class Config extends HashMap<String, Object> {
      * timeout for the topology implementing the DRPC function.
      */
     public static final String DRPC_REQUEST_TIMEOUT_SECS  = "drpc.request.timeout.secs";
-    public static final Object DRPC_REQUEST_TIMEOUT_SECS_SCHEMA = Number.class;
+    public static final Object DRPC_REQUEST_TIMEOUT_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Childopts for Storm DRPC Java process.
@@ -415,7 +414,7 @@ public class Config extends HashMap<String, Object> {
      * how many workers run on each machine.
      */
     public static final String SUPERVISOR_SLOTS_PORTS = "supervisor.slots.ports";
-    public static final Object SUPERVISOR_SLOTS_PORTS_SCHEMA = ConfigValidation.NumbersValidator;
+    public static final Object SUPERVISOR_SLOTS_PORTS_SCHEMA = ConfigValidation.IntegersValidator;
 
 
     /**
@@ -431,7 +430,7 @@ public class Config extends HashMap<String, Object> {
      * restart the worker process.
      */
     public static final String SUPERVISOR_WORKER_TIMEOUT_SECS = "supervisor.worker.timeout.secs";
-    public static final Object SUPERVISOR_WORKER_TIMEOUT_SECS_SCHEMA = Number.class;
+    public static final Object SUPERVISOR_WORKER_TIMEOUT_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
 
     /**
@@ -441,7 +440,7 @@ public class Config extends HashMap<String, Object> {
      * overhead to starting and configuring the JVM on launch.
      */
     public static final String SUPERVISOR_WORKER_START_TIMEOUT_SECS = "supervisor.worker.start.timeout.secs";
-    public static final Object SUPERVISOR_WORKER_START_TIMEOUT_SECS_SCHEMA = Number.class;
+    public static final Object SUPERVISOR_WORKER_START_TIMEOUT_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
 
     /**
@@ -457,7 +456,7 @@ public class Config extends HashMap<String, Object> {
      * how often the supervisor sends a heartbeat to the master.
      */
     public static final String SUPERVISOR_HEARTBEAT_FREQUENCY_SECS = "supervisor.heartbeat.frequency.secs";
-    public static final Object SUPERVISOR_HEARTBEAT_FREQUENCY_SECS_SCHEMA = Number.class;
+    public static final Object SUPERVISOR_HEARTBEAT_FREQUENCY_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
 
     /**
@@ -465,7 +464,7 @@ public class Config extends HashMap<String, Object> {
      * need to be restarted.
      */
     public static final String SUPERVISOR_MONITOR_FREQUENCY_SECS = "supervisor.monitor.frequency.secs";
-    public static final Object SUPERVISOR_MONITOR_FREQUENCY_SECS_SCHEMA = Number.class;
+    public static final Object SUPERVISOR_MONITOR_FREQUENCY_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The jvm opts provided to workers launched by this supervisor. All "%ID%" substrings
are replaced
@@ -478,19 +477,19 @@ public class Config extends HashMap<String, Object> {
      * control how many worker receiver threads we need per worker
      */
     public static final String WORKER_RECEIVER_THREAD_COUNT = "topology.worker.receiver.thread.count";
-    public static final Object WORKER_RECEIVER_THREAD_COUNT_SCHEMA = Number.class;
+    public static final Object WORKER_RECEIVER_THREAD_COUNT_SCHEMA = ConfigValidation.IntegerValidator;
     
     /**
      * How often this worker should heartbeat to the supervisor.
      */
     public static final String WORKER_HEARTBEAT_FREQUENCY_SECS = "worker.heartbeat.frequency.secs";
-    public static final Object WORKER_HEARTBEAT_FREQUENCY_SECS_SCHEMA = Number.class;
+    public static final Object WORKER_HEARTBEAT_FREQUENCY_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * How often a task should heartbeat its status to the master.
      */
     public static final String TASK_HEARTBEAT_FREQUENCY_SECS = "task.heartbeat.frequency.secs";
-    public static final Object TASK_HEARTBEAT_FREQUENCY_SECS_SCHEMA = Number.class;
+    public static final Object TASK_HEARTBEAT_FREQUENCY_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
 
     /**
@@ -501,7 +500,7 @@ public class Config extends HashMap<String, Object> {
      * come through.
      */
     public static final String TASK_REFRESH_POLL_SECS = "task.refresh.poll.secs";
-    public static final Object TASK_REFRESH_POLL_SECS_SCHEMA = Number.class;
+    public static final Object TASK_REFRESH_POLL_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
 
 
@@ -532,7 +531,7 @@ public class Config extends HashMap<String, Object> {
      * on each component in the topology to tune the performance of a topology.
      */
     public static final String TOPOLOGY_WORKERS = "topology.workers";
-    public static final Object TOPOLOGY_WORKERS_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_WORKERS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * How many instances to create for a spout/bolt. A task runs on a thread with zero or
more
@@ -543,7 +542,7 @@ public class Config extends HashMap<String, Object> {
      * guaranteeing that the same value goes to the same task).
      */
     public static final String TOPOLOGY_TASKS = "topology.tasks";
-    public static final Object TOPOLOGY_TASKS_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_TASKS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * How many executors to spawn for ackers.
@@ -552,7 +551,7 @@ public class Config extends HashMap<String, Object> {
      * as they come off the spout, effectively disabling reliability.</p>
      */
     public static final String TOPOLOGY_ACKER_EXECUTORS = "topology.acker.executors";
-    public static final Object TOPOLOGY_ACKER_EXECUTORS_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_ACKER_EXECUTORS_SCHEMA = ConfigValidation.IntegerValidator;
 
 
     /**
@@ -562,7 +561,7 @@ public class Config extends HashMap<String, Object> {
      * the message at a later time.
      */
     public static final String TOPOLOGY_MESSAGE_TIMEOUT_SECS = "topology.message.timeout.secs";
-    public static final Object TOPOLOGY_MESSAGE_TIMEOUT_SECS_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_MESSAGE_TIMEOUT_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * A list of serialization registrations for Kryo ( http://code.google.com/p/kryo/ ),
@@ -620,7 +619,7 @@ public class Config extends HashMap<String, Object> {
      * typically used in testing to limit the number of threads spawned in local mode.
      */
     public static final String TOPOLOGY_MAX_TASK_PARALLELISM="topology.max.task.parallelism";
-    public static final Object TOPOLOGY_MAX_TASK_PARALLELISM_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_MAX_TASK_PARALLELISM_SCHEMA = ConfigValidation.IntegerValidator;
 
 
     /**
@@ -632,7 +631,7 @@ public class Config extends HashMap<String, Object> {
      * their tuples with a message id.
      */
     public static final String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending"; 
-    public static final Object TOPOLOGY_MAX_SPOUT_PENDING_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_MAX_SPOUT_PENDING_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * A class that implements a strategy for what to do when a spout needs to wait. Waiting
is
@@ -648,26 +647,26 @@ public class Config extends HashMap<String, Object> {
      * The amount of milliseconds the SleepEmptyEmitStrategy should sleep for.
      */
     public static final String TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS="topology.sleep.spout.wait.strategy.time.ms";
-    public static final Object TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_SLEEP_SPOUT_WAIT_STRATEGY_TIME_MS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The maximum amount of time a component gives a source of state to synchronize before
it requests
      * synchronization again.
      */
     public static final String TOPOLOGY_STATE_SYNCHRONIZATION_TIMEOUT_SECS="topology.state.synchronization.timeout.secs";
-    public static final Object TOPOLOGY_STATE_SYNCHRONIZATION_TIMEOUT_SECS_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_STATE_SYNCHRONIZATION_TIMEOUT_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The percentage of tuples to sample to produce stats for a task.
      */
     public static final String TOPOLOGY_STATS_SAMPLE_RATE="topology.stats.sample.rate";
-    public static final Object TOPOLOGY_STATS_SAMPLE_RATE_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_STATS_SAMPLE_RATE_SCHEMA = ConfigValidation.DoubleValidator;
 
     /**
      * The time period that builtin metrics data in bucketed into. 
      */
     public static final String TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS="topology.builtin.metrics.bucket.size.secs";
-    public static final Object TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Whether or not to use Java serialization in a topology.
@@ -734,14 +733,14 @@ public class Config extends HashMap<String, Object> {
      * The size of the Disruptor transfer queue for each worker.
      */
     public static final String TOPOLOGY_TRANSFER_BUFFER_SIZE="topology.transfer.buffer.size";
-    public static final Object TOPOLOGY_TRANSFER_BUFFER_SIZE_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_TRANSFER_BUFFER_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
 
    /**
     * How often a tick tuple from the "__system" component and "__tick" stream should be
sent
     * to tasks. Meant to be used as a component-specific configuration.
     */
     public static final String TOPOLOGY_TICK_TUPLE_FREQ_SECS="topology.tick.tuple.freq.secs";
-    public static final Object TOPOLOGY_TICK_TUPLE_FREQ_SECS_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_TICK_TUPLE_FREQ_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
 
    /**
@@ -756,7 +755,7 @@ public class Config extends HashMap<String, Object> {
     * via the TopologyContext.
     */
     public static final String TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE="topology.worker.shared.thread.pool.size";
-    public static final Object TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The interval in seconds to use for determining whether to throttle error reported
to Zookeeper. For example, 
@@ -764,20 +763,20 @@ public class Config extends HashMap<String, Object> {
      * reported to Zookeeper per task for every 10 second interval of time.
      */
     public static final String TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS="topology.error.throttle.interval.secs";
-    public static final Object TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * See doc for TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS
      */
     public static final String TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL="topology.max.error.report.per.interval";
-    public static final Object TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL_SCHEMA = ConfigValidation.IntegerValidator;
 
 
     /**
      * How often a batch can be emitted in a Trident topology.
      */
     public static final String TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS="topology.trident.batch.emit.interval.millis";
-    public static final Object TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * Name of the topology. This config is automatically set by Storm when the topology
is submitted.
@@ -789,7 +788,7 @@ public class Config extends HashMap<String, Object> {
      * Max pending tuples in one ShellBolt
      */
     public static final String TOPOLOGY_SHELLBOLT_MAX_PENDING="topology.shellbolt.max.pending";
-    public static final Object TOPOLOGY_SHELLBOLT_MAX_PENDING_SCHEMA = Number.class;
+    public static final Object TOPOLOGY_SHELLBOLT_MAX_PENDING_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The root directory in ZooKeeper for metadata about TransactionalSpouts.
@@ -809,13 +808,13 @@ public class Config extends HashMap<String, Object> {
      * will use storm.zookeeper.port
      */
     public static final String TRANSACTIONAL_ZOOKEEPER_PORT="transactional.zookeeper.port";
-    public static final Object TRANSACTIONAL_ZOOKEEPER_PORT_SCHEMA = Number.class;
+    public static final Object TRANSACTIONAL_ZOOKEEPER_PORT_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The number of threads that should be used by the zeromq context in each worker process.
      */
     public static final String ZMQ_THREADS = "zmq.threads";
-    public static final Object ZMQ_THREADS_SCHEMA = Number.class;
+    public static final Object ZMQ_THREADS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * How long a connection should retry sending messages to a target host when
@@ -823,14 +822,14 @@ public class Config extends HashMap<String, Object> {
      * certainly be ignored.
      */
     public static final String ZMQ_LINGER_MILLIS = "zmq.linger.millis";
-    public static final Object ZMQ_LINGER_MILLIS_SCHEMA = Number.class;
+    public static final Object ZMQ_LINGER_MILLIS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * The high water for the ZeroMQ push sockets used for networking. Use this config to
prevent buffer explosion
      * on the networking layer.
      */
     public static final String ZMQ_HWM = "zmq.hwm";
-    public static final Object ZMQ_HWM_SCHEMA = Number.class;
+    public static final Object ZMQ_HWM_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
      * This value is passed to spawned JVMs (e.g., Nimbus, Supervisor, and Workers)

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/214ee745/storm-core/src/jvm/backtype/storm/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/ConfigValidation.java b/storm-core/src/jvm/backtype/storm/ConfigValidation.java
index 3accb82..156ccf8 100644
--- a/storm-core/src/jvm/backtype/storm/ConfigValidation.java
+++ b/storm-core/src/jvm/backtype/storm/ConfigValidation.java
@@ -82,6 +82,76 @@ public class ConfigValidation {
     public static Object MapsValidator = FieldListValidatorFactory(Map.class);
 
     /**
+     * Validates a Integer.
+     */
+    public static Object IntegerValidator = new FieldValidator() {
+        @Override
+        public void validateField(String name, Object o) throws IllegalArgumentException
{
+            if (o == null) {
+                // A null value is acceptable.
+                return;
+            }
+            final long i;
+            if (o instanceof Number &&
+                    (i = ((Number)o).longValue()) == ((Number)o).doubleValue()) {
+                if (i <= Integer.MAX_VALUE && i >= Integer.MIN_VALUE) {
+                    return;
+                }
+            }
+
+            throw new IllegalArgumentException("Field " + name + " must be an Integer within
type range.");
+        }
+    };
+
+    /**
+     * Validates is a list of Integers.
+     */
+    public static Object IntegersValidator = new FieldValidator() {
+        @Override
+        public void validateField(String name, Object field)
+                throws IllegalArgumentException {
+            if (field == null) {
+                // A null value is acceptable.
+                return;
+            }
+            if (field instanceof Iterable) {
+                for (Object o : (Iterable)field) {
+                    final long i;
+                    if (o instanceof Number &&
+                            ((i = ((Number)o).longValue()) == ((Number)o).doubleValue())
&&
+                            (i <= Integer.MAX_VALUE && i >= Integer.MIN_VALUE))
{
+                        // pass the test
+                    } else {
+                        throw new IllegalArgumentException(
+                                "Each element of the list " + name + " must be an Integer
within type range.");
+                    }
+                }
+                return;
+            }
+        }
+    };
+
+    /**
+     * Validates a Double.
+     */
+    public static Object DoubleValidator = new FieldValidator() {
+        @Override
+        public void validateField(String name, Object o) throws IllegalArgumentException
{
+            if (o == null) {
+                // A null value is acceptable.
+                return;
+            }
+
+            // we can provide a lenient way to convert int/long to double with losing some
precision
+            if (o instanceof Number) {
+                return;
+            }
+
+            throw new IllegalArgumentException("Field " + name + " must be an Double.");
+        }
+    };
+
+    /**
      * Validates a power of 2.
      */
     public static Object PowerOf2Validator = new FieldValidator() {

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/214ee745/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index 6a0a447..364b53f 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -305,7 +305,7 @@ public class Utils {
     public static Integer getInt(Object o) {
       Integer result = getInt(o, null);
       if (null == result) {
-        throw new IllegalArgumentException("Don't know how to convert null + to int");
+        throw new IllegalArgumentException("Don't know how to convert null to int");
       }
       return result;
     }
@@ -314,16 +314,19 @@ public class Utils {
       if (null == o) {
         return defaultValue;
       }
-      
-      if(o instanceof Long) {
-          return ((Long) o ).intValue();
-      } else if (o instanceof Integer) {
-          return (Integer) o;
-      } else if (o instanceof Short) {
-          return ((Short) o).intValue();
-      } else {
-          throw new IllegalArgumentException("Don't know how to convert " + o + " + to int");
+
+      if (o instanceof Integer ||
+          o instanceof Short ||
+          o instanceof Byte) {
+          return ((Number) o).intValue();
+      } else if (o instanceof Long) {
+          final long l = (Long) o;
+          if (l <= Integer.MAX_VALUE && l >= Integer.MIN_VALUE) {
+              return (int) l;
+          }
       }
+
+      throw new IllegalArgumentException("Don't know how to convert " + o + " to int");
     }
 
     public static boolean getBoolean(Object o, boolean defaultValue) {

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/214ee745/storm-core/test/clj/backtype/storm/config_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/config_test.clj b/storm-core/test/clj/backtype/storm/config_test.clj
index 01f788b..50970b6 100644
--- a/storm-core/test/clj/backtype/storm/config_test.clj
+++ b/storm-core/test/clj/backtype/storm/config_test.clj
@@ -65,13 +65,41 @@
                 (.validateField validator "test" x)
                 (catch Exception e e)))))))
 
-(deftest test-topology-workers-is-number
+(deftest test-integer-validator
+   (let [validator ConfigValidation/IntegerValidator]
+        (.validateField validator "test" 1000)
+        (is (thrown-cause? java.lang.IllegalArgumentException
+           (.validateField validator "test" 1.34)))
+        (is (thrown-cause? java.lang.IllegalArgumentException
+           (.validateField validator "test" 9223372036854775807)))))
+
+(deftest test-integers-validator
+  (let [validator ConfigValidation/IntegersValidator]
+    (.validateField validator "test" [1000 0 -1000])
+    (is (thrown-cause? java.lang.IllegalArgumentException
+          (.validateField validator "test" [0 10 1.34])))
+    (is (thrown-cause? java.lang.IllegalArgumentException
+          (.validateField validator "test" [-100 9223372036854775807])))))
+
+(deftest test-double-validator
+  (let [validator ConfigValidation/DoubleValidator]
+    (.validateField validator "test" 10)
+    ;; we can provide lenient way to convert int/long to double with losing precision
+    (.validateField validator "test" 2147483647)
+    (.validateField validator "test" 9223372036854775807)
+    (.validateField validator "test" 1.7976931348623157E308)))
+
+(deftest test-topology-workers-is-integer
   (let [validator (CONFIG-SCHEMA-MAP TOPOLOGY-WORKERS)]
     (.validateField validator "test" 42)
-    ;; The float can be rounded down to an int.
-    (.validateField validator "test" 3.14159)
     (is (thrown-cause? java.lang.IllegalArgumentException
-      (.validateField validator "test" "42")))))
+      (.validateField validator "test" 3.14159)))))
+
+(deftest test-topology-stats-sample-rate-is-float
+  (let [validator (CONFIG-SCHEMA-MAP TOPOLOGY-STATS-SAMPLE-RATE)]
+    (.validateField validator "test" 0.5)
+    (.validateField validator "test" 10)
+    (.validateField validator "test" 1.7976931348623157E308)))
 
 (deftest test-isolation-scheduler-machines-is-map
   (let [validator (CONFIG-SCHEMA-MAP ISOLATION-SCHEDULER-MACHINES)]


Mime
View raw message