avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r820442 - in /hadoop/avro/trunk: CHANGES.txt ivy.xml src/java/org/apache/avro/ipc/HttpServer.java src/java/org/apache/avro/ipc/HttpTransceiver.java src/test/java/org/apache/avro/TestProtocolHttp.java
Date Wed, 30 Sep 2009 20:52:22 GMT
Author: cutting
Date: Wed Sep 30 20:52:22 2009
New Revision: 820442

URL: http://svn.apache.org/viewvc?rev=820442&view=rev
Log:
AVRO-129.  Add HTTP-based RPC client and server.

Added:
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/HttpServer.java
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/HttpTransceiver.java
    hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolHttp.java
Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/ivy.xml

Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=820442&r1=820441&r2=820442&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Wed Sep 30 20:52:22 2009
@@ -33,6 +33,8 @@
     written with a different version of the schema than is current.
     (cutting)
 
+    AVRO-129.  Add HTTP-based RPC client and server.  (cutting)
+
   IMPROVEMENTS
 
     AVRO-99.  Use Boost framework for C++ unit tests.

Modified: hadoop/avro/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/ivy.xml?rev=820442&r1=820441&r2=820442&view=diff
==============================================================================
--- hadoop/avro/trunk/ivy.xml (original)
+++ hadoop/avro/trunk/ivy.xml Wed Sep 30 20:52:22 2009
@@ -39,6 +39,8 @@
 		rev="1.5"/>
     <dependency org="com.thoughtworks.paranamer" name="paranamer-ant"
 		rev="1.5"/>
+    <dependency org="org.mortbay.jetty" name="jetty"
+		rev="6.1.14"/>
     <dependency org="junit" name="junit" rev="4.5" conf="test->default"/>
     <dependency org="checkstyle" name="checkstyle" rev="5.0"
 		conf="test->default"/>

Added: hadoop/avro/trunk/src/java/org/apache/avro/ipc/HttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/HttpServer.java?rev=820442&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/HttpServer.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/HttpServer.java Wed Sep 30 20:52:22 2009
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.avro.AvroRuntimeException;
+
+import org.mortbay.jetty.servlet.Context;
+import org.mortbay.jetty.servlet.ServletHolder;
+
+public class HttpServer extends HttpServlet implements Server {
+  private Responder responder;
+  private org.mortbay.jetty.Server server;
+
+  public HttpServer(Responder responder, int port) throws IOException {
+    this.responder = responder;
+    this.server = new org.mortbay.jetty.Server(port);
+    new Context(server,"/").addServlet(new ServletHolder(this), "/*");
+    try {
+      server.start();
+    } catch (Exception e) {
+      throw new AvroRuntimeException(e);
+    }
+  }
+
+  @Override
+  public int getPort() { return server.getConnectors()[0].getLocalPort(); }
+
+  @Override
+  public void close() {
+    try {
+      server.stop();
+    } catch (Exception e) {
+      throw new AvroRuntimeException(e);
+    }
+  }
+
+  public void doPost(HttpServletRequest request, HttpServletResponse response)
+        throws IOException, ServletException {
+    response.setContentType("avro/binary");
+    List<ByteBuffer> requestBuffers =
+      HttpTransceiver.readBuffers(request.getInputStream());
+    try {
+      List<ByteBuffer> responseBuffers =
+        responder.respond(requestBuffers);
+      response.setContentLength(HttpTransceiver.getLength(responseBuffers));
+      HttpTransceiver.writeBuffers(responseBuffers, response.getOutputStream());
+    } catch (AvroRuntimeException e) {
+      throw new ServletException(e);
+    }
+  }
+}

Added: hadoop/avro/trunk/src/java/org/apache/avro/ipc/HttpTransceiver.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/HttpTransceiver.java?rev=820442&view=auto
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/HttpTransceiver.java (added)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/HttpTransceiver.java Wed Sep 30 20:52:22
2009
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.ipc;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.EOFException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.net.URL;
+import java.net.URLConnection;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** An HTTP-based {@link Transceiver} implementation. */
+public class HttpTransceiver extends Transceiver {
+  private static final Logger LOG
+    = LoggerFactory.getLogger(HttpTransceiver.class);
+
+  private URL url;
+  private URLConnection connection;
+  
+  public HttpTransceiver(URL url) { this.url = url; }
+
+  public String getRemoteName() { return this.url.toString(); }
+
+  @Override
+  public synchronized List<ByteBuffer> transceive(List<ByteBuffer> request)
+    throws IOException {
+    this.connection = url.openConnection();
+    connection.setRequestProperty("Content-Type", "avro/binary");
+    connection.setRequestProperty("Content-Length",
+                                  Integer.toString(getLength(request)));
+    connection.setDoOutput(true);
+    LOG.info("Connecting to: "+url);
+    return super.transceive(request);
+  }
+
+  public synchronized List<ByteBuffer> readBuffers() throws IOException {
+    return readBuffers(connection.getInputStream());
+  }
+
+  public synchronized void writeBuffers(List<ByteBuffer> buffers)
+    throws IOException {
+    writeBuffers(buffers, connection.getOutputStream());
+  }
+
+  static int getLength(List<ByteBuffer> buffers) {
+    int length = 0;
+    for (ByteBuffer buffer : buffers) {
+      length += 4;
+      length += buffer.remaining();
+    }
+    length += 4;
+    return length;
+  }
+
+  static List<ByteBuffer> readBuffers(InputStream in)
+    throws IOException {
+    List<ByteBuffer> buffers = new ArrayList<ByteBuffer>();
+    while (true) {
+      int length = (in.read()<<24)+(in.read()<<16)+(in.read()<<8)+in.read();
+      if (length == 0) {                       // end of buffers
+        return buffers;
+      }
+      ByteBuffer buffer = ByteBuffer.allocate(length);
+      while (buffer.hasRemaining()) {
+        int p = buffer.position();
+        int i = in.read(buffer.array(), p, buffer.remaining());
+        if (i < 0)
+          throw new EOFException("Unexpected EOF");
+        buffer.position(p+i);
+      }
+      buffer.flip();
+      buffers.add(buffer);
+    }
+  }
+
+  static void writeBuffers(List<ByteBuffer> buffers, OutputStream out)
+    throws IOException {
+    for (ByteBuffer buffer : buffers) {
+      writeLength(buffer.limit(), out);           // length-prefix
+      out.write(buffer.array(), buffer.position(), buffer.remaining());
+      buffer.position(buffer.limit());
+    }
+    writeLength(0, out);                          // null-terminate
+  }
+
+  private static void writeLength(int length, OutputStream out)
+    throws IOException {
+    out.write(0xff & (length >>> 24));
+    out.write(0xff & (length >>> 16));
+    out.write(0xff & (length >>> 8));
+    out.write(0xff & length);
+  }
+}
+

Added: hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolHttp.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolHttp.java?rev=820442&view=auto
==============================================================================
--- hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolHttp.java (added)
+++ hadoop/avro/trunk/src/test/java/org/apache/avro/TestProtocolHttp.java Wed Sep 30 20:52:22
2009
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.avro;
+
+import java.util.Random;
+import org.apache.avro.ipc.HttpServer;
+import org.apache.avro.ipc.HttpTransceiver;
+import org.apache.avro.specific.SpecificRequestor;
+import org.apache.avro.specific.SpecificResponder;
+import org.apache.avro.test.Simple;
+import org.junit.Before;
+
+import java.net.URL;
+
+public class TestProtocolHttp extends TestProtocolSpecific {
+
+  @Before
+  public void testStartServer() throws Exception {
+    server =
+      new HttpServer(new SpecificResponder(Simple.class, new TestImpl()), 0);
+    client =
+      new HttpTransceiver(new URL("http://127.0.0.1:"+server.getPort()+"/"));
+    proxy = (Simple)SpecificRequestor.getClient(Simple.class, client);
+  }
+
+}



Mime
View raw message