avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r790377 - in /hadoop/avro/trunk: CHANGES.txt src/java/org/apache/avro/ipc/Requestor.java src/java/org/apache/avro/ipc/Responder.java src/py/avro/ipc.py
Date Wed, 01 Jul 2009 21:02:13 GMT
Author: cutting
Date: Wed Jul  1 21:02:13 2009
New Revision: 790377

URL: http://svn.apache.org/viewvc?rev=790377&view=rev
Log:
AVRO-66.  Add per-call metadata to Java and Python.  Contributed by George Porter & cutting.

Modified:
    hadoop/avro/trunk/CHANGES.txt
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java
    hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java
    hadoop/avro/trunk/src/py/avro/ipc.py

Modified: hadoop/avro/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/CHANGES.txt?rev=790377&r1=790376&r2=790377&view=diff
==============================================================================
--- hadoop/avro/trunk/CHANGES.txt (original)
+++ hadoop/avro/trunk/CHANGES.txt Wed Jul  1 21:02:13 2009
@@ -17,6 +17,9 @@
 
     AVRO-46. Optimized RPC handshake protocol for Python.  (sharad)
 
+    AVRO-66.  Add per-call RPC metadata to Java and Python. (George
+    Porter & cutting)
+
   NEW FEATURES
 
     AVRO-6. Permit easier implementation of alternate generic data

Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java?rev=790377&r1=790376&r2=790377&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/Requestor.java Wed Jul  1 21:02:13 2009
@@ -29,6 +29,8 @@
 import org.apache.avro.Protocol;
 import org.apache.avro.Schema;
 import org.apache.avro.Protocol.Message;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.BinaryEncoder;
@@ -43,6 +45,13 @@
 public abstract class Requestor {
   private static final Logger LOG = LoggerFactory.getLogger(Requestor.class);
 
+  private static final Schema META =
+    Schema.createMap(Schema.create(Schema.Type.BYTES));
+  private static final GenericDatumReader<Map<Utf8,ByteBuffer>> META_READER =
+    new GenericDatumReader<Map<Utf8,ByteBuffer>>(META);
+  private static final GenericDatumWriter<Map<Utf8,ByteBuffer>> META_WRITER =
+    new GenericDatumWriter<Map<Utf8,ByteBuffer>>(META);
+
   private Protocol local;
   private Protocol remote;
   private boolean established, sendLocalText;
@@ -63,6 +72,7 @@
     throws IOException {
     Decoder in;
     Message m;
+    Map<Utf8,ByteBuffer> requestMeta = new HashMap<Utf8,ByteBuffer>();
     do {
       ByteBufferOutputStream bbo = new ByteBufferOutputStream();
       Encoder out = new BinaryEncoder(bbo);
@@ -75,6 +85,7 @@
       if (m == null)
         throw new AvroRuntimeException("Not a local message: "+messageName);
       
+      META_WRITER.write(requestMeta, out);
       out.writeString(m.getName());       // write message name
       writeRequest(m.getRequest(), request, out); // write request payload
       
@@ -91,6 +102,7 @@
     m = getRemote().getMessages().get(messageName);
     if (m == null)
       throw new AvroRuntimeException("Not a remote message: "+messageName);
+    Map<Utf8,ByteBuffer> responseMeta = META_READER.read(null, in);
     if (!in.readBoolean()) {                      // no error
       return readResponse(m.getResponse(), in);
     } else {

Modified: hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java?rev=790377&r1=790376&r2=790377&view=diff
==============================================================================
--- hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java (original)
+++ hadoop/avro/trunk/src/java/org/apache/avro/ipc/Responder.java Wed Jul  1 21:02:13 2009
@@ -28,6 +28,8 @@
 import org.apache.avro.*;
 import org.apache.avro.Protocol.Message;
 import org.apache.avro.util.*;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.io.*;
 import org.apache.avro.specific.*;
 
@@ -35,6 +37,13 @@
 public abstract class Responder {
   private static final Logger LOG = LoggerFactory.getLogger(Responder.class);
 
+  private static final Schema META =
+    Schema.createMap(Schema.create(Schema.Type.BYTES));
+  private static final GenericDatumReader<Map<Utf8,ByteBuffer>> META_READER =
+    new GenericDatumReader<Map<Utf8,ByteBuffer>>(META);
+  private static final GenericDatumWriter<Map<Utf8,ByteBuffer>> META_WRITER =
+    new GenericDatumWriter<Map<Utf8,ByteBuffer>>(META);
+
   private Map<Transceiver,Protocol> remotes
     = Collections.synchronizedMap(new WeakHashMap<Transceiver,Protocol>());
   private Map<MD5,Protocol> protocols
@@ -69,6 +78,7 @@
         return bbo.getBufferList();
 
       // read request using remote protocol specification
+      Map<Utf8,ByteBuffer> requestMeta = META_READER.read(null, in);
       String messageName = in.readString(null).toString();
       Message m = remote.getMessages().get(messageName);
       if (m == null)
@@ -90,6 +100,8 @@
         error = new AvroRemoteException(new Utf8(e.toString()));
       }
 
+      Map<Utf8,ByteBuffer> responseMeta = new HashMap<Utf8,ByteBuffer>();
+      META_WRITER.write(responseMeta, out);
       out.writeBoolean(error != null);
       if (error == null)
         writeResponse(m.getResponse(), response, out);

Modified: hadoop/avro/trunk/src/py/avro/ipc.py
URL: http://svn.apache.org/viewvc/hadoop/avro/trunk/src/py/avro/ipc.py?rev=790377&r1=790376&r2=790377&view=diff
==============================================================================
--- hadoop/avro/trunk/src/py/avro/ipc.py (original)
+++ hadoop/avro/trunk/src/py/avro/ipc.py Wed Jul  1 21:02:13 2009
@@ -20,6 +20,7 @@
 import avro.schema as schema
 import avro.protocol as protocol
 import avro.io as io
+import avro.genericio as genericio
 import avro.reflectio as reflectio
 
 class TransceiverBase(object):
@@ -79,6 +80,10 @@
 _REMOTE_HASHES = dict()
 _REMOTE_PROTOCOLS = dict()
 
+_META_SCHEMA = schema.parse("{\"type\": \"map\", \"values\": \"bytes\"}")
+_META_WRITER = genericio.DatumWriter(_META_SCHEMA)
+_META_READER = genericio.DatumReader(_META_SCHEMA)
+
 class RequestorBase(object):
   """Base class for the client side of a protocol interaction."""
 
@@ -107,6 +112,8 @@
       encoder = io.Encoder(buf)
       if not self.__established:
         self.__writehandshake(encoder)
+      requestmeta = dict()
+      _META_WRITER.write(requestmeta, encoder)
       m = self.__localproto.getmessages().get(msgname)
       if m is None:
         raise schema.AvroException("Not a local message: "+msgname.__str__())
@@ -116,6 +123,7 @@
       decoder = io.Decoder(cStringIO.StringIO(response))
       if not self.__established:
         self.__readhandshake(decoder)
+    responsemeta = _META_READER.read(decoder)
     m = self.getremote().getmessages().get(msgname)
     if m is None:
       raise schema.AvroException("Not a remote message: "+msgname.__str__())
@@ -197,6 +205,7 @@
     buf = cStringIO.StringIO()
     encoder = io.Encoder(buf)
     error = None
+    responsemeta = dict()
     
     try:
       remoteproto = self.__handshake(transceiver, decoder, encoder)
@@ -204,6 +213,7 @@
         return buf.getvalue()
       
       #read request using remote protocol specification
+      requestmeta = _META_READER.read(decoder)
       msgname = decoder.readutf8()
       m = remoteproto.getmessages().get(msgname)
       if m is None:
@@ -220,6 +230,7 @@
         error = e
       except Exception, e:
         error = AvroRemoteException(unicode(e.__str__()))
+      _META_WRITER.write(responsemeta, encoder)
       encoder.writeboolean(error is not None)
       if error is None:
         self.writeresponse(m.getresponse(), response, encoder)
@@ -229,6 +240,7 @@
       error = AvroRemoteException(unicode(e.__str__()))
       buf = cStringIO.StringIO()
       encoder = io.Encoder(buf)
+      _META_WRITER.write(responsemeta, encoder)
       encoder.writeboolean(True)
       self.writeerror(protocol._SYSTEM_ERRORS, error, encoder)
       



Mime
View raw message