storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [01/10] storm git commit: Adding dynamic profiling for worker
Date Wed, 04 Nov 2015 17:18:44 GMT
Repository: storm
Updated Branches:
  refs/heads/master f3568d73c -> f3ed08b9f


http://git-wip-us.apache.org/repos/asf/storm/blob/0c2021e6/storm-core/src/jvm/backtype/storm/generated/ProfileRequest.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/ProfileRequest.java b/storm-core/src/jvm/backtype/storm/generated/ProfileRequest.java
new file mode 100644
index 0000000..a3a98ae
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/generated/ProfileRequest.java
@@ -0,0 +1,631 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.2)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package backtype.storm.generated;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import javax.annotation.Generated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-8-24")
+public class ProfileRequest implements org.apache.thrift.TBase<ProfileRequest, ProfileRequest._Fields>, java.io.Serializable, Cloneable, Comparable<ProfileRequest> {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ProfileRequest");
+
+  private static final org.apache.thrift.protocol.TField NODE_INFO_FIELD_DESC = new org.apache.thrift.protocol.TField("nodeInfo", org.apache.thrift.protocol.TType.STRUCT, (short)1);
+  private static final org.apache.thrift.protocol.TField ACTION_FIELD_DESC = new org.apache.thrift.protocol.TField("action", org.apache.thrift.protocol.TType.I32, (short)2);
+  private static final org.apache.thrift.protocol.TField TIME_STAMP_FIELD_DESC = new org.apache.thrift.protocol.TField("time_stamp", org.apache.thrift.protocol.TType.I64, (short)3);
+
+  private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>();
+  static {
+    schemes.put(StandardScheme.class, new ProfileRequestStandardSchemeFactory());
+    schemes.put(TupleScheme.class, new ProfileRequestTupleSchemeFactory());
+  }
+
+  private NodeInfo nodeInfo; // required
+  private ProfileAction action; // required
+  private long time_stamp; // optional
+
+  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    NODE_INFO((short)1, "nodeInfo"),
+    /**
+     * 
+     * @see ProfileAction
+     */
+    ACTION((short)2, "action"),
+    TIME_STAMP((short)3, "time_stamp");
+
+    private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+    static {
+      for (_Fields field : EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // NODE_INFO
+          return NODE_INFO;
+        case 2: // ACTION
+          return ACTION;
+        case 3: // TIME_STAMP
+          return TIME_STAMP;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final String _fieldName;
+
+    _Fields(short thriftId, String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  private static final int __TIME_STAMP_ISSET_ID = 0;
+  private byte __isset_bitfield = 0;
+  private static final _Fields optionals[] = {_Fields.TIME_STAMP};
+  public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+  static {
+    Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.NODE_INFO, new org.apache.thrift.meta_data.FieldMetaData("nodeInfo", org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, NodeInfo.class)));
+    tmpMap.put(_Fields.ACTION, new org.apache.thrift.meta_data.FieldMetaData("action", org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM, ProfileAction.class)));
+    tmpMap.put(_Fields.TIME_STAMP, new org.apache.thrift.meta_data.FieldMetaData("time_stamp", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)));
+    metaDataMap = Collections.unmodifiableMap(tmpMap);
+    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ProfileRequest.class, metaDataMap);
+  }
+
+  public ProfileRequest() {
+  }
+
+  public ProfileRequest(
+    NodeInfo nodeInfo,
+    ProfileAction action)
+  {
+    this();
+    this.nodeInfo = nodeInfo;
+    this.action = action;
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public ProfileRequest(ProfileRequest other) {
+    __isset_bitfield = other.__isset_bitfield;
+    if (other.is_set_nodeInfo()) {
+      this.nodeInfo = new NodeInfo(other.nodeInfo);
+    }
+    if (other.is_set_action()) {
+      this.action = other.action;
+    }
+    this.time_stamp = other.time_stamp;
+  }
+
+  public ProfileRequest deepCopy() {
+    return new ProfileRequest(this);
+  }
+
+  @Override
+  public void clear() {
+    this.nodeInfo = null;
+    this.action = null;
+    set_time_stamp_isSet(false);
+    this.time_stamp = 0;
+  }
+
+  public NodeInfo get_nodeInfo() {
+    return this.nodeInfo;
+  }
+
+  public void set_nodeInfo(NodeInfo nodeInfo) {
+    this.nodeInfo = nodeInfo;
+  }
+
+  public void unset_nodeInfo() {
+    this.nodeInfo = null;
+  }
+
+  /** Returns true if field nodeInfo is set (has been assigned a value) and false otherwise */
+  public boolean is_set_nodeInfo() {
+    return this.nodeInfo != null;
+  }
+
+  public void set_nodeInfo_isSet(boolean value) {
+    if (!value) {
+      this.nodeInfo = null;
+    }
+  }
+
+  /**
+   * 
+   * @see ProfileAction
+   */
+  public ProfileAction get_action() {
+    return this.action;
+  }
+
+  /**
+   * 
+   * @see ProfileAction
+   */
+  public void set_action(ProfileAction action) {
+    this.action = action;
+  }
+
+  public void unset_action() {
+    this.action = null;
+  }
+
+  /** Returns true if field action is set (has been assigned a value) and false otherwise */
+  public boolean is_set_action() {
+    return this.action != null;
+  }
+
+  public void set_action_isSet(boolean value) {
+    if (!value) {
+      this.action = null;
+    }
+  }
+
+  public long get_time_stamp() {
+    return this.time_stamp;
+  }
+
+  public void set_time_stamp(long time_stamp) {
+    this.time_stamp = time_stamp;
+    set_time_stamp_isSet(true);
+  }
+
+  public void unset_time_stamp() {
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __TIME_STAMP_ISSET_ID);
+  }
+
+  /** Returns true if field time_stamp is set (has been assigned a value) and false otherwise */
+  public boolean is_set_time_stamp() {
+    return EncodingUtils.testBit(__isset_bitfield, __TIME_STAMP_ISSET_ID);
+  }
+
+  public void set_time_stamp_isSet(boolean value) {
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __TIME_STAMP_ISSET_ID, value);
+  }
+
+  public void setFieldValue(_Fields field, Object value) {
+    switch (field) {
+    case NODE_INFO:
+      if (value == null) {
+        unset_nodeInfo();
+      } else {
+        set_nodeInfo((NodeInfo)value);
+      }
+      break;
+
+    case ACTION:
+      if (value == null) {
+        unset_action();
+      } else {
+        set_action((ProfileAction)value);
+      }
+      break;
+
+    case TIME_STAMP:
+      if (value == null) {
+        unset_time_stamp();
+      } else {
+        set_time_stamp((Long)value);
+      }
+      break;
+
+    }
+  }
+
+  public Object getFieldValue(_Fields field) {
+    switch (field) {
+    case NODE_INFO:
+      return get_nodeInfo();
+
+    case ACTION:
+      return get_action();
+
+    case TIME_STAMP:
+      return Long.valueOf(get_time_stamp());
+
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new IllegalArgumentException();
+    }
+
+    switch (field) {
+    case NODE_INFO:
+      return is_set_nodeInfo();
+    case ACTION:
+      return is_set_action();
+    case TIME_STAMP:
+      return is_set_time_stamp();
+    }
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof ProfileRequest)
+      return this.equals((ProfileRequest)that);
+    return false;
+  }
+
+  public boolean equals(ProfileRequest that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_nodeInfo = true && this.is_set_nodeInfo();
+    boolean that_present_nodeInfo = true && that.is_set_nodeInfo();
+    if (this_present_nodeInfo || that_present_nodeInfo) {
+      if (!(this_present_nodeInfo && that_present_nodeInfo))
+        return false;
+      if (!this.nodeInfo.equals(that.nodeInfo))
+        return false;
+    }
+
+    boolean this_present_action = true && this.is_set_action();
+    boolean that_present_action = true && that.is_set_action();
+    if (this_present_action || that_present_action) {
+      if (!(this_present_action && that_present_action))
+        return false;
+      if (!this.action.equals(that.action))
+        return false;
+    }
+
+    boolean this_present_time_stamp = true && this.is_set_time_stamp();
+    boolean that_present_time_stamp = true && that.is_set_time_stamp();
+    if (this_present_time_stamp || that_present_time_stamp) {
+      if (!(this_present_time_stamp && that_present_time_stamp))
+        return false;
+      if (this.time_stamp != that.time_stamp)
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    List<Object> list = new ArrayList<Object>();
+
+    boolean present_nodeInfo = true && (is_set_nodeInfo());
+    list.add(present_nodeInfo);
+    if (present_nodeInfo)
+      list.add(nodeInfo);
+
+    boolean present_action = true && (is_set_action());
+    list.add(present_action);
+    if (present_action)
+      list.add(action.getValue());
+
+    boolean present_time_stamp = true && (is_set_time_stamp());
+    list.add(present_time_stamp);
+    if (present_time_stamp)
+      list.add(time_stamp);
+
+    return list.hashCode();
+  }
+
+  @Override
+  public int compareTo(ProfileRequest other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+
+    lastComparison = Boolean.valueOf(is_set_nodeInfo()).compareTo(other.is_set_nodeInfo());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_nodeInfo()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.nodeInfo, other.nodeInfo);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(is_set_action()).compareTo(other.is_set_action());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_action()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.action, other.action);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(is_set_time_stamp()).compareTo(other.is_set_time_stamp());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_time_stamp()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.time_stamp, other.time_stamp);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+    schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+    schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("ProfileRequest(");
+    boolean first = true;
+
+    sb.append("nodeInfo:");
+    if (this.nodeInfo == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.nodeInfo);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("action:");
+    if (this.action == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.action);
+    }
+    first = false;
+    if (is_set_time_stamp()) {
+      if (!first) sb.append(", ");
+      sb.append("time_stamp:");
+      sb.append(this.time_stamp);
+      first = false;
+    }
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+    if (!is_set_nodeInfo()) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'nodeInfo' is unset! Struct:" + toString());
+    }
+
+    if (!is_set_action()) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'action' is unset! Struct:" + toString());
+    }
+
+    // check for sub-struct validity
+    if (nodeInfo != null) {
+      nodeInfo.validate();
+    }
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+    try {
+      // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+      __isset_bitfield = 0;
+      read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private static class ProfileRequestStandardSchemeFactory implements SchemeFactory {
+    public ProfileRequestStandardScheme getScheme() {
+      return new ProfileRequestStandardScheme();
+    }
+  }
+
+  private static class ProfileRequestStandardScheme extends StandardScheme<ProfileRequest> {
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot, ProfileRequest struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TField schemeField;
+      iprot.readStructBegin();
+      while (true)
+      {
+        schemeField = iprot.readFieldBegin();
+        if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+          break;
+        }
+        switch (schemeField.id) {
+          case 1: // NODE_INFO
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) {
+              struct.nodeInfo = new NodeInfo();
+              struct.nodeInfo.read(iprot);
+              struct.set_nodeInfo_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 2: // ACTION
+            if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
+              struct.action = backtype.storm.generated.ProfileAction.findByValue(iprot.readI32());
+              struct.set_action_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          case 3: // TIME_STAMP
+            if (schemeField.type == org.apache.thrift.protocol.TType.I64) {
+              struct.time_stamp = iprot.readI64();
+              struct.set_time_stamp_isSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
+          default:
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+      struct.validate();
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot, ProfileRequest struct) throws org.apache.thrift.TException {
+      struct.validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      if (struct.nodeInfo != null) {
+        oprot.writeFieldBegin(NODE_INFO_FIELD_DESC);
+        struct.nodeInfo.write(oprot);
+        oprot.writeFieldEnd();
+      }
+      if (struct.action != null) {
+        oprot.writeFieldBegin(ACTION_FIELD_DESC);
+        oprot.writeI32(struct.action.getValue());
+        oprot.writeFieldEnd();
+      }
+      if (struct.is_set_time_stamp()) {
+        oprot.writeFieldBegin(TIME_STAMP_FIELD_DESC);
+        oprot.writeI64(struct.time_stamp);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+  }
+
+  private static class ProfileRequestTupleSchemeFactory implements SchemeFactory {
+    public ProfileRequestTupleScheme getScheme() {
+      return new ProfileRequestTupleScheme();
+    }
+  }
+
+  private static class ProfileRequestTupleScheme extends TupleScheme<ProfileRequest> {
+
+    @Override
+    public void write(org.apache.thrift.protocol.TProtocol prot, ProfileRequest struct) throws org.apache.thrift.TException {
+      TTupleProtocol oprot = (TTupleProtocol) prot;
+      struct.nodeInfo.write(oprot);
+      oprot.writeI32(struct.action.getValue());
+      BitSet optionals = new BitSet();
+      if (struct.is_set_time_stamp()) {
+        optionals.set(0);
+      }
+      oprot.writeBitSet(optionals, 1);
+      if (struct.is_set_time_stamp()) {
+        oprot.writeI64(struct.time_stamp);
+      }
+    }
+
+    @Override
+    public void read(org.apache.thrift.protocol.TProtocol prot, ProfileRequest struct) throws org.apache.thrift.TException {
+      TTupleProtocol iprot = (TTupleProtocol) prot;
+      struct.nodeInfo = new NodeInfo();
+      struct.nodeInfo.read(iprot);
+      struct.set_nodeInfo_isSet(true);
+      struct.action = backtype.storm.generated.ProfileAction.findByValue(iprot.readI32());
+      struct.set_action_isSet(true);
+      BitSet incoming = iprot.readBitSet(1);
+      if (incoming.get(0)) {
+        struct.time_stamp = iprot.readI64();
+        struct.set_time_stamp_isSet(true);
+      }
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2021e6/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
index e856578..a2549a5 100644
--- a/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
+++ b/storm-core/src/jvm/backtype/storm/security/auth/authorizer/SimpleACLAuthorizer.java
@@ -57,6 +57,14 @@ public class SimpleACLAuthorizer implements IAuthorizer {
             "getComponentPageInfo",
             "uploadNewCredentials",
             "setLogConfig",
+            "setWorkerProfiler",
+            "getWorkerProfileActionExpiry",
+            "getComponentPendingProfileActions",
+            "startProfiling",
+            "stopProfiling",
+            "dumpProfile",
+            "dumpJstack",
+            "dumpHeap",
             "getLogConfig"));
 
     protected Set<String> _admins;

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2021e6/storm-core/src/native/worker-launcher/impl/main.c
----------------------------------------------------------------------
diff --git a/storm-core/src/native/worker-launcher/impl/main.c b/storm-core/src/native/worker-launcher/impl/main.c
index 7067cf9..a51f9f9 100644
--- a/storm-core/src/native/worker-launcher/impl/main.c
+++ b/storm-core/src/native/worker-launcher/impl/main.c
@@ -47,6 +47,7 @@ void display_usage(FILE *stream) {
   fprintf(stream, "   initialize stormdist dir: code-dir <code-directory>\n");
   fprintf(stream, "   remove a file/directory: rmr <directory>\n");
   fprintf(stream, "   launch a worker: worker <working-directory> <script-to-run>\n");
+  fprintf(stream, "   launch a profiler: profiler <working-directory> <script-to-run>\n");
   fprintf(stream, "   signal a worker: signal <pid> <signal>\n");
 }
 
@@ -176,6 +177,15 @@ int main(int argc, char **argv) {
     if (exit_code == 0) {
       exit_code = exec_as_user(working_dir, argv[optind]);
     }
+   } else if (strcasecmp("profiler", command) == 0) {
+    if (argc != 5) {
+      fprintf(ERRORFILE, "Incorrect number of arguments (%d vs 5) for profiler\n",
+	      argc);
+      fflush(ERRORFILE);
+      return INVALID_ARGUMENT_NUMBER;
+    }
+    working_dir = argv[optind++];
+    exit_code = exec_as_user(working_dir, argv[optind]);
   } else if (strcasecmp("signal", command) == 0) {
     if (argc != 5) {
       fprintf(ERRORFILE, "Incorrect number of arguments (%d vs 5) for signal\n",

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2021e6/storm-core/src/native/worker-launcher/impl/worker-launcher.c
----------------------------------------------------------------------
diff --git a/storm-core/src/native/worker-launcher/impl/worker-launcher.c b/storm-core/src/native/worker-launcher/impl/worker-launcher.c
index 3a6c4b8..1c4b36a 100644
--- a/storm-core/src/native/worker-launcher/impl/worker-launcher.c
+++ b/storm-core/src/native/worker-launcher/impl/worker-launcher.c
@@ -743,6 +743,53 @@ int exec_as_user(const char * working_dir, const char * script_file) {
   return -1;
 }
 
+int fork_as_user(const char * working_dir, const char * script_file) {
+  char *script_file_dest = NULL;
+  script_file_dest = get_container_launcher_file(working_dir);
+  if (script_file_dest == NULL) {
+    return OUT_OF_MEMORY;
+  }
+
+  // open launch script
+  int script_file_source = open_file_as_nm(script_file);
+  if (script_file_source == -1) {
+    return -1;
+  }
+
+  setsid();
+
+  // give up root privs
+  if (change_user(user_detail->pw_uid, user_detail->pw_gid) != 0) {
+    return SETUID_OPER_FAILED;
+  }
+
+  if (copy_file(script_file_source, script_file, script_file_dest, S_IRWXU) != 0) {
+    return -1;
+  }
+
+  fcloseall();
+  umask(0027);
+  if (chdir(working_dir) != 0) {
+    fprintf(LOGFILE, "Can't change directory to %s -%s\n", working_dir,
+	    strerror(errno));
+    return -1;
+  }
+
+  int pid = fork();
+  if (pid == 0 && execlp(script_file_dest, script_file_dest, NULL) != 0) {
+    fprintf(LOGFILE, "Couldn't execute the container launch file %s - %s",
+            script_file_dest, strerror(errno));
+    return UNABLE_TO_EXECUTE_CONTAINER_SCRIPT;
+  } else {
+    fprintf(LOGFILE, "Launched the process from the container launch file %s - with pid %d",
+            script_file_dest, pid);
+    return 0;
+  }
+
+  //Unreachable
+  return -1;
+}
+
 /**
  * Delete the given directory as the user from each of the directories
  * user: the user doing the delete

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2021e6/storm-core/src/native/worker-launcher/impl/worker-launcher.h
----------------------------------------------------------------------
diff --git a/storm-core/src/native/worker-launcher/impl/worker-launcher.h b/storm-core/src/native/worker-launcher/impl/worker-launcher.h
index 59ab998..3b1ec24 100644
--- a/storm-core/src/native/worker-launcher/impl/worker-launcher.h
+++ b/storm-core/src/native/worker-launcher/impl/worker-launcher.h
@@ -70,6 +70,8 @@ int setup_stormdist_dir(const char* local_dir);
 
 int exec_as_user(const char * working_dir, const char * args);
 
+int fork_as_user(const char * working_dir, const char * args);
+
 // delete a directory (or file) recursively as the user. The directory
 // could optionally be relative to the baseDir set of directories (if the same
 // directory appears on multiple disk volumes, the disk volumes should be passed

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2021e6/storm-core/src/py/storm/Nimbus-remote
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/Nimbus-remote b/storm-core/src/py/storm/Nimbus-remote
index d05a4b2..14acdc9 100644
--- a/storm-core/src/py/storm/Nimbus-remote
+++ b/storm-core/src/py/storm/Nimbus-remote
@@ -52,6 +52,8 @@ if len(sys.argv) <= 1 or sys.argv[1] == '--help':
   print('  void setLogConfig(string name, LogConfig config)')
   print('  LogConfig getLogConfig(string name)')
   print('  void debug(string name, string component, bool enable, double samplingPercentage)')
+  print('  void setWorkerProfiler(string id, ProfileRequest profileRequest)')
+  print('   getComponentPendingProfileActions(string id, string component_id, ProfileAction action)')
   print('  void uploadNewCredentials(string name, Credentials creds)')
   print('  string beginFileUpload()')
   print('  void uploadChunk(string location, string chunk)')
@@ -183,6 +185,18 @@ elif cmd == 'debug':
     sys.exit(1)
   pp.pprint(client.debug(args[0],args[1],eval(args[2]),eval(args[3]),))
 
+elif cmd == 'setWorkerProfiler':
+  if len(args) != 2:
+    print('setWorkerProfiler requires 2 args')
+    sys.exit(1)
+  pp.pprint(client.setWorkerProfiler(args[0],eval(args[1]),))
+
+elif cmd == 'getComponentPendingProfileActions':
+  if len(args) != 3:
+    print('getComponentPendingProfileActions requires 3 args')
+    sys.exit(1)
+  pp.pprint(client.getComponentPendingProfileActions(args[0],args[1],eval(args[2]),))
+
 elif cmd == 'uploadNewCredentials':
   if len(args) != 2:
     print('uploadNewCredentials requires 2 args')

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2021e6/storm-core/src/py/storm/Nimbus.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/Nimbus.py b/storm-core/src/py/storm/Nimbus.py
index 3c26a16..a446654 100644
--- a/storm-core/src/py/storm/Nimbus.py
+++ b/storm-core/src/py/storm/Nimbus.py
@@ -125,6 +125,23 @@ class Iface:
     """
     pass
 
+  def setWorkerProfiler(self, id, profileRequest):
+    """
+    Parameters:
+     - id
+     - profileRequest
+    """
+    pass
+
+  def getComponentPendingProfileActions(self, id, component_id, action):
+    """
+    Parameters:
+     - id
+     - component_id
+     - action
+    """
+    pass
+
   def uploadNewCredentials(self, name, creds):
     """
     Parameters:
@@ -600,6 +617,72 @@ class Client(Iface):
       raise result.aze
     return
 
+  def setWorkerProfiler(self, id, profileRequest):
+    """
+    Parameters:
+     - id
+     - profileRequest
+    """
+    self.send_setWorkerProfiler(id, profileRequest)
+    self.recv_setWorkerProfiler()
+
+  def send_setWorkerProfiler(self, id, profileRequest):
+    self._oprot.writeMessageBegin('setWorkerProfiler', TMessageType.CALL, self._seqid)
+    args = setWorkerProfiler_args()
+    args.id = id
+    args.profileRequest = profileRequest
+    args.write(self._oprot)
+    self._oprot.writeMessageEnd()
+    self._oprot.trans.flush()
+
+  def recv_setWorkerProfiler(self):
+    iprot = self._iprot
+    (fname, mtype, rseqid) = iprot.readMessageBegin()
+    if mtype == TMessageType.EXCEPTION:
+      x = TApplicationException()
+      x.read(iprot)
+      iprot.readMessageEnd()
+      raise x
+    result = setWorkerProfiler_result()
+    result.read(iprot)
+    iprot.readMessageEnd()
+    return
+
+  def getComponentPendingProfileActions(self, id, component_id, action):
+    """
+    Parameters:
+     - id
+     - component_id
+     - action
+    """
+    self.send_getComponentPendingProfileActions(id, component_id, action)
+    return self.recv_getComponentPendingProfileActions()
+
+  def send_getComponentPendingProfileActions(self, id, component_id, action):
+    self._oprot.writeMessageBegin('getComponentPendingProfileActions', TMessageType.CALL, self._seqid)
+    args = getComponentPendingProfileActions_args()
+    args.id = id
+    args.component_id = component_id
+    args.action = action
+    args.write(self._oprot)
+    self._oprot.writeMessageEnd()
+    self._oprot.trans.flush()
+
+  def recv_getComponentPendingProfileActions(self):
+    iprot = self._iprot
+    (fname, mtype, rseqid) = iprot.readMessageBegin()
+    if mtype == TMessageType.EXCEPTION:
+      x = TApplicationException()
+      x.read(iprot)
+      iprot.readMessageEnd()
+      raise x
+    result = getComponentPendingProfileActions_result()
+    result.read(iprot)
+    iprot.readMessageEnd()
+    if result.success is not None:
+      return result.success
+    raise TApplicationException(TApplicationException.MISSING_RESULT, "getComponentPendingProfileActions failed: unknown result");
+
   def uploadNewCredentials(self, name, creds):
     """
     Parameters:
@@ -1127,6 +1210,8 @@ class Processor(Iface, TProcessor):
     self._processMap["setLogConfig"] = Processor.process_setLogConfig
     self._processMap["getLogConfig"] = Processor.process_getLogConfig
     self._processMap["debug"] = Processor.process_debug
+    self._processMap["setWorkerProfiler"] = Processor.process_setWorkerProfiler
+    self._processMap["getComponentPendingProfileActions"] = Processor.process_getComponentPendingProfileActions
     self._processMap["uploadNewCredentials"] = Processor.process_uploadNewCredentials
     self._processMap["beginFileUpload"] = Processor.process_beginFileUpload
     self._processMap["uploadChunk"] = Processor.process_uploadChunk
@@ -1314,6 +1399,28 @@ class Processor(Iface, TProcessor):
     oprot.writeMessageEnd()
     oprot.trans.flush()
 
+  def process_setWorkerProfiler(self, seqid, iprot, oprot):
+    args = setWorkerProfiler_args()
+    args.read(iprot)
+    iprot.readMessageEnd()
+    result = setWorkerProfiler_result()
+    self._handler.setWorkerProfiler(args.id, args.profileRequest)
+    oprot.writeMessageBegin("setWorkerProfiler", TMessageType.REPLY, seqid)
+    result.write(oprot)
+    oprot.writeMessageEnd()
+    oprot.trans.flush()
+
+  def process_getComponentPendingProfileActions(self, seqid, iprot, oprot):
+    args = getComponentPendingProfileActions_args()
+    args.read(iprot)
+    iprot.readMessageEnd()
+    result = getComponentPendingProfileActions_result()
+    result.success = self._handler.getComponentPendingProfileActions(args.id, args.component_id, args.action)
+    oprot.writeMessageBegin("getComponentPendingProfileActions", TMessageType.REPLY, seqid)
+    result.write(oprot)
+    oprot.writeMessageEnd()
+    oprot.trans.flush()
+
   def process_uploadNewCredentials(self, seqid, iprot, oprot):
     args = uploadNewCredentials_args()
     args.read(iprot)
@@ -3163,6 +3270,295 @@ class debug_result:
   def __ne__(self, other):
     return not (self == other)
 
+class setWorkerProfiler_args:
+  """
+  Attributes:
+   - id
+   - profileRequest
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRING, 'id', None, None, ), # 1
+    (2, TType.STRUCT, 'profileRequest', (ProfileRequest, ProfileRequest.thrift_spec), None, ), # 2
+  )
+
+  def __init__(self, id=None, profileRequest=None,):
+    self.id = id
+    self.profileRequest = profileRequest
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRING:
+          self.id = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.STRUCT:
+          self.profileRequest = ProfileRequest()
+          self.profileRequest.read(iprot)
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('setWorkerProfiler_args')
+    if self.id is not None:
+      oprot.writeFieldBegin('id', TType.STRING, 1)
+      oprot.writeString(self.id.encode('utf-8'))
+      oprot.writeFieldEnd()
+    if self.profileRequest is not None:
+      oprot.writeFieldBegin('profileRequest', TType.STRUCT, 2)
+      self.profileRequest.write(oprot)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.id)
+    value = (value * 31) ^ hash(self.profileRequest)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class setWorkerProfiler_result:
+
+  thrift_spec = (
+  )
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('setWorkerProfiler_result')
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    return
+
+
+  def __hash__(self):
+    value = 17
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class getComponentPendingProfileActions_args:
+  """
+  Attributes:
+   - id
+   - component_id
+   - action
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRING, 'id', None, None, ), # 1
+    (2, TType.STRING, 'component_id', None, None, ), # 2
+    (3, TType.I32, 'action', None, None, ), # 3
+  )
+
+  def __init__(self, id=None, component_id=None, action=None,):
+    self.id = id
+    self.component_id = component_id
+    self.action = action
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRING:
+          self.id = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.STRING:
+          self.component_id = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      elif fid == 3:
+        if ftype == TType.I32:
+          self.action = iprot.readI32();
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('getComponentPendingProfileActions_args')
+    if self.id is not None:
+      oprot.writeFieldBegin('id', TType.STRING, 1)
+      oprot.writeString(self.id.encode('utf-8'))
+      oprot.writeFieldEnd()
+    if self.component_id is not None:
+      oprot.writeFieldBegin('component_id', TType.STRING, 2)
+      oprot.writeString(self.component_id.encode('utf-8'))
+      oprot.writeFieldEnd()
+    if self.action is not None:
+      oprot.writeFieldBegin('action', TType.I32, 3)
+      oprot.writeI32(self.action)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.id)
+    value = (value * 31) ^ hash(self.component_id)
+    value = (value * 31) ^ hash(self.action)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class getComponentPendingProfileActions_result:
+  """
+  Attributes:
+   - success
+  """
+
+  thrift_spec = (
+    (0, TType.LIST, 'success', (TType.STRUCT,(ProfileRequest, ProfileRequest.thrift_spec)), None, ), # 0
+  )
+
+  def __init__(self, success=None,):
+    self.success = success
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 0:
+        if ftype == TType.LIST:
+          self.success = []
+          (_etype599, _size596) = iprot.readListBegin()
+          for _i600 in xrange(_size596):
+            _elem601 = ProfileRequest()
+            _elem601.read(iprot)
+            self.success.append(_elem601)
+          iprot.readListEnd()
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('getComponentPendingProfileActions_result')
+    if self.success is not None:
+      oprot.writeFieldBegin('success', TType.LIST, 0)
+      oprot.writeListBegin(TType.STRUCT, len(self.success))
+      for iter602 in self.success:
+        iter602.write(oprot)
+      oprot.writeListEnd()
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.success)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
 class uploadNewCredentials_args:
   """
   Attributes:

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2021e6/storm-core/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/ttypes.py b/storm-core/src/py/storm/ttypes.py
index 7f8adea..a14d62d 100644
--- a/storm-core/src/py/storm/ttypes.py
+++ b/storm-core/src/py/storm/ttypes.py
@@ -99,6 +99,32 @@ class NumErrorsChoice:
     "ONE": 2,
   }
 
+class ProfileAction:
+  JPROFILE_STOP = 0
+  JPROFILE_START = 1
+  JPROFILE_DUMP = 2
+  JMAP_DUMP = 3
+  JSTACK_DUMP = 4
+  JVM_RESTART = 5
+
+  _VALUES_TO_NAMES = {
+    0: "JPROFILE_STOP",
+    1: "JPROFILE_START",
+    2: "JPROFILE_DUMP",
+    3: "JMAP_DUMP",
+    4: "JSTACK_DUMP",
+    5: "JVM_RESTART",
+  }
+
+  _NAMES_TO_VALUES = {
+    "JPROFILE_STOP": 0,
+    "JPROFILE_START": 1,
+    "JPROFILE_DUMP": 2,
+    "JMAP_DUMP": 3,
+    "JSTACK_DUMP": 4,
+    "JVM_RESTART": 5,
+  }
+
 class LogLevelAction:
   UNCHANGED = 1
   UPDATE = 2
@@ -8553,6 +8579,102 @@ class LSWorkerHeartbeat:
   def __ne__(self, other):
     return not (self == other)
 
+class ProfileRequest:
+  """
+  Attributes:
+   - nodeInfo
+   - action
+   - time_stamp
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRUCT, 'nodeInfo', (NodeInfo, NodeInfo.thrift_spec), None, ), # 1
+    (2, TType.I32, 'action', None, None, ), # 2
+    (3, TType.I64, 'time_stamp', None, None, ), # 3
+  )
+
+  def __init__(self, nodeInfo=None, action=None, time_stamp=None,):
+    self.nodeInfo = nodeInfo
+    self.action = action
+    self.time_stamp = time_stamp
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRUCT:
+          self.nodeInfo = NodeInfo()
+          self.nodeInfo.read(iprot)
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.I32:
+          self.action = iprot.readI32();
+        else:
+          iprot.skip(ftype)
+      elif fid == 3:
+        if ftype == TType.I64:
+          self.time_stamp = iprot.readI64();
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('ProfileRequest')
+    if self.nodeInfo is not None:
+      oprot.writeFieldBegin('nodeInfo', TType.STRUCT, 1)
+      self.nodeInfo.write(oprot)
+      oprot.writeFieldEnd()
+    if self.action is not None:
+      oprot.writeFieldBegin('action', TType.I32, 2)
+      oprot.writeI32(self.action)
+      oprot.writeFieldEnd()
+    if self.time_stamp is not None:
+      oprot.writeFieldBegin('time_stamp', TType.I64, 3)
+      oprot.writeI64(self.time_stamp)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.nodeInfo is None:
+      raise TProtocol.TProtocolException(message='Required field nodeInfo is unset!')
+    if self.action is None:
+      raise TProtocol.TProtocolException(message='Required field action is unset!')
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.nodeInfo)
+    value = (value * 31) ^ hash(self.action)
+    value = (value * 31) ^ hash(self.time_stamp)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
 class GetInfoOptions:
   """
   Attributes:

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2021e6/storm-core/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-core/src/storm.thrift b/storm-core/src/storm.thrift
index 3c07cac..51e1236 100644
--- a/storm-core/src/storm.thrift
+++ b/storm-core/src/storm.thrift
@@ -467,6 +467,21 @@ enum NumErrorsChoice {
   ONE
 }
 
+enum ProfileAction {
+  JPROFILE_STOP,
+  JPROFILE_START,
+  JPROFILE_DUMP,
+  JMAP_DUMP,
+  JSTACK_DUMP,
+  JVM_RESTART
+}
+
+struct ProfileRequest {
+  1: required NodeInfo nodeInfo,
+  2: required ProfileAction action,
+  3: optional i64 time_stamp; 
+}
+
 struct GetInfoOptions {
   1: optional NumErrorsChoice num_err_choice;
 }
@@ -523,6 +538,11 @@ service Nimbus {
   * The 'samplingPercentage' will limit loggging to a percentage of generated tuples.
   **/
   void debug(1: string name, 2: string component, 3: bool enable, 4: double samplingPercentage) throws (1: NotAliveException e, 2: AuthorizationException aze);
+
+  // dynamic profile actions
+  void setWorkerProfiler(1: string id, 2: ProfileRequest  profileRequest);
+  list<ProfileRequest> getComponentPendingProfileActions(1: string id, 2: string component_id, 3: ProfileAction action);
+
   void uploadNewCredentials(1: string name, 2: Credentials creds) throws (1: NotAliveException e, 2: InvalidTopologyException ite, 3: AuthorizationException aze);
 
   // need to add functions for asking about status of storms, what nodes they're running on, looking at task logs

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2021e6/storm-core/src/ui/public/component.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/component.html b/storm-core/src/ui/public/component.html
index 4be5860..60a85cf 100644
--- a/storm-core/src/ui/public/component.html
+++ b/storm-core/src/ui/public/component.html
@@ -58,6 +58,9 @@
     <div id="component-output-stats" class="col-md-12"></div>
   </div>
   <div class="row">
+    <div id="profiler-control" class="col-md-12"></div>
+  </div>
+  <div class="row">
     <div id="component-executor-stats" class="col-md-12"></div>
   </div>
   <div class="row">
@@ -76,6 +79,15 @@ $(document).ajaxStop($.unblockUI);
 $(document).ajaxStart(function(){
     $.blockUI({ message: '<img src="images/spinner.gif" /> <h3>Loading component summary...</h3>'});
 });
+function jsError(other) {
+    try {
+      other();
+    } catch (err) {
+      $.get("/templates/json-error-template.html", function(template) {
+        $("#json-response-error").append(Mustache.render($(template).filter("#json-error-template").html(),{error: "JS Error", errorMessage: err}));
+      });
+    }
+}
 $(document).ready(function() {
     var componentId = $.url("?id");
     var topologyId = $.url("?topology_id");
@@ -121,11 +133,45 @@ $(document).ready(function() {
         var componentStatsDetail = $("#component-stats-detail")
         var inputStats = $("#component-input-stats");
         var outputStats = $("#component-output-stats");
+        var profilerControl = $("#profiler-control");
         var executorStats = $("#component-executor-stats");
         var componentErrors = $("#component-errors");
         $.get("/templates/component-page-template.html", function(template) {
-            componentSummary.append(Mustache.render($(template).filter("#component-summary-template").html(),response));
-            componentActions.append(Mustache.render($(template).filter("#component-actions-template").html(),buttonJsonData));
+            response["hosts"] = [];
+            for(var comp_index in response["executorStats"]) {
+                var comp = response["executorStats"][comp_index];
+                var host_port = comp["host"] + ":" + comp["port"];
+                if($.inArray(host_port, response["hosts"]) == -1) {
+                    response["hosts"].push(host_port);
+                }
+            }
+
+            response["hosts"] = $.map(response["hosts"], function(host_port) {
+                return {"name": host_port};
+            });
+
+            response["profilerActive"] = $.map(response["profilerActive"], function(active_map) {
+                var date = new Date();
+                var millis = date.getTime() + parseInt(active_map["timestamp"]);
+                date = new Date(millis);
+                active_map["timestamp"] = date.toTimeString();
+                return active_map;
+            });
+
+            jsError(function() {
+              componentSummary.append(Mustache.render($(template).filter("#component-summary-template").html(),response));
+            });
+
+            jsError(function() {
+              componentActions.append(Mustache.render($(template).filter("#component-actions-template").html(),buttonJsonData));
+            });
+
+            jsError(function() {
+                var part = $(template).filter('#profiler-active-partial').html();
+                var partials = {"profilerActive": part};
+                profilerControl.append(Mustache.render($(template).filter("#profiling-template").html(), response, partials));
+            });
+
             if(response["componentType"] == "spout") {
                 componentStatsDetail.append(Mustache.render($(template).filter("#spout-stats-detail-template").html(),response));
                 //window, emitted, transferred, complete latency, acked, failed
@@ -213,6 +259,121 @@ $(document).ready(function() {
         });
     });
 });
+
+function profiler_selected_worker() {
+    return $("#selected_worker").val();
+}
+
+function start_profiling(id) {
+    var topologyId = $.url("?topology_id");
+    var timeout = $("#timeout").val();
+
+    if(timeout == "") { timeout = 10; }
+    if(isNaN(parseFloat(timeout)) || !isFinite(timeout)) {
+        alert("Must specify a numeric timeout");
+        return;
+    }
+
+    var url = "/api/v1/topology/"+topologyId+"/profiling/start/" + id + "/" + timeout;
+    $.get(url, function(response,status,jqXHR) {
+        jsError(function() {
+            $.get("/templates/component-page-template.html", function(template) {
+                var host_port_split = id.split(":");
+                var host = host_port_split[0];
+                var port = host_port_split[1];
+                var millis = new Date().getTime() + (timeout * 60000);
+                var timestamp = new Date(millis).toTimeString();
+                
+                var mustache = Mustache.render($(template).filter("#profiler-active-partial").html(), {"profilerActive": [{
+                    "host": host,
+                    "port": port,
+                    "timestamp": timestamp,
+                    "dumplink": response["dumplink"]}]});
+                $("#profiler-table-body").append(mustache);
+            });
+        });
+    })
+    .fail(function(response) {
+        alert( "Starting profiler for " + id + " failed: \n" + JSON.stringify(response));
+    });
+}
+
+function stop_profiling(id) {
+    var topologyId = $.url("?topology_id");
+    var url = "/api/v1/topology/"+topologyId+"/profiling/stop/" + id;
+
+    $("#stop_" + id).prop('disabled', true);
+    setTimeout(function(){ $("#stop_" + id).prop('disabled', false); }, 5000);
+    
+    $.get(url, function(response,status,jqXHR) {
+        alert("Submitted request to stop profiling...");
+    })
+    .fail(function(response) {
+        alert( "Stopping profiler for " + id + " failed: \n" + JSON.stringify(response));
+    });
+    
+}
+
+function dump_profile(id) {
+    var topologyId = $.url("?topology_id");
+    var url = "/api/v1/topology/"+topologyId+"/profiling/dumpprofile/" + id;
+
+    $("#dump_profile_" + id).prop('disabled', true);
+    setTimeout(function(){ $("#dump_profile_" + id).prop('disabled', false); }, 5000);
+    
+    $.get(url, function(response,status,jqXHR) {
+        alert("Submitted request to dump profile snapshot...");
+    })
+    .fail(function(response) {
+        alert( "Dumping profile data for " + id + " failed: \n" + JSON.stringify(response));
+    });
+}
+
+function dump_jstack(id) {
+    var topologyId = $.url("?topology_id");
+    var url = "/api/v1/topology/"+topologyId+"/profiling/dumpjstack/" + id;
+
+    $("#dump_jstack_" + id).prop('disabled', true);
+    setTimeout(function(){ $("#dump_jstack_" + id).prop('disabled', false); }, 5000);
+    
+    $.get(url, function(response,status,jqXHR) {
+        alert("Submitted request for jstack dump...");
+    })
+    .fail(function(response) {
+        alert( "Dumping JStack for " + id + " failed: \n" + JSON.stringify(response));
+    });
+}
+
+function restart_worker_jvm(id) {
+    var topologyId = $.url("?topology_id");
+    var url = "/api/v1/topology/"+topologyId+"/profiling/restartworker/" + id;
+
+    $("#restart_worker_jvm_" + id).prop('disabled', true);
+    setTimeout(function(){ $("#restart_worker_jvm_" + id).prop('disabled', false); }, 5000);
+
+    $.get(url, function(response,status,jqXHR) {
+        alert("Submitted request for restarting worker...");
+    })
+    .fail(function(response) {
+        alert( "Failed to restart worker for " + id + " failed: \n" + JSON.stringify(response));
+    });
+}
+
+function dump_heap(id) {
+    var topologyId = $.url("?topology_id");
+    var url = "/api/v1/topology/"+topologyId+"/profiling/dumpheap/" + id;
+    var heap = $("#dump_heap_" + id);
+    $("#dump_heap_" + id).prop('disabled', true);
+    setTimeout(function(){ $("#dump_heap_" + id).prop('disabled', false); }, 5000);
+    
+    $.get(url, function(response,status,jqXHR) {
+        alert("Submitted request for jmap dump...");
+    })
+    .fail(function(response) {
+        alert( "Dumping Heap for " + id + " failed: \n" + JSON.stringify(response));
+    });
+}
+
 </script>
 </div>
 </body>

http://git-wip-us.apache.org/repos/asf/storm/blob/0c2021e6/storm-core/src/ui/public/templates/component-page-template.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/templates/component-page-template.html b/storm-core/src/ui/public/templates/component-page-template.html
index 7a4c894..f21b30d 100644
--- a/storm-core/src/ui/public/templates/component-page-template.html
+++ b/storm-core/src/ui/public/templates/component-page-template.html
@@ -227,6 +227,59 @@
     </tbody>
   </table>
 </script>
+<script id="profiling-template" type="text/html">
+  <h2>Profiling and Debugging</h2>
+  Use the following controls to profile and debug the components on this page.
+  <table class="table table-striped compact">
+    <thead>
+      <tr>
+        <th class="header">Component</th>
+        <th class="header">
+          <span data-original-title="The status of a running profiler or the timeout for one you're starting (in minutes)" data-toggle="tooltip">
+            Status / Timeout (Minutes)
+          </span>
+        </th>
+        <th class="header">Actions</th>
+      </tr>
+    </thead>
+    <tbody id="profiler-table-body">
+      <tr>
+        <td>
+          <select id="selected_worker">
+            {{#hosts}}
+            <option value="{{name}}">{{name}}</option>
+            {{/hosts}}
+          </select>
+        </td>
+        <td>
+          <input id="timeout" class="timeout_input" type="text" value="" placeholder="10"/>
+        </td>
+        <td>
+          <input type="button" value="Start" name="start" onClick="start_profiling(profiler_selected_worker())" class="btn btn-secondary"/>
+          <input type="button" value="JStack" name="jstack" onClick="dump_jstack(profiler_selected_worker())" class="btn btn-secondary"/>
+          <input type="button" value="Restart Worker" name="jvmrestart" onClick="restart_worker_jvm(profiler_selected_worker())" class="btn btn-secondary"/>
+          <input type="button" value="Heap" name="heap" onClick="dump_heap(profiler_selected_worker())" class="btn btn-secondary"/>
+        </td>
+      </tr>
+      {{> profilerActive}}
+    </tbody>
+  </table>
+</script>
+<script id="profiler-active-partial" type="text/html">
+  {{#profilerActive}}
+  <tr>
+    <td>{{host}}:{{port}}</td>
+    <td>Active until {{timestamp}}</td>
+    <td>
+      <input id="stop_{{host}}:{{port}}" type="button" value="Stop" name="stop" onClick="stop_profiling('{{host}}:{{port}}')" class="btn btn-secondary"/>
+      <input id="dump_profile_{{host}}:{{port}}" type="button" value="Dump Profile" name="dumpjprofile" onClick="dump_profile('{{host}}:{{port}}')" class="btn btn-secondary"/>
+      <input id="dump_jstack_{{host}}:{{port}}" type="button" value="JStack" name="jstack" onClick="dump_jstack('{{host}}:{{port}}')" class="btn btn-secondary"/>
+      <input id="restart_worker_jvm_{{host}}:{{port}}" type="button" value="Restart Worker" name="jvmrestart" onClick="restart_worker_jvm('{{host}}:{{port}}')" class="btn btn-secondary"/>
+      <input id="dump_heap_{{host}}:{{port}}" type="button" value="Heap" name="heap" onClick="dump_heap('{{host}}:{{port}}')" class="btn btn-secondary"/> <a href="{{dumplink}}">My Dump Files</a>
+    </td>
+  </tr>
+  {{/profilerActive}}
+</script>
 <script id="bolt-stats-template" type="text/html">
   <h2>Bolt stats</h2>
   <table class="table table-striped compact" id="bolt-stats-table">


Mime
View raw message