toree-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lrese...@apache.org
Subject incubator-toree git commit: [TOREE-391] Treat zmq ids as bytes
Date Thu, 16 Mar 2017 22:58:00 GMT
Repository: incubator-toree
Updated Branches:
  refs/heads/master 3b6eb2926 -> ca4b86be7


[TOREE-391] Treat zmq ids as bytes

Properly handle zMQ ids as byte arrays
to avoid timeouts during Kernel restarts.

Closes #111


Project: http://git-wip-us.apache.org/repos/asf/incubator-toree/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-toree/commit/ca4b86be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-toree/tree/ca4b86be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-toree/diff/ca4b86be

Branch: refs/heads/master
Commit: ca4b86be705ce1fd0901e7d4a3dfb9b53067eb0f
Parents: 3b6eb29
Author: Jim Rhyness <jrhyness@ca.ibm.com>
Authored: Thu Mar 9 20:43:20 2017 -0500
Committer: Luciano Resende <lresende@apache.org>
Committed: Thu Mar 16 15:50:41 2017 -0700

----------------------------------------------------------------------
 .../kernel/protocol/v5/client/Utilities.scala   |  8 +++----
 .../v5/client/socket/IOPubClientSpec.scala      |  4 ++--
 .../v5/client/socket/ShellClientSpec.scala      |  4 ++--
 .../toree/communication/SocketManager.scala     | 10 ++++----
 .../actors/DealerSocketActor.scala              |  5 ++--
 .../communication/actors/PubSocketActor.scala   |  3 +--
 .../communication/actors/RepSocketActor.scala   |  5 ++--
 .../communication/actors/ReqSocketActor.scala   |  5 ++--
 .../actors/RouterSocketActor.scala              |  9 ++++---
 .../communication/actors/SubSocketActor.scala   |  2 +-
 .../communication/socket/JeroMQSocket.scala     |  7 ++++--
 .../socket/ReqSocketRunnable.scala              |  2 +-
 .../toree/communication/socket/SocketLike.scala |  2 +-
 .../communication/socket/SocketRunnable.scala   |  2 +-
 .../socket/ZeroMQSocketRunnable.scala           |  6 ++---
 .../JeroMQSocketIntegrationSpec.scala           | 24 +++++++++----------
 .../communication/socket/JeroMQSocketSpec.scala |  4 ++--
 .../socket/ZeroMQSocketRunnableSpec.scala       |  2 +-
 .../toree/kernel/api/DisplayMethods.scala       |  4 ++--
 .../apache/toree/kernel/api/StreamMethods.scala |  2 +-
 .../protocol/v5/dispatch/StatusDispatch.scala   |  2 +-
 .../v5/handler/ExecuteRequestHandler.scala      |  4 ++--
 .../kernel/protocol/v5/kernel/Utilities.scala   |  6 ++---
 .../protocol/v5/stream/KernelOutputStream.scala |  2 +-
 .../v5/handler/CommInfoRequestHandlerSpec.scala |  6 ++---
 .../handler/KernelInfoRequestHandlerSpec.scala  |  2 +-
 .../protocol/v5/kernel/UtilitiesSpec.scala      | 10 ++++----
 .../v5/relay/KernelMessageRelaySpec.scala       |  8 +++----
 .../v5/stream/KernelOuputStreamSpec.scala       |  3 ++-
 .../toree/kernel/protocol/v5/KMBuilder.scala    |  2 +-
 .../kernel/protocol/v5/KernelMessage.scala      | 25 ++++++++++++++++++--
 .../kernel/protocol/v5/KMBuilderSpec.scala      |  2 +-
 .../toree/kernel/protocol/v5/package.scala      |  4 ++--
 33 files changed, 103 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/Utilities.scala
----------------------------------------------------------------------
diff --git a/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/Utilities.scala b/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/Utilities.scala
index 274057b..6e653c4 100644
--- a/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/Utilities.scala
+++ b/client/src/main/scala/org/apache/toree/kernel/protocol/v5/client/Utilities.scala
@@ -58,9 +58,9 @@ object Utilities extends LogLike {
     val delimiterIndex: Int =
       message.frames.indexOf(ByteString("<IDS|MSG>".getBytes))
     //  TODO Handle the case where there is no delimiter
-    val ids: Seq[String] =
+    val ids: Seq[Array[Byte]] =
       message.frames.take(delimiterIndex).map(
-        (byteString : ByteString) =>  { new String(byteString.toArray) }
+        (byteString : ByteString) =>  { byteString.toArray }
       )
     val header = Json.parse(message.frames(delimiterIndex + 2)).as[Header]
     val parentHeader = Json.parse(message.frames(delimiterIndex + 3)).validate[ParentHeader].fold[ParentHeader](
@@ -80,7 +80,7 @@ object Utilities extends LogLike {
 
   implicit def KernelMessageToZMQMessage(kernelMessage : KernelMessage) : ZMQMessage = {
     val frames: scala.collection.mutable.ListBuffer[ByteString] = scala.collection.mutable.ListBuffer()
-    kernelMessage.ids.map((id : String) => frames += id )
+    kernelMessage.ids.map((id : Array[Byte]) => frames += ByteString.apply(id) )
     frames += "<IDS|MSG>"
     frames += kernelMessage.signature
     frames += Json.toJson(kernelMessage.header).toString()
@@ -106,7 +106,7 @@ object Utilities extends LogLike {
     val header = Header(
       id, "spark", sessionId, MessageType.Incoming.ExecuteRequest.toString, "5.0")
 
-    KMBuilder().withIds(Seq[String]()).withSignature("").withHeader(header)
+    KMBuilder().withIds(Seq[Array[Byte]]()).withSignature("").withHeader(header)
       .withParentHeader(HeaderBuilder.empty).withContentString(message).build
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/IOPubClientSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/IOPubClientSpec.scala b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/IOPubClientSpec.scala
index 142a3a8..98db662 100644
--- a/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/IOPubClientSpec.scala
+++ b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/IOPubClientSpec.scala
@@ -238,7 +238,7 @@ class IOPubClientSpec extends TestKit(ActorSystem(
           MessageType.Incoming.ExecuteRequest.toString, "5.0")
 
         val kernelMessage = new KernelMessage(
-          Seq[String](),
+          Seq[Array[Byte]](),
           "",
           header,
           parentHeader,
@@ -272,7 +272,7 @@ class IOPubClientSpec extends TestKit(ActorSystem(
           MessageType.Outgoing.Stream.toString, "5.0")
 
         val kernelMessage = new KernelMessage(
-          Seq[String](),
+          Seq[Array[Byte]](),
           "",
           header,
           null,

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/ShellClientSpec.scala
----------------------------------------------------------------------
diff --git a/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/ShellClientSpec.scala b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/ShellClientSpec.scala
index 382304e..72cc9ee 100644
--- a/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/ShellClientSpec.scala
+++ b/client/src/test/scala/org/apache/toree/kernel/protocol/v5/client/socket/ShellClientSpec.scala
@@ -63,7 +63,7 @@ class ShellClientSpec extends TestKit(ActorSystem("ShellActorSpec"))
           "5.0"
         )
         val kernelMessage = KernelMessage(
-          Seq[String](), "",
+          Seq[Array[Byte]](), "",
           header, HeaderBuilder.empty,
           Metadata(), Json.toJson(request).toString
         )
@@ -77,4 +77,4 @@ class ShellClientSpec extends TestKit(ActorSystem("ShellActorSpec"))
       }
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/main/scala/org/apache/toree/communication/SocketManager.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/org/apache/toree/communication/SocketManager.scala b/communication/src/main/scala/org/apache/toree/communication/SocketManager.scala
index c660848..0422334 100644
--- a/communication/src/main/scala/org/apache/toree/communication/SocketManager.scala
+++ b/communication/src/main/scala/org/apache/toree/communication/SocketManager.scala
@@ -75,7 +75,7 @@ class SocketManager {
    */
   def newReqSocket(
     address: String,
-    inboundMessageCallback: (Seq[String]) => Unit
+    inboundMessageCallback: (Seq[Array[Byte]]) => Unit
   ): SocketLike = withNewContext{ ctx =>
      new JeroMQSocket(new ReqSocketRunnable(
       ctx,
@@ -95,7 +95,7 @@ class SocketManager {
    */
   def newRepSocket(
     address: String,
-    inboundMessageCallback: (Seq[String]) => Unit
+    inboundMessageCallback: (Seq[Array[Byte]]) => Unit
   ): SocketLike = withNewContext{ ctx =>
     new JeroMQSocket(new ZeroMQSocketRunnable(
       ctx,
@@ -133,7 +133,7 @@ class SocketManager {
    */
   def newSubSocket(
     address: String,
-    inboundMessageCallback: (Seq[String]) => Unit
+    inboundMessageCallback: (Seq[Array[Byte]]) => Unit
   ): SocketLike = withNewContext { ctx =>
     new JeroMQSocket(new ZeroMQSocketRunnable(
       ctx,
@@ -155,7 +155,7 @@ class SocketManager {
    */
   def newRouterSocket(
     address: String,
-    inboundMessageCallback: (Seq[String]) => Unit
+    inboundMessageCallback: (Seq[Array[Byte]]) => Unit
   ): SocketLike = withNewContext { ctx =>
     new JeroMQSocket(new ZeroMQSocketRunnable(
       ctx,
@@ -176,7 +176,7 @@ class SocketManager {
    */
   def newDealerSocket(
     address: String,
-    inboundMessageCallback: (Seq[String]) => Unit,
+    inboundMessageCallback: (Seq[Array[Byte]]) => Unit,
     identity: String = UUID.randomUUID().toString
   ): SocketLike = withNewContext{ ctx =>
     new JeroMQSocket(new ZeroMQSocketRunnable(

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/main/scala/org/apache/toree/communication/actors/DealerSocketActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/org/apache/toree/communication/actors/DealerSocketActor.scala b/communication/src/main/scala/org/apache/toree/communication/actors/DealerSocketActor.scala
index c5b4575..ad0f7a9 100644
--- a/communication/src/main/scala/org/apache/toree/communication/actors/DealerSocketActor.scala
+++ b/communication/src/main/scala/org/apache/toree/communication/actors/DealerSocketActor.scala
@@ -33,7 +33,7 @@ class DealerSocketActor(connection: String, listener: ActorRef)
 {
   logger.debug(s"Initializing dealer socket actor for $connection")
   private val manager: SocketManager = new SocketManager
-  private val socket = manager.newDealerSocket(connection, (message: Seq[String]) => {
+  private val socket = manager.newDealerSocket(connection, (message: Seq[Array[Byte]]) => {
     listener ! ZMQMessage(message.map(ByteString.apply): _*)
   })
 
@@ -43,8 +43,7 @@ class DealerSocketActor(connection: String, listener: ActorRef)
 
   override def receive: Actor.Receive = {
     case zmqMessage: ZMQMessage =>
-      val frames = zmqMessage.frames.map(byteString =>
-        new String(byteString.toArray, ZMQ.CHARSET))
+      val frames = zmqMessage.frames.map(byteString => byteString.toArray )
       socket.send(frames: _*)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/main/scala/org/apache/toree/communication/actors/PubSocketActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/org/apache/toree/communication/actors/PubSocketActor.scala b/communication/src/main/scala/org/apache/toree/communication/actors/PubSocketActor.scala
index e0ca2f8..3a7c770 100644
--- a/communication/src/main/scala/org/apache/toree/communication/actors/PubSocketActor.scala
+++ b/communication/src/main/scala/org/apache/toree/communication/actors/PubSocketActor.scala
@@ -45,8 +45,7 @@ class PubSocketActor(connection: String)
 
   override def receive: Actor.Receive = {
     case zmqMessage: ZMQMessage => withProcessing {
-      val frames = zmqMessage.frames.map(byteString =>
-        new String(byteString.toArray, ZMQ.CHARSET))
+      val frames = zmqMessage.frames.map(byteString => byteString.toArray )
 
       socket.send(frames: _*)
     }

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/main/scala/org/apache/toree/communication/actors/RepSocketActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/org/apache/toree/communication/actors/RepSocketActor.scala b/communication/src/main/scala/org/apache/toree/communication/actors/RepSocketActor.scala
index 6a3d1ef..c406f15 100644
--- a/communication/src/main/scala/org/apache/toree/communication/actors/RepSocketActor.scala
+++ b/communication/src/main/scala/org/apache/toree/communication/actors/RepSocketActor.scala
@@ -33,7 +33,7 @@ class RepSocketActor(connection: String, listener: ActorRef)
 {
   logger.debug(s"Initializing reply socket actor for $connection")
   private val manager: SocketManager = new SocketManager
-  private val socket = manager.newRepSocket(connection, (message: Seq[String]) => {
+  private val socket = manager.newRepSocket(connection, (message: Seq[Array[Byte]]) => {
     listener ! ZMQMessage(message.map(ByteString.apply): _*)
   })
 
@@ -43,8 +43,7 @@ class RepSocketActor(connection: String, listener: ActorRef)
 
   override def receive: Actor.Receive = {
     case zmqMessage: ZMQMessage =>
-      val frames = zmqMessage.frames.map(byteString =>
-        new String(byteString.toArray, ZMQ.CHARSET))
+      val frames = zmqMessage.frames.map(byteString => byteString.toArray )
       socket.send(frames: _*)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/main/scala/org/apache/toree/communication/actors/ReqSocketActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/org/apache/toree/communication/actors/ReqSocketActor.scala b/communication/src/main/scala/org/apache/toree/communication/actors/ReqSocketActor.scala
index 71a99a5..ec22a92 100644
--- a/communication/src/main/scala/org/apache/toree/communication/actors/ReqSocketActor.scala
+++ b/communication/src/main/scala/org/apache/toree/communication/actors/ReqSocketActor.scala
@@ -33,7 +33,7 @@ class ReqSocketActor(connection: String, listener: ActorRef)
 {
   logger.debug(s"Initializing request socket actor for $connection")
   private val manager: SocketManager = new SocketManager
-  private val socket = manager.newReqSocket(connection, (message: Seq[String]) => {
+  private val socket = manager.newReqSocket(connection, (message: Seq[Array[Byte]]) => {
     listener ! ZMQMessage(message.map(ByteString.apply): _*)
   })
 
@@ -43,8 +43,7 @@ class ReqSocketActor(connection: String, listener: ActorRef)
 
   override def receive: Actor.Receive = {
     case zmqMessage: ZMQMessage =>
-      val frames = zmqMessage.frames.map(byteString =>
-        new String(byteString.toArray, ZMQ.CHARSET))
+      val frames = zmqMessage.frames.map(byteString => byteString.toArray )
       socket.send(frames: _*)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/main/scala/org/apache/toree/communication/actors/RouterSocketActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/org/apache/toree/communication/actors/RouterSocketActor.scala b/communication/src/main/scala/org/apache/toree/communication/actors/RouterSocketActor.scala
index 67cbd80..2501909 100644
--- a/communication/src/main/scala/org/apache/toree/communication/actors/RouterSocketActor.scala
+++ b/communication/src/main/scala/org/apache/toree/communication/actors/RouterSocketActor.scala
@@ -33,7 +33,7 @@ class RouterSocketActor(connection: String, listener: ActorRef)
 {
   logger.debug(s"Initializing router socket actor for $connection")
   private val manager: SocketManager = new SocketManager
-  private val socket = manager.newRouterSocket(connection, (message: Seq[String]) => {
+  private val socket = manager.newRouterSocket(connection, (message: Seq[Array[Byte]]) => {
     listener ! ZMQMessage(message.map(ByteString.apply): _*)
   })
 
@@ -43,8 +43,7 @@ class RouterSocketActor(connection: String, listener: ActorRef)
 
   override def receive: Actor.Receive = {
     case zmqMessage: ZMQMessage =>
-      val frames = zmqMessage.frames.map(byteString =>
-        new String(byteString.toArray, ZMQ.CHARSET))
-      socket.send(frames: _*)
+      val frames = zmqMessage.frames.map(byteString => byteString.toArray )
+    socket.send(frames: _*)
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/main/scala/org/apache/toree/communication/actors/SubSocketActor.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/org/apache/toree/communication/actors/SubSocketActor.scala b/communication/src/main/scala/org/apache/toree/communication/actors/SubSocketActor.scala
index 7c39fb5..0c82823 100644
--- a/communication/src/main/scala/org/apache/toree/communication/actors/SubSocketActor.scala
+++ b/communication/src/main/scala/org/apache/toree/communication/actors/SubSocketActor.scala
@@ -32,7 +32,7 @@ class SubSocketActor(connection: String, listener: ActorRef)
 {
   logger.debug(s"Initializing subscribe socket actor for $connection")
   private val manager: SocketManager = new SocketManager
-  private val socket = manager.newSubSocket(connection, (message: Seq[String]) => {
+  private val socket = manager.newSubSocket(connection, (message: Seq[Array[Byte]]) => {
     listener ! ZMQMessage(message.map(ByteString.apply): _*)
   })
 

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/main/scala/org/apache/toree/communication/socket/JeroMQSocket.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/org/apache/toree/communication/socket/JeroMQSocket.scala b/communication/src/main/scala/org/apache/toree/communication/socket/JeroMQSocket.scala
index e8942de..236945b 100644
--- a/communication/src/main/scala/org/apache/toree/communication/socket/JeroMQSocket.scala
+++ b/communication/src/main/scala/org/apache/toree/communication/socket/JeroMQSocket.scala
@@ -35,10 +35,13 @@ class JeroMQSocket(private val runnable: ZeroMQSocketRunnable)
    *
    * @param message The message to send
    */
-  override def send(message: String*): Unit = {
+  override def send(message: Array[Byte]*): Unit = {
     assert(isAlive, "Socket is not alive to be able to send messages!")
 
-    runnable.offer(ZMsg.newStringMsg(message: _*))
+    val msg = new ZMsg()
+    for( frame <- message ) msg.add( frame )
+    
+    runnable.offer( msg )
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/main/scala/org/apache/toree/communication/socket/ReqSocketRunnable.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/org/apache/toree/communication/socket/ReqSocketRunnable.scala b/communication/src/main/scala/org/apache/toree/communication/socket/ReqSocketRunnable.scala
index 81355fa..cd3d85e 100644
--- a/communication/src/main/scala/org/apache/toree/communication/socket/ReqSocketRunnable.scala
+++ b/communication/src/main/scala/org/apache/toree/communication/socket/ReqSocketRunnable.scala
@@ -31,7 +31,7 @@ import org.zeromq.ZMQ.{Socket, Context}
  */
 class ReqSocketRunnable(
   private val context: Context,
-  private val inboundMessageCallback: Option[(Seq[String]) => Unit],
+  private val inboundMessageCallback: Option[(Seq[Array[Byte]]) => Unit],
   private val socketOptions: SocketOption*
 ) extends ZeroMQSocketRunnable(
   context,

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/main/scala/org/apache/toree/communication/socket/SocketLike.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/org/apache/toree/communication/socket/SocketLike.scala b/communication/src/main/scala/org/apache/toree/communication/socket/SocketLike.scala
index 504ad32..d5b6d31 100644
--- a/communication/src/main/scala/org/apache/toree/communication/socket/SocketLike.scala
+++ b/communication/src/main/scala/org/apache/toree/communication/socket/SocketLike.scala
@@ -28,7 +28,7 @@ trait SocketLike {
    *
    * @param message The message to send
    */
-  def send(message: String*): Unit
+  def send(message: Array[Byte]*): Unit
 
   /**
    * Closes the socket, marking it no longer able to process or send messages.

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/main/scala/org/apache/toree/communication/socket/SocketRunnable.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/org/apache/toree/communication/socket/SocketRunnable.scala b/communication/src/main/scala/org/apache/toree/communication/socket/SocketRunnable.scala
index ee43bc7..8701e04 100644
--- a/communication/src/main/scala/org/apache/toree/communication/socket/SocketRunnable.scala
+++ b/communication/src/main/scala/org/apache/toree/communication/socket/SocketRunnable.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.ConcurrentLinkedQueue
  *                               through this runnable
  */
 abstract class SocketRunnable[T](
-   private val inboundMessageCallback: Option[(Seq[String]) => Unit]
+   private val inboundMessageCallback: Option[(Seq[Array[Byte]]) => Unit]
 ) extends Runnable {
 
   /** The collection of messages to be sent out through the socket. */

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/main/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnable.scala
----------------------------------------------------------------------
diff --git a/communication/src/main/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnable.scala b/communication/src/main/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnable.scala
index 377e918..0464d57 100644
--- a/communication/src/main/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnable.scala
+++ b/communication/src/main/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnable.scala
@@ -36,7 +36,7 @@ import scala.util.Try
 class ZeroMQSocketRunnable(
   private val context: Context,
   private val socketType: SocketType,
-  private val inboundMessageCallback: Option[(Seq[String]) => Unit],
+  private val inboundMessageCallback: Option[(Seq[Array[Byte]]) => Unit],
   private val socketOptions: SocketOption*
 ) extends SocketRunnable[ZMsg](inboundMessageCallback)
   with LogLike {
@@ -120,8 +120,8 @@ class ZeroMQSocketRunnable(
     flags: Int = ZMQ.DONTWAIT
   ): Unit = {
     Option(ZMsg.recvMsg(socket, flags)).foreach(zMsg => {
-      inboundMessageCallback.foreach(_(zMsg.asScala.toSeq
-        .map(zFrame => new String(zFrame.getData, ZMQ.CHARSET))
+      inboundMessageCallback.foreach(_(zMsg.asScala.toSeq    
+        .map(zFrame => zFrame.getData)  
       ))
     })
   }

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/test/scala/integration/JeroMQSocketIntegrationSpec.scala
----------------------------------------------------------------------
diff --git a/communication/src/test/scala/integration/JeroMQSocketIntegrationSpec.scala b/communication/src/test/scala/integration/JeroMQSocketIntegrationSpec.scala
index 6c2e311..2cd5b0b 100644
--- a/communication/src/test/scala/integration/JeroMQSocketIntegrationSpec.scala
+++ b/communication/src/test/scala/integration/JeroMQSocketIntegrationSpec.scala
@@ -47,8 +47,8 @@ class JeroMQSocketIntegrationSpec extends FunSpec
       it("should be able to communicate") {
         val address =s"inproc://${this.hashCode()}"
 
-        val replyMessages = new ConcurrentLinkedDeque[Seq[String]]()
-        val replyCallback: (Seq[String]) => Unit = { msg: Seq[String] =>
+        val replyMessages = new ConcurrentLinkedDeque[Seq[Array[Byte]]]()
+        val replyCallback: (Seq[Array[Byte]]) => Unit = { msg: Seq[Array[Byte]] =>
           replyMessages.offer(msg)
         }
         val reply =  socketManager.newRouterSocket(
@@ -60,8 +60,8 @@ class JeroMQSocketIntegrationSpec extends FunSpec
           reply.isReady should be (true)
         }
 
-        val requestMessages = new ConcurrentLinkedDeque[Seq[String]]()
-        val requestCallback: (Seq[String]) => Unit = { msg: Seq[String] =>
+        val requestMessages = new ConcurrentLinkedDeque[Seq[Array[Byte]]]()
+        val requestCallback: (Seq[Array[Byte]]) => Unit = { msg: Seq[Array[Byte]] =>
           requestMessages.offer(msg)
         }
         val request = socketManager.newDealerSocket(
@@ -73,7 +73,7 @@ class JeroMQSocketIntegrationSpec extends FunSpec
           request.isReady should be (true)
         }
 
-        request.send("Message from the request to the reply")
+        request.send("Message from the request to the reply".getBytes)
 
         eventually {
           replyMessages.size() should be(1)
@@ -88,8 +88,8 @@ class JeroMQSocketIntegrationSpec extends FunSpec
       it("should be able to communicate"){
         val address =s"inproc://${this.hashCode()}"
 
-        val routerMessages = new ConcurrentLinkedDeque[Seq[String]]()
-        val routerCallback: (Seq[String]) => Unit = { msg: Seq[String] =>
+        val routerMessages = new ConcurrentLinkedDeque[Seq[Array[Byte]]]()
+        val routerCallback: (Seq[Array[Byte]]) => Unit = { msg: Seq[Array[Byte]] =>
           routerMessages.offer(msg)
         }
         val router =  socketManager.newRouterSocket(
@@ -104,14 +104,14 @@ class JeroMQSocketIntegrationSpec extends FunSpec
 
         val dealer = socketManager.newDealerSocket(
           address,
-          (_: Seq[String]) => {}
+          (_: Seq[Array[Byte]]) => {}
         )
 
         eventually {
           dealer.isReady should be (true)
         }
 
-        dealer.send("Message from the dealer to the router")
+        dealer.send("Message from the dealer to the router".getBytes)
 
         eventually {
           routerMessages.size() should be(1)
@@ -135,8 +135,8 @@ class JeroMQSocketIntegrationSpec extends FunSpec
           publisher.isReady should be (true)
         }
 
-        val subscriberMessages = new ConcurrentLinkedDeque[Seq[String]]()
-        val subscriberCallback: (Seq[String]) => Unit = { msg: Seq[String] =>
+        val subscriberMessages = new ConcurrentLinkedDeque[Seq[Array[Byte]]]()
+        val subscriberCallback: (Seq[Array[Byte]]) => Unit = { msg: Seq[Array[Byte]] =>
           subscriberMessages.offer(msg)
         }
         val subscriber =  socketManager.newSubSocket(
@@ -148,7 +148,7 @@ class JeroMQSocketIntegrationSpec extends FunSpec
           subscriber.isReady should be (true)
         }
 
-        publisher.send("Message form the publisher to the subscriber")
+        publisher.send("Message form the publisher to the subscriber".getBytes)
 
         eventually {
           subscriberMessages.size() should be(1)

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/test/scala/org/apache/toree/communication/socket/JeroMQSocketSpec.scala
----------------------------------------------------------------------
diff --git a/communication/src/test/scala/org/apache/toree/communication/socket/JeroMQSocketSpec.scala b/communication/src/test/scala/org/apache/toree/communication/socket/JeroMQSocketSpec.scala
index d742a75..078a282 100644
--- a/communication/src/test/scala/org/apache/toree/communication/socket/JeroMQSocketSpec.scala
+++ b/communication/src/test/scala/org/apache/toree/communication/socket/JeroMQSocketSpec.scala
@@ -53,7 +53,7 @@ class JeroMQSocketSpec extends FunSpec with MockitoSugar
         val message: String = "Some Message"
         val expected = ZMsg.newStringMsg(message)
 
-        socket.send(message)
+        socket.send(message.getBytes)
         verify(runnable).offer(expected)
       }
 
@@ -61,7 +61,7 @@ class JeroMQSocketSpec extends FunSpec with MockitoSugar
         socket.close()
 
         intercept[AssertionError] {
-          socket.send("")
+          socket.send("".getBytes)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/communication/src/test/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnableSpec.scala
----------------------------------------------------------------------
diff --git a/communication/src/test/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnableSpec.scala b/communication/src/test/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnableSpec.scala
index d16f7a7..aa0e993 100644
--- a/communication/src/test/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnableSpec.scala
+++ b/communication/src/test/scala/org/apache/toree/communication/socket/ZeroMQSocketRunnableSpec.scala
@@ -42,7 +42,7 @@ class ZeroMQSocketRunnableSpec extends FunSpec with Matchers
     private val socket: ZMQ.Socket,
     private val context: Context,
     private val socketType: SocketType,
-    private val inboundMessageCallback: Option[(Seq[String]) => Unit],
+    private val inboundMessageCallback: Option[(Seq[Array[Byte]]) => Unit],
     private val socketOptions: SocketOption*
   ) extends ZeroMQSocketRunnable(
     context,

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/kernel/src/main/scala/org/apache/toree/kernel/api/DisplayMethods.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/api/DisplayMethods.scala b/kernel/src/main/scala/org/apache/toree/kernel/api/DisplayMethods.scala
index 9cf5e29..d6b17e1 100644
--- a/kernel/src/main/scala/org/apache/toree/kernel/api/DisplayMethods.scala
+++ b/kernel/src/main/scala/org/apache/toree/kernel/api/DisplayMethods.scala
@@ -38,7 +38,7 @@ class DisplayMethods(
     val displayData = v5.content.DisplayData("user", Map(mimeType -> data), Map())
 
     val kernelMessage = kmBuilder
-      .withIds(Seq(v5.content.DisplayData.toTypeString))
+      .withIds(Seq(v5.content.DisplayData.toTypeString.getBytes))
       .withHeader(v5.content.DisplayData.toTypeString)
       .withContentString(displayData).build
 
@@ -49,7 +49,7 @@ class DisplayMethods(
     val clearOutput = v5.content.ClearOutput(wait)
 
     val kernelMessage = kmBuilder
-      .withIds(Seq(v5.content.ClearOutput.toTypeString))
+      .withIds(Seq(v5.content.ClearOutput.toTypeString.getBytes))
       .withHeader(v5.content.ClearOutput.toTypeString)
       .withContentString(clearOutput).build
 

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/kernel/src/main/scala/org/apache/toree/kernel/api/StreamMethods.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/api/StreamMethods.scala b/kernel/src/main/scala/org/apache/toree/kernel/api/StreamMethods.scala
index 99ac5e4..56e2051 100644
--- a/kernel/src/main/scala/org/apache/toree/kernel/api/StreamMethods.scala
+++ b/kernel/src/main/scala/org/apache/toree/kernel/api/StreamMethods.scala
@@ -30,7 +30,7 @@ class StreamMethods(actorLoader: ActorLoader, parentMessage: KernelMessage)
 {
   private[api] val kmBuilder = v5.KMBuilder()
     .withParent(parentMessage)
-    .withIds(Seq(v5.content.StreamContent.toTypeString))
+    .withIds(Seq(v5.content.StreamContent.toTypeString.getBytes))
     .withHeader(v5.content.StreamContent.toTypeString)
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/dispatch/StatusDispatch.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/dispatch/StatusDispatch.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/dispatch/StatusDispatch.scala
index 76e326c..5a148d6 100644
--- a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/dispatch/StatusDispatch.scala
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/dispatch/StatusDispatch.scala
@@ -29,7 +29,7 @@ class StatusDispatch(actorLoader: ActorLoader) extends Actor with LogLike {
   private def sendStatusMessage(kernelStatus: KernelStatusType, parentHeader: Header) {
     //  Create the status message and send it to the relay
     val km : KernelMessage = KMBuilder()
-      .withIds(Seq(MessageType.Outgoing.Status.toString))
+      .withIds(Seq(MessageType.Outgoing.Status.toString.getBytes))
       .withSignature("")
       .withHeader(MessageType.Outgoing.Status)
       .withParentHeader(parentHeader)

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/handler/ExecuteRequestHandler.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/handler/ExecuteRequestHandler.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/handler/ExecuteRequestHandler.scala
index a59608d..96c306e 100644
--- a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/handler/ExecuteRequestHandler.scala
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/handler/ExecuteRequestHandler.scala
@@ -98,7 +98,7 @@ class ExecuteRequestHandler(
           //  Send an ExecuteResult with the result of the code execution
           if (executeResult.hasContent) {
             val executeResultMsg = skeletonBuilder
-              .withIds(Seq(MessageType.Outgoing.ExecuteResult.toString))
+              .withIds(Seq(MessageType.Outgoing.ExecuteResult.toString.getBytes))
               .withHeader(MessageType.Outgoing.ExecuteResult)
               .withContentString(executeResult).build
             relayMsg(executeResultMsg, relayActor)
@@ -166,4 +166,4 @@ class ExecuteRequestHandler(
     logKernelMessageAction("Sending to KernelMessageRelay.", km)
     relayActor ! km
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/Utilities.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/Utilities.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/Utilities.scala
index 6ef4c06..6f5d060 100644
--- a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/Utilities.scala
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/kernel/Utilities.scala
@@ -55,9 +55,9 @@ object Utilities extends LogLike {
     val delimiterIndex: Int =
       message.frames.indexOf(ByteString("<IDS|MSG>".getBytes))
     //  TODO Handle the case where there is no delimiter
-    val ids: Seq[String] =
+    val ids: Seq[Array[Byte]] =
       message.frames.take(delimiterIndex).map(
-        (byteString : ByteString) =>  { new String(byteString.toArray) }
+        (byteString : ByteString) =>  { byteString.toArray }
       )
     val header = Json.parse(message.frames(delimiterIndex + 2)).as[Header]
     // TODO: Investigate better solution than setting parentHeader to null for {}
@@ -78,7 +78,7 @@ object Utilities extends LogLike {
 
   implicit def KernelMessageToZMQMessage(kernelMessage : KernelMessage) : ZMQMessage = {
     val frames: scala.collection.mutable.ListBuffer[ByteString] = scala.collection.mutable.ListBuffer()
-    kernelMessage.ids.map((id : String) => frames += id )
+    kernelMessage.ids.map((id : Array[Byte]) => frames += ByteString.apply(id) )
     frames += "<IDS|MSG>"
     frames += kernelMessage.signature
     frames += Json.toJson(kernelMessage.header).toString()

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/stream/KernelOutputStream.scala
----------------------------------------------------------------------
diff --git a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/stream/KernelOutputStream.scala b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/stream/KernelOutputStream.scala
index 65c5874..5f798b2 100644
--- a/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/stream/KernelOutputStream.scala
+++ b/kernel/src/main/scala/org/apache/toree/kernel/protocol/v5/stream/KernelOutputStream.scala
@@ -106,7 +106,7 @@ class KernelOutputStream(
     )
 
     val kernelMessage = kmBuilder
-      .withIds(Seq(MessageType.Outgoing.Stream.toString))
+      .withIds(Seq(MessageType.Outgoing.Stream.toString.getBytes))
       .withHeader(MessageType.Outgoing.Stream)
       .withContentString(streamContent).build
 

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/handler/CommInfoRequestHandlerSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/handler/CommInfoRequestHandlerSpec.scala b/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/handler/CommInfoRequestHandlerSpec.scala
index 0e30747..e60fdec 100644
--- a/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/handler/CommInfoRequestHandlerSpec.scala
+++ b/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/handler/CommInfoRequestHandlerSpec.scala
@@ -65,7 +65,7 @@ class CommInfoRequestHandlerSpec extends TestKit(
   describe("Comm Info Request Handler") {
     it("should return a KernelMessage containing a comm info response for a specific target name") {
       val kernelMessage = new KernelMessage(
-        Seq[String](), "test message", header, header, Map[String, String](), "{\"target_name\":\"test.name\"}"
+        Seq[Array[Byte]](), "test message", header, header, Map[String, String](), "{\"target_name\":\"test.name\"}"
       )
 
       when(mockCommStorage.getTargets()).thenReturn(Set("test.name"))
@@ -83,7 +83,7 @@ class CommInfoRequestHandlerSpec extends TestKit(
 
   it("should return a KernelMessage containing a comm info response for all comms when target_name is missing from the message") {
     val kernelMessage = new KernelMessage(
-      Seq[String](), "test message", header, header, Map[String, String](), "{}"
+      Seq[Array[Byte]](), "test message", header, header, Map[String, String](), "{}"
     )
 
     when(mockCommStorage.getTargets()).thenReturn(Set("test.name1", "test.name2"))
@@ -103,7 +103,7 @@ class CommInfoRequestHandlerSpec extends TestKit(
 
   it("should return a KernelMessage containing an empty comm info response when the target name value is not found") {
     val kernelMessage = new KernelMessage(
-      Seq[String](), "test message", header, header, Map[String, String](), "{\"target_name\":\"can't_find_me\"}"
+      Seq[Array[Byte]](), "test message", header, header, Map[String, String](), "{\"target_name\":\"can't_find_me\"}"
     )
 
     when(mockCommStorage.getTargets()).thenReturn(Set("test.name"))

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/handler/KernelInfoRequestHandlerSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/handler/KernelInfoRequestHandlerSpec.scala b/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/handler/KernelInfoRequestHandlerSpec.scala
index a4af1e5..f477582 100644
--- a/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/handler/KernelInfoRequestHandlerSpec.scala
+++ b/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/handler/KernelInfoRequestHandlerSpec.scala
@@ -56,7 +56,7 @@ class KernelInfoRequestHandlerSpec extends TestKit(
 
   val header = Header("","","","","")
   val kernelMessage = new KernelMessage(
-    Seq[String](), "test message", header, header, Map[String, String](), "{}"
+    Seq[Array[Byte]](), "test message", header, header, Map[String, String](), "{}"
   )
 
   describe("Kernel Info Request Handler") {

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/kernel/UtilitiesSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/kernel/UtilitiesSpec.scala b/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/kernel/UtilitiesSpec.scala
index f2f21b7..cfae79b 100644
--- a/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/kernel/UtilitiesSpec.scala
+++ b/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/kernel/UtilitiesSpec.scala
@@ -33,7 +33,7 @@ class UtilitiesSpec extends FunSpec with Matchers {
     "<PARENT-UUID>", "<PARENT-STRING>", "<PARENT-UUID>", "<PARENT-STRING>", "<PARENT-FLOAT>"
   )
   val kernelMessage = KernelMessage(
-    Seq("<STRING-1>","<STRING-2>"),
+    Seq("<STRING-1>","<STRING-2>").map(x => x.getBytes),
     "<SIGNATURE>", header, parentHeader, Map(), "<STRING>"
   )
 
@@ -69,13 +69,13 @@ class UtilitiesSpec extends FunSpec with Matchers {
   describe("Utilities") {
     describe("implicit #KernelMessageToZMQMessage") {
       it("should correctly convert a kernel message to a ZMQMessage") {
-        Utilities.KernelMessageToZMQMessage(kernelMessage) should be (zmqMessage)
+        Utilities.KernelMessageToZMQMessage(kernelMessage) should equal (zmqMessage)
       }
     }
 
     describe("implicit #ZMQMessageToKernelMessage") {
       it("should correctly convert a ZMQMessage to a kernel message") {
-        Utilities.ZMQMessageToKernelMessage(zmqMessage) should be (kernelMessage)
+        Utilities.ZMQMessageToKernelMessage(zmqMessage) should equal (kernelMessage)
       }
     }
 
@@ -83,12 +83,12 @@ class UtilitiesSpec extends FunSpec with Matchers {
       it("should convert back to the original message, ZMQ -> Kernel -> ZMQ") {
         Utilities.KernelMessageToZMQMessage(
           Utilities.ZMQMessageToKernelMessage(zmqMessage)
-        ) should be (zmqMessage)
+        ) should equal (zmqMessage)
       }
       it("should convert back to the original message, Kernel -> ZMQ -> Kernel") {
         Utilities.ZMQMessageToKernelMessage(
           Utilities.KernelMessageToZMQMessage(kernelMessage)
-        ) should be (kernelMessage)
+        ) should equal (kernelMessage)
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/relay/KernelMessageRelaySpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/relay/KernelMessageRelaySpec.scala b/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/relay/KernelMessageRelaySpec.scala
index 0dbcc09..2935f4b 100644
--- a/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/relay/KernelMessageRelaySpec.scala
+++ b/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/relay/KernelMessageRelaySpec.scala
@@ -54,10 +54,10 @@ class KernelMessageRelaySpec extends TestKit(
     "<TYPE>", "<VERSION>")
   private val parentHeader: Header = Header("<PARENT-UUID>", "<PARENT-USER>",
     "<PARENT-SESSION>", "<PARENT-TYPE>", "<PARENT-VERSION>")
-  private val incomingKernelMessage: KernelMessage = KernelMessage(Seq("<ID>"),
+  private val incomingKernelMessage: KernelMessage = KernelMessage(Seq("<ID>".getBytes),
     "<SIGNATURE>", header.copy(msg_type = IncomingMessageType),
     parentHeader, Metadata(), "<CONTENT>")
-  private val outgoingKernelMessage: KernelMessage = KernelMessage(Seq("<ID>"),
+  private val outgoingKernelMessage: KernelMessage = KernelMessage(Seq("<ID>".getBytes),
     "<SIGNATURE>", header.copy(msg_type = OutgoingMessageType),
     incomingKernelMessage.header, Metadata(), "<CONTENT>")
   private val incomingZmqStrings = "1" :: "2" :: "3" :: "4" :: Nil
@@ -221,7 +221,7 @@ class KernelMessageRelaySpec extends TestKit(
   def sendKernelMessages(n: Int, kernelMessageRelay: ActorRef): Unit ={
     // Sends n messages to the relay
     (0 until n).foreach (i => {
-      val km = KernelMessage(Seq("<ID>"), s"${i}",
+      val km = KernelMessage(Seq("<ID>".getBytes), s"${i}",
         header.copy(msg_type = IncomingMessageType), parentHeader,
         Metadata(), s"${i}")
       kernelMessageRelay ! Tuple2(Seq("SomeString"), km)
@@ -245,4 +245,4 @@ case class ChaoticActor[U](receiveFunc : Any => U) extends Actor {
         }
       }).start()
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/stream/KernelOuputStreamSpec.scala
----------------------------------------------------------------------
diff --git a/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/stream/KernelOuputStreamSpec.scala b/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/stream/KernelOuputStreamSpec.scala
index a74946d..c9ee2aa 100644
--- a/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/stream/KernelOuputStreamSpec.scala
+++ b/kernel/src/test/scala/org/apache/toree/kernel/protocol/v5/stream/KernelOuputStreamSpec.scala
@@ -237,7 +237,8 @@ class KernelOuputStreamSpec
         Then("the ids should be set to execute_result")
         val message = kernelOutputRelayProbe
           .receiveOne(MaxAkkaTestTimeout).asInstanceOf[KernelMessage]
-        message.ids should be (Seq(MessageType.Outgoing.Stream.toString))
+        
+        message.ids(0).deep should equal (MessageType.Outgoing.Stream.toString.getBytes.deep)
       }
 
       it("should set the message type in the header of the kernel message to an execute_result") {

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/protocol/src/main/scala/org/apache/toree/kernel/protocol/v5/KMBuilder.scala
----------------------------------------------------------------------
diff --git a/protocol/src/main/scala/org/apache/toree/kernel/protocol/v5/KMBuilder.scala b/protocol/src/main/scala/org/apache/toree/kernel/protocol/v5/KMBuilder.scala
index 70eeb4b..ccc69a3 100644
--- a/protocol/src/main/scala/org/apache/toree/kernel/protocol/v5/KMBuilder.scala
+++ b/protocol/src/main/scala/org/apache/toree/kernel/protocol/v5/KMBuilder.scala
@@ -48,7 +48,7 @@ case class KMBuilder(km: KernelMessage = KernelMessage(
                                             )) {
   require(km != null)
 
-  def withIds(newVal: Seq[String]) : KMBuilder =
+  def withIds(newVal: Seq[Array[Byte]]) : KMBuilder =
     KMBuilder(this.km.copy(ids = newVal))
 
   def withSignature(newVal: String) : KMBuilder =

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/protocol/src/main/scala/org/apache/toree/kernel/protocol/v5/KernelMessage.scala
----------------------------------------------------------------------
diff --git a/protocol/src/main/scala/org/apache/toree/kernel/protocol/v5/KernelMessage.scala b/protocol/src/main/scala/org/apache/toree/kernel/protocol/v5/KernelMessage.scala
index 6ef6663..8eeb536 100644
--- a/protocol/src/main/scala/org/apache/toree/kernel/protocol/v5/KernelMessage.scala
+++ b/protocol/src/main/scala/org/apache/toree/kernel/protocol/v5/KernelMessage.scala
@@ -18,10 +18,31 @@
 package org.apache.toree.kernel.protocol.v5
 
 case class KernelMessage(
-  ids: Seq[String],
+  ids: Seq[Array[Byte]],
   signature: String,
   header: Header,
   parentHeader: ParentHeader, // TODO: This can be an empty json object of {}
   metadata: Metadata,
   contentString: String
-)
+) 
+{
+  override def equals ( o: Any ) = o match {
+    case km: KernelMessage => {
+      var equal = ( ids.length == km.ids.length && signature == km.signature && header == km.header && parentHeader == km.parentHeader && metadata == km.metadata && contentString == km.contentString )
+      var i = ids.length
+      while ( equal && ( 0 < i ) ) {
+        i = i - 1
+        equal = (ids(i).deep == km.ids(i).deep )
+      }
+      equal = true
+      equal
+    }
+    case _ => false
+  }
+
+  override def hashCode: Int = { 
+    var z = signature.## + header.## + parentHeader.## + metadata.## + contentString.##
+    for( id <- ids ) for ( b <- id ) { z += b }
+    z
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/KMBuilderSpec.scala
----------------------------------------------------------------------
diff --git a/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/KMBuilderSpec.scala b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/KMBuilderSpec.scala
index afdf5b1..01132a8 100644
--- a/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/KMBuilderSpec.scala
+++ b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/KMBuilderSpec.scala
@@ -73,7 +73,7 @@ class KMBuilderSpec extends FunSpec with Matchers {
     describe("withXYZ"){
       describe("#withIds"){
         it("should produce a KMBuilder with a KernelMessage with ids set") {
-          val ids = Seq("baos", "win")
+          val ids = Seq("baos", "win").map(x => x.getBytes)
           val builder = KMBuilder().withIds(ids)
           builder.km.ids should be (ids)
         }

http://git-wip-us.apache.org/repos/asf/incubator-toree/blob/ca4b86be/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/package.scala
----------------------------------------------------------------------
diff --git a/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/package.scala b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/package.scala
index e30941a..33d2f6b 100644
--- a/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/package.scala
+++ b/protocol/src/test/scala/org/apache/toree/kernel/protocol/v5/package.scala
@@ -30,7 +30,7 @@ package object v5Test {
   val MockParenHeader: Header = Header("<PARENT-UUID>","<PARENT-USER>","<PARENT-SESSION>",
     MessageType.Outgoing.ClearOutput.toString, "<PARENT-VERSION>")
   //  The actual kernel message
-  val MockKernelMessage : KernelMessage = KernelMessage(Seq("<ID>"), "<SIGNATURE>", MockHeader,
+  val MockKernelMessage : KernelMessage = KernelMessage(Seq("<ID>".getBytes), "<SIGNATURE>", MockHeader,
     MockParenHeader, Metadata(), "<CONTENT>")
   //  Use the implicit to convert the KernelMessage to ZMQMessage
   //val MockZMQMessage : ZMQMessage = MockKernelMessage
@@ -41,7 +41,7 @@ package object v5Test {
     contentString =  Json.toJson(MockExecuteRequest).toString
   )
   val MockKernelMessageWithBadExecuteRequest = new KernelMessage(
-    Seq[String](), "test message", MockHeader, MockParenHeader, Map[String, String](),
+    Seq[Array[Byte]](), "test message", MockHeader, MockParenHeader, Map[String, String](),
     """
         {"code" : 124 }
     """


Mime
View raw message