storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [13/23] git commit: Add nodejs multilang clojure test
Date Mon, 22 Sep 2014 20:29:01 GMT
Add nodejs multilang clojure test


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c0957d04
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c0957d04
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c0957d04

Branch: refs/heads/master
Commit: c0957d040802003d6b6fcc8ad77e3600ece2e307
Parents: 346335c
Author: Anya Tchernishov <anya@forter.com>
Authored: Wed Aug 6 15:53:14 2014 +0300
Committer: Itai Frenkel <itai@ryzyco.com>
Committed: Thu Aug 7 15:35:54 2014 +0300

----------------------------------------------------------------------
 storm-core/src/dev/resources/storm.js           | 348 +++++++++++++++++++
 storm-core/src/dev/resources/tester_bolt.js     |  31 ++
 storm-core/src/dev/resources/tester_spout.js    |  65 ++++
 .../test/clj/backtype/storm/multilang_test.clj  |   4 +
 4 files changed, 448 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c0957d04/storm-core/src/dev/resources/storm.js
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/storm.js b/storm-core/src/dev/resources/storm.js
new file mode 100755
index 0000000..176c8ce
--- /dev/null
+++ b/storm-core/src/dev/resources/storm.js
@@ -0,0 +1,348 @@
+/**
+ * Base classes in node-js for storm Bolt and Spout.
+ * Implements the storm multilang protocol for nodejs.
+ */
+
+
+var fs = require('fs');
+
+function Storm() {
+    this.messageParts = [];
+    this.taskIdsCallbacks = [];
+    this.isFirstMessage = true;
+    this.separator = '\nend\n';
+}
+
+Storm.prototype.sendMsgToParent = function(msg) {
+    var str = JSON.stringify(msg);
+    process.stdout.write(str + this.separator);
+}
+
+Storm.prototype.sync = function() {
+    this.sendMsgToParent({"command":"sync"});
+}
+
+Storm.prototype.sendPid = function(heartbeatdir) {
+    var pid = process.pid;
+    fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w"));
+    this.sendMsgToParent({"pid": pid})
+}
+
+Storm.prototype.log = function(msg) {
+    this.sendMsgToParent({"command": "log", "msg": msg});
+}
+
+Storm.prototype.initSetupInfo = function(setupInfo) {
+    var self = this;
+    var callback = function() {
+        self.sendPid(setupInfo['pidDir']);
+    }
+    this.initialize(setupInfo['conf'], setupInfo['context'], callback);
+}
+
+Storm.prototype.startReadingInput = function() {
+    var self = this;
+    process.stdin.on('readable', function() {
+        var chunk = process.stdin.read();
+        var messages = self.handleNewChunk(chunk);
+        messages.forEach(function(message) {
+            self.handleNewMessage(message);
+        })
+
+    });
+}
+
+/**
+ * receives a new string chunk and returns a list of new messages with the separator removed
+ * stores state in this.messageParts
+ * @param chunk
+ */
+Storm.prototype.handleNewChunk = function(chunk) {
+    var messages = [];
+    if (chunk && chunk.length !== 0) {
+        //"{}".split("\nend\n")           ==> ['{}']
+        //"\nend\n".split("\nend\n")      ==> [''  , '']
+        //"{}\nend\n".split("\nend\n")    ==> ['{}', '']
+        //"\nend\n{}".split("\nend\n")    ==> [''  , '{}']
+        // "{}\nend\n{}".split("\nend\n") ==> ['{}', '{}' ]
+        var newMessageParts = chunk.split(this.separator);
+        while (newMessageParts.length > 0) {
+            var messagePart = newMessageParts.shift();
+            this.messageParts.push(messagePart);
+            var anotherMessageAhead = newMessageParts.length > 0;
+            if  (anotherMessageAhead) {
+                var message = this.messageParts.join('');
+                this.messageParts = [];
+                if (message.length > 0) {
+                    messages.push(message);
+                }
+            }
+        }
+    }
+    return messages;
+ }
+
+Storm.prototype.isTaskIds = function(msg) {
+    return (msg instanceof Array);
+}
+
+Storm.prototype.handleNewMessage = function(msg) {
+    var parsedMsg = JSON.parse(msg);
+
+    if (this.isFirstMessage) {
+        this.initSetupInfo(parsedMsg);
+        this.isFirstMessage = false;
+    } else if (this.isTaskIds(parsedMsg)) {
+        this.handleNewTaskId(parsedMsg);
+    } else {
+        this.handleNewCommand(parsedMsg);
+    }
+}
+
+Storm.prototype.handleNewTaskId = function(taskIds) {
+    //When new list of task ids arrives, the callback that was passed with the corresponding
emit should be called.
+    //Storm assures that the task ids will be sent in the same order as their corresponding
emits so it we can simply
+    //take the first callback in the list and be sure it is the right one.
+
+    var callback = this.taskIdsCallbacks.shift();
+    if (callback) {
+        callback(taskIds);
+    } else {
+        throw new Error('Something went wrong, we off the split of task id callbacks');
+    }
+}
+
+
+
+/**
+ *
+ * @param messageDetails json with the emit details.
+ *
+ * For bolt, the json must contain the required fields:
+ * - tuple - the value to emit
+ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit).
Used to track the source
+ * tuple and return ack when all components successfully finished to process it.
+ * and may contain the optional fields:
+ * - stream (if empty - emit to default stream)
+ *
+ * For spout, the json must contain the required fields:
+ * - tuple - the value to emit
+ *
+ * and may contain the optional fields:
+ * - id - pass id for reliable emit (and receive ack/fail later).
+ * - stream - if empty - emit to default stream.
+ *
+ * @param onTaskIds function than will be called with list of task ids the message was emitted
to (when received).
+ */
+Storm.prototype.emit = function(messageDetails, onTaskIds) {
+    //Every emit triggers a response - list of task ids to which the tuple was emitted. The
task ids are accessible
+    //through the callback (will be called when the response arrives). The callback is stored
in a list until the
+    //corresponding task id list arrives.
+    if (messageDetails.task) {
+        throw new Error('Illegal input - task. To emit to specific task use emit direct!');
+    }
+
+    if (!onTaskIds) {
+        throw new Error('You must pass a onTaskIds callback when using emit!')
+    }
+
+    this.taskIdsCallbacks.push(onTaskIds);
+    this.__emit(messageDetails);;
+}
+
+
+/**
+ * Emit message to specific task.
+ * @param messageDetails json with the emit details.
+ *
+ * For bolt, the json must contain the required fields:
+ * - tuple - the value to emit
+ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit).
Used to track the source
+ * tuple and return ack when all components successfully finished to process it.
+ * - task - indicate the task to send the tuple to.
+ * and may contain the optional fields:
+ * - stream (if empty - emit to default stream)
+ *
+ * For spout, the json must contain the required fields:
+ * - tuple - the value to emit
+ * - task - indicate the task to send the tuple to.
+ * and may contain the optional fields:
+ * - id - pass id for reliable emit (and receive ack/fail later).
+ * - stream - if empty - emit to default stream.
+ *
+ * @param onTaskIds function than will be called with list of task ids the message was emitted
to (when received).
+ */
+Storm.prototype.emitDirect = function(commandDetails) {
+    if (!commandDetails.task) {
+        throw new Error("Emit direct must receive task id!")
+    }
+    this.__emit(commandDetails);
+}
+
+/**
+ * Initialize storm component according to the configuration received.
+ * @param conf configuration object accrding to storm protocol.
+ * @param context context object according to storm protocol.
+ * @param done callback. Call this method when finished initializing.
+ */
+Storm.prototype.initialize = function(conf, context, done) {
+    done();
+}
+
+Storm.prototype.run = function() {
+    process.stdout.setEncoding('utf8');
+    process.stdin.setEncoding('utf8');
+    this.startReadingInput();
+}
+
+function Tuple(id, component, stream, task, values) {
+    this.id = id;
+    this.component = component;
+    this.stream = stream;
+    this.task = task;
+    this.values = values;
+}
+
+/**
+ * Base class for storm bolt.
+ * To create a bolt implement 'process' method.
+ * You may also implement initialize method to
+ */
+function BasicBolt() {
+    Storm.call(this);
+    this.anchorTuple = null;
+};
+
+BasicBolt.prototype = Object.create(Storm.prototype);
+BasicBolt.prototype.constructor = BasicBolt;
+
+/**
+ * Emit message.
+ * @param commandDetails json with the required fields:
+ * - tuple - the value to emit
+ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit).
Used to track the source
+ * tuple and return ack when all components successfully finished to process it.
+ * and the optional fields:
+ * - stream (if empty - emit to default stream)
+ * - task (pass only to emit to specific task)
+ */
+BasicBolt.prototype.__emit = function(commandDetails) {
+    var self = this;
+
+    var message = {
+        command: "emit",
+        tuple: commandDetails.tuple,
+        stream: commandDetails.stream,
+        task: commandDetails.task,
+        anchors: [commandDetails.anchorTupleId]
+    };
+
+    this.sendMsgToParent(message);
+}
+
+BasicBolt.prototype.handleNewCommand = function(command) {
+    var self = this;
+    var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"],
command["tuple"]);
+    var callback = function(err) {
+          if (err) {
+              self.fail(tup, err);
+          }
+          self.ack(tup);
+      }
+    this.process(tup, callback);
+}
+
+/**
+ * Implement this method when creating a bolt. This is the main method that provides the
logic of the bolt (what
+ * should it do?).
+ * @param tuple the input of the bolt - what to process.
+ * @param done call this method when done processing.
+ */
+BasicBolt.prototype.process = function(tuple, done) {};
+
+BasicBolt.prototype.ack = function(tup) {
+    this.sendMsgToParent({"command": "ack", "id": tup.id});
+}
+
+BasicBolt.prototype.fail = function(tup, err) {
+    this.sendMsgToParent({"command": "fail", "id": tup.id});
+}
+
+
+/**
+ * Base class for storm spout.
+ * To create a spout implement the following methods: nextTuple, ack and fail (nextTuple
- mandatory, ack and fail
+ * can stay empty).
+ * You may also implement initialize method.
+ *
+ */
+function Spout() {
+    Storm.call(this);
+};
+
+Spout.prototype = Object.create(Storm.prototype);
+
+Spout.prototype.constructor = Spout;
+
+/**
+ * This method will be called when an ack is received for preciously sent tuple. One may
implement it.
+ * @param id The id of the tuple.
+ * @param done Call this method when finished and ready to receive more tuples.
+ */
+Spout.prototype.ack = function(id, done) {};
+
+/**
+ * This method will be called when an fail is received for preciously sent tuple. One may
implement it (for example -
+ * log the failure or send the tuple again).
+ * @param id The id of the tuple.
+ * @param done Call this method when finished and ready to receive more tuples.
+ */
+Spout.prototype.fail = function(id, done) {};
+
+/**
+ * Method the indicates its time to emit the next tuple.
+ * @param done call this method when done sending the output.
+ */
+Spout.prototype.nextTuple = function(done) {};
+
+Spout.prototype.handleNewCommand = function(command) {
+    var self = this;
+    var callback = function() {
+        self.sync();
+    }
+
+    if (command["command"] === "next") {
+        this.nextTuple(callback);
+    }
+
+    if (command["command"] === "ack") {
+        this.ack(command["id"], callback);
+    }
+
+    if (command["command"] === "fail") {
+        this.fail(command["id"], callback);
+    }
+}
+
+/**
+ * @param commandDetails json with the required fields:
+ * - tuple - the value to emit.
+ * and the optional fields:
+ * - id - pass id for reliable emit (and receive ack/fail later).
+ * - stream - if empty - emit to default stream.
+ * - task - pass only to emit to specific task.
+ */
+Spout.prototype.__emit = function(commandDetails) {
+    var message = {
+        command: "emit",
+        tuple: commandDetails.tuple,
+        id: commandDetails.id,
+        stream: commandDetails.stream,
+        task: commandDetails.task
+    };
+
+    this.sendMsgToParent(message);
+}
+
+module.exports.BasicBolt = BasicBolt;
+module.exports.Spout = Spout;

http://git-wip-us.apache.org/repos/asf/storm/blob/c0957d04/storm-core/src/dev/resources/tester_bolt.js
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/tester_bolt.js b/storm-core/src/dev/resources/tester_bolt.js
new file mode 100644
index 0000000..544c704
--- /dev/null
+++ b/storm-core/src/dev/resources/tester_bolt.js
@@ -0,0 +1,31 @@
+/**
+ * Created by anya on 8/6/14.
+ */
+
+var storm = require('./storm');
+var BasicBolt = storm.BasicBolt;
+
+function TesterBolt() {
+    BasicBolt.call(this);
+};
+
+TesterBolt.prototype = Object.create(BasicBolt.prototype);
+TesterBolt.prototype.constructor = TesterBolt;
+
+TesterBolt.prototype.initialize = function(conf, context, done) {
+//    this.emit({tuple: ['bolt initializing']}, function() {});
+    done();
+}
+
+TesterBolt.prototype.process = function(tup, done) {
+    var word = tup.values[0];
+
+    if (Math.random() < 0.75) {
+        this.emit({tuple: [word + 'lalala'], anchorTupleId: tup.id}, function() {});
+        done();
+    } else {
+        this.log(word + ' randomly skipped!');
+    }
+}
+
+new TesterBolt().run();
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/c0957d04/storm-core/src/dev/resources/tester_spout.js
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/tester_spout.js b/storm-core/src/dev/resources/tester_spout.js
new file mode 100644
index 0000000..ff36bcb
--- /dev/null
+++ b/storm-core/src/dev/resources/tester_spout.js
@@ -0,0 +1,65 @@
+/**
+ * Created by anya on 8/6/14.
+ */
+
+
+
+
+var storm = require('./storm');
+var Spout = storm.Spout;
+
+var words = ["nathan", "mike", "jackson", "golda", "bertels人", "עברית"]
+
+function TesterSpout() {
+    Spout.call(this);
+    this.runningTupleId = 0;
+    this.pending = {};
+};
+
+TesterSpout.prototype = Object.create(Spout.prototype);
+TesterSpout.prototype.constructor = TesterSpout;
+
+TesterSpout.prototype.initialize = function(conf, context, done) {
+//    this.emit({tuple: ['spout initializing']}, function() {});
+    done();
+}
+
+TesterSpout.prototype.nextTuple = function(done) {
+    var self = this;
+    var word = this.getRandomWord();
+    var tup = [word];
+    var id = this.createNextTupleId();
+    this.pending[id] = tup;
+    this.emit({tuple: tup, id: id}, function(taskIds) {});
+    done();
+}
+
+TesterSpout.prototype.createNextTupleId = function() {
+    var id = this.runningTupleId;
+    this.runningTupleId++;
+    return id;
+}
+
+TesterSpout.prototype.ack = function(id, done) {
+    delete this.pending[id];
+    done();
+}
+
+TesterSpout.prototype.fail = function(id, done) {
+    this.log("emitting " + this.pending[id] + " on fail");
+    this.emit({tuple: this.pending[id], id: id}, function() {});
+    done();
+}
+
+TesterSpout.prototype.getRandomWord = function() {
+    return words[getRandomInt(0, words.length - 1)];
+}
+
+/**
+ * Returns a random integer between min (inclusive) and max (inclusive)
+ */
+function getRandomInt(min, max) {
+    return Math.floor(Math.random() * (max - min + 1)) + min;
+}
+
+new TesterSpout().run();

http://git-wip-us.apache.org/repos/asf/storm/blob/c0957d04/storm-core/test/clj/backtype/storm/multilang_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/multilang_test.clj b/storm-core/test/clj/backtype/storm/multilang_test.clj
index 5e7b9f3..2d3a866 100644
--- a/storm-core/test/clj/backtype/storm/multilang_test.clj
+++ b/storm-core/test/clj/backtype/storm/multilang_test.clj
@@ -62,3 +62,7 @@
 (deftest test-ruby
 (test-multilang "ruby" "rb")
 )
+
+(deftest test-node
+(test-multilang "node" "js")
+)


Mime
View raw message