storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [01/16] storm git commit: STORM-634: Converting SupervisorInfo, Assignment, StormBase, TopologyStatus, ZKWorkerHeartbeat, ErrorInfo, Credentials to thrift and defaulting the serialization delegate to thrift serialization. Added class as a param to serializatio
Date Wed, 18 Mar 2015 19:39:08 GMT
Repository: storm
Updated Branches:
  refs/heads/master 1ea378d78 -> bb8d48da2


http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/ttypes.py b/storm-core/src/py/storm/ttypes.py
index 1bbaf37..e15cf1d 100644
--- a/storm-core/src/py/storm/ttypes.py
+++ b/storm-core/src/py/storm/ttypes.py
@@ -44,6 +44,26 @@ class TopologyInitialStatus:
     "INACTIVE": 2,
   }
 
+class TopologyStatus:
+  ACTIVE = 1
+  INACTIVE = 2
+  REBALANCING = 3
+  KILLED = 4
+
+  _VALUES_TO_NAMES = {
+    1: "ACTIVE",
+    2: "INACTIVE",
+    3: "REBALANCING",
+    4: "KILLED",
+  }
+
+  _NAMES_TO_VALUES = {
+    "ACTIVE": 1,
+    "INACTIVE": 2,
+    "REBALANCING": 3,
+    "KILLED": 4,
+  }
+
 
 class JavaObjectArg:
   """
@@ -3034,6 +3054,7 @@ class ExecutorStats:
    - emitted
    - transferred
    - specific
+   - rate
   """
 
   thrift_spec = (
@@ -3041,15 +3062,17 @@ class ExecutorStats:
     (1, TType.MAP, 'emitted', (TType.STRING,None,TType.MAP,(TType.STRING,None,TType.I64,None)),
None, ), # 1
     (2, TType.MAP, 'transferred', (TType.STRING,None,TType.MAP,(TType.STRING,None,TType.I64,None)),
None, ), # 2
     (3, TType.STRUCT, 'specific', (ExecutorSpecificStats, ExecutorSpecificStats.thrift_spec),
None, ), # 3
+    (4, TType.DOUBLE, 'rate', None, None, ), # 4
   )
 
   def __hash__(self):
-    return 0 + hash(self.emitted) + hash(self.transferred) + hash(self.specific)
+    return 0 + hash(self.emitted) + hash(self.transferred) + hash(self.specific) + hash(self.rate)
 
-  def __init__(self, emitted=None, transferred=None, specific=None,):
+  def __init__(self, emitted=None, transferred=None, specific=None, rate=None,):
     self.emitted = emitted
     self.transferred = transferred
     self.specific = specific
+    self.rate = rate
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans,
TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -3100,6 +3123,11 @@ class ExecutorStats:
           self.specific.read(iprot)
         else:
           iprot.skip(ftype)
+      elif fid == 4:
+        if ftype == TType.DOUBLE:
+          self.rate = iprot.readDouble();
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -3138,6 +3166,10 @@ class ExecutorStats:
       oprot.writeFieldBegin('specific', TType.STRUCT, 3)
       self.specific.write(oprot)
       oprot.writeFieldEnd()
+    if self.rate is not None:
+      oprot.writeFieldBegin('rate', TType.DOUBLE, 4)
+      oprot.writeDouble(self.rate)
+      oprot.writeFieldEnd()
     oprot.writeFieldStop()
     oprot.writeStructEnd()
 
@@ -3148,6 +3180,8 @@ class ExecutorStats:
       raise TProtocol.TProtocolException(message='Required field transferred is unset!')
     if self.specific is None:
       raise TProtocol.TProtocolException(message='Required field specific is unset!')
+    if self.rate is None:
+      raise TProtocol.TProtocolException(message='Required field rate is unset!')
     return
 
 
@@ -4383,6 +4417,764 @@ class SubmitOptions:
   def __ne__(self, other):
     return not (self == other)
 
+class SupervisorInfo:
+  """
+  Attributes:
+   - time_secs
+   - hostname
+   - assignment_id
+   - used_ports
+   - meta
+   - scheduler_meta
+   - uptime_secs
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.I64, 'time_secs', None, None, ), # 1
+    (2, TType.STRING, 'hostname', None, None, ), # 2
+    (3, TType.STRING, 'assignment_id', None, None, ), # 3
+    (4, TType.LIST, 'used_ports', (TType.I64,None), None, ), # 4
+    (5, TType.LIST, 'meta', (TType.I64,None), None, ), # 5
+    (6, TType.MAP, 'scheduler_meta', (TType.STRING,None,TType.STRING,None), None, ), # 6
+    (7, TType.I64, 'uptime_secs', None, None, ), # 7
+  )
+
+  def __hash__(self):
+    return 0 + hash(self.time_secs) + hash(self.hostname) + hash(self.assignment_id) + hash(self.used_ports)
+ hash(self.meta) + hash(self.scheduler_meta) + hash(self.uptime_secs)
+
+  def __init__(self, time_secs=None, hostname=None, assignment_id=None, used_ports=None,
meta=None, scheduler_meta=None, uptime_secs=None,):
+    self.time_secs = time_secs
+    self.hostname = hostname
+    self.assignment_id = assignment_id
+    self.used_ports = used_ports
+    self.meta = meta
+    self.scheduler_meta = scheduler_meta
+    self.uptime_secs = uptime_secs
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans,
TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.I64:
+          self.time_secs = iprot.readI64();
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.STRING:
+          self.hostname = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      elif fid == 3:
+        if ftype == TType.STRING:
+          self.assignment_id = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      elif fid == 4:
+        if ftype == TType.LIST:
+          self.used_ports = []
+          (_etype304, _size301) = iprot.readListBegin()
+          for _i305 in xrange(_size301):
+            _elem306 = iprot.readI64();
+            self.used_ports.append(_elem306)
+          iprot.readListEnd()
+        else:
+          iprot.skip(ftype)
+      elif fid == 5:
+        if ftype == TType.LIST:
+          self.meta = []
+          (_etype310, _size307) = iprot.readListBegin()
+          for _i311 in xrange(_size307):
+            _elem312 = iprot.readI64();
+            self.meta.append(_elem312)
+          iprot.readListEnd()
+        else:
+          iprot.skip(ftype)
+      elif fid == 6:
+        if ftype == TType.MAP:
+          self.scheduler_meta = {}
+          (_ktype314, _vtype315, _size313 ) = iprot.readMapBegin() 
+          for _i317 in xrange(_size313):
+            _key318 = iprot.readString().decode('utf-8')
+            _val319 = iprot.readString().decode('utf-8')
+            self.scheduler_meta[_key318] = _val319
+          iprot.readMapEnd()
+        else:
+          iprot.skip(ftype)
+      elif fid == 7:
+        if ftype == TType.I64:
+          self.uptime_secs = iprot.readI64();
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec
is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('SupervisorInfo')
+    if self.time_secs is not None:
+      oprot.writeFieldBegin('time_secs', TType.I64, 1)
+      oprot.writeI64(self.time_secs)
+      oprot.writeFieldEnd()
+    if self.hostname is not None:
+      oprot.writeFieldBegin('hostname', TType.STRING, 2)
+      oprot.writeString(self.hostname.encode('utf-8'))
+      oprot.writeFieldEnd()
+    if self.assignment_id is not None:
+      oprot.writeFieldBegin('assignment_id', TType.STRING, 3)
+      oprot.writeString(self.assignment_id.encode('utf-8'))
+      oprot.writeFieldEnd()
+    if self.used_ports is not None:
+      oprot.writeFieldBegin('used_ports', TType.LIST, 4)
+      oprot.writeListBegin(TType.I64, len(self.used_ports))
+      for iter320 in self.used_ports:
+        oprot.writeI64(iter320)
+      oprot.writeListEnd()
+      oprot.writeFieldEnd()
+    if self.meta is not None:
+      oprot.writeFieldBegin('meta', TType.LIST, 5)
+      oprot.writeListBegin(TType.I64, len(self.meta))
+      for iter321 in self.meta:
+        oprot.writeI64(iter321)
+      oprot.writeListEnd()
+      oprot.writeFieldEnd()
+    if self.scheduler_meta is not None:
+      oprot.writeFieldBegin('scheduler_meta', TType.MAP, 6)
+      oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.scheduler_meta))
+      for kiter322,viter323 in self.scheduler_meta.items():
+        oprot.writeString(kiter322.encode('utf-8'))
+        oprot.writeString(viter323.encode('utf-8'))
+      oprot.writeMapEnd()
+      oprot.writeFieldEnd()
+    if self.uptime_secs is not None:
+      oprot.writeFieldBegin('uptime_secs', TType.I64, 7)
+      oprot.writeI64(self.uptime_secs)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.time_secs is None:
+      raise TProtocol.TProtocolException(message='Required field time_secs is unset!')
+    if self.hostname is None:
+      raise TProtocol.TProtocolException(message='Required field hostname is unset!')
+    return
+
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class NodeInfo:
+  """
+  Attributes:
+   - node
+   - port
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRING, 'node', None, None, ), # 1
+    (2, TType.SET, 'port', (TType.I64,None), None, ), # 2
+  )
+
+  def __hash__(self):
+    return 0 + hash(self.node) + hash(self.port)
+
+  def __init__(self, node=None, port=None,):
+    self.node = node
+    self.port = port
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans,
TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRING:
+          self.node = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.SET:
+          self.port = set()
+          (_etype327, _size324) = iprot.readSetBegin()
+          for _i328 in xrange(_size324):
+            _elem329 = iprot.readI64();
+            self.port.add(_elem329)
+          iprot.readSetEnd()
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec
is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('NodeInfo')
+    if self.node is not None:
+      oprot.writeFieldBegin('node', TType.STRING, 1)
+      oprot.writeString(self.node.encode('utf-8'))
+      oprot.writeFieldEnd()
+    if self.port is not None:
+      oprot.writeFieldBegin('port', TType.SET, 2)
+      oprot.writeSetBegin(TType.I64, len(self.port))
+      for iter330 in self.port:
+        oprot.writeI64(iter330)
+      oprot.writeSetEnd()
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.node is None:
+      raise TProtocol.TProtocolException(message='Required field node is unset!')
+    if self.port is None:
+      raise TProtocol.TProtocolException(message='Required field port is unset!')
+    return
+
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class Assignment:
+  """
+  Attributes:
+   - master_code_dir
+   - node_host
+   - executor_node_port
+   - executor_start_time_secs
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRING, 'master_code_dir', None, None, ), # 1
+    (2, TType.MAP, 'node_host', (TType.STRING,None,TType.STRING,None), {
+    }, ), # 2
+    (3, TType.MAP, 'executor_node_port', (TType.LIST,(TType.I64,None),TType.STRUCT,(NodeInfo,
NodeInfo.thrift_spec)), {
+    }, ), # 3
+    (4, TType.MAP, 'executor_start_time_secs', (TType.LIST,(TType.I64,None),TType.I64,None),
{
+    }, ), # 4
+  )
+
+  def __hash__(self):
+    return 0 + hash(self.master_code_dir) + hash(self.node_host) + hash(self.executor_node_port)
+ hash(self.executor_start_time_secs)
+
+  def __init__(self, master_code_dir=None, node_host=thrift_spec[2][4], executor_node_port=thrift_spec[3][4],
executor_start_time_secs=thrift_spec[4][4],):
+    self.master_code_dir = master_code_dir
+    if node_host is self.thrift_spec[2][4]:
+      node_host = {
+    }
+    self.node_host = node_host
+    if executor_node_port is self.thrift_spec[3][4]:
+      executor_node_port = {
+    }
+    self.executor_node_port = executor_node_port
+    if executor_start_time_secs is self.thrift_spec[4][4]:
+      executor_start_time_secs = {
+    }
+    self.executor_start_time_secs = executor_start_time_secs
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans,
TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRING:
+          self.master_code_dir = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.MAP:
+          self.node_host = {}
+          (_ktype332, _vtype333, _size331 ) = iprot.readMapBegin() 
+          for _i335 in xrange(_size331):
+            _key336 = iprot.readString().decode('utf-8')
+            _val337 = iprot.readString().decode('utf-8')
+            self.node_host[_key336] = _val337
+          iprot.readMapEnd()
+        else:
+          iprot.skip(ftype)
+      elif fid == 3:
+        if ftype == TType.MAP:
+          self.executor_node_port = {}
+          (_ktype339, _vtype340, _size338 ) = iprot.readMapBegin() 
+          for _i342 in xrange(_size338):
+            _key343 = []
+            (_etype348, _size345) = iprot.readListBegin()
+            for _i349 in xrange(_size345):
+              _elem350 = iprot.readI64();
+              _key343.append(_elem350)
+            iprot.readListEnd()
+            _val344 = NodeInfo()
+            _val344.read(iprot)
+            self.executor_node_port[_key343] = _val344
+          iprot.readMapEnd()
+        else:
+          iprot.skip(ftype)
+      elif fid == 4:
+        if ftype == TType.MAP:
+          self.executor_start_time_secs = {}
+          (_ktype352, _vtype353, _size351 ) = iprot.readMapBegin() 
+          for _i355 in xrange(_size351):
+            _key356 = []
+            (_etype361, _size358) = iprot.readListBegin()
+            for _i362 in xrange(_size358):
+              _elem363 = iprot.readI64();
+              _key356.append(_elem363)
+            iprot.readListEnd()
+            _val357 = iprot.readI64();
+            self.executor_start_time_secs[_key356] = _val357
+          iprot.readMapEnd()
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec
is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('Assignment')
+    if self.master_code_dir is not None:
+      oprot.writeFieldBegin('master_code_dir', TType.STRING, 1)
+      oprot.writeString(self.master_code_dir.encode('utf-8'))
+      oprot.writeFieldEnd()
+    if self.node_host is not None:
+      oprot.writeFieldBegin('node_host', TType.MAP, 2)
+      oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.node_host))
+      for kiter364,viter365 in self.node_host.items():
+        oprot.writeString(kiter364.encode('utf-8'))
+        oprot.writeString(viter365.encode('utf-8'))
+      oprot.writeMapEnd()
+      oprot.writeFieldEnd()
+    if self.executor_node_port is not None:
+      oprot.writeFieldBegin('executor_node_port', TType.MAP, 3)
+      oprot.writeMapBegin(TType.LIST, TType.STRUCT, len(self.executor_node_port))
+      for kiter366,viter367 in self.executor_node_port.items():
+        oprot.writeListBegin(TType.I64, len(kiter366))
+        for iter368 in kiter366:
+          oprot.writeI64(iter368)
+        oprot.writeListEnd()
+        viter367.write(oprot)
+      oprot.writeMapEnd()
+      oprot.writeFieldEnd()
+    if self.executor_start_time_secs is not None:
+      oprot.writeFieldBegin('executor_start_time_secs', TType.MAP, 4)
+      oprot.writeMapBegin(TType.LIST, TType.I64, len(self.executor_start_time_secs))
+      for kiter369,viter370 in self.executor_start_time_secs.items():
+        oprot.writeListBegin(TType.I64, len(kiter369))
+        for iter371 in kiter369:
+          oprot.writeI64(iter371)
+        oprot.writeListEnd()
+        oprot.writeI64(viter370)
+      oprot.writeMapEnd()
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.master_code_dir is None:
+      raise TProtocol.TProtocolException(message='Required field master_code_dir is unset!')
+    return
+
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class TopologyActionOptions:
+  """
+  Attributes:
+   - kill_options
+   - rebalance_options
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRUCT, 'kill_options', (KillOptions, KillOptions.thrift_spec), None, ), #
1
+    (2, TType.STRUCT, 'rebalance_options', (RebalanceOptions, RebalanceOptions.thrift_spec),
None, ), # 2
+  )
+
+  def __hash__(self):
+    return 0 + hash(self.kill_options) + hash(self.rebalance_options)
+
+  def __init__(self, kill_options=None, rebalance_options=None,):
+    self.kill_options = kill_options
+    self.rebalance_options = rebalance_options
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans,
TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRUCT:
+          self.kill_options = KillOptions()
+          self.kill_options.read(iprot)
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.STRUCT:
+          self.rebalance_options = RebalanceOptions()
+          self.rebalance_options.read(iprot)
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec
is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('TopologyActionOptions')
+    if self.kill_options is not None:
+      oprot.writeFieldBegin('kill_options', TType.STRUCT, 1)
+      self.kill_options.write(oprot)
+      oprot.writeFieldEnd()
+    if self.rebalance_options is not None:
+      oprot.writeFieldBegin('rebalance_options', TType.STRUCT, 2)
+      self.rebalance_options.write(oprot)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    return
+
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class StormBase:
+  """
+  Attributes:
+   - name
+   - status
+   - num_workers
+   - component_executors
+   - launch_time_secs
+   - owner
+   - topology_action_options
+   - prev_status
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRING, 'name', None, None, ), # 1
+    (2, TType.I32, 'status', None, None, ), # 2
+    (3, TType.I32, 'num_workers', None, None, ), # 3
+    (4, TType.MAP, 'component_executors', (TType.STRING,None,TType.I32,None), None, ), #
4
+    (5, TType.I32, 'launch_time_secs', None, None, ), # 5
+    (6, TType.STRING, 'owner', None, None, ), # 6
+    (7, TType.STRUCT, 'topology_action_options', (TopologyActionOptions, TopologyActionOptions.thrift_spec),
None, ), # 7
+    (8, TType.I32, 'prev_status', None, None, ), # 8
+  )
+
+  def __hash__(self):
+    return 0 + hash(self.name) + hash(self.status) + hash(self.num_workers) + hash(self.component_executors)
+ hash(self.launch_time_secs) + hash(self.owner) + hash(self.topology_action_options) + hash(self.prev_status)
+
+  def __init__(self, name=None, status=None, num_workers=None, component_executors=None,
launch_time_secs=None, owner=None, topology_action_options=None, prev_status=None,):
+    self.name = name
+    self.status = status
+    self.num_workers = num_workers
+    self.component_executors = component_executors
+    self.launch_time_secs = launch_time_secs
+    self.owner = owner
+    self.topology_action_options = topology_action_options
+    self.prev_status = prev_status
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans,
TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRING:
+          self.name = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.I32:
+          self.status = iprot.readI32();
+        else:
+          iprot.skip(ftype)
+      elif fid == 3:
+        if ftype == TType.I32:
+          self.num_workers = iprot.readI32();
+        else:
+          iprot.skip(ftype)
+      elif fid == 4:
+        if ftype == TType.MAP:
+          self.component_executors = {}
+          (_ktype373, _vtype374, _size372 ) = iprot.readMapBegin() 
+          for _i376 in xrange(_size372):
+            _key377 = iprot.readString().decode('utf-8')
+            _val378 = iprot.readI32();
+            self.component_executors[_key377] = _val378
+          iprot.readMapEnd()
+        else:
+          iprot.skip(ftype)
+      elif fid == 5:
+        if ftype == TType.I32:
+          self.launch_time_secs = iprot.readI32();
+        else:
+          iprot.skip(ftype)
+      elif fid == 6:
+        if ftype == TType.STRING:
+          self.owner = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      elif fid == 7:
+        if ftype == TType.STRUCT:
+          self.topology_action_options = TopologyActionOptions()
+          self.topology_action_options.read(iprot)
+        else:
+          iprot.skip(ftype)
+      elif fid == 8:
+        if ftype == TType.I32:
+          self.prev_status = iprot.readI32();
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec
is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('StormBase')
+    if self.name is not None:
+      oprot.writeFieldBegin('name', TType.STRING, 1)
+      oprot.writeString(self.name.encode('utf-8'))
+      oprot.writeFieldEnd()
+    if self.status is not None:
+      oprot.writeFieldBegin('status', TType.I32, 2)
+      oprot.writeI32(self.status)
+      oprot.writeFieldEnd()
+    if self.num_workers is not None:
+      oprot.writeFieldBegin('num_workers', TType.I32, 3)
+      oprot.writeI32(self.num_workers)
+      oprot.writeFieldEnd()
+    if self.component_executors is not None:
+      oprot.writeFieldBegin('component_executors', TType.MAP, 4)
+      oprot.writeMapBegin(TType.STRING, TType.I32, len(self.component_executors))
+      for kiter379,viter380 in self.component_executors.items():
+        oprot.writeString(kiter379.encode('utf-8'))
+        oprot.writeI32(viter380)
+      oprot.writeMapEnd()
+      oprot.writeFieldEnd()
+    if self.launch_time_secs is not None:
+      oprot.writeFieldBegin('launch_time_secs', TType.I32, 5)
+      oprot.writeI32(self.launch_time_secs)
+      oprot.writeFieldEnd()
+    if self.owner is not None:
+      oprot.writeFieldBegin('owner', TType.STRING, 6)
+      oprot.writeString(self.owner.encode('utf-8'))
+      oprot.writeFieldEnd()
+    if self.topology_action_options is not None:
+      oprot.writeFieldBegin('topology_action_options', TType.STRUCT, 7)
+      self.topology_action_options.write(oprot)
+      oprot.writeFieldEnd()
+    if self.prev_status is not None:
+      oprot.writeFieldBegin('prev_status', TType.I32, 8)
+      oprot.writeI32(self.prev_status)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.name is None:
+      raise TProtocol.TProtocolException(message='Required field name is unset!')
+    if self.status is None:
+      raise TProtocol.TProtocolException(message='Required field status is unset!')
+    if self.num_workers is None:
+      raise TProtocol.TProtocolException(message='Required field num_workers is unset!')
+    return
+
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class ZKWorkerHeartbeat:
+  """
+  Attributes:
+   - storm_id
+   - executor_stats
+   - time_secs
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRING, 'storm_id', None, None, ), # 1
+    (2, TType.MAP, 'executor_stats', (TType.STRUCT,(ExecutorInfo, ExecutorInfo.thrift_spec),TType.STRUCT,(ExecutorStats,
ExecutorStats.thrift_spec)), None, ), # 2
+    (3, TType.I32, 'time_secs', None, None, ), # 3
+  )
+
+  def __hash__(self):
+    return 0 + hash(self.storm_id) + hash(self.executor_stats) + hash(self.time_secs)
+
+  def __init__(self, storm_id=None, executor_stats=None, time_secs=None,):
+    self.storm_id = storm_id
+    self.executor_stats = executor_stats
+    self.time_secs = time_secs
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans,
TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRING:
+          self.storm_id = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.MAP:
+          self.executor_stats = {}
+          (_ktype382, _vtype383, _size381 ) = iprot.readMapBegin() 
+          for _i385 in xrange(_size381):
+            _key386 = ExecutorInfo()
+            _key386.read(iprot)
+            _val387 = ExecutorStats()
+            _val387.read(iprot)
+            self.executor_stats[_key386] = _val387
+          iprot.readMapEnd()
+        else:
+          iprot.skip(ftype)
+      elif fid == 3:
+        if ftype == TType.I32:
+          self.time_secs = iprot.readI32();
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec
is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('ZKWorkerHeartbeat')
+    if self.storm_id is not None:
+      oprot.writeFieldBegin('storm_id', TType.STRING, 1)
+      oprot.writeString(self.storm_id.encode('utf-8'))
+      oprot.writeFieldEnd()
+    if self.executor_stats is not None:
+      oprot.writeFieldBegin('executor_stats', TType.MAP, 2)
+      oprot.writeMapBegin(TType.STRUCT, TType.STRUCT, len(self.executor_stats))
+      for kiter388,viter389 in self.executor_stats.items():
+        kiter388.write(oprot)
+        viter389.write(oprot)
+      oprot.writeMapEnd()
+      oprot.writeFieldEnd()
+    if self.time_secs is not None:
+      oprot.writeFieldBegin('time_secs', TType.I32, 3)
+      oprot.writeI32(self.time_secs)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.storm_id is None:
+      raise TProtocol.TProtocolException(message='Required field storm_id is unset!')
+    if self.executor_stats is None:
+      raise TProtocol.TProtocolException(message='Required field executor_stats is unset!')
+    if self.time_secs is None:
+      raise TProtocol.TProtocolException(message='Required field time_secs is unset!')
+    return
+
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
 class DRPCRequest:
   """
   Attributes:

http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-core/src/storm.thrift b/storm-core/src/storm.thrift
index f807b74..3cc0eb9 100644
--- a/storm-core/src/storm.thrift
+++ b/storm-core/src/storm.thrift
@@ -193,6 +193,7 @@ struct ExecutorStats {
   1: required map<string, map<string, i64>> emitted;
   2: required map<string, map<string, i64>> transferred;
   3: required ExecutorSpecificStats specific;
+  4: required double rate;
 }
 
 struct ExecutorInfo {
@@ -243,6 +244,56 @@ struct SubmitOptions {
   2: optional Credentials creds;
 }
 
+struct SupervisorInfo {
+    1: required i64 time_secs;
+    2: required string hostname;
+    3: optional string assignment_id;
+    4: optional list<i64> used_ports;
+    5: optional list<i64> meta;
+    6: optional map<string, string> scheduler_meta;
+    7: optional i64 uptime_secs;
+}
+struct NodeInfo {
+    1: required string node;
+    2: required set<i64> port;
+}
+
+struct Assignment {
+    1: required string master_code_dir;
+    2: optional map<string, string> node_host = {};
+    3: optional map<list<i64>, NodeInfo> executor_node_port = {};
+    4: optional map<list<i64>, i64> executor_start_time_secs = {};
+}
+
+enum TopologyStatus {
+    ACTIVE = 1,
+    INACTIVE = 2,
+    REBALANCING = 3,
+    KILLED = 4
+}
+
+union TopologyActionOptions {
+    1: optional KillOptions kill_options;
+    2: optional RebalanceOptions rebalance_options;
+}
+
+struct StormBase {
+    1: required string name;
+    2: required TopologyStatus status;
+    3: required i32 num_workers;
+    4: optional map<string, i32> component_executors;
+    5: optional i32 launch_time_secs;
+    6: optional string owner;
+    7: optional TopologyActionOptions topology_action_options;
+    8: optional TopologyStatus prev_status;//currently only used during rebalance action.
+}
+
+struct ZKWorkerHeartbeat {
+    1: required string storm_id;
+    2: required map<ExecutorInfo,ExecutorStats> executor_stats;
+    3: required i32 time_secs;
+}
+
 service Nimbus {
   void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf,
4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException
ite, 3: AuthorizationException aze);
   void submitTopologyWithOpts(1: string name, 2: string uploadedJarLocation, 3: string jsonConf,
4: StormTopology topology, 5: SubmitOptions options) throws (1: AlreadyAliveException e, 2:
InvalidTopologyException ite, 3: AuthorizationException aze);

http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/test/clj/backtype/storm/cluster_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/cluster_test.clj b/storm-core/test/clj/backtype/storm/cluster_test.clj
index 7ed1028..98eae68 100644
--- a/storm-core/test/clj/backtype/storm/cluster_test.clj
+++ b/storm-core/test/clj/backtype/storm/cluster_test.clj
@@ -25,7 +25,7 @@
   (:require [conjure.core])
   (:use [conjure core])
   (:use [clojure test])
-  (:use [backtype.storm cluster config util testing]))
+  (:use [backtype.storm cluster config util testing thrift log]))
 
 (defn mk-config [zk-port]
   (merge (read-storm-config)
@@ -168,10 +168,10 @@
 (deftest test-storm-cluster-state-basics
   (with-inprocess-zookeeper zk-port
     (let [state (mk-storm-state zk-port)
-          assignment1 (Assignment. "/aaa" {} {1 [2 2002 1]} {})
-          assignment2 (Assignment. "/aaa" {} {1 [2 2002]} {})
-          base1 (StormBase. "/tmp/storm1" 1 {:type :active} 2 {} "")
-          base2 (StormBase. "/tmp/storm2" 2 {:type :active} 2 {} "")]
+          assignment1 (Assignment. "/aaa" {} {[1] ["1" 1001 1]} {})
+          assignment2 (Assignment. "/aaa" {} {[2] ["2" 2002]} {})
+          base1 (StormBase. "/tmp/storm1" 1 {:type :active} 2 {} "" nil nil)
+          base2 (StormBase. "/tmp/storm2" 2 {:type :active} 2 {} "" nil nil)]
       (is (= [] (.assignments state nil)))
       (.set-assignment! state "storm1" assignment1)
       (is (= assignment1 (.assignment-info state "storm1" nil)))
@@ -242,12 +242,15 @@
 (deftest test-supervisor-state
   (with-inprocess-zookeeper zk-port
     (let [state1 (mk-storm-state zk-port)
-          state2 (mk-storm-state zk-port)]
+          state2 (mk-storm-state zk-port)
+          supervisor-info1 (SupervisorInfo. 10 "hostname-1" "id1" [1 2] [] {} 1000 )
+          supervisor-info2 (SupervisorInfo. 10 "hostname-2" "id2" [1 2] [] {} 1000 )
+          ]
       (is (= [] (.supervisors state1 nil)))
-      (.supervisor-heartbeat! state2 "2" {:a 1})
-      (.supervisor-heartbeat! state1 "1" {})
-      (is (= {:a 1} (.supervisor-info state1 "2")))
-      (is (= {} (.supervisor-info state1 "1")))
+      (.supervisor-heartbeat! state2 "2" supervisor-info2)
+      (.supervisor-heartbeat! state1 "1" supervisor-info1)
+      (is (= supervisor-info2 (.supervisor-info state1 "2")))
+      (is (= supervisor-info1 (.supervisor-info state1 "1")))
       (is (= #{"1" "2"} (set (.supervisors state1 nil))))
       (is (= #{"1" "2"} (set (.supervisors state2 nil))))
       (.disconnect state2)
@@ -255,8 +258,6 @@
       (.disconnect state1)
       )))
 
-
-
 (deftest test-cluster-authentication
   (with-inprocess-zookeeper zk-port
     (let [builder (Mockito/mock CuratorFrameworkFactory$Builder)

http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/test/clj/backtype/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/nimbus_test.clj b/storm-core/test/clj/backtype/storm/nimbus_test.clj
index efdad25..7671f58 100644
--- a/storm-core/test/clj/backtype/storm/nimbus_test.clj
+++ b/storm-core/test/clj/backtype/storm/nimbus_test.clj
@@ -15,7 +15,7 @@
 ;; limitations under the License.
 (ns backtype.storm.nimbus-test
   (:use [clojure test])
-  (:require [backtype.storm [util :as util]])
+  (:require [backtype.storm [util :as util] [stats :as stats]])
   (:require [backtype.storm.daemon [nimbus :as nimbus]])
   (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter])
   (:import [backtype.storm.scheduler INimbus])
@@ -113,7 +113,7 @@
         curr-beat (.get-worker-heartbeat state storm-id node port)
         stats (:executor-stats curr-beat)]
     (.worker-heartbeat! state storm-id node port
-      {:storm-id storm-id :time-secs (current-time-secs) :uptime 10 :executor-stats (merge
stats {executor nil})}
+      {:storm-id storm-id :time-secs (current-time-secs) :uptime 10 :executor-stats (merge
stats {executor (stats/render-stats! (stats/mk-bolt-stats 20))})}
       )))
 
 (defn slot-assignments [cluster storm-id]
@@ -486,7 +486,7 @@
       (bind [executor-id1 executor-id2]  (topology-executors cluster storm-id))
       (bind ass1 (executor-assignment cluster storm-id executor-id1))
       (bind ass2 (executor-assignment cluster storm-id executor-id2))
-      
+
       (advance-cluster-time cluster 59)
       (do-executor-heartbeat cluster storm-id executor-id1)
       (do-executor-heartbeat cluster storm-id executor-id2)

http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/test/jvm/backtype/storm/serialization/GzipBridgeSerializationDelegateTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/serialization/GzipBridgeSerializationDelegateTest.java
b/storm-core/test/jvm/backtype/storm/serialization/GzipBridgeSerializationDelegateTest.java
index ce49a8a..9026ec3 100644
--- a/storm-core/test/jvm/backtype/storm/serialization/GzipBridgeSerializationDelegateTest.java
+++ b/storm-core/test/jvm/backtype/storm/serialization/GzipBridgeSerializationDelegateTest.java
@@ -41,7 +41,7 @@ public class GzipBridgeSerializationDelegateTest {
 
         byte[] serialized = new GzipSerializationDelegate().serialize(pojo);
 
-        TestPojo pojo2 = (TestPojo)testDelegate.deserialize(serialized);
+        TestPojo pojo2 = (TestPojo)testDelegate.deserialize(serialized, TestPojo.class);
 
         assertEquals(pojo2.name, pojo.name);
         assertEquals(pojo2.age, pojo.age);
@@ -55,7 +55,7 @@ public class GzipBridgeSerializationDelegateTest {
 
         byte[] serialized = new GzipBridgeSerializationDelegate().serialize(pojo);
 
-        TestPojo pojo2 = (TestPojo)testDelegate.deserialize(serialized);
+        TestPojo pojo2 = (TestPojo)testDelegate.deserialize(serialized, TestPojo.class);
 
         assertEquals(pojo2.name, pojo.name);
         assertEquals(pojo2.age, pojo.age);
@@ -69,7 +69,7 @@ public class GzipBridgeSerializationDelegateTest {
 
         byte[] serialized = new DefaultSerializationDelegate().serialize(pojo);
 
-        TestPojo pojo2 = (TestPojo)testDelegate.deserialize(serialized);
+        TestPojo pojo2 = (TestPojo)testDelegate.deserialize(serialized, TestPojo.class);
 
         assertEquals(pojo2.name, pojo.name);
         assertEquals(pojo2.age, pojo.age);

http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/test/jvm/backtype/storm/serialization/ThriftBridgeSerializationDelegateTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/serialization/ThriftBridgeSerializationDelegateTest.java
b/storm-core/test/jvm/backtype/storm/serialization/ThriftBridgeSerializationDelegateTest.java
new file mode 100644
index 0000000..ef17017
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/serialization/ThriftBridgeSerializationDelegateTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.serialization;
+
+import backtype.storm.generated.ErrorInfo;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class ThriftBridgeSerializationDelegateTest {
+
+    SerializationDelegate testDelegate;
+
+    @Before
+    public void setUp() throws Exception {
+        testDelegate = new ThriftSerializationDelegateBridge();
+        testDelegate.prepare(null);
+    }
+
+    @Test
+    public void testNonThriftInstance() throws Exception {
+        TestPojo pojo = new TestPojo();
+        pojo.name = "foo";
+        pojo.age = 100;
+
+        byte[] serialized = new DefaultSerializationDelegate().serialize(pojo);
+
+        TestPojo pojo2 = (TestPojo)testDelegate.deserialize(serialized, TestPojo.class);
+
+        assertEquals(pojo2.name, pojo.name);
+        assertEquals(pojo2.age, pojo.age);
+
+        serialized = testDelegate.serialize(pojo);
+        pojo2 = (TestPojo) new DefaultSerializationDelegate().deserialize(serialized, Serializable.class);
+        assertEquals(pojo2.name, pojo.name);
+        assertEquals(pojo2.age, pojo.age);
+    }
+
+    @Test
+    public void testThriftInstance() throws Exception {
+        ErrorInfo errorInfo = new ErrorInfo();
+        errorInfo.set_error("error");
+        errorInfo.set_error_time_secs(1);
+        errorInfo.set_host("host");
+        errorInfo.set_port(1);
+
+        byte[] serialized = new ThriftSerializationDelegate().serialize(errorInfo);
+        ErrorInfo errorInfo2 = testDelegate.deserialize(serialized, ErrorInfo.class);
+        assertEquals(errorInfo, errorInfo2);
+
+        serialized = testDelegate.serialize(errorInfo);
+        errorInfo2 = new ThriftSerializationDelegate().deserialize(serialized, ErrorInfo.class);
+        assertEquals(errorInfo, errorInfo2);
+    }
+
+    static class TestPojo implements Serializable {
+        String name;
+        int age;
+    }
+}


Mime
View raw message