storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/3] storm git commit: STORM-2484 Add support for configuring memory load and cpu load in Flux
Date Wed, 21 Jun 2017 23:38:58 GMT
Repository: storm
Updated Branches:
  refs/heads/1.x-branch dd206b663 -> 34ae82b1a


STORM-2484 Add support for configuring memory load and cpu load in Flux


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1891210f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1891210f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1891210f

Branch: refs/heads/1.x-branch
Commit: 1891210f39d2de5a8ab4be37ed6f78cf7b720359
Parents: 1352714
Author: Angus Helm <angus.helm@slalom.com>
Authored: Mon May 15 11:13:31 2017 -0500
Committer: Angus Helm <angus.helm@slalom.com>
Committed: Mon Jun 12 09:17:28 2017 -0500

----------------------------------------------------------------------
 .../java/org/apache/storm/flux/FluxBuilder.java | 34 ++++++++++++++++--
 .../org/apache/storm/flux/model/VertexDef.java  | 38 +++++++++++++++++++-
 .../src/test/resources/configs/tck.yaml         |  8 +++++
 3 files changed, 77 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1891210f/external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
b/external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
index e79dfb7..1758a37 100644
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java
@@ -18,12 +18,12 @@
 package org.apache.storm.flux;
 
 import org.apache.storm.Config;
+import org.apache.storm.flux.model.*;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.grouping.CustomStreamGrouping;
 import org.apache.storm.topology.*;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.utils.Utils;
-import org.apache.storm.flux.model.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -187,6 +187,21 @@ public class FluxBuilder {
                         boltObj.getClass().getName());
             }
 
+            BoltDef boltDef = topologyDef.getBoltDef(stream.getTo());
+            if (boltDef.getOnHeapMemoryLoad() > -1) {
+                if (boltDef.getOffHeapMemoryLoad() > -1) {
+                    declarer.setMemoryLoad(boltDef.getOnHeapMemoryLoad(), boltDef.getOffHeapMemoryLoad());
+                } else {
+                    declarer.setMemoryLoad(boltDef.getOnHeapMemoryLoad());
+                }
+            }
+            if (boltDef.getCpuLoad() > -1) {
+                declarer.setCPULoad(boltDef.getCpuLoad());
+            }
+            if (boltDef.getNumTasks() > -1) {
+                declarer.setNumTasks(boltDef.getNumTasks());
+            }
+
             GroupingDef grouping = stream.getGrouping();
             // if the streamId is defined, use it for the grouping, otherwise assume storm's
default stream
             String streamId = (grouping.getStreamId() == null ? Utils.DEFAULT_STREAM_ID :
grouping.getStreamId());
@@ -366,7 +381,22 @@ public class FluxBuilder {
             NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException,
NoSuchFieldException {
         for (SpoutDef sd : context.getTopologyDef().getSpouts()) {
             IRichSpout spout = buildSpout(sd, context);
-            builder.setSpout(sd.getId(), spout, sd.getParallelism());
+            SpoutDeclarer declarer = builder.setSpout(sd.getId(), spout, sd.getParallelism());
+
+            if (sd.getOnHeapMemoryLoad() > -1) {
+                if (sd.getOffHeapMemoryLoad() > -1) {
+                    declarer.setMemoryLoad(sd.getOnHeapMemoryLoad(), sd.getOffHeapMemoryLoad());
+                } else {
+                    declarer.setMemoryLoad(sd.getOnHeapMemoryLoad());
+                }
+            }
+            if (sd.getCpuLoad() > -1) {
+                declarer.setCPULoad(sd.getCpuLoad());
+            }
+            if (sd.getNumTasks() > -1) {
+                declarer.setNumTasks(sd.getNumTasks());
+            }
+
             context.addSpout(sd.getId(), spout);
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/1891210f/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java
b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java
index e71bcc2..8651819 100644
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/model/VertexDef.java
@@ -23,8 +23,12 @@ package org.apache.storm.flux.model;
  */
 public abstract class VertexDef extends BeanDef {
 
-    // default parallelism to 1 so if it's ommitted, the topology will still function.
+    // default parallelism to 1 so if it's omitted, the topology will still function.
     private int parallelism = 1;
+    private int numTasks = -1;
+    private int onHeapMemoryLoad = -1;
+    private int offHeapMemoryLoad = -1;
+    private int cpuLoad = -1;
 
     public int getParallelism() {
         return parallelism;
@@ -33,4 +37,36 @@ public abstract class VertexDef extends BeanDef {
     public void setParallelism(int parallelism) {
         this.parallelism = parallelism;
     }
+
+    public int getNumTasks() {
+        return numTasks;
+    }
+
+    public void setNumTasks(int numTasks) {
+        this.numTasks = numTasks;
+    }
+
+    public int getOnHeapMemoryLoad() {
+        return onHeapMemoryLoad;
+    }
+
+    public void setOnHeapMemoryLoad(int onHeapMemoryLoad) {
+        this.onHeapMemoryLoad = onHeapMemoryLoad;
+    }
+
+    public int getOffHeapMemoryLoad() {
+        return offHeapMemoryLoad;
+    }
+
+    public void setOffHeapMemoryLoad(int offHeapMemoryLoad) {
+        this.offHeapMemoryLoad = offHeapMemoryLoad;
+    }
+
+    public int getCpuLoad() {
+        return cpuLoad;
+    }
+
+    public void setCpuLoad(int cpuLoad) {
+        this.cpuLoad = cpuLoad;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/1891210f/external/flux/flux-core/src/test/resources/configs/tck.yaml
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/test/resources/configs/tck.yaml b/external/flux/flux-core/src/test/resources/configs/tck.yaml
index 5d40445..15d18ad 100644
--- a/external/flux/flux-core/src/test/resources/configs/tck.yaml
+++ b/external/flux/flux-core/src/test/resources/configs/tck.yaml
@@ -51,6 +51,10 @@ spouts:
   - id: "spout-1"
     className: "org.apache.storm.testing.TestWordSpout"
     parallelism: 1
+    numTasks: 5
+    onHeapMemoryLoad: 100
+    offHeapMemoryLoad: 100
+    cpuLoad: 100
     # ...
 
 # bolt definitions
@@ -58,6 +62,10 @@ bolts:
   - id: "bolt-1"
     className: "org.apache.storm.testing.TestWordCounter"
     parallelism: 1
+    numTasks: 100
+    onHeapMemoryLoad: 100
+    offHeapMemoryLoad: 100
+    cpuLoad: 100
     # ...
 
   - id: "bolt-2"


Mime
View raw message