storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [08/23] git commit: Support very long json objects. Explicit utf8 encoding
Date Mon, 22 Sep 2014 20:28:56 GMT
Support very long json objects. Explicit utf8 encoding


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7d8b2014
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7d8b2014
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7d8b2014

Branch: refs/heads/master
Commit: 7d8b20140c3db51991166849a29793231052fcdd
Parents: 6574592
Author: Itai Frenkel <itai@ryzyco.com>
Authored: Tue Aug 5 14:52:18 2014 +0300
Committer: Itai Frenkel <itai@ryzyco.com>
Committed: Tue Aug 5 14:52:18 2014 +0300

----------------------------------------------------------------------
 .../storm-starter/multilang/resources/storm.js  | 69 +++++++++++---------
 1 file changed, 39 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/7d8b2014/examples/storm-starter/multilang/resources/storm.js
----------------------------------------------------------------------
diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js
index 0fa6825..176c8ce 100755
--- a/examples/storm-starter/multilang/resources/storm.js
+++ b/examples/storm-starter/multilang/resources/storm.js
@@ -3,17 +3,19 @@
  * Implements the storm multilang protocol for nodejs.
  */
 
+
 var fs = require('fs');
 
 function Storm() {
-    this.lines = [];
+    this.messageParts = [];
     this.taskIdsCallbacks = [];
     this.isFirstMessage = true;
+    this.separator = '\nend\n';
 }
 
 Storm.prototype.sendMsgToParent = function(msg) {
-    var str = JSON.stringify(msg) + '\nend\n';
-    process.stdout.write(str);
+    var str = JSON.stringify(msg);
+    process.stdout.write(str + this.separator);
 }
 
 Storm.prototype.sync = function() {
@@ -40,40 +42,45 @@ Storm.prototype.initSetupInfo = function(setupInfo) {
 
 Storm.prototype.startReadingInput = function() {
     var self = this;
-
     process.stdin.on('readable', function() {
         var chunk = process.stdin.read();
+        var messages = self.handleNewChunk(chunk);
+        messages.forEach(function(message) {
+            self.handleNewMessage(message);
+        })
 
-        if (chunk && chunk.length !== 0) {
-          var lines = chunk.toString().split('\n');
-          lines.forEach(function(line) {
-              self.handleNewLine(line);
-          })
-        }
     });
 }
 
-Storm.prototype.handleNewLine = function(line) {
-    if (line === 'end') {
-        var msg = this.collectMessageLines();
-        this.cleanLines();
-        this.handleNewMessage(msg);
-    } else {
-        this.storeLine(line);
+/**
+ * receives a new string chunk and returns a list of new messages with the separator removed
+ * stores state in this.messageParts
+ * @param chunk
+ */
+Storm.prototype.handleNewChunk = function(chunk) {
+    var messages = [];
+    if (chunk && chunk.length !== 0) {
+        //"{}".split("\nend\n")           ==> ['{}']
+        //"\nend\n".split("\nend\n")      ==> [''  , '']
+        //"{}\nend\n".split("\nend\n")    ==> ['{}', '']
+        //"\nend\n{}".split("\nend\n")    ==> [''  , '{}']
+        // "{}\nend\n{}".split("\nend\n") ==> ['{}', '{}' ]
+        var newMessageParts = chunk.split(this.separator);
+        while (newMessageParts.length > 0) {
+            var messagePart = newMessageParts.shift();
+            this.messageParts.push(messagePart);
+            var anotherMessageAhead = newMessageParts.length > 0;
+            if  (anotherMessageAhead) {
+                var message = this.messageParts.join('');
+                this.messageParts = [];
+                if (message.length > 0) {
+                    messages.push(message);
+                }
+            }
+        }
     }
-}
-
-Storm.prototype.collectMessageLines = function() {
-    return this.lines.join('\n');
-}
-
-Storm.prototype.cleanLines = function() {
-    this.lines = [];
-}
-
-Storm.prototype.storeLine = function(line) {
-    this.lines.push(line);
-}
+    return messages;
+ }
 
 Storm.prototype.isTaskIds = function(msg) {
     return (msg instanceof Array);
@@ -183,6 +190,8 @@ Storm.prototype.initialize = function(conf, context, done) {
 }
 
 Storm.prototype.run = function() {
+    process.stdout.setEncoding('utf8');
+    process.stdin.setEncoding('utf8');
     this.startReadingInput();
 }
 


Mime
View raw message