avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r1557225 [3/3] - in /avro/trunk: ./ lang/py3/ lang/py3/avro/ lang/py3/avro/tests/ lang/py3/scripts/ share/test/schemas/
Date Fri, 10 Jan 2014 19:11:43 GMT
Added: avro/trunk/lang/py3/avro/tests/test_ipc.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/test_ipc.py?rev=1557225&view=auto
==============================================================================
--- avro/trunk/lang/py3/avro/tests/test_ipc.py (added)
+++ avro/trunk/lang/py3/avro/tests/test_ipc.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,158 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+There are currently no IPC tests within python, in part because there are no
+servers yet available.
+"""
+
+import logging
+import threading
+import time
+import unittest
+
+from avro import ipc
+from avro import protocol
+from avro import schema
+
+
+def NowMS():
+  return int(time.time() * 1000)
+
+
+ECHO_PROTOCOL_JSON = """
+{
+  "protocol" : "Echo",
+  "namespace" : "org.apache.avro.ipc.echo",
+  "types" : [ {
+    "type" : "record",
+    "name" : "Ping",
+    "fields" : [ {
+      "name" : "timestamp",
+      "type" : "long",
+      "default" : -1
+    }, {
+      "name" : "text",
+      "type" : "string",
+      "default" : ""
+    } ]
+  }, {
+    "type" : "record",
+    "name" : "Pong",
+    "fields" : [ {
+      "name" : "timestamp",
+      "type" : "long",
+      "default" : -1
+    }, {
+      "name" : "ping",
+      "type" : "Ping"
+    } ]
+  } ],
+  "messages" : {
+    "ping" : {
+      "request" : [ {
+        "name" : "ping",
+        "type" : "Ping"
+      } ],
+      "response" : "Pong"
+    }
+  }
+}
+"""
+
+
+ECHO_PROTOCOL = protocol.Parse(ECHO_PROTOCOL_JSON)
+
+
+class EchoResponder(ipc.Responder):
+  def __init__(self):
+    super(EchoResponder, self).__init__(
+        local_protocol=ECHO_PROTOCOL,
+    )
+
+  def Invoke(self, message, request):
+    logging.info('Message: %s', message)
+    logging.info('Request: %s', request)
+    ping = request['ping']
+    return {'timestamp': NowMS(), 'ping': ping}
+
+
+class TestIPC(unittest.TestCase):
+
+  def __init__(self, *args, **kwargs):
+    super(TestIPC, self).__init__(*args, **kwargs)
+    # Reference to an Echo RPC over HTTP server:
+    self._server = None
+
+  def StartEchoServer(self):
+    self._server = ipc.AvroIpcHttpServer(
+        interface='localhost',
+        port=0,
+        responder=EchoResponder(),
+    )
+
+    def ServerThread():
+      self._server.serve_forever()
+
+    self._server_thread = threading.Thread(target=ServerThread)
+    self._server_thread.start()
+
+    logging.info(
+        'Echo RPC Server listening on %s:%s',
+        *self._server.server_address)
+    logging.info('RPC socket: %s', self._server.socket)
+
+  def StopEchoServer(self):
+    assert (self._server is not None)
+    self._server.shutdown()
+    self._server_thread.join()
+    self._server.server_close()
+    self._server = None
+
+  def testEchoService(self):
+    """Tests client-side of the Echo service."""
+    self.StartEchoServer()
+    try:
+      (server_host, server_port) = self._server.server_address
+
+      transceiver = ipc.HTTPTransceiver(host=server_host, port=server_port)
+      requestor = ipc.Requestor(
+          local_protocol=ECHO_PROTOCOL,
+          transceiver=transceiver,
+      )
+      response = requestor.Request(
+          message_name='ping',
+          request_datum={'ping': {'timestamp': 31415, 'text': 'hello ping'}},
+      )
+      logging.info('Received echo response: %s', response)
+
+      response = requestor.Request(
+          message_name='ping',
+          request_datum={'ping': {'timestamp': 123456, 'text': 'hello again'}},
+      )
+      logging.info('Received echo response: %s', response)
+
+      transceiver.Close()
+
+    finally:
+      self.StopEchoServer()
+
+
+if __name__ == '__main__':
+  raise Exception('Use run_tests.py')

Propchange: avro/trunk/lang/py3/avro/tests/test_ipc.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/py3/avro/tests/test_protocol.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/test_protocol.py?rev=1557225&view=auto
==============================================================================
--- avro/trunk/lang/py3/avro/tests/test_protocol.py (added)
+++ avro/trunk/lang/py3/avro/tests/test_protocol.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,504 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+Test the protocol parsing logic.
+"""
+
+import logging
+import traceback
+import unittest
+
+from avro import protocol
+
+
+# ------------------------------------------------------------------------------
+
+
+class ExampleProtocol(object):
+  def __init__(
+      self,
+      protocol_string,
+      valid=True,
+  ):
+    self._protocol_string = protocol_string
+    self._valid = valid
+
+  @property
+  def protocol_string(self):
+    return self._protocol_string
+
+  @property
+  def valid(self):
+    return self._valid
+
+
+# ------------------------------------------------------------------------------
+# Protocol test cases:
+
+HELLO_WORLD = ExampleProtocol("""
+{
+  "namespace": "com.acme",
+  "protocol": "HelloWorld",
+  "types": [
+    {
+      "name": "Greeting",
+      "type": "record",
+      "fields": [{"name": "message", "type": "string"}]
+    },
+    {
+      "name": "Curse",
+      "type": "error",
+      "fields": [{"name": "message", "type": "string"}]
+    }
+  ],
+  "messages": {
+    "hello": {
+      "request": [{"name": "greeting", "type": "Greeting" }],
+      "response": "Greeting",
+      "errors": ["Curse"]
+    }
+  }
+}
+""")
+
+EXAMPLES = [
+  HELLO_WORLD,
+
+  ExampleProtocol(
+  """
+  {
+    "namespace": "org.apache.avro.test",
+    "protocol": "Simple",
+    "types": [
+      {"name": "Kind", "type": "enum", "symbols": ["FOO","BAR","BAZ"]},
+      {"name": "MD5", "type": "fixed", "size": 16},
+      {"name": "TestRecord", "type": "record",
+       "fields": [
+         {"name": "name", "type": "string", "order": "ignore"},
+         {"name": "kind", "type": "Kind", "order": "descending"},
+         {"name": "hash", "type": "MD5"}
+       ]
+      },
+      {"name": "TestError", "type": "error",
+       "fields": [
+         {"name": "message", "type": "string"}
+       ]
+      }
+    ],
+    "messages": {
+      "hello": {
+        "request": [{"name": "greeting", "type": "string"}],
+        "response": "string"
+      },
+      "echo": {
+        "request": [{"name": "record", "type": "TestRecord"}],
+        "response": "TestRecord"
+      },
+      "add": {
+        "request": [{"name": "arg1", "type": "int"},
+                    {"name": "arg2", "type": "int"}],
+        "response": "int"
+      },
+      "echoBytes": {
+        "request": [{"name": "data", "type": "bytes"}],
+        "response": "bytes"
+      },
+      "error": {
+        "request": [],
+        "response": "null",
+        "errors": ["TestError"]
+      }
+    }
+  }
+  """),
+
+  ExampleProtocol(
+  """
+  {
+    "namespace": "org.apache.avro.test.namespace",
+    "protocol": "TestNamespace",
+    "types": [
+      {"name": "org.apache.avro.test.util.MD5", "type": "fixed", "size": 16},
+      {
+        "name": "TestRecord",
+        "type": "record",
+        "fields": [{"name": "hash", "type": "org.apache.avro.test.util.MD5"}]
+      },
+      {
+        "name": "TestError",
+        "namespace": "org.apache.avro.test.errors",
+        "type": "error",
+        "fields": [{"name": "message", "type": "string"}]
+      }
+    ],
+    "messages": {
+      "echo": {
+        "request": [{"name": "record", "type": "TestRecord"}],
+        "response": "TestRecord"
+      },
+      "error": {
+        "request": [],
+        "response": "null",
+        "errors": ["org.apache.avro.test.errors.TestError"]
+      }
+    }
+  }
+  """),
+
+  ExampleProtocol(
+  """
+  {
+    "namespace": "org.apache.avro.test.namespace",
+    "protocol": "TestImplicitNamespace",
+    "types": [
+      {"name": "org.apache.avro.test.util.MD5", "type": "fixed", "size": 16},
+      {
+        "name": "ReferencedRecord",
+        "type": "record",
+        "fields": [{"name": "foo", "type": "string"}]
+      },
+      {
+        "name": "TestRecord",
+        "type": "record",
+        "fields": [
+          {"name": "hash", "type": "org.apache.avro.test.util.MD5"},
+          {"name": "unqalified", "type": "ReferencedRecord"}
+        ]
+      },
+      {
+        "name": "TestError",
+        "type": "error",
+        "fields": [{"name": "message", "type": "string"}]
+      }
+    ],
+    "messages": {
+      "echo": {
+        "request": [
+          {"name": "qualified", "type": "org.apache.avro.test.namespace.TestRecord"}
+        ],
+        "response": "TestRecord"
+      },
+      "error": {
+        "request": [],
+        "response": "null",
+        "errors": ["org.apache.avro.test.namespace.TestError"]
+      }
+    }
+  }
+  """),
+
+  ExampleProtocol(
+  """
+  {
+    "namespace": "org.apache.avro.test.namespace",
+    "protocol": "TestNamespaceTwo",
+    "types": [
+      {"name": "org.apache.avro.test.util.MD5", "type": "fixed", "size": 16},
+      {
+        "name": "ReferencedRecord",
+        "namespace": "org.apache.avro.other.namespace",
+        "type": "record",
+        "fields": [{"name": "foo", "type": "string"}]
+      },
+      {
+        "name": "TestRecord",
+        "type": "record",
+        "fields": [
+          {"name": "hash", "type": "org.apache.avro.test.util.MD5"},
+          {
+            "name": "qualified",
+            "type": "org.apache.avro.other.namespace.ReferencedRecord"
+          }
+        ]
+      },
+      {
+        "name": "TestError",
+        "type": "error",
+        "fields": [{"name": "message", "type": "string"}]
+      }
+    ],
+    "messages": {
+      "echo": {
+        "request": [
+          {
+            "name": "qualified",
+            "type": "org.apache.avro.test.namespace.TestRecord"
+          }
+        ],
+        "response": "TestRecord"
+      },
+      "error": {
+        "request": [],
+        "response": "null",
+        "errors": ["org.apache.avro.test.namespace.TestError"]
+      }
+    }
+  }
+  """),
+
+  ExampleProtocol(
+  """
+  {
+    "namespace": "org.apache.avro.test.namespace",
+    "protocol": "TestValidRepeatedName",
+    "types": [
+      {"name": "org.apache.avro.test.util.MD5", "type": "fixed", "size": 16},
+      {
+        "name": "ReferencedRecord",
+        "namespace": "org.apache.avro.other.namespace",
+        "type": "record",
+        "fields": [{"name": "foo", "type": "string"}]
+      },
+      {
+        "name": "ReferencedRecord",
+        "type": "record",
+        "fields": [{"name": "bar", "type": "double"}]
+      },
+      {
+        "name": "TestError",
+        "type": "error",
+        "fields": [{"name": "message", "type": "string"}]
+      }
+    ],
+    "messages": {
+      "echo": {
+        "request": [{"name": "qualified", "type": "ReferencedRecord"}],
+        "response": "org.apache.avro.other.namespace.ReferencedRecord"
+      },
+      "error": {
+        "request": [],
+        "response": "null",
+        "errors": ["org.apache.avro.test.namespace.TestError"]
+      }
+    }
+  }
+  """),
+
+  ExampleProtocol(
+  """
+  {
+    "namespace": "org.apache.avro.test.namespace",
+    "protocol": "TestInvalidRepeatedName",
+    "types": [
+      {"name": "org.apache.avro.test.util.MD5", "type": "fixed", "size": 16},
+      {
+        "name": "ReferencedRecord",
+        "type": "record",
+        "fields": [{"name": "foo", "type": "string"}]
+      },
+      {
+        "name": "ReferencedRecord",
+        "type": "record",
+        "fields": [{"name": "bar", "type": "double"}]
+      },
+      {
+        "name": "TestError",
+        "type": "error",
+        "fields": [{"name": "message", "type": "string"}]
+      }
+    ],
+    "messages": {
+      "echo": {
+        "request": [{"name": "qualified", "type": "ReferencedRecord"}],
+        "response": "org.apache.avro.other.namespace.ReferencedRecord"
+      },
+      "error": {
+        "request": [],
+        "response": "null",
+        "errors": ["org.apache.avro.test.namespace.TestError"]
+      }
+    }
+  }
+  """,
+  valid=False),
+
+  ExampleProtocol(
+  """
+  {
+    "namespace": "org.apache.avro.test",
+    "protocol": "BulkData",
+    "types": [],
+    "messages": {
+      "read": {
+        "request": [],
+        "response": "bytes"
+      },
+      "write": {
+        "request": [ {"name": "data", "type": "bytes"} ],
+        "response": "null"
+      }
+    }
+  }
+  """),
+
+  ExampleProtocol(
+  """
+  {
+    "protocol": "API",
+    "namespace": "xyz.api",
+    "types": [
+      {
+        "type": "enum",
+        "name": "Symbology",
+        "namespace": "xyz.api.product",
+        "symbols": [ "OPRA", "CUSIP", "ISIN", "SEDOL" ]
+      },
+      {
+        "type": "record",
+        "name": "Symbol",
+        "namespace": "xyz.api.product",
+        "fields": [ {
+          "name": "symbology",
+          "type": "xyz.api.product.Symbology"
+        }, {
+          "name": "symbol",
+          "type": "string"
+        } ]
+      },
+      {
+        "type": "record",
+        "name": "MultiSymbol",
+        "namespace": "xyz.api.product",
+        "fields": [{
+          "name": "symbols",
+          "type": {
+            "type": "map",
+            "values": "xyz.api.product.Symbol"
+          }
+        }]
+      }
+    ],
+    "messages": {}
+  }
+  """),
+]
+# End of EXAMPLES
+
+
+VALID_EXAMPLES = [e for e in EXAMPLES if e.valid]
+
+
+# ------------------------------------------------------------------------------
+
+
+class TestProtocol(unittest.TestCase):
+
+  def testParse(self):
+    correct = 0
+    for iexample, example in enumerate(EXAMPLES):
+      logging.debug(
+          'Parsing protocol #%d:\n%s',
+          iexample, example.protocol_string)
+      try:
+        parsed = protocol.Parse(example.protocol_string)
+        if example.valid:
+          correct += 1
+        else:
+          self.fail(
+              'Invalid protocol was parsed:\n%s' % example.protocol_string)
+      except Exception as exn:
+        if example.valid:
+          self.fail(
+              'Valid protocol failed to parse: %s\n%s'
+              % (example.protocol_string, traceback.format_exc()))
+        else:
+          if logging.getLogger().getEffectiveLevel() <= 5:
+            logging.debug('Expected error:\n%s', traceback.format_exc())
+          else:
+            logging.debug('Expected error: %r', exn)
+          correct += 1
+
+    self.assertEqual(
+      correct,
+      len(EXAMPLES),
+      'Parse behavior correct on %d out of %d protocols.'
+      % (correct, len(EXAMPLES)))
+
+  def testInnerNamespaceSet(self):
+    proto = protocol.Parse(HELLO_WORLD.protocol_string)
+    self.assertEqual(proto.namespace, 'com.acme')
+    greeting_type = proto.type_map['com.acme.Greeting']
+    self.assertEqual(greeting_type.namespace, 'com.acme')
+
+  def testInnerNamespaceNotRendered(self):
+    proto = protocol.Parse(HELLO_WORLD.protocol_string)
+    self.assertEqual('com.acme.Greeting', proto.types[0].fullname)
+    self.assertEqual('Greeting', proto.types[0].name)
+    # but there shouldn't be 'namespace' rendered to json on the inner type
+    self.assertFalse('namespace' in proto.to_json()['types'][0])
+
+  def testValidCastToStringAfterParse(self):
+    """
+    Test that the string generated by an Avro Protocol object is,
+    in fact, a valid Avro protocol.
+    """
+    num_correct = 0
+    for example in VALID_EXAMPLES:
+      proto = protocol.Parse(example.protocol_string)
+      try:
+        protocol.Parse(str(proto))
+        logging.debug(
+            'Successfully reparsed protocol:\n%s',
+            example.protocol_string)
+        num_correct += 1
+      except:
+        logging.debug(
+            'Failed to reparse protocol:\n%s',
+            example.protocol_string)
+
+    fail_msg = (
+      'Cast to string success on %d out of %d protocols'
+      % (num_correct, len(VALID_EXAMPLES)))
+    self.assertEqual(num_correct, len(VALID_EXAMPLES), fail_msg)
+
+  def testEquivalenceAfterRoundTrip(self):
+    """
+    1. Given a string, parse it to get Avro protocol "original".
+    2. Serialize "original" to a string and parse that string
+         to generate Avro protocol "round trip".
+    3. Ensure "original" and "round trip" protocols are equivalent.
+    """
+    num_correct = 0
+    for example in VALID_EXAMPLES:
+      original_protocol = protocol.Parse(example.protocol_string)
+      round_trip_protocol = protocol.Parse(str(original_protocol))
+
+      if original_protocol == round_trip_protocol:
+        num_correct += 1
+        logging.debug(
+            'Successful round-trip for protocol:\n%s',
+            example.protocol_string)
+      else:
+        self.fail(
+            'Round-trip failure for protocol:\n%s\nOriginal protocol:\n%s'
+            % (example.protocol_string, str(original_protocol)))
+
+    self.assertEqual(
+        num_correct,
+        len(VALID_EXAMPLES),
+        'Round trip success on %d out of %d protocols.'
+        % (num_correct, len(VALID_EXAMPLES)))
+
+
+# ------------------------------------------------------------------------------
+
+
+if __name__ == '__main__':
+  raise Exception('Use run_tests.py')

Propchange: avro/trunk/lang/py3/avro/tests/test_protocol.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/py3/avro/tests/test_schema.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/test_schema.py?rev=1557225&view=auto
==============================================================================
--- avro/trunk/lang/py3/avro/tests/test_schema.py (added)
+++ avro/trunk/lang/py3/avro/tests/test_schema.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,625 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+Test the schema parsing logic.
+"""
+
+import logging
+import traceback
+import unittest
+
+from avro import schema
+
+
+# ------------------------------------------------------------------------------
+
+
+class ExampleSchema(object):
+  def __init__(self, schema_string, valid, name='', comment=''):
+    self._schema_string = schema_string
+    self._valid = valid
+    self._name = name or schema_string # default to schema_string for name
+    self.comment = comment
+
+  @property
+  def schema_string(self):
+    return self._schema_string
+
+  @property
+  def valid(self):
+    return self._valid
+
+  @property
+  def name(self):
+    return self._name
+
+
+# ------------------------------------------------------------------------------
+# Example Schemas
+
+
+def MakePrimitiveExamples():
+  examples = []
+  for type in schema.PRIMITIVE_TYPES:
+    examples.append(ExampleSchema('"%s"' % type, valid=True))
+    examples.append(ExampleSchema('{"type": "%s"}' % type, valid=True))
+  return examples
+
+PRIMITIVE_EXAMPLES = MakePrimitiveExamples() + [
+  ExampleSchema('"True"', valid=False),
+  ExampleSchema('True', valid=False),
+  ExampleSchema('{"no_type": "test"}', valid=False),
+  ExampleSchema('{"type": "panther"}', valid=False),
+]
+
+FIXED_EXAMPLES = [
+  ExampleSchema('{"type": "fixed", "name": "Test", "size": 1}', valid=True),
+  ExampleSchema("""
+    {
+      "type": "fixed",
+      "name": "MyFixed",
+      "namespace": "org.apache.hadoop.avro",
+      "size": 1
+    }
+    """,
+    valid=True),
+  ExampleSchema("""
+    {
+      "type": "fixed",
+      "name": "Missing size"
+    }
+    """,
+    valid=False),
+  ExampleSchema("""
+    {
+      "type": "fixed",
+      "size": 314
+    }
+    """,
+    valid=False),
+]
+
+ENUM_EXAMPLES = [
+  ExampleSchema(
+    '{"type": "enum", "name": "Test", "symbols": ["A", "B"]}',
+    valid=True),
+  ExampleSchema("""
+    {
+      "type": "enum",
+      "name": "Status",
+      "symbols": "Normal Caution Critical"
+    }
+    """,
+    valid=False),
+  ExampleSchema("""
+    {
+      "type": "enum",
+      "name": [0, 1, 1, 2, 3, 5, 8],
+      "symbols": ["Golden", "Mean"]
+    }
+    """,
+    valid=False),
+  ExampleSchema("""
+    {
+      "type": "enum",
+      "symbols": ["I", "will", "fail", "no", "name"]
+    }
+    """,
+    valid=False),
+  ExampleSchema("""
+    {
+      "type": "enum",
+      "name": "Test"
+      "symbols": ["AA", "AA"]
+    }
+    """,
+    valid=False),
+]
+
+ARRAY_EXAMPLES = [
+  ExampleSchema('{"type": "array", "items": "long"}', valid=True),
+  ExampleSchema("""
+    {
+      "type": "array",
+      "items": {"type": "enum", "name": "Test", "symbols": ["A", "B"]}
+    }
+    """,
+    valid=True),
+]
+
+MAP_EXAMPLES = [
+  ExampleSchema('{"type": "map", "values": "long"}', True),
+  ExampleSchema("""
+    {
+      "type": "map",
+      "values": {"type": "enum", "name": "Test", "symbols": ["A", "B"]}
+    }
+    """,
+    valid=True,
+  ),
+]
+
+UNION_EXAMPLES = [
+  ExampleSchema('["string", "null", "long"]', valid=True),
+  ExampleSchema('["null", "null"]', valid=False),
+  ExampleSchema('["long", "long"]', valid=False),
+  ExampleSchema("""
+    [
+      {"type": "array", "items": "long"},
+      {"type": "array", "items": "string"}
+    ]
+    """,
+    valid=False,
+  ),
+]
+
+RECORD_EXAMPLES = [
+  ExampleSchema("""
+    {
+      "type": "record",
+      "name": "Test",
+      "fields": [{"name": "f", "type": "long"}]
+    }
+    """,
+    valid=True,
+  ),
+  ExampleSchema("""
+    {
+      "type": "error",
+      "name": "Test",
+      "fields": [{"name": "f", "type": "long"}]
+    }
+    """,
+    valid=True,
+  ),
+  ExampleSchema("""
+    {
+      "type": "record",
+      "name": "Node",
+      "fields": [
+        {"name": "label", "type": "string"},
+        {"name": "children", "type": {"type": "array", "items": "Node"}}
+      ]
+    }
+    """,
+    valid=True,
+  ),
+  ExampleSchema("""
+    {
+      "type": "record",
+      "name": "Lisp",
+      "fields": [{
+        "name": "value",
+        "type": [
+          "null",
+          "string",
+          {
+            "type": "record",
+            "name": "Cons",
+            "fields": [{"name": "car", "type": "Lisp"},
+                       {"name": "cdr", "type": "Lisp"}]
+          }
+        ]
+      }]
+    }
+    """,
+    valid=True,
+  ),
+  ExampleSchema("""
+    {
+      "type": "record",
+      "name": "HandshakeRequest",
+      "namespace": "org.apache.avro.ipc",
+      "fields": [
+        {
+          "name": "clientHash",
+          "type": {"type": "fixed", "name": "MD5", "size": 16}
+        },
+        {"name": "clientProtocol", "type": ["null", "string"]},
+        {"name": "serverHash", "type": "MD5"},
+        {
+          "name": "meta",
+          "type": ["null", {"type": "map", "values": "bytes"}]
+        }
+      ]
+    }
+    """,
+    valid=True,
+  ),
+  ExampleSchema("""
+    {
+      "type": "record",
+      "name": "HandshakeResponse",
+      "namespace": "org.apache.avro.ipc",
+      "fields": [
+        {
+          "name": "match",
+          "type": {
+            "type": "enum",
+            "name": "HandshakeMatch",
+            "symbols": ["BOTH", "CLIENT", "NONE"]
+          }
+        },
+        {"name": "serverProtocol", "type": ["null", "string"]},
+        {
+          "name": "serverHash",
+          "type": ["null", {"name": "MD5", "size": 16, "type": "fixed"}]
+        },
+        {
+          "name": "meta",
+          "type": ["null", {"type": "map", "values": "bytes"}]}]
+        }
+    """,
+    valid=True,
+  ),
+  ExampleSchema("""
+    {
+      "type": "record",
+      "name": "Interop",
+      "namespace": "org.apache.avro",
+      "fields": [
+        {"name": "intField", "type": "int"},
+        {"name": "longField", "type": "long"},
+        {"name": "stringField", "type": "string"},
+        {"name": "boolField", "type": "boolean"},
+        {"name": "floatField", "type": "float"},
+        {"name": "doubleField", "type": "double"},
+        {"name": "bytesField", "type": "bytes"},
+        {"name": "nullField", "type": "null"},
+        {"name": "arrayField", "type": {"type": "array", "items": "double"}},
+        {
+          "name": "mapField",
+          "type": {
+            "type": "map",
+            "values": {"name": "Foo",
+                       "type": "record",
+                       "fields": [{"name": "label", "type": "string"}]}
+          }
+        },
+        {
+          "name": "unionField",
+          "type": ["boolean", "double", {"type": "array", "items": "bytes"}]
+        },
+        {
+          "name": "enumField",
+          "type": {"type": "enum", "name": "Kind", "symbols": ["A", "B", "C"]}
+        },
+        {
+          "name": "fixedField",
+          "type": {"type": "fixed", "name": "MD5", "size": 16}
+        },
+        {
+          "name": "recordField",
+          "type": {"type": "record",
+                   "name": "Node",
+                   "fields": [{"name": "label", "type": "string"},
+                              {"name": "children",
+                               "type": {"type": "array",
+                                        "items": "Node"}}]}
+        }
+      ]
+    }
+    """,
+    valid=True,
+  ),
+  ExampleSchema("""
+    {
+      "type": "record",
+      "name": "ipAddr",
+      "fields": [{
+        "name": "addr",
+        "type": [
+          {"name": "IPv6", "type": "fixed", "size": 16},
+          {"name": "IPv4", "type": "fixed", "size": 4}
+        ]
+      }]
+    }
+    """,
+    valid=True,
+  ),
+  ExampleSchema("""
+    {
+      "type": "record",
+      "name": "Address",
+      "fields": [
+        {"type": "string"},
+        {"type": "string", "name": "City"}
+      ]
+    }
+    """,
+    valid=False,
+  ),
+  ExampleSchema("""
+    {
+      "type": "record",
+      "name": "Event",
+      "fields": [
+        {"name": "Sponsor"},
+        {"name": "City", "type": "string"}
+      ]
+    }
+    """,
+    valid=False,
+  ),
+  ExampleSchema("""
+    {
+      "type": "record",
+      "fields": "His vision, from the constantly passing bars,"
+      "name", "Rainer"
+    }
+    """,
+    valid=False,
+  ),
+  ExampleSchema("""
+    {
+      "name": ["Tom", "Jerry"],
+      "type": "record",
+      "fields": [{"name": "name", "type": "string"}]
+    }
+    """,
+    valid=False,
+  ),
+]
+
+DOC_EXAMPLES = [
+  ExampleSchema("""
+    {
+      "type": "record",
+      "name": "TestDoc",
+      "doc":  "Doc string",
+      "fields": [{"name": "name", "type": "string", "doc": "Doc String"}]
+    }
+    """,
+    valid=True,
+  ),
+  ExampleSchema("""
+    {"type": "enum", "name": "Test", "symbols": ["A", "B"], "doc": "Doc String"}
+    """,
+    valid=True,
+  ),
+]
+
+OTHER_PROP_EXAMPLES = [
+  ExampleSchema("""
+    {
+      "type": "record",
+      "name": "TestRecord",
+      "cp_string": "string",
+      "cp_int": 1,
+      "cp_array": [ 1, 2, 3, 4],
+      "fields": [
+        {"name": "f1", "type": "string", "cp_object": {"a":1,"b":2}},
+        {"name": "f2", "type": "long", "cp_null": null}
+      ]
+    }
+    """,
+    valid=True,
+  ),
+  ExampleSchema(
+    '{"type": "map", "values": "long", "cp_boolean": true}',
+    valid=True,
+  ),
+  ExampleSchema("""
+    {
+      "type": "enum",
+      "name": "TestEnum",
+      "symbols": ["one", "two", "three"],
+      "cp_float": 1.0
+    }
+    """,
+    valid=True,
+  ),
+]
+
+EXAMPLES = PRIMITIVE_EXAMPLES
+EXAMPLES += FIXED_EXAMPLES
+EXAMPLES += ENUM_EXAMPLES
+EXAMPLES += ARRAY_EXAMPLES
+EXAMPLES += MAP_EXAMPLES
+EXAMPLES += UNION_EXAMPLES
+EXAMPLES += RECORD_EXAMPLES
+EXAMPLES += DOC_EXAMPLES
+
+VALID_EXAMPLES = [e for e in EXAMPLES if e.valid]
+
+
+# ------------------------------------------------------------------------------
+
+
+class TestSchema(unittest.TestCase):
+
+  def testCorrectRecursiveExtraction(self):
+    parsed = schema.Parse("""
+      {
+        "type": "record",
+        "name": "X",
+        "fields": [{
+          "name": "y",
+          "type": {
+            "type": "record",
+            "name": "Y",
+            "fields": [{"name": "Z", "type": "X"}, {"name": "W", "type": "X"}]
+          }
+        }]
+      }
+    """)
+    logging.debug('Parsed schema:\n%s', parsed)
+    logging.debug('Fields: %s', parsed.fields)
+    t = schema.Parse(str(parsed.fields[0].type))
+    # If we've made it this far, the subschema was reasonably stringified;
+    # it could be reparsed.
+    self.assertEqual("X", t.fields[0].type.name)
+
+  def testParse(self):
+    correct = 0
+    for iexample, example in enumerate(EXAMPLES):
+      logging.debug('Testing example #%d\n%s', iexample, example.schema_string)
+      try:
+        schema.Parse(example.schema_string)
+        if example.valid:
+          correct += 1
+        else:
+          self.fail('Invalid schema was parsed:\n%s' % example.schema_string)
+      except Exception as exn:
+        if example.valid:
+          self.fail(
+              'Valid schema failed to parse: %r\n%s'
+              % (example.schema_string, traceback.format_exc()))
+        else:
+          if logging.getLogger().getEffectiveLevel() <= 5:
+            logging.debug('Expected error:\n%s', traceback.format_exc())
+          else:
+            logging.debug('Expected error: %r', exn)
+          correct += 1
+
+    self.assertEqual(
+        correct,
+        len(EXAMPLES),
+        'Parse behavior correct on %d out of %d schemas.'
+        % (correct, len(EXAMPLES)),
+    )
+
+  def testValidCastToStringAfterParse(self):
+    """
+    Test that the string generated by an Avro Schema object
+    is, in fact, a valid Avro schema.
+    """
+    correct = 0
+    for example in VALID_EXAMPLES:
+      schema_data = schema.Parse(example.schema_string)
+      schema.Parse(str(schema_data))
+      correct += 1
+
+    fail_msg = "Cast to string success on %d out of %d schemas" % \
+      (correct, len(VALID_EXAMPLES))
+    self.assertEqual(correct, len(VALID_EXAMPLES), fail_msg)
+
+  def testEquivalenceAfterRoundTrip(self):
+    """
+    1. Given a string, parse it to get Avro schema "original".
+    2. Serialize "original" to a string and parse that string
+         to generate Avro schema "round trip".
+    3. Ensure "original" and "round trip" schemas are equivalent.
+    """
+    correct = 0
+    for example in VALID_EXAMPLES:
+      original_schema = schema.Parse(example.schema_string)
+      round_trip_schema = schema.Parse(str(original_schema))
+      if original_schema == round_trip_schema:
+        correct += 1
+        debug_msg = "%s: ROUND TRIP SUCCESS" % example.name
+      else:
+        debug_msg = "%s: ROUND TRIP FAILURE" % example.name
+        self.fail(
+            "Round trip failure: %s, %s, %s"
+            % (example.name, original_schema, str(original_schema)))
+
+    fail_msg = "Round trip success on %d out of %d schemas" % \
+      (correct, len(VALID_EXAMPLES))
+    self.assertEqual(correct, len(VALID_EXAMPLES), fail_msg)
+
+  def testFullname(self):
+    """The fullname is determined in one of the following ways:
+     * A name and namespace are both specified.  For example,
+       one might use "name": "X", "namespace": "org.foo"
+       to indicate the fullname "org.foo.X".
+     * A fullname is specified.  If the name specified contains
+       a dot, then it is assumed to be a fullname, and any
+       namespace also specified is ignored.  For example,
+       use "name": "org.foo.X" to indicate the
+       fullname "org.foo.X".
+     * A name only is specified, i.e., a name that contains no
+       dots.  In this case the namespace is taken from the most
+       tightly encosing schema or protocol.  For example,
+       if "name": "X" is specified, and this occurs
+       within a field of the record definition
+       of "org.foo.Y", then the fullname is "org.foo.X".
+
+    References to previously defined names are as in the latter
+    two cases above: if they contain a dot they are a fullname, if
+    they do not contain a dot, the namespace is the namespace of
+    the enclosing definition.
+
+    Primitive type names have no namespace and their names may
+    not be defined in any namespace.  A schema may only contain
+    multiple definitions of a fullname if the definitions are
+    equivalent.
+    """
+    # relative name and namespace specified
+    self.assertEqual(schema.Name('a', 'o.a.h').fullname, 'o.a.h.a')
+
+    # absolute name and namespace specified
+    self.assertEqual(schema.Name('.a', 'o.a.h').fullname, '.a')
+
+    # absolute name and namespace specified
+    fullname = schema.Name('a.b.c.d', 'o.a.h').fullname
+    self.assertEqual(fullname, 'a.b.c.d')
+
+  def testDocAttributes(self):
+    correct = 0
+    for example in DOC_EXAMPLES:
+      original_schema = schema.Parse(example.schema_string)
+      if original_schema.doc is not None:
+        correct += 1
+      if original_schema.type == 'record':
+        for f in original_schema.fields:
+          if f.doc is None:
+            self.fail(
+                "Failed to preserve 'doc' in fields: "
+                + example.schema_string)
+    self.assertEqual(correct,len(DOC_EXAMPLES))
+
+  def testOtherAttributes(self):
+    correct = 0
+    props = {}
+    for example in OTHER_PROP_EXAMPLES:
+      original_schema = schema.Parse(example.schema_string)
+      round_trip_schema = schema.Parse(str(original_schema))
+      self.assertEqual(original_schema.other_props,round_trip_schema.other_props)
+      if original_schema.type == "record":
+        field_props = 0
+        for f in original_schema.fields:
+          if f.other_props:
+            props.update(f.other_props)
+            field_props += 1
+        self.assertEqual(field_props,len(original_schema.fields))
+      if original_schema.other_props:
+        props.update(original_schema.other_props)
+        correct += 1
+    for k in props:
+      v = props[k]
+      if k == "cp_boolean":
+        self.assertEqual(type(v), bool)
+      elif k == "cp_int":
+        self.assertEqual(type(v), int)
+      elif k == "cp_object":
+        self.assertEqual(type(v), dict)
+      elif k == "cp_float":
+        self.assertEqual(type(v), float)
+      elif k == "cp_array":
+        self.assertEqual(type(v), list)
+    self.assertEqual(correct,len(OTHER_PROP_EXAMPLES))
+
+
+# ------------------------------------------------------------------------------
+
+
+if __name__ == '__main__':
+  raise Exception('Use run_tests.py')

Propchange: avro/trunk/lang/py3/avro/tests/test_schema.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/py3/avro/tests/test_script.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/test_script.py?rev=1557225&view=auto
==============================================================================
--- avro/trunk/lang/py3/avro/tests/test_script.py (added)
+++ avro/trunk/lang/py3/avro/tests/test_script.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,324 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import csv
+import io
+import json
+import logging
+import operator
+import os
+import subprocess
+import tempfile
+import unittest
+
+import avro.datafile
+import avro.io
+import avro.schema
+
+
+# ------------------------------------------------------------------------------
+
+
+NUM_RECORDS = 7
+
+SCHEMA = """
+{
+  "namespace": "test.avro",
+  "name": "LooneyTunes",
+  "type": "record",
+  "fields": [
+    {"name": "first", "type": "string"},
+    {"name": "last", "type": "string"},
+    {"name": "type", "type": "string"}
+  ]
+}
+"""
+
+LOONIES = (
+    ('daffy', 'duck', 'duck'),
+    ('bugs', 'bunny', 'bunny'),
+    ('tweety', '', 'bird'),
+    ('road', 'runner', 'bird'),
+    ('wile', 'e', 'coyote'),
+    ('pepe', 'le pew', 'skunk'),
+    ('foghorn', 'leghorn', 'rooster'),
+)
+
+
+def looney_records():
+  for f, l, t in LOONIES:
+    yield {'first': f, 'last' : l, 'type' : t}
+
+
+def GetRootDir():
+  test_dir = os.path.dirname(os.path.abspath(__file__))
+  root_dir = os.path.dirname(os.path.dirname(test_dir))
+  return root_dir
+
+
+def GetScriptPath():
+  root_dir = GetRootDir()
+  avro_script_path = os.path.join(root_dir, 'scripts', 'avro')
+  assert os.path.exists(avro_script_path), \
+      ('Avro script not found: %r' % avro_script_path)
+  return avro_script_path
+
+
+# Absolute path of the 'avro' script:
+SCRIPT_PATH = GetScriptPath()
+
+
+def RunScript(*args, stdin=None):
+  command = [SCRIPT_PATH]
+  command.extend(args)
+  env = dict(os.environ)
+  env['PYTHONPATH'] = '%s:%s' % (GetRootDir(), env.get('PYTHONPATH', ''))
+  logging.debug('Running command:\n%s', ' \\\n\t'.join(command))
+  process = subprocess.Popen(
+      args=command,
+      env=env,
+      stdin=stdin,
+      stdout=subprocess.PIPE,
+      stderr=subprocess.PIPE,
+  )
+  (out, err) = process.communicate()
+  assert (process.returncode == os.EX_OK), \
+      ('Command %r failed with exit code %r, output %r and error %r'
+       % (command, process.returncode, out, err))
+  return out
+
+
+# The trailing spaces are expected when pretty-printing JSON with json.dumps():
+_JSON_PRETTY = '\n'.join([
+    '{',
+    '    "first": "daffy", ',
+    '    "last": "duck", ',
+    '    "type": "duck"',
+    '}',
+])
+
+
+# ------------------------------------------------------------------------------
+
+
+class TestCat(unittest.TestCase):
+
+  @staticmethod
+  def WriteAvroFile(file_path):
+    schema = avro.schema.Parse(SCHEMA)
+    with open(file_path, 'wb') as writer:
+      with avro.datafile.DataFileWriter(
+          writer=writer,
+          datum_writer=avro.io.DatumWriter(),
+          writer_schema=schema,
+      ) as writer:
+        for record in looney_records():
+          writer.append(record)
+
+  def setUp(self):
+    # TODO: flag to not delete the files
+    delete = True
+    self._avro_file = (
+        tempfile.NamedTemporaryFile(prefix='test-', suffix='.avro', delete=delete))
+    TestCat.WriteAvroFile(self._avro_file.name)
+
+  def tearDown(self):
+    self._avro_file.close()
+
+  def _RunCat(self, *args, raw=False):
+    """Runs the specified 'avro cat test-file ...' command.
+
+    Args:
+      *args: extra parameters to the 'avro cat' command.
+      raw: Whether to decode stdout as UTF-8.
+    Returns:
+      The command stdout (as bytes if raw is set, or else as string).
+    """
+    out = RunScript('cat', self._avro_file.name, *args)
+    if raw:
+      return out
+    else:
+      return out.decode('utf-8')
+
+  def testPrint(self):
+    lines = self._RunCat().splitlines()
+    return len(lines) == NUM_RECORDS
+
+  def testFilter(self):
+    lines = self._RunCat('--filter', "r['type']=='bird'").splitlines()
+    return len(lines) == 2
+
+  def testSkip(self):
+    skip = 3
+    lines = self._RunCat('--skip', str(skip)).splitlines()
+    return len(lines) == NUM_RECORDS - skip
+
+  def testCsv(self):
+    reader = csv.reader(io.StringIO(self._RunCat('-f', 'csv')))
+    self.assertEqual(len(list(reader)), NUM_RECORDS)
+
+  def testCsvHeader(self):
+    reader = csv.DictReader(io.StringIO(self._RunCat('-f', 'csv', '--header')))
+    expected = {'type': 'duck', 'last': 'duck', 'first': 'daffy'}
+
+    data = next(reader)
+    self.assertEqual(expected, data)
+
+  def testPrintSchema(self):
+    out = self._RunCat('--print-schema')
+    self.assertEqual(json.loads(out)['namespace'], 'test.avro')
+
+  def testHelp(self):
+    # Just see we have these
+    self._RunCat('-h')
+    self._RunCat('--help')
+
+  def testJsonPretty(self):
+    out = self._RunCat('--format', 'json-pretty', '-n', '1')
+    self.assertEqual(
+        out.strip(),
+        _JSON_PRETTY.strip(),
+        'Output mismatch\n'
+        'Expect: %r\n'
+        'Actual: %r'
+        % (_JSON_PRETTY.strip(), out.strip()))
+
+  def testVersion(self):
+    out = RunScript('cat', '--version').decode('utf-8')
+
+  def testFiles(self):
+    lines = self._RunCat(self._avro_file.name).splitlines()
+    self.assertEqual(len(lines), 2 * NUM_RECORDS)
+
+  def testFields(self):
+    # One field selection (no comma)
+    lines = self._RunCat('--fields', 'last').splitlines()
+    self.assertEqual(json.loads(lines[0]), {'last': 'duck'})
+
+    # Field selection (with comma and space)
+    lines = self._RunCat('--fields', 'first, last').splitlines()
+    self.assertEqual(json.loads(lines[0]), {'first': 'daffy', 'last': 'duck'})
+
+    # Empty fields should get all
+    lines = self._RunCat('--fields', '').splitlines()
+    self.assertEqual(
+        json.loads(lines[0]),
+        {'first': 'daffy', 'last': 'duck', 'type': 'duck'})
+
+    # Non existing fields are ignored
+    lines = self._RunCat('--fields', 'first,last,age').splitlines()
+    self.assertEqual(
+        json.loads(lines[0]),
+        {'first': 'daffy', 'last': 'duck'})
+
+
+# ------------------------------------------------------------------------------
+
+
+class TestWrite(unittest.TestCase):
+
+  def setUp(self):
+    delete = False
+
+    self._json_file = tempfile.NamedTemporaryFile(
+        prefix='test-', suffix='.json', delete=delete)
+    with open(self._json_file.name, 'w') as f:
+      for record in looney_records():
+        json.dump(record, f)
+        f.write('\n')
+
+    self._csv_file = tempfile.NamedTemporaryFile(
+        prefix='test-', suffix='.csv', delete=delete)
+    with open(self._csv_file.name, 'w') as f:
+      writer = csv.writer(f)
+      get = operator.itemgetter('first', 'last', 'type')
+      for record in looney_records():
+        writer.writerow(get(record))
+
+    self._schema_file = tempfile.NamedTemporaryFile(
+        prefix='test-', suffix='.avsc', delete=delete)
+    with open(self._schema_file.name, 'w') as f:
+      f.write(SCHEMA)
+
+  def tearDown(self):
+    self._csv_file.close()
+    self._json_file.close()
+    self._schema_file.close()
+
+  def _RunWrite(self, *args, stdin=None):
+    """Runs the specified 'avro write ...' command.
+
+    Args:
+      *args: extra parameters to the 'avro write' command.
+      stdin: Optional string to feed the 'avro write' command stdin with.
+    Returns:
+      The command stdout as bytes.
+    """
+    return RunScript(
+        'write', '--schema', self._schema_file.name,
+        stdin=stdin, *args
+    )
+
+  def LoadAvro(self, filename):
+    out = RunScript('cat', filename).decode('utf-8')
+    return tuple(map(json.loads, out.splitlines()))
+
+  def testVersion(self):
+    out = RunScript('write', '--version').decode('utf-8')
+
+  def FormatCheck(self, format, filename):
+    with tempfile.NamedTemporaryFile(prefix='test-', suffix='.dat') as temp:
+      with open(temp.name, 'wb') as out:
+        out.write(self._RunWrite(filename, '-f', format))
+
+      records = self.LoadAvro(temp.name)
+      self.assertEqual(len(records), NUM_RECORDS)
+      self.assertEqual(records[0]['first'], 'daffy')
+
+  def testWriteJson(self):
+    self.FormatCheck('json', self._json_file.name)
+
+  def testWriteCsv(self):
+    self.FormatCheck('csv', self._csv_file.name)
+
+  def testOutfile(self):
+    with tempfile.NamedTemporaryFile(prefix='test-', suffix='.dat') as temp:
+      os.remove(temp.name)
+      self._RunWrite(self._json_file.name, '-o', temp.name)
+      self.assertEqual(len(self.LoadAvro(temp.name)), NUM_RECORDS)
+
+  def testMultiFile(self):
+    with tempfile.NamedTemporaryFile(prefix='test-', suffix='.dat') as temp:
+      with open(temp.name, 'wb') as out:
+        out.write(self._RunWrite(self._json_file.name, self._json_file.name))
+
+      self.assertEqual(len(self.LoadAvro(temp.name)), 2 * NUM_RECORDS)
+
+  def testStdin(self):
+    with tempfile.NamedTemporaryFile(prefix='test-', suffix='.dat') as temp:
+      with open(self._json_file.name, 'rb') as input_content:
+        with open(temp.name, 'wb') as out:
+          out.write(self._RunWrite('--input-type', 'json', stdin=input_content))
+
+      self.assertEqual(len(self.LoadAvro(temp.name)), NUM_RECORDS)
+
+
+if __name__ == '__main__':
+  raise Exception('Use run_tests.py')

Propchange: avro/trunk/lang/py3/avro/tests/test_script.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/py3/avro/tests/txsample_http_client.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/txsample_http_client.py?rev=1557225&view=auto
==============================================================================
--- avro/trunk/lang/py3/avro/tests/txsample_http_client.py (added)
+++ avro/trunk/lang/py3/avro/tests/txsample_http_client.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,108 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import sys
+
+from twisted.internet import reactor, defer
+from twisted.python.util import println
+
+from avro import protocol
+from avro import txipc
+
+MAIL_PROTOCOL_JSON = """\
+{"namespace": "example.proto",
+ "protocol": "Mail",
+
+ "types": [
+     {"name": "Message", "type": "record",
+      "fields": [
+          {"name": "to",   "type": "string"},
+          {"name": "from", "type": "string"},
+          {"name": "body", "type": "string"}
+      ]
+     }
+ ],
+
+ "messages": {
+     "send": {
+         "request": [{"name": "message", "type": "Message"}],
+         "response": "string"
+     },
+     "replay": {
+         "request": [],
+         "response": "string"
+     }
+ }
+}
+"""
+MAIL_PROTOCOL = protocol.Parse(MAIL_PROTOCOL_JSON)
+SERVER_HOST = 'localhost'
+SERVER_PORT = 9090
+
+class UsageError(Exception):
+  def __init__(self, value):
+    self.value = value
+  def __str__(self):
+    return repr(self.value)
+
+def make_requestor(server_host, server_port, protocol):
+  client = txipc.TwistedHTTPTransceiver(SERVER_HOST, SERVER_PORT)
+  return txipc.TwistedRequestor(protocol, client)
+
+if __name__ == '__main__':
+  if len(sys.argv) not in [4, 5]:
+    raise UsageError("Usage: <to> <from> <body> [<count>]")
+
+  # client code - attach to the server and send a message
+  # fill in the Message record
+  message = dict()
+  message['to'] = sys.argv[1]
+  message['from'] = sys.argv[2]
+  message['body'] = sys.argv[3]
+
+  try:
+    num_messages = int(sys.argv[4])
+  except:
+    num_messages = 1
+
+  # build the parameters for the request
+  params = {}
+  params['message'] = message
+
+  requests = []
+  # send the requests and print the result
+  for msg_count in range(num_messages):
+    requestor = make_requestor(SERVER_HOST, SERVER_PORT, MAIL_PROTOCOL)
+    d = requestor.request('send', params)
+    d.addCallback(lambda result: println("Result: " + result))
+    requests.append(d)
+  results = defer.gatherResults(requests)
+
+  def replay_cb(result):
+    print("Replay Result: " + result)
+    reactor.stop()
+
+  def replay(_):
+    # try out a replay message
+    requestor = make_requestor(SERVER_HOST, SERVER_PORT, MAIL_PROTOCOL)
+    d = requestor.request('replay', dict())
+    d.addCallback(replay_cb)
+
+  results.addCallback(replay)
+  reactor.run()

Propchange: avro/trunk/lang/py3/avro/tests/txsample_http_client.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/py3/avro/tests/txsample_http_server.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tests/txsample_http_server.py?rev=1557225&view=auto
==============================================================================
--- avro/trunk/lang/py3/avro/tests/txsample_http_server.py (added)
+++ avro/trunk/lang/py3/avro/tests/txsample_http_server.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,72 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from twisted.web import server
+from twisted.internet import reactor
+
+from avro import ipc
+from avro import protocol
+from avro import txipc
+
+MAIL_PROTOCOL_JSON = """\
+{"namespace": "example.proto",
+ "protocol": "Mail",
+
+ "types": [
+     {"name": "Message", "type": "record",
+      "fields": [
+          {"name": "to",   "type": "string"},
+          {"name": "from", "type": "string"},
+          {"name": "body", "type": "string"}
+      ]
+     }
+ ],
+
+ "messages": {
+     "send": {
+         "request": [{"name": "message", "type": "Message"}],
+         "response": "string"
+     },
+     "replay": {
+         "request": [],
+         "response": "string"
+     }
+ }
+}
+"""
+MAIL_PROTOCOL = protocol.Parse(MAIL_PROTOCOL_JSON)
+SERVER_ADDRESS = ('localhost', 9090)
+
+class MailResponder(ipc.Responder):
+  def __init__(self):
+    ipc.Responder.__init__(self, MAIL_PROTOCOL)
+
+  def invoke(self, message, request):
+    if message.name == 'send':
+      request_content = request['message']
+      response = "Sent message to %(to)s from %(from)s with body %(body)s" % \
+                 request_content
+      return response
+    elif message.name == 'replay':
+      return 'replay'
+
+if __name__ == '__main__':
+  root = server.Site(txipc.AvroResponderResource(MailResponder()))
+  reactor.listenTCP(9090, root)
+  reactor.run()

Propchange: avro/trunk/lang/py3/avro/tests/txsample_http_server.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/py3/avro/tool.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/tool.py?rev=1557225&view=auto
==============================================================================
--- avro/trunk/lang/py3/avro/tool.py (added)
+++ avro/trunk/lang/py3/avro/tool.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,166 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+Command-line tool
+
+NOTE: The API for the command-line tool is experimental.
+"""
+
+import sys
+import urllib
+
+from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
+
+from avro import io
+from avro import datafile
+from avro import protocol
+from avro import ipc
+
+class GenericResponder(ipc.Responder):
+  def __init__(self, proto, msg, datum):
+    proto_json = file(proto, 'r').read()
+    ipc.Responder.__init__(self, protocol.Parse(proto_json))
+    self.msg = msg
+    self.datum = datum
+
+  def invoke(self, message, request):
+    if message.name == self.msg:
+      print >> sys.stderr, "Message: %s Datum: %s" % (message.name, self.datum)
+      # server will shut down after processing a single Avro request
+      global server_should_shutdown
+      server_should_shutdown = True
+      return self.datum
+
+class GenericHandler(BaseHTTPRequestHandler):
+  def do_POST(self):
+    self.responder = responder
+    call_request_reader = ipc.FramedReader(self.rfile)
+    call_request = call_request_reader.read_framed_message()
+    resp_body = self.responder.respond(call_request)
+    self.send_response(200)
+    self.send_header('Content-Type', 'avro/binary')
+    self.end_headers()
+    resp_writer = ipc.FramedWriter(self.wfile)
+    resp_writer.write_framed_message(resp_body)
+    if server_should_shutdown:
+      print >> sys.stderr, "Shutting down server."
+      self.server.force_stop()
+
+class StoppableHTTPServer(HTTPServer):
+  """HTTPServer.shutdown added in Python 2.6. FML."""
+  stopped = False
+  allow_reuse_address = True
+  def __init__(self, *args, **kw):
+    HTTPServer.__init__(self, *args, **kw)
+    self.allow_reuse_address = True
+
+  def serve_forever(self):
+    while not self.stopped:
+      self.handle_request()
+
+  def force_stop(self):
+    self.server_close()
+    self.stopped = True
+    self.serve_forever()
+
+def run_server(uri, proto, msg, datum):
+  url_obj = urllib.parse.urlparse(uri)
+  server_addr = (url_obj.hostname, url_obj.port)
+  global responder
+  global server_should_shutdown
+  server_should_shutdown = False
+  responder = GenericResponder(proto, msg, datum)
+  server = StoppableHTTPServer(server_addr, GenericHandler)
+  print("Port: %s" % server.server_port)
+  sys.stdout.flush()
+  server.allow_reuse_address = True
+  print >> sys.stderr, "Starting server."
+  server.serve_forever()
+
+def send_message(uri, proto, msg, datum):
+  url_obj = urllib.parse.urlparse(uri)
+  client = ipc.HTTPTransceiver(url_obj.hostname, url_obj.port)
+  proto_json = file(proto, 'r').read()
+  requestor = ipc.Requestor(protocol.Parse(proto_json), client)
+  print(requestor.request(msg, datum))
+
+def file_or_stdin(f):
+  if f == "-":
+    return sys.stdin
+  else:
+    return file(f)
+
+def main(args=sys.argv):
+  if len(args) == 1:
+    print("Usage: %s [dump|rpcreceive|rpcsend]" % args[0])
+    return 1
+
+  if args[1] == "dump":
+    if len(args) != 3:
+      print("Usage: %s dump input_file" % args[0])
+      return 1
+    for d in datafile.DataFileReader(file_or_stdin(args[2]), io.DatumReader()):
+      print(repr(d))
+  elif args[1] == "rpcreceive":
+    usage_str = "Usage: %s rpcreceive uri protocol_file " % args[0]
+    usage_str += "message_name (-data d | -file f)"
+    if len(args) not in [5, 7]:
+      print(usage_str)
+      return 1
+    uri, proto, msg = args[2:5]
+    datum = None
+    if len(args) > 5:
+      if args[5] == "-file":
+        reader = open(args[6], 'rb')
+        datum_reader = io.DatumReader()
+        dfr = datafile.DataFileReader(reader, datum_reader)
+        datum = dfr.next()
+      elif args[5] == "-data":
+        print("JSON Decoder not yet implemented.")
+        return 1
+      else:
+        print(usage_str)
+        return 1
+    run_server(uri, proto, msg, datum)
+  elif args[1] == "rpcsend":
+    usage_str = "Usage: %s rpcsend uri protocol_file " % args[0]
+    usage_str += "message_name (-data d | -file f)"
+    if len(args) not in [5, 7]:
+      print(usage_str)
+      return 1
+    uri, proto, msg = args[2:5]
+    datum = None
+    if len(args) > 5:
+      if args[5] == "-file":
+        reader = open(args[6], 'rb')
+        datum_reader = io.DatumReader()
+        dfr = datafile.DataFileReader(reader, datum_reader)
+        datum = dfr.next()
+      elif args[5] == "-data":
+        print("JSON Decoder not yet implemented.")
+        return 1
+      else:
+        print(usage_str)
+        return 1
+    send_message(uri, proto, msg, datum)
+  return 0
+
+if __name__ == "__main__":
+  sys.exit(main(sys.argv))

Propchange: avro/trunk/lang/py3/avro/tool.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/py3/avro/txipc.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/avro/txipc.py?rev=1557225&view=auto
==============================================================================
--- avro/trunk/lang/py3/avro/txipc.py (added)
+++ avro/trunk/lang/py3/avro/txipc.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,224 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import io
+
+from avro import io as avro_io
+from avro import ipc
+
+from zope.interface import implements
+
+from twisted.internet.defer import maybeDeferred, Deferred
+from twisted.internet.protocol import Protocol
+from twisted.web import resource, server
+from twisted.web.client import Agent
+from twisted.web.http_headers import Headers
+from twisted.web.iweb import IBodyProducer
+
+
+class TwistedRequestor(ipc.BaseRequestor):
+  """A Twisted-compatible requestor. Returns a Deferred that will fire with the
+     returning value, instead of blocking until the request completes."""
+  def _process_handshake(self, call_response, message_name, request_datum):
+    # process the handshake and call response
+    buffer_decoder = avro_io.BinaryDecoder(io.StringIO(call_response))
+    call_response_exists = self.read_handshake_response(buffer_decoder)
+    if call_response_exists:
+      return self.read_call_response(message_name, buffer_decoder)
+    else:
+      return self.request(message_name, request_datum)
+
+  def issue_request(self, call_request, message_name, request_datum):
+    d = self.transceiver.transceive(call_request)
+    d.addCallback(self._process_handshake, message_name, request_datum)
+    return d
+
+class RequestStreamingProducer(object):
+  """A streaming producer for issuing requests with the Twisted.web Agent."""
+  implements(IBodyProducer)
+
+  paused = False
+  stopped = False
+  started = False
+
+  def __init__(self, message):
+    self._message = message
+    self._length = len(message)
+    # We need a buffer length header for every buffer and an additional
+    # zero-length buffer as the message terminator
+    self._length += (self._length / ipc.BUFFER_SIZE + 2) \
+      * ipc.BUFFER_HEADER_LENGTH
+    self._total_bytes_sent = 0
+    self._deferred = Deferred()
+
+  # read-only properties
+  message = property(lambda self: self._message)
+  length = property(lambda self: self._length)
+  consumer = property(lambda self: self._consumer)
+  deferred = property(lambda self: self._deferred)
+
+  def _get_total_bytes_sent(self):
+    return self._total_bytes_sent
+
+  def _set_total_bytes_sent(self, bytes_sent):
+    self._total_bytes_sent = bytes_sent
+
+  total_bytes_sent = property(_get_total_bytes_sent, _set_total_bytes_sent)
+
+  def startProducing(self, consumer):
+    if self.started:
+      return
+
+    self.started = True
+    self._consumer = consumer
+    # Keep writing data to the consumer until we're finished,
+    # paused (pauseProducing()) or stopped (stopProducing())
+    while self.length - self.total_bytes_sent > 0 and \
+      not self.paused and not self.stopped:
+      self.write()
+    # self.write will fire this deferred once it has written
+    # the entire message to the consumer
+    return self.deferred
+
+  def resumeProducing(self):
+    self.paused = False
+    self.write(self)
+
+  def pauseProducing(self):
+    self.paused = True
+
+  def stopProducing(self):
+    self.stopped = True
+
+  def write(self):
+    if self.length - self.total_bytes_sent > ipc.BUFFER_SIZE:
+      buffer_length = ipc.BUFFER_SIZE
+    else:
+      buffer_length = self.length - self.total_bytes_sent
+    self.write_buffer(self.message[self.total_bytes_sent:
+                              (self.total_bytes_sent + buffer_length)])
+    self.total_bytes_sent += buffer_length
+    # Make sure we wrote the entire message
+    if self.total_bytes_sent == self.length and not self.stopped:
+      self.stopProducing()
+      # A message is always terminated by a zero-length buffer.
+      self.write_buffer_length(0)
+      self.deferred.callback(None)
+
+  def write_buffer(self, chunk):
+    buffer_length = len(chunk)
+    self.write_buffer_length(buffer_length)
+    self.consumer.write(chunk)
+
+  def write_buffer_length(self, n):
+    self.consumer.write(ipc.BIG_ENDIAN_INT_STRUCT.pack(n))
+
+class AvroProtocol(Protocol):
+
+  recvd = ''
+  done = False
+
+  def __init__(self, finished):
+    self.finished = finished
+    self.message = []
+
+  def dataReceived(self, data):
+    self.recvd = self.recvd + data
+    while len(self.recvd) >= ipc.BUFFER_HEADER_LENGTH:
+      buffer_length ,= ipc.BIG_ENDIAN_INT_STRUCT.unpack(
+        self.recvd[:ipc.BUFFER_HEADER_LENGTH])
+      if buffer_length == 0:
+        response = ''.join(self.message)
+        self.done = True
+        self.finished.callback(response)
+        break
+      if len(self.recvd) < buffer_length + ipc.BUFFER_HEADER_LENGTH:
+        break
+      buffer = self.recvd[ipc.BUFFER_HEADER_LENGTH:buffer_length + ipc.BUFFER_HEADER_LENGTH]
+      self.recvd = self.recvd[buffer_length + ipc.BUFFER_HEADER_LENGTH:]
+      self.message.append(buffer)
+
+  def connectionLost(self, reason):
+    if not self.done:
+      self.finished.errback(ipc.ConnectionClosedException("Reader read 0 bytes."))
+
+class TwistedHTTPTransceiver(object):
+  """This transceiver uses the Agent class present in Twisted.web >= 9.0
+     for issuing requests to the remote endpoint."""
+  def __init__(self, host, port, remote_name=None, reactor=None):
+    self.url = "http://%s:%d/" % (host, port)
+
+    if remote_name is None:
+      # There's no easy way to get this peer's remote address
+      # in Twisted so I use a random UUID to identify ourselves
+      import uuid
+      self.remote_name = uuid.uuid4()
+
+    if reactor is None:
+      from twisted.internet import reactor
+    self.agent = Agent(reactor)
+
+  def read_framed_message(self, response):
+    finished = Deferred()
+    response.deliverBody(AvroProtocol(finished))
+    return finished
+
+  def transceive(self, request):
+    req_method = 'POST'
+    req_headers = {
+      'Content-Type': ['avro/binary'],
+      'Accept-Encoding': ['identity'],
+    }
+
+    body_producer = RequestStreamingProducer(request)
+    d = self.agent.request(
+      req_method,
+      self.url,
+      headers=Headers(req_headers),
+      bodyProducer=body_producer)
+    return d.addCallback(self.read_framed_message)
+
+class AvroResponderResource(resource.Resource):
+  """This Twisted.web resource can be placed anywhere in a URL hierarchy
+     to provide an Avro endpoint. Different Avro protocols can be served
+     by the same web server as long as they are in different resources in
+     a URL hierarchy."""
+  isLeaf = True
+
+  def __init__(self, responder):
+    resource.Resource.__init__(self)
+    self.responder = responder
+
+  def cb_render_POST(self, resp_body, request):
+    request.setResponseCode(200)
+    request.setHeader('Content-Type', 'avro/binary')
+    resp_writer = ipc.FramedWriter(request)
+    resp_writer.write_framed_message(resp_body)
+    request.finish()
+
+  def render_POST(self, request):
+    # Unfortunately, Twisted.web doesn't support incoming
+    # streamed input yet, the whole payload must be kept in-memory
+    request.content.seek(0, 0)
+    call_request_reader = ipc.FramedReader(request.content)
+    call_request = call_request_reader.read_framed_message()
+    d = maybeDeferred(self.responder.respond, call_request)
+    d.addCallback(self.cb_render_POST, request)
+    return server.NOT_DONE_YET

Propchange: avro/trunk/lang/py3/avro/txipc.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/lang/py3/scripts/avro
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/scripts/avro?rev=1557225&view=auto
==============================================================================
--- avro/trunk/lang/py3/scripts/avro (added)
+++ avro/trunk/lang/py3/scripts/avro Fri Jan 10 19:11:42 2014
@@ -0,0 +1,336 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Command line utlity for reading and writing Avro files."""
+
+import argparse
+import csv
+import functools
+import itertools
+import json
+import os
+import sys
+import traceback
+
+import avro
+from avro import datafile
+from avro import io as avro_io
+from avro import schema
+
+
+# ------------------------------------------------------------------------------
+
+
+class AvroError(Exception):
+  """Errors in this module."""
+  pass
+
+
+def print_json(row):
+  print(json.dumps(row))
+
+
+def print_json_pretty(row):
+  print(json.dumps(row, indent=4, sort_keys=True))
+
+_csv_writer = csv.writer(sys.stdout)
+
+def _write_row(row):
+  _csv_writer.writerow(row)
+
+
+def print_csv(row):
+  # Sort record fields to ensure consistent ordering:
+  _write_row([row[key] for key in sorted(row)])
+
+
+def select_printer(format):
+  return {
+      'json' : print_json,
+      'json-pretty' : print_json_pretty,
+      'csv' : print_csv
+  }[format]
+
+
+def record_match(expr, record):
+  return eval(expr, None, {'r' : record})
+
+
+def parse_fields(fields):
+  fields = fields or ''
+  if not fields.strip():
+    return None
+
+  return [field.strip() for field in fields.split(',') if field.strip()]
+
+
+def field_selector(fields):
+  fields = set(fields)
+  def keys_filter(obj):
+    return dict((k, obj[k]) for k in (set(obj) & fields))
+  return keys_filter
+
+
+def print_avro(avro, opts):
+  if opts.header and (opts.format != 'csv'):
+    raise AvroError('--header applies only to CSV format')
+
+  # Apply filter first
+  if opts.filter:
+    avro = filter(functools.partial(record_match, opts.filter), avro)
+
+  for i in range(opts.skip):
+    try:
+      next(avro)
+    except StopIteration:
+      return
+
+  fields = parse_fields(opts.fields)
+  if fields:
+    avro = map(field_selector(fields), avro)
+
+  printer = select_printer(opts.format)
+  for i, record in enumerate(avro):
+    if i == 0 and opts.header:
+      _write_row(sorted(record.keys()))
+    if i >= opts.count:
+      break
+    printer(record)
+
+
+def print_schema(avro):
+  schema = avro.meta['avro.schema'].decode('utf-8')
+  print(json.dumps(json.loads(schema), indent=4))
+
+
+def cat(opts, args):
+  if not args:
+    raise AvroError('No files to show')
+
+  for filename in args:
+    try:
+      fo = open(filename, 'rb')
+    except (OSError, IOError) as e:
+      raise AvroError('Cannot open %s - %s' % (filename, e))
+
+    avro = datafile.DataFileReader(fo, avro_io.DatumReader())
+
+    if opts.print_schema:
+      print_schema(avro)
+      continue
+
+    print_avro(avro, opts)
+
+
+# ------------------------------------------------------------------------------
+
+
+def _open(filename, mode):
+  if filename == '-':
+    return {
+        'rt' : sys.stdin,
+        'wb' : sys.stdout.buffer,
+    }[mode]
+
+  return open(filename, mode)
+
+
+def iter_json(info, schema):
+  return map(json.loads, info)
+
+
+def convert(value, field):
+  type = field.type.type
+  if type == 'union':
+    return convert_union(value, field)
+
+  return  {
+      'int' : int,
+      'long' : int,
+      'float' : float,
+      'double' : float,
+      'string' : str,
+      'bytes' : bytes,
+      'boolean' : bool,
+      'null' : lambda _: None,
+      'union' : lambda v: convert_union(v, field),
+  }[type](value)
+
+
+def convert_union(value, field):
+  for name in [s.name for s in field.type.schemas]:
+    try:
+      return convert(name)(value)
+    except ValueError:
+      continue
+
+
+def iter_csv(info, schema):
+  header = [field.name for field in schema.fields]
+  for row in csv.reader(info):
+    values = [convert(v, f) for v, f in zip(row, schema.fields)]
+    yield dict(zip(header, values))
+
+
+def guess_input_type(files):
+  if not files:
+      return None
+
+  ext = os.path.splitext(files[0])[1].lower()
+  if ext in ('.json', '.js'):
+      return 'json'
+  elif ext in ('.csv',):
+      return 'csv'
+
+  return None
+
+
+def write(opts, files):
+  if not opts.schema:
+      raise AvroError('No schema specified')
+
+  input_type = opts.input_type or guess_input_type(files)
+  if not input_type:
+      raise AvroError('Cannot guess input file type (not .json or .csv)')
+
+  try:
+    with open(opts.schema, 'rt') as f:
+      json_schema = f.read()
+    writer_schema = schema.Parse(json_schema)
+    out = _open(opts.output, 'wb')
+  except (IOError, OSError) as e:
+    raise AvroError('Cannot open file - %s' % e)
+
+  record_parser_map = {
+      'json': iter_json,
+      'csv': iter_csv,
+  }
+
+  with datafile.DataFileWriter(
+      writer=out,
+      datum_writer=avro_io.DatumWriter(),
+      writer_schema=writer_schema,
+  ) as writer:
+    iter_records = record_parser_map[input_type]
+    for filename in (files or ['-']):
+      reader = _open(filename, 'rt')
+      for record in iter_records(reader, writer_schema):
+        writer.append(record)
+
+
+# ------------------------------------------------------------------------------
+
+
+def main(argv=None):
+  argv = argv or sys.argv
+
+  parser = argparse.ArgumentParser(
+      description='Display/write for Avro files',
+      usage='%(prog)s cat|write [options] FILE [FILE...]',
+  )
+
+  parser.add_argument(
+      '--version',
+      action='version',
+      version='%(prog)s ' + avro.VERSION,
+  )
+
+  # cat options:
+  cat_options = parser.add_argument_group(title='cat options')
+  cat_options.add_argument(
+      '-n', '--count',
+      type=int,
+      default=float('Infinity'),
+      help='number of records to print',
+  )
+  cat_options.add_argument(
+      '-s', '--skip',
+      type=int,
+      default=0,
+      help='number of records to skip',
+  )
+  cat_options.add_argument(
+      '-f', '--format',
+      default='json',
+      choices=['json', 'csv', 'json-pretty'],
+      help='record format',
+  )
+  cat_options.add_argument(
+      '--header',
+      default=False,
+      action='store_true',
+      help='print CSV header',
+  )
+  cat_options.add_argument(
+      '--filter',
+      default=None,
+      help='filter records (e.g. r["age"]>1)',
+  )
+  cat_options.add_argument(
+      '--print-schema',
+      default=False,
+      action='store_true',
+      help='print schema',
+  )
+  cat_options.add_argument(
+      '--fields',
+      default=None,
+      help='fields to show, comma separated (show all by default)',
+  )
+
+  # write options
+  write_options = parser.add_argument_group(title='write options')
+  write_options.add_argument(
+      '--schema',
+      help='schema file (required)',
+  )
+  write_options.add_argument(
+      '--input-type',
+      choices=['json', 'csv'],
+      default=None,
+      help='input file(s) type (json or csv)',
+  )
+  write_options.add_argument(
+      '-o', '--output',
+      default='-',
+      help='output file',
+  )
+
+  opts, args = parser.parse_known_args(argv[1:])
+  if len(args) < 1:
+    parser.error('You much specify `cat` or `write`.')
+
+  command = args.pop(0)
+  try:
+    if command == 'cat':
+      cat(opts, args)
+    elif command == 'write':
+      write(opts, args)
+    else:
+      raise AvroError('Unknown command - %s' % command)
+  except AvroError as e:
+    parser.error('%s' % e) # Will exit
+  except Exception as e:
+    traceback.print_exc()
+    raise SystemExit('panic: %s' % e)
+
+
+if __name__ == '__main__':
+  main()

Added: avro/trunk/lang/py3/setup.py
URL: http://svn.apache.org/viewvc/avro/trunk/lang/py3/setup.py?rev=1557225&view=auto
==============================================================================
--- avro/trunk/lang/py3/setup.py (added)
+++ avro/trunk/lang/py3/setup.py Fri Jan 10 19:11:42 2014
@@ -0,0 +1,104 @@
+#!/usr/bin/env python3
+# -*- mode: python -*-
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import shutil
+import sys
+
+from setuptools import setup
+
+
+def Main():
+  assert (sys.version_info[0] >= 3), \
+      ('Python version >= 3 required, got %r' % sys.version_info)
+
+  py3_dir = os.path.dirname(os.path.abspath(__file__))
+  root_dir = os.path.dirname(os.path.dirname(py3_dir))
+
+  # Read and copy Avro version:
+  version_file_path = os.path.join(root_dir, 'share', 'VERSION.txt')
+  with open(version_file_path, 'r') as f:
+    avro_version = f.read().strip()
+  shutil.copy(
+      src=version_file_path,
+      dst=os.path.join(py3_dir, 'avro', 'VERSION.txt'),
+  )
+
+  # Copy necessary avsc files:
+  avsc_file_path = os.path.join(
+      root_dir, 'share', 'schemas',
+      'org', 'apache', 'avro', 'ipc', 'HandshakeRequest.avsc')
+  shutil.copy(
+      src=avsc_file_path,
+      dst=os.path.join(py3_dir, 'avro', 'HandshakeRequest.avsc'),
+  )
+
+  avsc_file_path = os.path.join(
+      root_dir, 'share', 'schemas',
+      'org', 'apache', 'avro', 'ipc', 'HandshakeResponse.avsc')
+  shutil.copy(
+      src=avsc_file_path,
+      dst=os.path.join(py3_dir, 'avro', 'HandshakeResponse.avsc'),
+  )
+
+  avsc_file_path = os.path.join(
+      root_dir, 'share', 'test', 'schemas', 'interop.avsc')
+  shutil.copy(
+      src=avsc_file_path,
+      dst=os.path.join(py3_dir, 'avro', 'tests', 'interop.avsc'),
+  )
+
+  # Make sure the avro shell script is executable:
+  os.chmod(
+      path=os.path.join(py3_dir, 'scripts', 'avro'),
+      mode=0o777,
+  )
+
+  setup(
+    name = 'avro',
+    version = avro_version,
+    packages = ['avro'],
+    package_dir = {'avro': 'avro'},
+    scripts = ['scripts/avro'],
+
+    include_package_data=True,
+    package_data = {
+        'avro': [
+            'HandshakeRequest.avsc',
+            'HandshakeResponse.avsc',
+            'VERSION.txt',
+        ],
+    },
+
+    test_suite='avro.tests.run_tests',
+    tests_require=[],
+
+    # metadata for upload to PyPI
+    author = 'Apache Avro',
+    author_email = 'avro-dev@hadoop.apache.org',
+    description = 'Avro is a serialization and RPC framework.',
+    license = 'Apache License 2.0',
+    keywords = 'avro serialization rpc',
+    url = 'http://hadoop.apache.org/avro',
+  )
+
+
+if __name__ == '__main__':
+  Main()

Propchange: avro/trunk/lang/py3/setup.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: avro/trunk/share/test/schemas/echo.avdl
URL: http://svn.apache.org/viewvc/avro/trunk/share/test/schemas/echo.avdl?rev=1557225&view=auto
==============================================================================
--- avro/trunk/share/test/schemas/echo.avdl (added)
+++ avro/trunk/share/test/schemas/echo.avdl Fri Jan 10 19:11:42 2014
@@ -0,0 +1,14 @@
+@namespace("org.apache.avro.echo")
+protocol Echo {
+  record Ping {
+    long timestamp = -1;
+    string text = "";
+  }
+
+  record Pong {
+    long timestamp = -1;
+    Ping ping;
+  }
+
+  Pong ping(Ping ping);
+}
\ No newline at end of file



Mime
View raw message