incubator-s4-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mmo...@apache.org
Subject [20/50] [abbrv] git commit: Rename packages in preparation for move to Apache
Date Tue, 03 Jan 2012 11:19:15 GMT
Rename packages in preparation for move to Apache


Project: http://git-wip-us.apache.org/repos/asf/incubator-s4/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-s4/commit/01d2dd8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-s4/tree/01d2dd8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-s4/diff/01d2dd8a

Branch: refs/heads/dev
Commit: 01d2dd8a4030696bb1debdb209a26f2af61f27cb
Parents: 2f93667
Author: Bruce Robbins <robbins@everychoose-lm.corp.yahoo.com>
Authored: Sun Nov 20 22:17:32 2011 -0800
Committer: Bruce Robbins <robbins@everychoose-lm.corp.yahoo.com>
Committed: Sun Nov 20 22:17:32 2011 -0800

----------------------------------------------------------------------
 .../src/main/java/io/s4/client/example/Inject.java |   91 --
 .../src/main/java/io/s4/client/example/Read.java   |  102 ---
 .../main/java/io/s4/client/example/Request.java    |  119 ---
 .../java/org/apache/s4/client/example/Inject.java  |   91 ++
 .../java/org/apache/s4/client/example/Read.java    |  102 +++
 .../java/org/apache/s4/client/example/Request.java |  119 +++
 .../java/src/main/java/io/s4/client/Driver.java    |  693 ---------------
 .../java/src/main/java/io/s4/client/IOChannel.java |   26 -
 .../java/src/main/java/io/s4/client/Message.java   |   97 --
 .../java/src/main/java/io/s4/client/ReadMode.java  |   20 -
 .../java/src/main/java/io/s4/client/WriteMode.java |   20 -
 .../java/io/s4/client/util/ByteArrayIOChannel.java |  112 ---
 .../src/main/java/org/apache/s4/client/Driver.java |  693 +++++++++++++++
 .../main/java/org/apache/s4/client/IOChannel.java  |   26 +
 .../main/java/org/apache/s4/client/Message.java    |   97 ++
 .../main/java/org/apache/s4/client/ReadMode.java   |   20 +
 .../main/java/org/apache/s4/client/WriteMode.java  |   20 +
 .../apache/s4/client/util/ByteArrayIOChannel.java  |  112 +++
 18 files changed, 1280 insertions(+), 1280 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/01d2dd8a/s4-driver/examples/src/main/java/io/s4/client/example/Inject.java
----------------------------------------------------------------------
diff --git a/s4-driver/examples/src/main/java/io/s4/client/example/Inject.java b/s4-driver/examples/src/main/java/io/s4/client/example/Inject.java
deleted file mode 100644
index e258770..0000000
--- a/s4-driver/examples/src/main/java/io/s4/client/example/Inject.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.client.example;
-
-import io.s4.client.Driver;
-import io.s4.client.Message;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.Reader;
-
-public class Inject {
-    public static void main(String[] args) {
-        if (args.length < 1) {
-            System.err.println("No host name specified");
-            System.exit(1);
-        }
-        String hostName = args[0];
-        
-        if (args.length < 2) {
-            System.err.println("No port specified");
-            System.exit(1);
-        }
-        
-        int port = -1;
-        try {
-            port = Integer.parseInt(args[1]);
-        } catch (NumberFormatException nfe) {
-            System.err.println("Bad port number specified: " + args[1]);
-            System.exit(1);
-        }
-        
-        if (args.length < 3) {
-            System.err.println("No stream name specified");
-            System.exit(1);
-        }
-        String streamName = args[2];
-        
-        if (args.length < 4) {
-            System.err.println("No class name specified");
-            System.exit(1);
-        }
-        String clazz = args[3];       
-        
-        Driver d = new Driver(hostName, port);
-        Reader inputReader = null;
-        BufferedReader br = null;
-        try {
-            if (!d.init()) {
-                System.err.println("Driver initialization failed");
-                System.exit(1);
-            }
-            
-            if (!d.connect()) {
-                System.err.println("Driver initialization failed");
-                System.exit(1);           
-            }
-            
-            inputReader = new InputStreamReader(System.in);
-            br = new BufferedReader(inputReader);
-
-            for  (String inputLine = null; (inputLine = br.readLine()) != null;) {
-                Message m = new Message(streamName, clazz, inputLine);
-                d.send(m);
-            }
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        finally {
-            try { d.disconnect(); } catch (Exception e) {}
-            try { br.close(); } catch (Exception e) {}
-            try { inputReader.close(); } catch (Exception e) {}
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/01d2dd8a/s4-driver/examples/src/main/java/io/s4/client/example/Read.java
----------------------------------------------------------------------
diff --git a/s4-driver/examples/src/main/java/io/s4/client/example/Read.java b/s4-driver/examples/src/main/java/io/s4/client/example/Read.java
deleted file mode 100644
index 3ab8abf..0000000
--- a/s4-driver/examples/src/main/java/io/s4/client/example/Read.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.client.example;
-
-import io.s4.client.Driver;
-import io.s4.client.Message;
-import io.s4.client.ReadMode;
-import io.s4.client.WriteMode;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.Reader;
-import java.util.List;
-
-public class Read {
-    public static void main(String[] args) {
-        if (args.length < 1) {
-            System.err.println("No host name specified");
-            System.exit(1);
-        }
-        String hostName = args[0];
-        
-        if (args.length < 2) {
-            System.err.println("No port specified");
-            System.exit(1);
-        }
-        
-        int port = -1;
-        try {
-            port = Integer.parseInt(args[1]);
-        } catch (NumberFormatException nfe) {
-            System.err.println("Bad port number specified: " + args[1]);
-            System.exit(1);
-        } 
-        
-        String outputStreamsString = "";
-        if (args.length >= 3) {
-            outputStreamsString = args[2];
-        }
-
-        String[] outputStreams = new String[0];
-        if (outputStreamsString.length() > 0 && !outputStreamsString.equals("-")) {
-            outputStreams = outputStreamsString.split(" ");
-        }
-        
-        String displayId = "read";
-        if (args.length >= 4) {
-            displayId = args[3];
-        }    
-        
-        Driver d = new Driver(hostName, port);
-        Reader inputReader = null;
-        BufferedReader br = null;
-        try {
-            if (!d.init()) {
-                System.err.println("Driver initialization failed");
-                System.exit(1);
-            }
-            
-            d.setReadMode(ReadMode.All);
-            if (outputStreams.length > 0) {
-                d.setReadMode(ReadMode.Select);
-                for (String outputStream : outputStreams) {
-                    System.out.printf("Registering output stream name '%s'\n", outputStream);
-                    d.readInclude(outputStream);
-                }
-            }
-            
-            if (!d.connect()) {
-                System.err.println("Driver initialization failed");
-                System.exit(1);           
-            }
-
-            // read all responses
-            while (true) {
-                Message response = d.recv();
-                System.out.println(displayId + ":" + response);
-            }
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        finally {
-            try { d.disconnect(); } catch (Exception e) {}
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/01d2dd8a/s4-driver/examples/src/main/java/io/s4/client/example/Request.java
----------------------------------------------------------------------
diff --git a/s4-driver/examples/src/main/java/io/s4/client/example/Request.java b/s4-driver/examples/src/main/java/io/s4/client/example/Request.java
deleted file mode 100644
index 5076d83..0000000
--- a/s4-driver/examples/src/main/java/io/s4/client/example/Request.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *          http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.client.example;
-
-import io.s4.client.Driver;
-import io.s4.client.Message;
-import io.s4.client.ReadMode;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.Reader;
-
-public class Request {
-    public static void main(String[] args) {
-        if (args.length < 1) {
-            System.err.println("No host name specified");
-            System.exit(1);
-        }
-        String hostName = args[0];
-        
-        if (args.length < 2) {
-            System.err.println("No port specified");
-            System.exit(1);
-        }
-        
-        int port = -1;
-        try {
-            port = Integer.parseInt(args[1]);
-        } catch (NumberFormatException nfe) {
-            System.err.println("Bad port number specified: " + args[1]);
-            System.exit(1);
-        }
-        
-        if (args.length < 3) {
-            System.err.println("No stream name specified");
-            System.exit(1);
-        }
-        String streamName = args[2];
-        
-        if (args.length < 4) {
-            System.err.println("No class name specified");
-            System.exit(1);
-        }
-        String clazz = args[3];   
-        
-        String outputStreamsString = "";
-        if (args.length == 5) {
-            outputStreamsString = args[4];
-        }
-
-        String[] outputStreams = outputStreamsString.split(" ");
-        
-        Driver d = new Driver(hostName, port);
-        Reader inputReader = null;
-        BufferedReader br = null;
-        try {
-            if (!d.init()) {
-                System.err.println("Driver initialization failed");
-                System.exit(1);
-            }
-            
-            d.setReadMode(ReadMode.All);
-            if (outputStreams.length > 0) {
-                d.setReadMode(ReadMode.Select);
-                for (String outputStream : outputStreams) {
-                    System.out.printf("Registering output stream name '%s'\n", outputStream);
-                    d.readInclude(outputStream);
-                }
-            }
-            
-            if (!d.connect()) {
-                System.err.println("Driver initialization failed");
-                System.exit(1);           
-            }
-            
-            inputReader = new InputStreamReader(System.in);
-            br = new BufferedReader(inputReader);
-
-            // send all messages
-            for  (String inputLine = null; (inputLine = br.readLine()) != null;) {
-                Message m = new Message(streamName, clazz, inputLine);
-                d.send(m);
-            }
-            
-            // read all responses
-            while (true) {
-                Message response = d.recv();
-                System.out.println(response);
-            }
-            /*List<Message> responses = d.recvAll(999999999);
-            for (Message message : responses) {
-                System.out.println(message);
-            }*/
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        finally {
-            try { d.disconnect(); } catch (Exception e) {}
-            try { br.close(); } catch (Exception e) {}
-            try { inputReader.close(); } catch (Exception e) {}
-        }
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/01d2dd8a/s4-driver/examples/src/main/java/org/apache/s4/client/example/Inject.java
----------------------------------------------------------------------
diff --git a/s4-driver/examples/src/main/java/org/apache/s4/client/example/Inject.java b/s4-driver/examples/src/main/java/org/apache/s4/client/example/Inject.java
new file mode 100644
index 0000000..aef14ea
--- /dev/null
+++ b/s4-driver/examples/src/main/java/org/apache/s4/client/example/Inject.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.client.example;
+
+import org.apache.s4.client.Driver;
+import org.apache.s4.client.Message;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+
+public class Inject {
+    public static void main(String[] args) {
+        if (args.length < 1) {
+            System.err.println("No host name specified");
+            System.exit(1);
+        }
+        String hostName = args[0];
+        
+        if (args.length < 2) {
+            System.err.println("No port specified");
+            System.exit(1);
+        }
+        
+        int port = -1;
+        try {
+            port = Integer.parseInt(args[1]);
+        } catch (NumberFormatException nfe) {
+            System.err.println("Bad port number specified: " + args[1]);
+            System.exit(1);
+        }
+        
+        if (args.length < 3) {
+            System.err.println("No stream name specified");
+            System.exit(1);
+        }
+        String streamName = args[2];
+        
+        if (args.length < 4) {
+            System.err.println("No class name specified");
+            System.exit(1);
+        }
+        String clazz = args[3];       
+        
+        Driver d = new Driver(hostName, port);
+        Reader inputReader = null;
+        BufferedReader br = null;
+        try {
+            if (!d.init()) {
+                System.err.println("Driver initialization failed");
+                System.exit(1);
+            }
+            
+            if (!d.connect()) {
+                System.err.println("Driver initialization failed");
+                System.exit(1);           
+            }
+            
+            inputReader = new InputStreamReader(System.in);
+            br = new BufferedReader(inputReader);
+
+            for  (String inputLine = null; (inputLine = br.readLine()) != null;) {
+                Message m = new Message(streamName, clazz, inputLine);
+                d.send(m);
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        finally {
+            try { d.disconnect(); } catch (Exception e) {}
+            try { br.close(); } catch (Exception e) {}
+            try { inputReader.close(); } catch (Exception e) {}
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/01d2dd8a/s4-driver/examples/src/main/java/org/apache/s4/client/example/Read.java
----------------------------------------------------------------------
diff --git a/s4-driver/examples/src/main/java/org/apache/s4/client/example/Read.java b/s4-driver/examples/src/main/java/org/apache/s4/client/example/Read.java
new file mode 100644
index 0000000..206f968
--- /dev/null
+++ b/s4-driver/examples/src/main/java/org/apache/s4/client/example/Read.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.client.example;
+
+import org.apache.s4.client.Driver;
+import org.apache.s4.client.Message;
+import org.apache.s4.client.ReadMode;
+import org.apache.s4.client.WriteMode;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.List;
+
+public class Read {
+    public static void main(String[] args) {
+        if (args.length < 1) {
+            System.err.println("No host name specified");
+            System.exit(1);
+        }
+        String hostName = args[0];
+        
+        if (args.length < 2) {
+            System.err.println("No port specified");
+            System.exit(1);
+        }
+        
+        int port = -1;
+        try {
+            port = Integer.parseInt(args[1]);
+        } catch (NumberFormatException nfe) {
+            System.err.println("Bad port number specified: " + args[1]);
+            System.exit(1);
+        } 
+        
+        String outputStreamsString = "";
+        if (args.length >= 3) {
+            outputStreamsString = args[2];
+        }
+
+        String[] outputStreams = new String[0];
+        if (outputStreamsString.length() > 0 && !outputStreamsString.equals("-")) {
+            outputStreams = outputStreamsString.split(" ");
+        }
+        
+        String displayId = "read";
+        if (args.length >= 4) {
+            displayId = args[3];
+        }    
+        
+        Driver d = new Driver(hostName, port);
+        Reader inputReader = null;
+        BufferedReader br = null;
+        try {
+            if (!d.init()) {
+                System.err.println("Driver initialization failed");
+                System.exit(1);
+            }
+            
+            d.setReadMode(ReadMode.All);
+            if (outputStreams.length > 0) {
+                d.setReadMode(ReadMode.Select);
+                for (String outputStream : outputStreams) {
+                    System.out.printf("Registering output stream name '%s'\n", outputStream);
+                    d.readInclude(outputStream);
+                }
+            }
+            
+            if (!d.connect()) {
+                System.err.println("Driver initialization failed");
+                System.exit(1);           
+            }
+
+            // read all responses
+            while (true) {
+                Message response = d.recv();
+                System.out.println(displayId + ":" + response);
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        finally {
+            try { d.disconnect(); } catch (Exception e) {}
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/01d2dd8a/s4-driver/examples/src/main/java/org/apache/s4/client/example/Request.java
----------------------------------------------------------------------
diff --git a/s4-driver/examples/src/main/java/org/apache/s4/client/example/Request.java b/s4-driver/examples/src/main/java/org/apache/s4/client/example/Request.java
new file mode 100644
index 0000000..e256250
--- /dev/null
+++ b/s4-driver/examples/src/main/java/org/apache/s4/client/example/Request.java
@@ -0,0 +1,119 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *          http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.client.example;
+
+import org.apache.s4.client.Driver;
+import org.apache.s4.client.Message;
+import org.apache.s4.client.ReadMode;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+
+public class Request {
+    public static void main(String[] args) {
+        if (args.length < 1) {
+            System.err.println("No host name specified");
+            System.exit(1);
+        }
+        String hostName = args[0];
+        
+        if (args.length < 2) {
+            System.err.println("No port specified");
+            System.exit(1);
+        }
+        
+        int port = -1;
+        try {
+            port = Integer.parseInt(args[1]);
+        } catch (NumberFormatException nfe) {
+            System.err.println("Bad port number specified: " + args[1]);
+            System.exit(1);
+        }
+        
+        if (args.length < 3) {
+            System.err.println("No stream name specified");
+            System.exit(1);
+        }
+        String streamName = args[2];
+        
+        if (args.length < 4) {
+            System.err.println("No class name specified");
+            System.exit(1);
+        }
+        String clazz = args[3];   
+        
+        String outputStreamsString = "";
+        if (args.length == 5) {
+            outputStreamsString = args[4];
+        }
+
+        String[] outputStreams = outputStreamsString.split(" ");
+        
+        Driver d = new Driver(hostName, port);
+        Reader inputReader = null;
+        BufferedReader br = null;
+        try {
+            if (!d.init()) {
+                System.err.println("Driver initialization failed");
+                System.exit(1);
+            }
+            
+            d.setReadMode(ReadMode.All);
+            if (outputStreams.length > 0) {
+                d.setReadMode(ReadMode.Select);
+                for (String outputStream : outputStreams) {
+                    System.out.printf("Registering output stream name '%s'\n", outputStream);
+                    d.readInclude(outputStream);
+                }
+            }
+            
+            if (!d.connect()) {
+                System.err.println("Driver initialization failed");
+                System.exit(1);           
+            }
+            
+            inputReader = new InputStreamReader(System.in);
+            br = new BufferedReader(inputReader);
+
+            // send all messages
+            for  (String inputLine = null; (inputLine = br.readLine()) != null;) {
+                Message m = new Message(streamName, clazz, inputLine);
+                d.send(m);
+            }
+            
+            // read all responses
+            while (true) {
+                Message response = d.recv();
+                System.out.println(response);
+            }
+            /*List<Message> responses = d.recvAll(999999999);
+            for (Message message : responses) {
+                System.out.println(message);
+            }*/
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        finally {
+            try { d.disconnect(); } catch (Exception e) {}
+            try { br.close(); } catch (Exception e) {}
+            try { inputReader.close(); } catch (Exception e) {}
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/01d2dd8a/s4-driver/java/src/main/java/io/s4/client/Driver.java
----------------------------------------------------------------------
diff --git a/s4-driver/java/src/main/java/io/s4/client/Driver.java b/s4-driver/java/src/main/java/io/s4/client/Driver.java
deleted file mode 100644
index 27e9fc0..0000000
--- a/s4-driver/java/src/main/java/io/s4/client/Driver.java
+++ /dev/null
@@ -1,693 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.client;
-
-import io.s4.client.util.ByteArrayIOChannel;
-
-import java.io.IOException;
-import java.net.Socket;
-import java.net.SocketTimeoutException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-/**
- * S4 Client Driver.
- * 
- * Allows S4 Client code to send and receive events from an S4 cluster.
- */
-public class Driver {
-    private static final String protocolName = "generic-json";
-    private static final int versionMajor = 1;
-    private static final int versionMinor = 0;
-
-    protected String uuid = null;
-    protected State state = State.Null;
-
-    protected final String hostname;
-    protected final int port;
-
-    protected Socket sock = null;
-    protected ByteArrayIOChannel io = null;
-
-    protected ReadMode readMode = ReadMode.Private;
-    protected List<String> readInclude = new ArrayList<String>();
-    protected List<String> readExclude = new ArrayList<String>();
-    protected WriteMode writeMode = WriteMode.Enabled;
-
-    protected boolean debug = false;
-
-    protected int recvTimeoutMs = 0;
-
-    /**
-     * Configure driver with Adapter location.
-     * 
-     * Note: this does not create a connection to the adapter.
-     * 
-     * @see #init()
-     * @see #connect()
-     * 
-     * @param hostname
-     *            Name of S4 client adapter host.
-     * @param port
-     *            Port on which adapter listens for client connections.
-     */
-    public Driver(String hostname, int port) {
-        this.hostname = hostname;
-        this.port = port;
-    }
-
-    /**
-     * Set the read mode, if not already connected.
-     * 
-     * @param m
-     *            read mode
-     * @return Driver with read mode set to {@code m}
-     */
-    public Driver setReadMode(ReadMode m) {
-        if (state != State.Connected)
-            this.readMode = m;
-        return this;
-    }
-
-    /**
-     * Add to set of stream names included for reading, if not already
-     * connected.
-     * 
-     * @param s
-     *            list of stream names
-     * @return Updated driver
-     */
-    public Driver readInclude(List<String> s) {
-        if (state != State.Connected)
-            this.readInclude.addAll(s);
-        return this;
-    }
-
-    /**
-     * Add to set of stream names included for reading, if not already
-     * connected.
-     * 
-     * @param s
-     *            list of stream names
-     * @return Updated driver
-     */
-    public Driver readInclude(String[] s) {
-        return readInclude(Arrays.asList(s));
-    }
-
-    /**
-     * Add to set of stream names included for reading, if not already
-     * connected.
-     * 
-     * @param s
-     *            stream name
-     * @return Updated driver
-     */
-    public Driver readInclude(String s) {
-        if (state != State.Connected)
-            this.readInclude.add(s);
-        return this;
-    }
-
-    /**
-     * Add to set of stream names excluded from reading, if not already
-     * connected.
-     * 
-     * @param s
-     *            list of stream names
-     * @return Updated driver
-     */
-    public Driver readExclude(List<String> s) {
-        if (state != State.Connected)
-            this.readExclude.addAll(s);
-        return this;
-    }
-
-    /**
-     * Add to set of stream names excluded from reading, if not already
-     * connected.
-     * 
-     * @param s
-     *            list of stream names
-     * @return Updated driver
-     */
-    public Driver readExclude(String[] s) {
-        return readExclude(Arrays.asList(s));
-    }
-
-    /**
-     * Add to set of stream names excluded from reading, if not already
-     * connected.
-     * 
-     * @param s
-     *            stream name
-     * @return Updated driver
-     */
-    public Driver readExclude(String s) {
-        if (state != State.Connected)
-            this.readExclude.add(s);
-        return this;
-    }
-
-    /**
-     * Set the write mode, if not already connected.
-     * 
-     * @param m
-     *            write mode
-     * @return Driver with write mode set to {@code m}
-     */
-    public Driver setWriteMode(WriteMode m) {
-        if (state != State.Connected)
-            this.writeMode = m;
-        return this;
-    }
-
-    /**
-     * 
-     * @param debug
-     *            debug flag
-     * @return Driver with debug flag set to {@code debug}
-     */
-    public Driver setDebug(boolean debug) {
-        this.debug = debug;
-        return this;
-    }
-
-    /**
-     * Set the timeout for receiving data.
-     * 
-     * @param ms
-     *            timeout in milliseconds
-     * @return updated driver
-     */
-    public Driver setRecvTimeout(int ms) {
-        this.recvTimeoutMs = ms;
-        return this;
-    }
-
-    /**
-     * Get the state of the driver.
-     * 
-     * @return state
-     */
-    public State getState() {
-        return state;
-    }
-
-    /**
-     * Initialize the driver.
-     * 
-     * Handshake with adapter to receive a unique id, and verify that the driver
-     * is compatible with the protocol used by the adapter. This does not
-     * actually establish a connection for sending and receiving events.
-     * 
-     * @see #connect()
-     * 
-     * @return true if and only if the adapter issued a valid ID to this client,
-     *         and the protocol is found to be compatible.
-     * @throws IOException
-     *             if the underlying TCP/IP socket throws an exception.
-     */
-    public boolean init() throws IOException {
-        if (state.isInitialized())
-            return true;
-
-        try {
-            sock = new Socket(hostname, port);
-
-            ByteArrayIOChannel io = new ByteArrayIOChannel(sock);
-
-            io.send(emptyBytes);
-
-            byte[] b = io.recv();
-
-            if (b == null || b.length == 0) {
-                if (debug) {
-                    System.err.println("Empty response during initialization.");
-                }
-                return false;
-            }
-
-            JSONObject json = new JSONObject(new String(b));
-
-            this.uuid = json.getString("uuid");
-
-            JSONObject proto = json.getJSONObject("protocol");
-
-            if (!isCompatible(proto)) {
-                if (debug) {
-                    System.err
-                            .println("Driver not compatible with adapter protocol: "
-                                    + proto);
-                }
-                return false;
-            }
-
-            state = State.Initialized;
-
-            return true;
-
-        } catch (JSONException e) {
-            if (debug) {
-                System.err
-                        .println("malformed JSON in initialization response. "
-                                + e);
-            }
-            e.printStackTrace();
-            return false;
-
-        } finally {
-            sock.close();
-        }
-    }
-
-    /**
-     * Test if the adapter protocol is compatible with this driver. More
-     * specifically, the following must be true:
-     * {@code
-     *     p.name == this.protocolName()
-     * AND p.versionMajor == this.versionMajor()
-     * AND p.versionMinor >= this.versionMinor()
-     * }
-     * 
-     * @param p
-     *            protocol specifier.
-     * @return true if and only if the protocol is compatible.
-     * @throws JSONException
-     *             if some required field was not found in the protocol
-     *             specifier.
-     */
-    private boolean isCompatible(JSONObject p) throws JSONException {
-        return p.getString("name").equals(protocolName)
-                && (p.getInt("versionMajor") == versionMajor)
-                && (p.getInt("versionMinor") >= versionMinor);
-    }
-
-    /**
-     * Establish a connection to the adapter. Upon success, this enables the
-     * client to send and receive events. The client must first be initialized.
-     * Otherwise, this operation will fail.
-     * 
-     * @see #init()
-     * 
-     * @return true if and only if a connection was successfully established.
-     * @throws IOException
-     *             if the underlying TCP/IP socket throws an exception.
-     */
-    public boolean connect() throws IOException {
-        if (!state.isInitialized()) {
-            // must first be initialized
-            if (debug) {
-                System.err.println("Not initialized.");
-            }
-            return false;
-        } else if (state.isConnected()) {
-            // nothing to do if already connected.
-            return true;
-        }
-
-        String message = null;
-
-        try {
-            // constructing connect message
-            JSONObject json = new JSONObject();
-
-            json.put("uuid", uuid);
-            json.put("readMode", readMode.toString());
-            json.put("writeMode", writeMode.toString());
-
-            if (readInclude != null) {
-                // stream inclusion
-                json.put("readInclude", new JSONArray(readInclude));
-            }
-
-            if (readExclude != null) {
-                // stream exclusion
-                json.put("readExclude", new JSONArray(readExclude));
-            }
-
-            message = json.toString();
-
-        } catch (JSONException e) {
-            if (debug) {
-                System.err.println("error constructing connect message: " + e);
-            }
-            return false;
-        }
-
-        try {
-            // send the message
-            this.sock = new Socket(hostname, port);
-            this.io = new ByteArrayIOChannel(sock);
-
-            io.send(message.getBytes());
-
-            // get a response
-            byte[] b = io.recv();
-
-            if (b == null || b.length == 0) {
-                if (debug) {
-                    System.err
-                            .println("empty response from adapter during connect.");
-                }
-                return false;
-            }
-
-            String response = new String(b);
-
-            JSONObject json = new JSONObject(response);
-            String s = json.optString("status", "unknown");
-
-            // does it look OK?
-            if (s.equalsIgnoreCase("ok")) {
-                // done connecting
-                state = State.Connected;
-                return true;
-            } else if (s.equalsIgnoreCase("failed")) {
-                // server has failed the connect attempt
-                if (debug) {
-                    System.err.println("connect failed by adapter. reason: "
-                            + json.optString("reason", "unknown"));
-                }
-                return false;
-            } else {
-                // unknown response.
-                if (debug) {
-                    System.err
-                            .println("connect failed by adapter. unrecongnized response: "
-                                    + response);
-                }
-                return false;
-            }
-
-        } catch (Exception e) {
-            // clean up after error...
-            if (debug) {
-                System.err.println("error during connect: " + e);
-                e.printStackTrace();
-            }
-
-            if (this.sock.isConnected()) {
-                this.sock.close();
-
-            }
-
-            return false;
-        }
-    }
-
-    /**
-     * Close the connection to the adapter. Events can no longer be sent or
-     * received by the client.
-     * 
-     * @return true upon success. False if the connection is already closed.
-     * @throws IOException
-     *             if the underlying TCP/IP socket throws an exception.
-     */
-    public boolean disconnect() throws IOException {
-        if (state.isConnected()) {
-            io.send(emptyBytes);
-            state = State.Null;
-            return true;
-        }
-
-        return false;
-    }
-
-    /**
-     * Send a message to the adapter.
-     * 
-     * @param m
-     *            message to be sent
-     * @return true if and only if the message was successfully sent.
-     * @throws IOException
-     *             if the underlying TCP/IP socket throws an exception.
-     */
-
-    public boolean send(Message m) throws IOException {
-
-        if (!state.isConnected()) {
-            if (debug) {
-                System.err.println("send failed. not connected.");
-            }
-            return false;
-        }
-
-        String s = null;
-
-        try {
-            JSONObject json = new JSONObject();
-
-            m.toJson(json);
-
-            s = json.toString();
-
-            io.send(s.getBytes());
-
-            return true;
-
-        } catch (JSONException e) {
-            if (debug) {
-                System.err
-                        .println("exception while constructing message to send: "
-                                + e);
-            }
-            return false;
-        }
-    }
-
-    /**
-     * Receive a message from the adapter. This function blocks till a message
-     * becomes available to read. The set of messages sent to this client from
-     * S4 depends on the read mode of the client.
-     * 
-     * @see ReadMode
-     * 
-     * @return message received from S4 cluster.
-     * @throws IOException
-     *             if the underlying TCP/IP socket throws an exception.
-     */
-    public Message recv() throws IOException {
-        return recv(this.recvTimeoutMs);
-    }
-
-    /**
-     * Receive a message from the adapter. This function blocks till a message
-     * becomes available to read, or till a specified number of milliseconds
-     * have elapsed. The set of messages sent to this client from S4 depends on
-     * the read mode of the client.
-     * 
-     * @see ReadMode
-     * 
-     * @param timeout
-     *            timeout in milliseconds
-     * @return message received from S4 cluster.
-     * @throws IOException
-     *             if the underlying TCP/IP socket throws an exception.
-     * @throws SocketTimeoutException
-     *             if the read timed out.
-     */
-    public Message recv(int timeout) throws IOException {
-        if (!state.isConnected()) {
-            if (debug) {
-                System.err.println("recv failed. not connected.");
-            }
-            return null;
-        }
-
-        try {
-            byte[] b = io.recv(timeout);
-
-            if (b == null || b.length == 0) {
-                if (debug) {
-                    System.err
-                            .println("empty message from adapter. disconnecting");
-                }
-                this.disconnect();
-                return null;
-            }
-
-            String s = new String(b);
-
-            JSONObject json = new JSONObject(s);
-
-            return Message.fromJson(json);
-
-        } catch (SocketTimeoutException e) {
-            if (debug) {
-                System.err.println("recv timed out");
-            }
-            return null;
-
-        } catch (JSONException e) {
-            if (debug) {
-                System.err.println("exception while parsing received JSON: "
-                        + e);
-            }
-            return null;
-        }
-    }
-
-    /**
-     * Receive the set of message that from the adapter within a specified time
-     * interval.
-     * 
-     * @param t
-     *            interval in milliseconds
-     * @return messages received from S4 cluster.
-     * @throws IOException
-     *             if the underlying TCP/IP socket throws an exception.
-     */
-    public List<Message> recvAll(int t) throws IOException {
-        if (!state.isConnected()) {
-            if (debug) {
-                System.err.println("recv failed. not connected.");
-            }
-            return Collections.<Message> emptyList();
-        }
-
-        List<Message> messages = new ArrayList<Message>();
-
-        long tStart = System.currentTimeMillis();
-        long tEnd = tStart + t;
-
-        long tNow = tStart;
-
-        while (tNow < tEnd) {
-            try {
-                byte[] b = io.recv((int) (tEnd - tNow));
-
-                if (b == null || b.length == 0) {
-                    if (debug) {
-                        System.err
-                                .println("empty message from adapter. disconnecting");
-                    }
-                    this.disconnect();
-                    break;
-                }
-
-                String s = new String(b);
-
-                JSONObject json = new JSONObject(s);
-
-                messages.add(Message.fromJson(json));
-
-            } catch (SocketTimeoutException e) {
-                break;
-
-            } catch (JSONException e) {
-                if (debug) {
-                    System.err
-                            .println("exception while parsing received JSON: "
-                                    + e);
-                }
-            }
-
-            tNow = System.currentTimeMillis();
-        }
-
-        return messages;
-    }
-
-    /**
-     * State of the client.
-     */
-    public enum State {
-        /**
-         * Uninitialized.
-         */
-        Null(false, false),
-
-        /**
-         * Initialized, but not connected to S4 adapter.
-         */
-        Initialized(true, false),
-
-        /**
-         * Connected to S4 adapter (implies initialized).
-         */
-        Connected(true, true);
-
-        State(boolean initialized, boolean connected) {
-            this.initialized = initialized;
-            this.connected = connected;
-        }
-
-        private final boolean initialized;
-        private final boolean connected;
-
-        /**
-         * Is initialization completed?
-         * 
-         * @return true if and only if this state implies initialization has
-         *         been completed.
-         */
-        public boolean isInitialized() {
-            return initialized;
-        }
-
-        /**
-         * Is client connected to S4 adapter?
-         * 
-         * @return true if and only if the client is connected to S4 adapter.
-         */
-        public boolean isConnected() {
-            return connected;
-        }
-    };
-
-    private static final byte[] emptyBytes = {};
-
-    /**
-     * Reads and prints all events over an interval of 5 seconds from a set of
-     * streams specified on the command line.
-     * 
-     * @param argv
-     *            list of streams
-     * @throws IOException
-     *             if an error occurs while reading from adapter.
-     */
-    public static void main(String[] argv) throws IOException {
-        Driver d = new Driver("localhost", 2334);
-
-        System.out.println("State: " + d.getState());
-
-        d.init();
-
-        System.out.println("State: " + d.getState());
-
-        d.setReadMode(ReadMode.Select).readInclude(argv)
-                .setWriteMode(WriteMode.Enabled);
-
-        d.connect();
-
-        System.out.println("State: " + d.getState());
-
-        List<Message> mm = d.recvAll(5001);
-        System.out.println("got messages (" + mm.size() + "): " + mm);
-
-        System.out.println("Disconnecting...");
-        d.disconnect();
-
-        System.out.println("State: " + d.getState());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/01d2dd8a/s4-driver/java/src/main/java/io/s4/client/IOChannel.java
----------------------------------------------------------------------
diff --git a/s4-driver/java/src/main/java/io/s4/client/IOChannel.java b/s4-driver/java/src/main/java/io/s4/client/IOChannel.java
deleted file mode 100644
index b5a6671..0000000
--- a/s4-driver/java/src/main/java/io/s4/client/IOChannel.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.client;
-
-import java.io.IOException;
-
-public interface IOChannel {
-    byte[] recv() throws IOException;
-
-    byte[] recv(int timeout) throws IOException;
-
-    void send(byte[] v) throws IOException;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/01d2dd8a/s4-driver/java/src/main/java/io/s4/client/Message.java
----------------------------------------------------------------------
diff --git a/s4-driver/java/src/main/java/io/s4/client/Message.java b/s4-driver/java/src/main/java/io/s4/client/Message.java
deleted file mode 100644
index 960584a..0000000
--- a/s4-driver/java/src/main/java/io/s4/client/Message.java
+++ /dev/null
@@ -1,97 +0,0 @@
-package io.s4.client;
-
-import org.json.JSONException;
-import org.json.JSONObject;
-
-/**
- * Messages that can be send/received by client. They typically correspond
- * to events that are sent/received.
- */
-public class Message {
-    public final String stream;
-    public final String clazz;
-    public final String[] keyNames;
-    public final String object;
-
-    /**
-     * Key-less event message.
-     * 
-     * @param stream
-     *            Name of stream associated with corresponding event.
-     * @param clazz
-     *            Class of event object.
-     * @param object
-     *            String representation of the event. This is the string
-     *            that Gson will convert into/from the event object.
-     */
-    public Message(String stream, String clazz, String object) {
-        this.stream = stream;
-        this.clazz = clazz;
-        this.keyNames = null;
-        this.object = object;
-    }
-
-    /**
-     * Keyed event message.
-     * 
-     * @param stream
-     *            Name of stream associated with corresponding event.
-     * @param keyNames
-     *            array of key names. Typically, the getter corresponding to
-     *            each string in this array will be invoked on the event
-     *            object, and the values concatenated, to produce the
-     *            routing key.
-     * @param clazz
-     *            Class of event object.
-     * @param object
-     *            String representation of the event. This is the string
-     *            that Gson will convert into/from the event object.
-     */
-    public Message(String stream, String[] keyNames, String clazz,
-            String object) {
-        this.stream = stream;
-        this.clazz = clazz;
-        this.keyNames = keyNames;
-        this.object = object;
-    }
-
-    /**
-     * Create from JSON.
-     * 
-     * @param json
-     *            JSON representation of message
-     * @return Message object.
-     * @throws JSONException
-     *             if the JSON is invalid.
-     */
-    public static Message fromJson(JSONObject json) throws JSONException {
-        String stream = json.getString("stream");
-        String clazz = json.getString("class");
-        String object = json.getString("object");
-
-        return new Message(stream, clazz, object);
-    }
-
-    /**
-     * Convert message into JSON.
-     * 
-     * @param json
-     *            JSON will be written into this object.
-     * @throws JSONException
-     *             if writing fails.
-     */
-    public void toJson(JSONObject json) throws JSONException {
-        json.put("stream", this.stream);
-        json.put("class", this.clazz);
-        if (this.keyNames != null) {
-            json.put("keyNames", this.keyNames);
-        }
-
-        json.put("object", this.object);
-    }
-
-    public String toString() {
-        return "{stream:" + stream + ", clazz:" + clazz + ", keyNames:"
-                + keyNames + ", object:" + object + "}";
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/01d2dd8a/s4-driver/java/src/main/java/io/s4/client/ReadMode.java
----------------------------------------------------------------------
diff --git a/s4-driver/java/src/main/java/io/s4/client/ReadMode.java b/s4-driver/java/src/main/java/io/s4/client/ReadMode.java
deleted file mode 100644
index 1d915ca..0000000
--- a/s4-driver/java/src/main/java/io/s4/client/ReadMode.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.client;
-
-public enum ReadMode {
-    None, Private, Select, All;
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/01d2dd8a/s4-driver/java/src/main/java/io/s4/client/WriteMode.java
----------------------------------------------------------------------
diff --git a/s4-driver/java/src/main/java/io/s4/client/WriteMode.java b/s4-driver/java/src/main/java/io/s4/client/WriteMode.java
deleted file mode 100644
index 3b5dd42..0000000
--- a/s4-driver/java/src/main/java/io/s4/client/WriteMode.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.client;
-
-public enum WriteMode {
-    Enabled, Disabled;
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/01d2dd8a/s4-driver/java/src/main/java/io/s4/client/util/ByteArrayIOChannel.java
----------------------------------------------------------------------
diff --git a/s4-driver/java/src/main/java/io/s4/client/util/ByteArrayIOChannel.java b/s4-driver/java/src/main/java/io/s4/client/util/ByteArrayIOChannel.java
deleted file mode 100644
index 731e6e7..0000000
--- a/s4-driver/java/src/main/java/io/s4/client/util/ByteArrayIOChannel.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
- * 
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 	        http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific
- * language governing permissions and limitations under the
- * License. See accompanying LICENSE file. 
- */
-package io.s4.client.util;
-
-import io.s4.client.IOChannel;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.net.SocketTimeoutException;
-
-public class ByteArrayIOChannel implements IOChannel {
-    private InputStream in;
-    private OutputStream out;
-    private Socket socket;
-
-    public ByteArrayIOChannel(Socket socket) throws IOException {
-        in = socket.getInputStream();
-        out = socket.getOutputStream();
-        this.socket = socket;
-    }
-
-    private int readBytes(byte[] s, int n, int timeout) throws IOException {
-        int r = 0; // bytes read so far
-
-        long tStart = System.currentTimeMillis();
-        long tEnd = tStart + timeout;
-        long tRem = timeout;
-        long tNow = tStart;
-
-        do {
-            socket.setSoTimeout((int) tRem);
-
-            // keep reading bytes till the required "n" are read
-            int p = in.read(s, r, (n - r));
-
-            if (p == -1) {
-                throw new IOException("reached end of stream after reading "
-                        + r + " bytes. expected " + n + " bytes");
-            }
-
-            r += p;
-
-            tNow = System.currentTimeMillis();
-
-            tRem = tEnd - tNow;
-
-        } while (r < n && (timeout == 0 || tRem > 0));
-
-        return (int) (tNow - tStart);
-    }
-
-    public byte[] recv() throws IOException {
-        return recv(0);
-    }
-
-    public byte[] recv(int timeout) throws IOException {
-        // first read size of byte array.
-        // unsigned int, big endian: 0A0B0C0D -> {0A, 0B, 0C, 0D}
-        byte[] s = { 0, 0, 0, 0 };
-        int tUsed = readBytes(s, 4, timeout);
-
-        if (timeout > 0 && (timeout - tUsed <= 1)) {
-            throw new SocketTimeoutException("recv timed out");
-        }
-
-        int size = (int) ( // NOTE: type cast not necessary for int
-        (0xff & s[0]) << 24 | (0xff & s[1]) << 16 | (0xff & s[2]) << 8 | (0xff & s[3]) << 0);
-
-        if (size == 0)
-            return null;
-
-        byte[] v = new byte[size];
-
-        // read the message
-        int tRem = (timeout > 0 ? timeout - tUsed : 0);
-        readBytes(v, size, tRem);
-
-        return v;
-    }
-
-    public void send(byte[] v) throws IOException {
-        byte[] s = { 0, 0, 0, 0 };
-        int size = v.length;
-
-        s[3] = (byte) (size & 0xff);
-        size >>= 8;
-        s[2] = (byte) (size & 0xff);
-        size >>= 8;
-        s[1] = (byte) (size & 0xff);
-        size >>= 8;
-        s[0] = (byte) (size & 0xff);
-
-        out.write(s);
-        out.write(v);
-        out.flush();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/01d2dd8a/s4-driver/java/src/main/java/org/apache/s4/client/Driver.java
----------------------------------------------------------------------
diff --git a/s4-driver/java/src/main/java/org/apache/s4/client/Driver.java b/s4-driver/java/src/main/java/org/apache/s4/client/Driver.java
new file mode 100644
index 0000000..4fadc09
--- /dev/null
+++ b/s4-driver/java/src/main/java/org/apache/s4/client/Driver.java
@@ -0,0 +1,693 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.client;
+
+import org.apache.s4.client.util.ByteArrayIOChannel;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * S4 Client Driver.
+ * 
+ * Allows S4 Client code to send and receive events from an S4 cluster.
+ */
+public class Driver {
+    private static final String protocolName = "generic-json";
+    private static final int versionMajor = 1;
+    private static final int versionMinor = 0;
+
+    protected String uuid = null;
+    protected State state = State.Null;
+
+    protected final String hostname;
+    protected final int port;
+
+    protected Socket sock = null;
+    protected ByteArrayIOChannel io = null;
+
+    protected ReadMode readMode = ReadMode.Private;
+    protected List<String> readInclude = new ArrayList<String>();
+    protected List<String> readExclude = new ArrayList<String>();
+    protected WriteMode writeMode = WriteMode.Enabled;
+
+    protected boolean debug = false;
+
+    protected int recvTimeoutMs = 0;
+
+    /**
+     * Configure driver with Adapter location.
+     * 
+     * Note: this does not create a connection to the adapter.
+     * 
+     * @see #init()
+     * @see #connect()
+     * 
+     * @param hostname
+     *            Name of S4 client adapter host.
+     * @param port
+     *            Port on which adapter listens for client connections.
+     */
+    public Driver(String hostname, int port) {
+        this.hostname = hostname;
+        this.port = port;
+    }
+
+    /**
+     * Set the read mode, if not already connected.
+     * 
+     * @param m
+     *            read mode
+     * @return Driver with read mode set to {@code m}
+     */
+    public Driver setReadMode(ReadMode m) {
+        if (state != State.Connected)
+            this.readMode = m;
+        return this;
+    }
+
+    /**
+     * Add to set of stream names included for reading, if not already
+     * connected.
+     * 
+     * @param s
+     *            list of stream names
+     * @return Updated driver
+     */
+    public Driver readInclude(List<String> s) {
+        if (state != State.Connected)
+            this.readInclude.addAll(s);
+        return this;
+    }
+
+    /**
+     * Add to set of stream names included for reading, if not already
+     * connected.
+     * 
+     * @param s
+     *            list of stream names
+     * @return Updated driver
+     */
+    public Driver readInclude(String[] s) {
+        return readInclude(Arrays.asList(s));
+    }
+
+    /**
+     * Add to set of stream names included for reading, if not already
+     * connected.
+     * 
+     * @param s
+     *            stream name
+     * @return Updated driver
+     */
+    public Driver readInclude(String s) {
+        if (state != State.Connected)
+            this.readInclude.add(s);
+        return this;
+    }
+
+    /**
+     * Add to set of stream names excluded from reading, if not already
+     * connected.
+     * 
+     * @param s
+     *            list of stream names
+     * @return Updated driver
+     */
+    public Driver readExclude(List<String> s) {
+        if (state != State.Connected)
+            this.readExclude.addAll(s);
+        return this;
+    }
+
+    /**
+     * Add to set of stream names excluded from reading, if not already
+     * connected.
+     * 
+     * @param s
+     *            list of stream names
+     * @return Updated driver
+     */
+    public Driver readExclude(String[] s) {
+        return readExclude(Arrays.asList(s));
+    }
+
+    /**
+     * Add to set of stream names excluded from reading, if not already
+     * connected.
+     * 
+     * @param s
+     *            stream name
+     * @return Updated driver
+     */
+    public Driver readExclude(String s) {
+        if (state != State.Connected)
+            this.readExclude.add(s);
+        return this;
+    }
+
+    /**
+     * Set the write mode, if not already connected.
+     * 
+     * @param m
+     *            write mode
+     * @return Driver with write mode set to {@code m}
+     */
+    public Driver setWriteMode(WriteMode m) {
+        if (state != State.Connected)
+            this.writeMode = m;
+        return this;
+    }
+
+    /**
+     * 
+     * @param debug
+     *            debug flag
+     * @return Driver with debug flag set to {@code debug}
+     */
+    public Driver setDebug(boolean debug) {
+        this.debug = debug;
+        return this;
+    }
+
+    /**
+     * Set the timeout for receiving data.
+     * 
+     * @param ms
+     *            timeout in milliseconds
+     * @return updated driver
+     */
+    public Driver setRecvTimeout(int ms) {
+        this.recvTimeoutMs = ms;
+        return this;
+    }
+
+    /**
+     * Get the state of the driver.
+     * 
+     * @return state
+     */
+    public State getState() {
+        return state;
+    }
+
+    /**
+     * Initialize the driver.
+     * 
+     * Handshake with adapter to receive a unique id, and verify that the driver
+     * is compatible with the protocol used by the adapter. This does not
+     * actually establish a connection for sending and receiving events.
+     * 
+     * @see #connect()
+     * 
+     * @return true if and only if the adapter issued a valid ID to this client,
+     *         and the protocol is found to be compatible.
+     * @throws IOException
+     *             if the underlying TCP/IP socket throws an exception.
+     */
+    public boolean init() throws IOException {
+        if (state.isInitialized())
+            return true;
+
+        try {
+            sock = new Socket(hostname, port);
+
+            ByteArrayIOChannel io = new ByteArrayIOChannel(sock);
+
+            io.send(emptyBytes);
+
+            byte[] b = io.recv();
+
+            if (b == null || b.length == 0) {
+                if (debug) {
+                    System.err.println("Empty response during initialization.");
+                }
+                return false;
+            }
+
+            JSONObject json = new JSONObject(new String(b));
+
+            this.uuid = json.getString("uuid");
+
+            JSONObject proto = json.getJSONObject("protocol");
+
+            if (!isCompatible(proto)) {
+                if (debug) {
+                    System.err
+                            .println("Driver not compatible with adapter protocol: "
+                                    + proto);
+                }
+                return false;
+            }
+
+            state = State.Initialized;
+
+            return true;
+
+        } catch (JSONException e) {
+            if (debug) {
+                System.err
+                        .println("malformed JSON in initialization response. "
+                                + e);
+            }
+            e.printStackTrace();
+            return false;
+
+        } finally {
+            sock.close();
+        }
+    }
+
+    /**
+     * Test if the adapter protocol is compatible with this driver. More
+     * specifically, the following must be true:
+     * {@code
+     *     p.name == this.protocolName()
+     * AND p.versionMajor == this.versionMajor()
+     * AND p.versionMinor >= this.versionMinor()
+     * }
+     * 
+     * @param p
+     *            protocol specifier.
+     * @return true if and only if the protocol is compatible.
+     * @throws JSONException
+     *             if some required field was not found in the protocol
+     *             specifier.
+     */
+    private boolean isCompatible(JSONObject p) throws JSONException {
+        return p.getString("name").equals(protocolName)
+                && (p.getInt("versionMajor") == versionMajor)
+                && (p.getInt("versionMinor") >= versionMinor);
+    }
+
+    /**
+     * Establish a connection to the adapter. Upon success, this enables the
+     * client to send and receive events. The client must first be initialized.
+     * Otherwise, this operation will fail.
+     * 
+     * @see #init()
+     * 
+     * @return true if and only if a connection was successfully established.
+     * @throws IOException
+     *             if the underlying TCP/IP socket throws an exception.
+     */
+    public boolean connect() throws IOException {
+        if (!state.isInitialized()) {
+            // must first be initialized
+            if (debug) {
+                System.err.println("Not initialized.");
+            }
+            return false;
+        } else if (state.isConnected()) {
+            // nothing to do if already connected.
+            return true;
+        }
+
+        String message = null;
+
+        try {
+            // constructing connect message
+            JSONObject json = new JSONObject();
+
+            json.put("uuid", uuid);
+            json.put("readMode", readMode.toString());
+            json.put("writeMode", writeMode.toString());
+
+            if (readInclude != null) {
+                // stream inclusion
+                json.put("readInclude", new JSONArray(readInclude));
+            }
+
+            if (readExclude != null) {
+                // stream exclusion
+                json.put("readExclude", new JSONArray(readExclude));
+            }
+
+            message = json.toString();
+
+        } catch (JSONException e) {
+            if (debug) {
+                System.err.println("error constructing connect message: " + e);
+            }
+            return false;
+        }
+
+        try {
+            // send the message
+            this.sock = new Socket(hostname, port);
+            this.io = new ByteArrayIOChannel(sock);
+
+            io.send(message.getBytes());
+
+            // get a response
+            byte[] b = io.recv();
+
+            if (b == null || b.length == 0) {
+                if (debug) {
+                    System.err
+                            .println("empty response from adapter during connect.");
+                }
+                return false;
+            }
+
+            String response = new String(b);
+
+            JSONObject json = new JSONObject(response);
+            String s = json.optString("status", "unknown");
+
+            // does it look OK?
+            if (s.equalsIgnoreCase("ok")) {
+                // done connecting
+                state = State.Connected;
+                return true;
+            } else if (s.equalsIgnoreCase("failed")) {
+                // server has failed the connect attempt
+                if (debug) {
+                    System.err.println("connect failed by adapter. reason: "
+                            + json.optString("reason", "unknown"));
+                }
+                return false;
+            } else {
+                // unknown response.
+                if (debug) {
+                    System.err
+                            .println("connect failed by adapter. unrecongnized response: "
+                                    + response);
+                }
+                return false;
+            }
+
+        } catch (Exception e) {
+            // clean up after error...
+            if (debug) {
+                System.err.println("error during connect: " + e);
+                e.printStackTrace();
+            }
+
+            if (this.sock.isConnected()) {
+                this.sock.close();
+
+            }
+
+            return false;
+        }
+    }
+
+    /**
+     * Close the connection to the adapter. Events can no longer be sent or
+     * received by the client.
+     * 
+     * @return true upon success. False if the connection is already closed.
+     * @throws IOException
+     *             if the underlying TCP/IP socket throws an exception.
+     */
+    public boolean disconnect() throws IOException {
+        if (state.isConnected()) {
+            io.send(emptyBytes);
+            state = State.Null;
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Send a message to the adapter.
+     * 
+     * @param m
+     *            message to be sent
+     * @return true if and only if the message was successfully sent.
+     * @throws IOException
+     *             if the underlying TCP/IP socket throws an exception.
+     */
+
+    public boolean send(Message m) throws IOException {
+
+        if (!state.isConnected()) {
+            if (debug) {
+                System.err.println("send failed. not connected.");
+            }
+            return false;
+        }
+
+        String s = null;
+
+        try {
+            JSONObject json = new JSONObject();
+
+            m.toJson(json);
+
+            s = json.toString();
+
+            io.send(s.getBytes());
+
+            return true;
+
+        } catch (JSONException e) {
+            if (debug) {
+                System.err
+                        .println("exception while constructing message to send: "
+                                + e);
+            }
+            return false;
+        }
+    }
+
+    /**
+     * Receive a message from the adapter. This function blocks till a message
+     * becomes available to read. The set of messages sent to this client from
+     * S4 depends on the read mode of the client.
+     * 
+     * @see ReadMode
+     * 
+     * @return message received from S4 cluster.
+     * @throws IOException
+     *             if the underlying TCP/IP socket throws an exception.
+     */
+    public Message recv() throws IOException {
+        return recv(this.recvTimeoutMs);
+    }
+
+    /**
+     * Receive a message from the adapter. This function blocks till a message
+     * becomes available to read, or till a specified number of milliseconds
+     * have elapsed. The set of messages sent to this client from S4 depends on
+     * the read mode of the client.
+     * 
+     * @see ReadMode
+     * 
+     * @param timeout
+     *            timeout in milliseconds
+     * @return message received from S4 cluster.
+     * @throws IOException
+     *             if the underlying TCP/IP socket throws an exception.
+     * @throws SocketTimeoutException
+     *             if the read timed out.
+     */
+    public Message recv(int timeout) throws IOException {
+        if (!state.isConnected()) {
+            if (debug) {
+                System.err.println("recv failed. not connected.");
+            }
+            return null;
+        }
+
+        try {
+            byte[] b = io.recv(timeout);
+
+            if (b == null || b.length == 0) {
+                if (debug) {
+                    System.err
+                            .println("empty message from adapter. disconnecting");
+                }
+                this.disconnect();
+                return null;
+            }
+
+            String s = new String(b);
+
+            JSONObject json = new JSONObject(s);
+
+            return Message.fromJson(json);
+
+        } catch (SocketTimeoutException e) {
+            if (debug) {
+                System.err.println("recv timed out");
+            }
+            return null;
+
+        } catch (JSONException e) {
+            if (debug) {
+                System.err.println("exception while parsing received JSON: "
+                        + e);
+            }
+            return null;
+        }
+    }
+
+    /**
+     * Receive the set of message that from the adapter within a specified time
+     * interval.
+     * 
+     * @param t
+     *            interval in milliseconds
+     * @return messages received from S4 cluster.
+     * @throws IOException
+     *             if the underlying TCP/IP socket throws an exception.
+     */
+    public List<Message> recvAll(int t) throws IOException {
+        if (!state.isConnected()) {
+            if (debug) {
+                System.err.println("recv failed. not connected.");
+            }
+            return Collections.<Message> emptyList();
+        }
+
+        List<Message> messages = new ArrayList<Message>();
+
+        long tStart = System.currentTimeMillis();
+        long tEnd = tStart + t;
+
+        long tNow = tStart;
+
+        while (tNow < tEnd) {
+            try {
+                byte[] b = io.recv((int) (tEnd - tNow));
+
+                if (b == null || b.length == 0) {
+                    if (debug) {
+                        System.err
+                                .println("empty message from adapter. disconnecting");
+                    }
+                    this.disconnect();
+                    break;
+                }
+
+                String s = new String(b);
+
+                JSONObject json = new JSONObject(s);
+
+                messages.add(Message.fromJson(json));
+
+            } catch (SocketTimeoutException e) {
+                break;
+
+            } catch (JSONException e) {
+                if (debug) {
+                    System.err
+                            .println("exception while parsing received JSON: "
+                                    + e);
+                }
+            }
+
+            tNow = System.currentTimeMillis();
+        }
+
+        return messages;
+    }
+
+    /**
+     * State of the client.
+     */
+    public enum State {
+        /**
+         * Uninitialized.
+         */
+        Null(false, false),
+
+        /**
+         * Initialized, but not connected to S4 adapter.
+         */
+        Initialized(true, false),
+
+        /**
+         * Connected to S4 adapter (implies initialized).
+         */
+        Connected(true, true);
+
+        State(boolean initialized, boolean connected) {
+            this.initialized = initialized;
+            this.connected = connected;
+        }
+
+        private final boolean initialized;
+        private final boolean connected;
+
+        /**
+         * Is initialization completed?
+         * 
+         * @return true if and only if this state implies initialization has
+         *         been completed.
+         */
+        public boolean isInitialized() {
+            return initialized;
+        }
+
+        /**
+         * Is client connected to S4 adapter?
+         * 
+         * @return true if and only if the client is connected to S4 adapter.
+         */
+        public boolean isConnected() {
+            return connected;
+        }
+    };
+
+    private static final byte[] emptyBytes = {};
+
+    /**
+     * Reads and prints all events over an interval of 5 seconds from a set of
+     * streams specified on the command line.
+     * 
+     * @param argv
+     *            list of streams
+     * @throws IOException
+     *             if an error occurs while reading from adapter.
+     */
+    public static void main(String[] argv) throws IOException {
+        Driver d = new Driver("localhost", 2334);
+
+        System.out.println("State: " + d.getState());
+
+        d.init();
+
+        System.out.println("State: " + d.getState());
+
+        d.setReadMode(ReadMode.Select).readInclude(argv)
+                .setWriteMode(WriteMode.Enabled);
+
+        d.connect();
+
+        System.out.println("State: " + d.getState());
+
+        List<Message> mm = d.recvAll(5001);
+        System.out.println("got messages (" + mm.size() + "): " + mm);
+
+        System.out.println("Disconnecting...");
+        d.disconnect();
+
+        System.out.println("State: " + d.getState());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/01d2dd8a/s4-driver/java/src/main/java/org/apache/s4/client/IOChannel.java
----------------------------------------------------------------------
diff --git a/s4-driver/java/src/main/java/org/apache/s4/client/IOChannel.java b/s4-driver/java/src/main/java/org/apache/s4/client/IOChannel.java
new file mode 100644
index 0000000..761853e
--- /dev/null
+++ b/s4-driver/java/src/main/java/org/apache/s4/client/IOChannel.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.client;
+
+import java.io.IOException;
+
+public interface IOChannel {
+    byte[] recv() throws IOException;
+
+    byte[] recv(int timeout) throws IOException;
+
+    void send(byte[] v) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/01d2dd8a/s4-driver/java/src/main/java/org/apache/s4/client/Message.java
----------------------------------------------------------------------
diff --git a/s4-driver/java/src/main/java/org/apache/s4/client/Message.java b/s4-driver/java/src/main/java/org/apache/s4/client/Message.java
new file mode 100644
index 0000000..e1a3f74
--- /dev/null
+++ b/s4-driver/java/src/main/java/org/apache/s4/client/Message.java
@@ -0,0 +1,97 @@
+package org.apache.s4.client;
+
+import org.json.JSONException;
+import org.json.JSONObject;
+
+/**
+ * Messages that can be send/received by client. They typically correspond
+ * to events that are sent/received.
+ */
+public class Message {
+    public final String stream;
+    public final String clazz;
+    public final String[] keyNames;
+    public final String object;
+
+    /**
+     * Key-less event message.
+     * 
+     * @param stream
+     *            Name of stream associated with corresponding event.
+     * @param clazz
+     *            Class of event object.
+     * @param object
+     *            String representation of the event. This is the string
+     *            that Gson will convert into/from the event object.
+     */
+    public Message(String stream, String clazz, String object) {
+        this.stream = stream;
+        this.clazz = clazz;
+        this.keyNames = null;
+        this.object = object;
+    }
+
+    /**
+     * Keyed event message.
+     * 
+     * @param stream
+     *            Name of stream associated with corresponding event.
+     * @param keyNames
+     *            array of key names. Typically, the getter corresponding to
+     *            each string in this array will be invoked on the event
+     *            object, and the values concatenated, to produce the
+     *            routing key.
+     * @param clazz
+     *            Class of event object.
+     * @param object
+     *            String representation of the event. This is the string
+     *            that Gson will convert into/from the event object.
+     */
+    public Message(String stream, String[] keyNames, String clazz,
+            String object) {
+        this.stream = stream;
+        this.clazz = clazz;
+        this.keyNames = keyNames;
+        this.object = object;
+    }
+
+    /**
+     * Create from JSON.
+     * 
+     * @param json
+     *            JSON representation of message
+     * @return Message object.
+     * @throws JSONException
+     *             if the JSON is invalid.
+     */
+    public static Message fromJson(JSONObject json) throws JSONException {
+        String stream = json.getString("stream");
+        String clazz = json.getString("class");
+        String object = json.getString("object");
+
+        return new Message(stream, clazz, object);
+    }
+
+    /**
+     * Convert message into JSON.
+     * 
+     * @param json
+     *            JSON will be written into this object.
+     * @throws JSONException
+     *             if writing fails.
+     */
+    public void toJson(JSONObject json) throws JSONException {
+        json.put("stream", this.stream);
+        json.put("class", this.clazz);
+        if (this.keyNames != null) {
+            json.put("keyNames", this.keyNames);
+        }
+
+        json.put("object", this.object);
+    }
+
+    public String toString() {
+        return "{stream:" + stream + ", clazz:" + clazz + ", keyNames:"
+                + keyNames + ", object:" + object + "}";
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/01d2dd8a/s4-driver/java/src/main/java/org/apache/s4/client/ReadMode.java
----------------------------------------------------------------------
diff --git a/s4-driver/java/src/main/java/org/apache/s4/client/ReadMode.java b/s4-driver/java/src/main/java/org/apache/s4/client/ReadMode.java
new file mode 100644
index 0000000..202fd70
--- /dev/null
+++ b/s4-driver/java/src/main/java/org/apache/s4/client/ReadMode.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.client;
+
+public enum ReadMode {
+    None, Private, Select, All;
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/01d2dd8a/s4-driver/java/src/main/java/org/apache/s4/client/WriteMode.java
----------------------------------------------------------------------
diff --git a/s4-driver/java/src/main/java/org/apache/s4/client/WriteMode.java b/s4-driver/java/src/main/java/org/apache/s4/client/WriteMode.java
new file mode 100644
index 0000000..831c26d
--- /dev/null
+++ b/s4-driver/java/src/main/java/org/apache/s4/client/WriteMode.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.client;
+
+public enum WriteMode {
+    Enabled, Disabled;
+}

http://git-wip-us.apache.org/repos/asf/incubator-s4/blob/01d2dd8a/s4-driver/java/src/main/java/org/apache/s4/client/util/ByteArrayIOChannel.java
----------------------------------------------------------------------
diff --git a/s4-driver/java/src/main/java/org/apache/s4/client/util/ByteArrayIOChannel.java b/s4-driver/java/src/main/java/org/apache/s4/client/util/ByteArrayIOChannel.java
new file mode 100644
index 0000000..4da3fca
--- /dev/null
+++ b/s4-driver/java/src/main/java/org/apache/s4/client/util/ByteArrayIOChannel.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 	        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific
+ * language governing permissions and limitations under the
+ * License. See accompanying LICENSE file. 
+ */
+package org.apache.s4.client.util;
+
+import org.apache.s4.client.IOChannel;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+
+public class ByteArrayIOChannel implements IOChannel {
+    private InputStream in;
+    private OutputStream out;
+    private Socket socket;
+
+    public ByteArrayIOChannel(Socket socket) throws IOException {
+        in = socket.getInputStream();
+        out = socket.getOutputStream();
+        this.socket = socket;
+    }
+
+    private int readBytes(byte[] s, int n, int timeout) throws IOException {
+        int r = 0; // bytes read so far
+
+        long tStart = System.currentTimeMillis();
+        long tEnd = tStart + timeout;
+        long tRem = timeout;
+        long tNow = tStart;
+
+        do {
+            socket.setSoTimeout((int) tRem);
+
+            // keep reading bytes till the required "n" are read
+            int p = in.read(s, r, (n - r));
+
+            if (p == -1) {
+                throw new IOException("reached end of stream after reading "
+                        + r + " bytes. expected " + n + " bytes");
+            }
+
+            r += p;
+
+            tNow = System.currentTimeMillis();
+
+            tRem = tEnd - tNow;
+
+        } while (r < n && (timeout == 0 || tRem > 0));
+
+        return (int) (tNow - tStart);
+    }
+
+    public byte[] recv() throws IOException {
+        return recv(0);
+    }
+
+    public byte[] recv(int timeout) throws IOException {
+        // first read size of byte array.
+        // unsigned int, big endian: 0A0B0C0D -> {0A, 0B, 0C, 0D}
+        byte[] s = { 0, 0, 0, 0 };
+        int tUsed = readBytes(s, 4, timeout);
+
+        if (timeout > 0 && (timeout - tUsed <= 1)) {
+            throw new SocketTimeoutException("recv timed out");
+        }
+
+        int size = (int) ( // NOTE: type cast not necessary for int
+        (0xff & s[0]) << 24 | (0xff & s[1]) << 16 | (0xff & s[2]) << 8 | (0xff & s[3]) << 0);
+
+        if (size == 0)
+            return null;
+
+        byte[] v = new byte[size];
+
+        // read the message
+        int tRem = (timeout > 0 ? timeout - tUsed : 0);
+        readBytes(v, size, tRem);
+
+        return v;
+    }
+
+    public void send(byte[] v) throws IOException {
+        byte[] s = { 0, 0, 0, 0 };
+        int size = v.length;
+
+        s[3] = (byte) (size & 0xff);
+        size >>= 8;
+        s[2] = (byte) (size & 0xff);
+        size >>= 8;
+        s[1] = (byte) (size & 0xff);
+        size >>= 8;
+        s[0] = (byte) (size & 0xff);
+
+        out.write(s);
+        out.write(v);
+        out.flush();
+    }
+}


Mime
View raw message