storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
Subject [2/3] storm git commit: Removing TODO as well, to track issues and proposals see
Date Mon, 17 Aug 2015 13:54:33 GMT
Removing TODO as well, to track issues and proposals see


Branch: refs/heads/master
Commit: 4686a47cabb283037d165438a1f4c886a9e1f47a
Parents: 36366c8
Author: Kostya Golikov <>
Authored: Mon Aug 17 16:01:37 2015 +0300
Committer: Kostya Golikov <>
Committed: Mon Aug 17 16:01:37 2015 +0300

 TODO | 178 --------------------------------------------------------------
 1 file changed, 178 deletions(-)
diff --git a/TODO b/TODO
deleted file mode 100644
index 23c8837..0000000
--- a/TODO
+++ /dev/null
@@ -1,178 +0,0 @@
-Use cases:
-1. number of steps between 2 people in a graph (topology with cycles?)
-* Repackage jzmq and zmq as a leiningen "native dep"
-       - this might be good, since the native dep can package builds for all different systems/os's?
-* Deploy design:
-- storm swap {name} {jar} {class}
-- it's allowed to use resources equal to current running topology plus number of free resources
-- starts in deactivated mode
-- add TOPOLOGY_STARTUP_TIME config for the delay until nimbus activates a topology after
launching it
-- for swap, after the startup time, deactivate the other topology, wait the TOPOLOGY_MESSAGE_TIMEOUT_SECS,
and then activate the other topology
-- should be able to decrease the message timeout for killing or swapping (add optional thrift
parameter) -- or just make it part of the config?
-- add killWithOptions, swap, swapWithOptions
-* Storm UI, stats, debugging, diagnosis tools
--- need to be able to hide system streams/components from the calculations (another query
param and should be default)
--- need to optimize (slowness is probably on nimbus end of querying zk, consider adding heartbeat
caching into nimbus)
--- add margins
--- add titles so its easier to distinguish the various pages
--- right align all table columns except for the leftmost
-* Unit test the core pieces that have stabilized their APIs
-- process simulator
-- virtual ports
-- supervisor
-- utils
-- test worker/tasks
-* implement pseudo-distributed mode -- this is for testing the distributed parts of the code
-  - perhaps i can use pallet/vmfest for this
-* Need integration tests that run on an actual storm cluster (scp code/process code/zookeeper
code not tested in unit tests)
-* bolts with none grouping can be pushed into a bolt. e.g. A -> B -> C
-     A -> D -> E
-If A -> B and A -> D are shuffle grouping = none, and B -> C and D -> E are not,
then both can be run in A, b's branch goes to C and D's branch goes to E
-* Failure design
-Add fail method to outputcollector
-Fail sends fail message to Acker for those anchors, which sends fail message back to spout.
-Whenever spout fails a tuple, it emits it in its failure stream...
-Add fail method to drpc... Causes blocked thread to throw exception
-* Have worker heartbeat with its task ids, nimbus verifies - if wrong, reassign tasks?
-- detect and ignore stray tasks
-Each worker can choose a unique id for itself when heart beating
-- nimbus deletes those that aren't in topology
-* Subscriptions design
--- new kind of spout: "subscription spout"
-   --> goal is to sync it's data across the tasks that subscribe to its streams
-   --> after doing a grouping, remembers what task it sent the tuple to (regardless of
grouping). if a task dies, it knows its subscriptions and asks to be resynced
-   --> normal operation is to push to tasks, but pull done when a task starts up (b/c
previous task died or something)
-   --> need to be able to add tuples to subscription or take tuples away (this is protocol
with who you're subscribing to - e.g. rocket)
-   --> subscriptions can only happen in a spout because it requires persistent state
-   --> when subscription spout task dies, it polls the source (e.g. rocket) for all the
subscription info
-   --> ideally you'd set things up to have one subscription spout per rocket server
-   --> TODO: Need some way to delete subscriptions -> part of tuple or extra metadata
on tuple (extra metadata seems cleaner)
-        --> add isSubscription() method to Tuple as well as a getSubscriptionType() [which
returns ADD or REMOVE]
-   --> when a spout starts up, it also needs to push all of its subscription info
-   --> acks are irrelevant for subscription tuples -- how should acks be managed as an
-        -- maybe the synchronized state is done for you -- you just access the state directly
and receive a callback whenever it changes?
-        -- so don't use tuples...
-   --> subscriptions break all the abstractions, perhaps I should generalize spouts and
factor acking as a library on top of storm. subscriptions would just be another kind of library?
-> no, it seems to break abstractions anyway (like keeping task -> tuples in memory)
-   --> maybe call it "syncspout"
-   --> if just do syncing (don't expose tuples directly?)
-   --> have a "SubscribedState" class that takes care of indexing/etc. --> expose it
through topologycontext?
-      -- need a way to distinguish between states of different streams
-      -- has "add" and "remove" methods
-      -- bolt can give a statemanager object that implements add and remove in the prepare
-      -- add(Tuple tuple)
-      -- remove(Tuple tuple)
-   --> synchronize protocol (when spout or source of data dies):
-      --> send how many tuples are going to be sent
-      --> send the tuples
-      --> OR: pack everything together into a single message (could be hard b/c where
tuples are supposed to go is abstracted away)
-      --> tie everything together with a unique ID
-      --> once task receives everything, has info needed to remove tuples
-   --> statespout should do long-polling with timeout
-   --> to do subscriptions, the state should contain something like [url, subscriber].
some bolt appends subscriber to tuples, group by subscriber, and send info back
-        --> how to to fields grouping with an even distribution?
-   -->  ********* tasks need to block on startup until they're synchronized *********
-          --> send sync messages in a loop until it's synchronized
-          --> add a task.synchronize.poll.freq.secs config (default to 10 seconds)
-          --> need to buffer other messages as topology is waiting for synchronization
messages (use disk?)
-   --> could use acking system to know if a piece of state gets fully synchronized and
communicate this with user
-      --> perhaps expose this through a special stream? (the state status stream ->
similar to failure streams)
-   --> should be able to do updates of existing state
-      --> use case: have a knob that you can set externally
-      --> this isn't really any better than just using zookeeper directly
-_myState = context.setSubscribedState(_myState)
-StateSpout {
-  //does a timeout long poll and emits new add or remove state tuples (add and remove on
the output collector)
-  nextTuple(StateSpoutOutputCollector) //collector has add and remove methods add(id, tuple).
-  //emits all the tuples into the output collector (in the background, will also send ids
and counts to tasks so they know how to synchronize)
-  //called on startup
-  //collector can have a synchronize method in case the source of data (e.g., rocket) craps
-  synchronize(SynchronizationOutputCollector) //collector only has add(id, tuple) method
-//task startup (in prepare method) [this is automatic]
-for(int taskId: statespoutids) {
-  emitDirect(SYNC_STREAM, tuple())
-statespout synchronization():
-  id = uuid()
-  //getAlLStateTuples calls synchronize on the spout to get the tuples
-  for(Tuple t: getAllStateTuplesFromSource()) {
-    List tasks = emit(cons(id, t));
-    .. keep track of id -> tasks -> count
-    for(task: all output tasks) {
-      emitDirect(task, id, count)
-    } 
-  }
-for synchronization to work, task needs to keep track of which tasks sent it tuples, and
compare against only that set on synchronization
-Need a way to propogate information back up the topology - "subscriptions"
-e.g. browser -> rocket -> bolt -> bolt -> bolt. 
-example: #retweets for a subscribed set of tweet ids
-storm topology
- -> tweet spout (A) -> group on original id -> count (B) -> rocket
-subscriptions: rocket -> count (B) tweet id (need to group) -> spout (need to go to
--- how does it work when stuff dies downstream or upstream? do people ask what the subscriptions
are? or do you push your subscriptions up? a combination?
--- maybe subscriptions are a "constant" spout? e..g, continuously emits and refreshes to
make sure every task has the tuple. this seem amporphous and hard to implement... nimbus would
need to refire all constant spouts whenever there's a reassignment that affects the flow of
data. subscriptions seem more natural
--- subscriptions are a special kind of stream that are driven by being asked to send it.
e..g, rocket is a spout that emits subscription/unsubscription tuples. they only send it when
they get something new, or are asked as to what all the subscriptions are
--- maybe you just need a system stream to know when tasks are created. when you see that
a downstream task is created, you know to fire subscriptions to it if its subscribed to your
subscriptions stream? - how does this interplay with all the grouping types... you almost
want to do a grouping and only send what to tasks that would have received. spouts would need
to be able to subscribe to streams as well
-(use 'backtype.storm.testing)
-(def cluster (mk-local-storm-cluster))
-(use 'backtype.storm.bootstrap) (bootstrap)
-(import '[backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount TestAggregatesCounter])
-(def spout (feeder-spout ["word"]))
-(def topology (thrift/mk-topology
-                    {1 (thrift/mk-spout-spec spout :parallelism-hint 3)}
-                    {2 (thrift/mk-bolt-spec {1 ["word"]} (TestWordCounter.) :parallelism-hint
-                     3 (thrift/mk-bolt-spec {1 :global} (TestGlobalCount.))
-                     4 (thrift/mk-bolt-spec {2 :global} (TestAggregatesCounter.))
-                     }))
-(submit-local-topology (:nimbus cluster) "test" {TOPOLOGY-WORKERS 4 TOPOLOGY-DEBUG true}
-* clean up project
-  - remove log4j dir and instead generate it in the deploy (it's only used in bin/storm ->
create a console one and put into bin/)
-  - include system component / stream information in the topologycontext and clean up system
specific code all over the place
-* Very rare errors
-weird nullptr exceptions:
-(tasks i) on send-fn
-no virtual port socket for outbound task (in worker)

View raw message