avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kojirom...@apache.org
Subject [avro] branch master updated: AVRO-2656: Python3 Support for lang/py (#744)
Date Sat, 04 Jan 2020 18:46:03 GMT
This is an automated email from the ASF dual-hosted git repository.

kojiromike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/master by this push:
     new 4cd778a  AVRO-2656: Python3 Support for lang/py (#744)
4cd778a is described below

commit 4cd778a7e0de048eacf2a06abe3d0e310bbcc343
Author: Michael A. Smith <michael@smith-li.com>
AuthorDate: Sat Jan 4 13:45:53 2020 -0500

    AVRO-2656: Python3 Support for lang/py (#744)
    
    * AVRO-2656: Python3 Support
    
    * AVRO-2656: Consolidate DataFile Code
    
    * AVRO-2656: Must Seek after Truncate
    
    * AVRO-2656: Fix Protocol Unicode
    
    * AVRO-2656: Add Additional Trove Classifiers
---
 lang/py/avro/datafile.py                     | 180 +++++++++++++++------------
 lang/py/avro/io.py                           | 133 +++++++++++---------
 lang/py/avro/ipc.py                          |  28 +++--
 lang/py/avro/protocol.py                     |  16 ++-
 lang/py/avro/schema.py                       |  10 ++
 lang/py/avro/test/gen_interop_data.py        |  22 ++--
 lang/py/avro/test/mock_tether_parent.py      |  13 +-
 lang/py/avro/test/test_datafile.py           |  25 ++--
 lang/py/avro/test/test_datafile_interop.py   |  27 ++--
 lang/py/avro/test/test_io.py                 |  46 ++++---
 lang/py/avro/test/test_protocol.py           |  10 ++
 lang/py/avro/test/test_schema.py             |  26 ++--
 lang/py/avro/test/test_script.py             |  67 +++++-----
 lang/py/avro/test/test_tether_task.py        |  48 ++++---
 lang/py/avro/test/test_tether_task_runner.py |  23 ++--
 lang/py/avro/test/test_tether_word_count.py  |  15 ++-
 lang/py/avro/tether/tether_task.py           |  20 +--
 lang/py/avro/tether/tether_task_runner.py    |  12 +-
 lang/py/scripts/avro                         |  81 ++++++------
 lang/py/setup.cfg                            |  10 +-
 lang/py/tox.ini                              |   1 +
 21 files changed, 468 insertions(+), 345 deletions(-)

diff --git a/lang/py/avro/datafile.py b/lang/py/avro/datafile.py
index 964a049..5c6eae6 100644
--- a/lang/py/avro/datafile.py
+++ b/lang/py/avro/datafile.py
@@ -39,12 +39,13 @@ try:
   has_zstandard = True
 except ImportError:
   has_zstandard = False
+
+
 #
 # Constants
 #
-
 VERSION = 1
-MAGIC = 'Obj' + chr(VERSION)
+MAGIC = bytes(b'Obj' + bytearray([VERSION]))
 MAGIC_SIZE = len(MAGIC)
 SYNC_SIZE = 16
 SYNC_INTERVAL = 4000 * SYNC_SIZE # TODO(hammer): make configurable
@@ -80,10 +81,65 @@ class DataFileException(avro.schema.AvroException):
 # Write Path
 #
 
-class DataFileWriter(object):
-  @staticmethod
-  def generate_sync_marker():
-    return generate_sixteen_random_bytes()
+class _DataFile(object):
+  """Mixin for methods common to both reading and writing."""
+
+  block_count = 0
+  _meta = None
+  _sync_marker = None
+
+  def __enter__(self):
+    return self
+
+  def __exit__(self, type, value, traceback):
+    # Perform a close if there's no exception
+    if type is None:
+      self.close()
+
+  def get_meta(self, key):
+    return self.meta.get(key)
+
+  def set_meta(self, key, val):
+    self.meta[key] = val
+
+  @property
+  def sync_marker(self):
+    return self._sync_marker
+
+  @property
+  def meta(self):
+    """Read-only dictionary of metadata for this datafile."""
+    if self._meta is None:
+      self._meta = {}
+    return self._meta
+
+  @property
+  def codec(self):
+    """Meta are stored as bytes, but codec is returned as a string."""
+    try:
+      return self.get_meta(CODEC_KEY).decode()
+    except AttributeError:
+      return "null"
+
+  @codec.setter
+  def codec(self, value):
+    """Meta are stored as bytes, but codec is set as a string."""
+    if value not in VALID_CODECS:
+      raise DataFileException("Unknown codec: {!r}".format(value))
+    self.set_meta(CODEC_KEY, value.encode())
+
+  @property
+  def schema(self):
+    """Meta are stored as bytes, but schema is returned as a string."""
+    return self.get_meta(SCHEMA_KEY).decode()
+
+  @schema.setter
+  def schema(self, value):
+    """Meta are stored as bytes, but schema is set as a string."""
+    self.set_meta(SCHEMA_KEY, value.encode())
+
+
+class DataFileWriter(_DataFile):
 
   # TODO(hammer): make 'encoder' a metadata property
   def __init__(self, writer, datum_writer, writers_schema=None, codec='null'):
@@ -97,16 +153,13 @@ class DataFileWriter(object):
     self._datum_writer = datum_writer
     self._buffer_writer = io.BytesIO()
     self._buffer_encoder = avro.io.BinaryEncoder(self._buffer_writer)
-    self._block_count = 0
-    self._meta = {}
+    self.block_count = 0
     self._header_written = False
 
     if writers_schema is not None:
-      if codec not in VALID_CODECS:
-        raise DataFileException("Unknown codec: %r" % codec)
-      self._sync_marker = DataFileWriter.generate_sync_marker()
-      self.set_meta('avro.codec', codec)
-      self.set_meta('avro.schema', str(writers_schema))
+      self._sync_marker = generate_sixteen_random_bytes()
+      self.codec = codec
+      self.schema = str(writers_schema)
       self.datum_writer.writers_schema = writers_schema
     else:
       # open writer for reading to collect metadata
@@ -115,11 +168,10 @@ class DataFileWriter(object):
       # TODO(hammer): collect arbitrary metadata
       # collect metadata
       self._sync_marker = dfr.sync_marker
-      self.set_meta('avro.codec', dfr.get_meta('avro.codec'))
+      self.codec = dfr.codec
 
       # get schema used to write existing file
-      schema_from_file = dfr.get_meta('avro.schema')
-      self.set_meta('avro.schema', schema_from_file)
+      self.schema = schema_from_file = dfr.schema
       self.datum_writer.writers_schema = avro.schema.parse(schema_from_file)
 
       # seek to the end of the file and prepare for writing
@@ -132,27 +184,6 @@ class DataFileWriter(object):
   datum_writer = property(lambda self: self._datum_writer)
   buffer_writer = property(lambda self: self._buffer_writer)
   buffer_encoder = property(lambda self: self._buffer_encoder)
-  sync_marker = property(lambda self: self._sync_marker)
-  meta = property(lambda self: self._meta)
-
-  def __enter__(self):
-    return self
-
-  def __exit__(self, type, value, traceback):
-    # Perform a close if there's no exception
-    if type is None:
-      self.close()
-
-  # read/write properties
-  def set_block_count(self, new_val):
-    self._block_count = new_val
-  block_count = property(lambda self: self._block_count, set_block_count)
-
-  # utility functions to read/write metadata entries
-  def get_meta(self, key):
-    return self._meta.get(key)
-  def set_meta(self, key, val):
-    self._meta[key] = val
 
   def _write_header(self):
     header = {'magic': MAGIC,
@@ -161,6 +192,18 @@ class DataFileWriter(object):
     self.datum_writer.write_data(META_SCHEMA, header, self.encoder)
     self._header_written = True
 
+  @property
+  def codec(self):
+    """Meta are stored as bytes, but codec is returned as a string."""
+    return self.get_meta(CODEC_KEY).decode()
+
+  @codec.setter
+  def codec(self, value):
+    """Meta are stored as bytes, but codec is set as a string."""
+    if value not in VALID_CODECS:
+      raise DataFileException("Unknown codec: {!r}".format(value))
+    self.set_meta(CODEC_KEY, value.encode())
+
   # TODO(hammer): make a schema for blocks and use datum_writer
   def _write_block(self):
     if not self._header_written:
@@ -172,22 +215,23 @@ class DataFileWriter(object):
 
       # write block contents
       uncompressed_data = self.buffer_writer.getvalue()
-      if self.get_meta(CODEC_KEY) == 'null':
+      codec = self.codec
+      if codec == 'null':
         compressed_data = uncompressed_data
         compressed_data_length = len(compressed_data)
-      elif self.get_meta(CODEC_KEY) == 'deflate':
+      elif codec == 'deflate':
         # The first two characters and last character are zlib
         # wrappers around deflate data.
         compressed_data = zlib.compress(uncompressed_data)[2:-1]
         compressed_data_length = len(compressed_data)
-      elif self.get_meta(CODEC_KEY) == 'snappy':
+      elif codec == 'snappy':
         compressed_data = snappy.compress(uncompressed_data)
         compressed_data_length = len(compressed_data) + 4 # crc32
-      elif self.get_meta(CODEC_KEY) == 'zstandard':
+      elif codec == 'zstandard':
         compressed_data = zstd.ZstdCompressor().compress(uncompressed_data)
         compressed_data_length = len(compressed_data)
       else:
-        fail_msg = '"%s" codec is not supported.' % self.get_meta(CODEC_KEY)
+        fail_msg = '"%s" codec is not supported.' % self.codec
         raise DataFileException(fail_msg)
 
       # Write length of block
@@ -197,7 +241,7 @@ class DataFileWriter(object):
       self.writer.write(compressed_data)
 
       # Write CRC32 checksum for Snappy
-      if self.get_meta(CODEC_KEY) == 'snappy':
+      if codec == 'snappy':
         self.encoder.write_crc32(uncompressed_data)
 
       # write sync marker
@@ -205,6 +249,7 @@ class DataFileWriter(object):
 
       # reset buffer
       self.buffer_writer.truncate(0)
+      self.buffer_writer.seek(0)
       self.block_count = 0
 
   def append(self, datum):
@@ -235,7 +280,7 @@ class DataFileWriter(object):
     self.flush()
     self.writer.close()
 
-class DataFileReader(object):
+class DataFileReader(_DataFile):
   """Read files written by DataFileWriter."""
   # TODO(hammer): allow user to specify expected schema?
   # TODO(hammer): allow user to specify the encoder
@@ -248,27 +293,12 @@ class DataFileReader(object):
     # read the header: magic, meta, sync
     self._read_header()
 
-    # ensure codec is valid
-    self.codec = self.get_meta('avro.codec')
-    if self.codec is None:
-      self.codec = "null"
-    if self.codec not in VALID_CODECS:
-      raise DataFileException('Unknown codec: %s.' % self.codec)
-
     # get file length
     self._file_length = self.determine_file_length()
 
     # get ready to read
-    self._block_count = 0
-    self.datum_reader.writers_schema = avro.schema.parse(self.get_meta(SCHEMA_KEY))
-
-  def __enter__(self):
-    return self
-
-  def __exit__(self, type, value, traceback):
-    # Perform a close if there's no exception
-    if type is None:
-      self.close()
+    self.block_count = 0
+    self.datum_reader.writers_schema = avro.schema.parse(self.schema)
 
   def __iter__(self):
     return self
@@ -278,21 +308,8 @@ class DataFileReader(object):
   raw_decoder = property(lambda self: self._raw_decoder)
   datum_decoder = property(lambda self: self._datum_decoder)
   datum_reader = property(lambda self: self._datum_reader)
-  sync_marker = property(lambda self: self._sync_marker)
-  meta = property(lambda self: self._meta)
   file_length = property(lambda self: self._file_length)
 
-  # read/write properties
-  def set_block_count(self, new_val):
-    self._block_count = new_val
-  block_count = property(lambda self: self._block_count, set_block_count)
-
-  # utility functions to read/write metadata entries
-  def get_meta(self, key):
-    return self._meta.get(key)
-  def set_meta(self, key, val):
-    self._meta[key] = val
-
   def determine_file_length(self):
     """
     Get file length and leave file cursor where we found it.
@@ -328,11 +345,12 @@ class DataFileReader(object):
 
   def _read_block_header(self):
     self.block_count = self.raw_decoder.read_long()
-    if self.codec == "null":
+    codec = self.codec
+    if codec == "null":
       # Skip a long; we don't need to use the length.
       self.raw_decoder.skip_long()
       self._datum_decoder = self._raw_decoder
-    elif self.codec == 'deflate':
+    elif codec == 'deflate':
       # Compressed data is stored as (length, data), which
       # corresponds to how the "bytes" type is encoded.
       data = self.raw_decoder.read_bytes()
@@ -340,14 +358,14 @@ class DataFileReader(object):
       # "raw" (no zlib headers) decompression.  See zlib.h.
       uncompressed = zlib.decompress(data, -15)
       self._datum_decoder = avro.io.BinaryDecoder(io.BytesIO(uncompressed))
-    elif self.codec == 'snappy':
+    elif codec == 'snappy':
       # Compressed data includes a 4-byte CRC32 checksum
       length = self.raw_decoder.read_long()
       data = self.raw_decoder.read(length - 4)
       uncompressed = snappy.decompress(data)
       self._datum_decoder = avro.io.BinaryDecoder(io.BytesIO(uncompressed))
       self.raw_decoder.check_crc32(uncompressed);
-    elif self.codec == 'zstandard':
+    elif codec == 'zstandard':
       length = self.raw_decoder.read_long()
       data = self.raw_decoder.read(length)
       uncompressed = bytearray()
@@ -373,7 +391,7 @@ class DataFileReader(object):
       return False
     return True
 
-  def next(self):
+  def __next__(self):
     """Return the next datum in the file."""
     while self.block_count == 0:
       if self.is_EOF() or (self._skip_sync() and self.is_EOF()):
@@ -383,13 +401,15 @@ class DataFileReader(object):
     datum = self.datum_reader.read(self.datum_decoder)
     self.block_count -= 1
     return datum
+  next = __next__
 
   def close(self):
     """Close this reader."""
     self.reader.close()
 
+
 def generate_sixteen_random_bytes():
   try:
     return os.urandom(16)
   except NotImplementedError:
-    return [chr(random.randrange(256)) for i in range(16)]
+    return bytes(random.randrange(256) for i in range(16))
diff --git a/lang/py/avro/io.py b/lang/py/avro/io.py
index b18b148..6e9ccdd 100644
--- a/lang/py/avro/io.py
+++ b/lang/py/avro/io.py
@@ -52,10 +52,29 @@ from decimal import Decimal, getcontext
 
 from avro import constants, schema, timezones
 
+try:
+  unicode
+except NameError:
+  unicode = str
+
+try:
+  basestring  # type: ignore
+except NameError:
+  basestring = (bytes, unicode)
+
+try:
+  long
+except NameError:
+  long = int
+
+
 #
 # Constants
 #
 
+_DEBUG_VALIDATE_INDENT = 0
+_DEBUG_VALIDATE = False
+
 INT_MIN_VALUE = -(1 << 31)
 INT_MAX_VALUE = (1 << 31) - 1
 LONG_MIN_VALUE = -(1 << 63)
@@ -111,8 +130,8 @@ def _is_timezone_aware_datetime(dt):
 _valid = {
   'null': lambda s, d: d is None,
   'boolean': lambda s, d: isinstance(d, bool),
-  'string': lambda s, d: isinstance(d, basestring),
-  'bytes': lambda s, d: ((isinstance(d, str)) or
+  'string': lambda s, d: isinstance(d, unicode),
+  'bytes': lambda s, d: ((isinstance(d, bytes)) or
                          (isinstance(d, Decimal) and
                           getattr(s, 'logical_type', None) == constants.DECIMAL)),
   'int': lambda s, d: ((isinstance(d, (int, long))) and (INT_MIN_VALUE <= d <= INT_MAX_VALUE) or
@@ -128,12 +147,13 @@ _valid = {
                         getattr(s, 'logical_type', None) in (constants.TIMESTAMP_MILLIS,
                                                              constants.TIMESTAMP_MICROS))),
   'float': lambda s, d: isinstance(d, (int, long, float)),
-  'fixed': lambda s, d: ((isinstance(d, str) and len(d) == s.size) or
+  'fixed': lambda s, d: ((isinstance(d, bytes) and len(d) == s.size) or
                          (isinstance(d, Decimal) and
                           getattr(s, 'logical_type', None) == constants.DECIMAL)),
   'enum': lambda s, d: d in s.symbols,
+
   'array': lambda s, d: isinstance(d, list) and all(validate(s.items, item) for item in d),
-  'map': lambda s, d: (isinstance(d, dict) and all(isinstance(key, basestring) for key in d)
+  'map': lambda s, d: (isinstance(d, dict) and all(isinstance(key, unicode) for key in d)
                        and all(validate(s.values, value) for value in d.values())),
   'union': lambda s, d: any(validate(branch, d) for branch in s.schemas),
   'record': lambda s, d: (isinstance(d, dict)
@@ -144,7 +164,6 @@ _valid['double'] = _valid['float']
 _valid['error_union'] = _valid['union']
 _valid['error'] = _valid['request'] = _valid['record']
 
-
 def validate(expected_schema, datum):
   """Determines if a python datum is an instance of a schema.
 
@@ -154,10 +173,28 @@ def validate(expected_schema, datum):
   Returns:
     True if the datum is an instance of the schema.
   """
-  try:
-    return _valid[expected_schema.type](expected_schema, datum)
-  except KeyError:
-    raise AvroTypeException('Unknown Avro schema type: %r' % schema_type)
+  global _DEBUG_VALIDATE_INDENT
+  global _DEBUG_VALIDATE
+  expected_type = expected_schema.type
+  name = getattr(expected_schema, 'name', '')
+  if name:
+    name = ' ' + name
+  if expected_type in ('array', 'map', 'union', 'record'):
+    if _DEBUG_VALIDATE:
+      print('{!s}{!s}{!s}: {!s} {{'.format(' ' * _DEBUG_VALIDATE_INDENT, expected_schema.type, name, type(datum).__name__), file=sys.stderr)
+      _DEBUG_VALIDATE_INDENT += 2
+      if datum is not None and not datum:
+        print('{!s}<Empty>'.format(' ' * _DEBUG_VALIDATE_INDENT), file=sys.stderr)
+    result = _valid[expected_type](expected_schema, datum)
+    if _DEBUG_VALIDATE:
+      _DEBUG_VALIDATE_INDENT -= 2
+      print('{!s}}} -> {!s}'.format(' ' * _DEBUG_VALIDATE_INDENT, result), file=sys.stderr)
+  else:
+    result = _valid[expected_type](expected_schema, datum)
+    if _DEBUG_VALIDATE:
+      print('{!s}{!s}{!s}: {!s} -> {!s}'.format(' ' * _DEBUG_VALIDATE_INDENT, expected_schema.type, name, type(datum).__name__, result), file=sys.stderr)
+  return result
+
 
 #
 # Decoder/Encoder
@@ -244,19 +281,19 @@ class BinaryDecoder(object):
     """
     datum = self.read(size)
     unscaled_datum = 0
-    msb = struct.unpack('!b', datum[0])[0]
+    msb = struct.unpack('!b', datum[0:1])[0]
     leftmost_bit = (msb >> 7) & 1
     if leftmost_bit == 1:
-      modified_first_byte = ord(datum[0]) ^ (1 << 7)
-      datum = chr(modified_first_byte) + datum[1:]
+      modified_first_byte = ord(datum[0:1]) ^ (1 << 7)
+      datum = bytearray([modified_first_byte]) + datum[1:]
       for offset in range(size):
         unscaled_datum <<= 8
-        unscaled_datum += ord(datum[offset])
+        unscaled_datum += ord(datum[offset:1+offset])
       unscaled_datum += pow(-2, (size*8) - 1)
     else:
       for offset in range(size):
         unscaled_datum <<= 8
-        unscaled_datum += ord(datum[offset])
+        unscaled_datum += ord(datum[offset:1+offset])
 
     original_prec = getcontext().prec
     getcontext().prec = precision
@@ -383,7 +420,7 @@ class BinaryEncoder(object):
   writer = property(lambda self: self._writer)
 
   def write(self, datum):
-    """Write an abritrary datum."""
+    """Write an arbitrary datum."""
     self.writer.write(datum)
 
   def write_null(self, datum):
@@ -397,10 +434,7 @@ class BinaryEncoder(object):
     a boolean is written as a single byte
     whose value is either 0 (false) or 1 (true).
     """
-    if datum:
-      self.write(chr(1))
-    else:
-      self.write(chr(0))
+    self.write(bytearray([bool(datum)]))
 
   def write_int(self, datum):
     """
@@ -414,9 +448,9 @@ class BinaryEncoder(object):
     """
     datum = (datum << 1) ^ (datum >> 63)
     while (datum & ~0x7F) != 0:
-      self.write(chr((datum & 0x7f) | 0x80))
+      self.write(bytearray([(datum & 0x7f) | 0x80]))
       datum >>= 7
-    self.write(chr(datum))
+    self.write(bytearray([datum]))
 
   def write_float(self, datum):
     """
@@ -459,7 +493,7 @@ class BinaryEncoder(object):
     self.write_long(bytes_req)
     for index in range(bytes_req-1, -1, -1):
       bits_to_write = packed_bits >> (8 * index)
-      self.write(chr(bits_to_write & 0xff))
+      self.write(bytearray([bits_to_write & 0xff]))
 
   def write_decimal_fixed(self, datum, scale, size):
     """
@@ -494,13 +528,13 @@ class BinaryEncoder(object):
       unscaled_datum = mask | unscaled_datum
       for index in range(size-1, -1, -1):
         bits_to_write = unscaled_datum >> (8 * index)
-        self.write(chr(bits_to_write & 0xff))
+        self.write(bytearray([bits_to_write & 0xff]))
     else:
       for i in range(offset_bits // 8):
-        self.write(chr(0))
+        self.write(b'\x00')
       for index in range(bytes_req-1, -1, -1):
         bits_to_write = unscaled_datum >> (8 * index)
-        self.write(chr(bits_to_write & 0xff))
+        self.write(bytearray([bits_to_write & 0xff]))
 
   def write_bytes(self, datum):
     """
@@ -661,6 +695,7 @@ class DatumReader(object):
       fail_msg = 'Schemas do not match.'
       raise SchemaResolutionException(fail_msg, writers_schema, readers_schema)
 
+    logical_type = getattr(writers_schema, 'logical_type', None)
     # schema resolution: reader's schema is a union, writer's schema is not
     if (writers_schema.type not in ['union', 'error_union']
         and readers_schema.type in ['union', 'error_union']):
@@ -678,23 +713,17 @@ class DatumReader(object):
     elif writers_schema.type == 'string':
       return decoder.read_utf8()
     elif writers_schema.type == 'int':
-      if (hasattr(writers_schema, 'logical_type') and
-          writers_schema.logical_type == constants.DATE):
+      if logical_type == constants.DATE:
         return decoder.read_date_from_int()
-      elif (hasattr(writers_schema, 'logical_type') and
-        writers_schema.logical_type == constants.TIME_MILLIS):
+      if logical_type == constants.TIME_MILLIS:
         return decoder.read_time_millis_from_int()
-      else:
-        return decoder.read_int()
+      return decoder.read_int()
     elif writers_schema.type == 'long':
-      if (hasattr(writers_schema, 'logical_type') and
-          writers_schema.logical_type == constants.TIME_MICROS):
+      if logical_type == constants.TIME_MICROS:
         return decoder.read_time_micros_from_long()
-      elif (hasattr(writers_schema, 'logical_type') and
-            writers_schema.logical_type == constants.TIMESTAMP_MILLIS):
+      elif logical_type == constants.TIMESTAMP_MILLIS:
         return decoder.read_timestamp_millis_from_long()
-      elif (hasattr(writers_schema, 'logical_type') and
-            writers_schema.logical_type == constants.TIMESTAMP_MICROS):
+      elif logical_type == constants.TIMESTAMP_MICROS:
         return decoder.read_timestamp_micros_from_long()
       else:
         return decoder.read_long()
@@ -703,8 +732,7 @@ class DatumReader(object):
     elif writers_schema.type == 'double':
       return decoder.read_double()
     elif writers_schema.type == 'bytes':
-      if (hasattr(writers_schema, 'logical_type') and
-                      writers_schema.logical_type == 'decimal'):
+      if logical_type == 'decimal':
         return decoder.read_decimal_from_bytes(
           writers_schema.get_prop('precision'),
           writers_schema.get_prop('scale')
@@ -712,8 +740,7 @@ class DatumReader(object):
       else:
         return decoder.read_bytes()
     elif writers_schema.type == 'fixed':
-      if (hasattr(writers_schema, 'logical_type') and
-                      writers_schema.logical_type == 'decimal'):
+      if logical_type == 'decimal':
         return decoder.read_decimal_from_fixed(
           writers_schema.get_prop('precision'),
           writers_schema.get_prop('scale'),
@@ -1006,14 +1033,13 @@ class DatumWriter(object):
                             set_writers_schema)
 
   def write(self, datum, encoder):
-    # validate datum
     if not validate(self.writers_schema, datum):
       raise AvroTypeException(self.writers_schema, datum)
-
     self.write_data(self.writers_schema, datum, encoder)
 
   def write_data(self, writers_schema, datum, encoder):
     # function dispatch to write datum
+    logical_type = getattr(writers_schema, 'logical_type', None)
     if writers_schema.type == 'null':
       encoder.write_null(datum)
     elif writers_schema.type == 'boolean':
@@ -1021,23 +1047,18 @@ class DatumWriter(object):
     elif writers_schema.type == 'string':
       encoder.write_utf8(datum)
     elif writers_schema.type == 'int':
-      if (hasattr(writers_schema, 'logical_type') and
-          writers_schema.logical_type == constants.DATE):
+      if logical_type == constants.DATE:
         encoder.write_date_int(datum)
-      elif (hasattr(writers_schema, 'logical_type') and
-            writers_schema.logical_type == constants.TIME_MILLIS):
+      elif logical_type == constants.TIME_MILLIS:
         encoder.write_time_millis_int(datum)
       else:
         encoder.write_int(datum)
     elif writers_schema.type == 'long':
-      if (hasattr(writers_schema, 'logical_type') and
-          writers_schema.logical_type == constants.TIME_MICROS):
+      if logical_type == constants.TIME_MICROS:
         encoder.write_time_micros_long(datum)
-      elif (hasattr(writers_schema, 'logical_type') and
-            writers_schema.logical_type == constants.TIMESTAMP_MILLIS):
+      elif logical_type == constants.TIMESTAMP_MILLIS:
         encoder.write_timestamp_millis_long(datum)
-      elif (hasattr(writers_schema, 'logical_type') and
-            writers_schema.logical_type == constants.TIMESTAMP_MICROS):
+      elif logical_type == constants.TIMESTAMP_MICROS:
         encoder.write_timestamp_micros_long(datum)
       else:
         encoder.write_long(datum)
@@ -1046,14 +1067,12 @@ class DatumWriter(object):
     elif writers_schema.type == 'double':
       encoder.write_double(datum)
     elif writers_schema.type == 'bytes':
-      if (hasattr(writers_schema, 'logical_type') and
-                      writers_schema.logical_type == 'decimal'):
+      if logical_type == 'decimal':
         encoder.write_decimal_bytes(datum, writers_schema.get_prop('scale'))
       else:
         encoder.write_bytes(datum)
     elif writers_schema.type == 'fixed':
-      if (hasattr(writers_schema, 'logical_type') and
-                      writers_schema.logical_type == 'decimal'):
+      if logical_type == 'decimal':
         encoder.write_decimal_fixed(
           datum,
           writers_schema.get_prop('scale'),
diff --git a/lang/py/avro/ipc.py b/lang/py/avro/ipc.py
index 17ad175..1c69063 100644
--- a/lang/py/avro/ipc.py
+++ b/lang/py/avro/ipc.py
@@ -21,18 +21,27 @@
 
 from __future__ import absolute_import, division, print_function
 
-import httplib
 import io
 import os
 
 import avro.io
 from avro import protocol, schema
 
+try:
+  import httplib  # type: ignore
+except ImportError:
+  import http.client as httplib  # type: ignore
+
+try:
+  unicode
+except NameError:
+  unicode = str
+
 
 def _load(name):
   dir_path = os.path.dirname(__file__)
   rsrc_path = os.path.join(dir_path, name)
-  with open(rsrc_path, 'rb') as f:
+  with open(rsrc_path, 'r') as f:
     return f.read()
 
 HANDSHAKE_REQUEST_SCHEMA_JSON = _load('HandshakeRequest.avsc')
@@ -131,7 +140,7 @@ class BaseRequestor(object):
     request_datum['clientHash'] = local_hash
     request_datum['serverHash'] = remote_hash
     if self.send_protocol:
-      request_datum['clientProtocol'] = str(self.local_protocol)
+      request_datum['clientProtocol'] = unicode(self.local_protocol)
     HANDSHAKE_REQUESTOR_WRITER.write(request_datum, encoder)
 
   def write_call_request(self, message_name, request_datum, encoder):
@@ -236,8 +245,7 @@ class Requestor(BaseRequestor):
     call_response_exists = self.read_handshake_response(buffer_decoder)
     if call_response_exists:
       return self.read_call_response(message_name, buffer_decoder)
-    else:
-      return self.request(message_name, request_datum)
+    return self.request(message_name, request_datum)
 
 class Responder(object):
   """Base class for the server side of a protocol interaction."""
@@ -301,7 +309,7 @@ class Responder(object):
       except AvroRemoteException as e:
         error = e
       except Exception as e:
-        error = AvroRemoteException(str(e))
+        error = AvroRemoteException(unicode(e))
 
       # write response using local protocol
       META_WRITER.write(response_metadata, buffer_encoder)
@@ -313,7 +321,7 @@ class Responder(object):
         writers_schema = local_message.errors
         self.write_error(writers_schema, error, buffer_encoder)
     except schema.AvroException as e:
-      error = AvroRemoteException(str(e))
+      error = AvroRemoteException(unicode(e))
       buffer_encoder = avro.io.BinaryEncoder(io.BytesIO())
       META_WRITER.write(response_metadata, buffer_encoder)
       buffer_encoder.write_boolean(True)
@@ -346,7 +354,7 @@ class Responder(object):
         handshake_response['match'] = 'CLIENT'
 
     if handshake_response['match'] != 'BOTH':
-      handshake_response['serverProtocol'] = str(self.local_protocol)
+      handshake_response['serverProtocol'] = unicode(self.local_protocol)
       handshake_response['serverHash'] = self.local_hash
 
     HANDSHAKE_RESPONDER_WRITER.write(handshake_response, encoder)
@@ -368,7 +376,7 @@ class Responder(object):
 
   def write_error(self, writers_schema, error_exception, encoder):
     datum_writer = avro.io.DatumWriter(writers_schema)
-    datum_writer.write(str(error_exception), encoder)
+    datum_writer.write(unicode(error_exception), encoder)
 
 #
 # Utility classes
@@ -388,7 +396,7 @@ class FramedReader(object):
       buffer = io.BytesIO()
       buffer_length = self._read_buffer_length()
       if buffer_length == 0:
-        return ''.join(message)
+        return b''.join(message)
       while buffer.tell() < buffer_length:
         chunk = self.reader.read(buffer_length - buffer.tell())
         if chunk == '':
diff --git a/lang/py/avro/protocol.py b/lang/py/avro/protocol.py
index fef2633..c41ad0c 100644
--- a/lang/py/avro/protocol.py
+++ b/lang/py/avro/protocol.py
@@ -26,6 +26,16 @@ import json
 
 import avro.schema
 
+try:
+  unicode
+except NameError:
+  unicode = str
+
+try:
+  basestring  # type: ignore
+except NameError:
+  basestring = (bytes, unicode)
+
 #
 # Constants
 #
@@ -58,7 +68,7 @@ class Protocol(object):
 
   def _parse_messages(self, messages, names):
     message_objects = {}
-    for name, body in messages.iteritems():
+    for name, body in messages.items():
       if name in message_objects:
         fail_msg = 'Message name "%s" repeated.' % name
         raise ProtocolParseException(fail_msg)
@@ -100,7 +110,7 @@ class Protocol(object):
       self.set_prop('types', self._parse_types(types, type_names))
     if messages is not None:
       self.set_prop('messages', self._parse_messages(messages, type_names))
-    self._md5 = hashlib.md5(str(self)).digest()
+    self._md5 = hashlib.md5(str(self).encode()).digest()
 
   # read-only properties
   name = property(lambda self: self.get_prop('name'))
@@ -130,7 +140,7 @@ class Protocol(object):
       to_dump['types'] = [ t.to_json(names) for t in self.types ]
     if self.messages:
       messages_dict = {}
-      for name, body in self.messages.iteritems():
+      for name, body in self.messages.items():
         messages_dict[name] = body.to_json(names)
       to_dump['messages'] = messages_dict
     return to_dump
diff --git a/lang/py/avro/schema.py b/lang/py/avro/schema.py
index a6116e2..6258674 100644
--- a/lang/py/avro/schema.py
+++ b/lang/py/avro/schema.py
@@ -48,6 +48,16 @@ import warnings
 
 from avro import constants
 
+try:
+  unicode
+except NameError:
+  unicode = str
+
+try:
+  basestring  # type: ignore
+except NameError:
+  basestring = (bytes, unicode)
+
 #
 # Constants
 #
diff --git a/lang/py/avro/test/gen_interop_data.py b/lang/py/avro/test/gen_interop_data.py
index 3fe372e..6f2f428 100644
--- a/lang/py/avro/test/gen_interop_data.py
+++ b/lang/py/avro/test/gen_interop_data.py
@@ -27,6 +27,11 @@ import avro.datafile
 import avro.io
 import avro.schema
 
+try:
+  unicode
+except NameError:
+  unicode = str
+
 CODECS_TO_VALIDATE = ('null', 'deflate')
 
 try:
@@ -43,31 +48,32 @@ except ImportError:
 DATUM = {
   'intField': 12,
   'longField': 15234324,
-  'stringField': 'hey',
+  'stringField': unicode('hey'),
   'boolField': True,
   'floatField': 1234.0,
   'doubleField': -1234.0,
-  'bytesField': '12312adf',
+  'bytesField': b'12312adf',
   'nullField': None,
   'arrayField': [5.0, 0.0, 12.0],
-  'mapField': {'a': {'label': 'a'}, 'bee': {'label': 'cee'}},
+  'mapField': {unicode('a'): {'label': unicode('a')},
+               unicode('bee'): {'label': unicode('cee')}},
   'unionField': 12.0,
   'enumField': 'C',
-  'fixedField': '1019181716151413',
-  'recordField': {'label': 'blah', 'children': [{'label': 'inner', 'children': []}]},
+  'fixedField': b'1019181716151413',
+  'recordField': {'label': unicode('blah'),
+                  'children': [{'label': unicode('inner'), 'children': []}]},
 }
 
 def generate(schema_path, output_path):
+  with open(schema_path, 'r') as schema_file:
+    interop_schema = avro.schema.parse(schema_file.read())
   for codec in CODECS_TO_VALIDATE:
-    with open(schema_path, 'rb') as schema_file:
-      interop_schema = avro.schema.parse(schema_file.read())
     filename = output_path
     if codec != 'null':
       base, ext = os.path.splitext(output_path)
       filename = base + "_" + codec + ext
     with avro.datafile.DataFileWriter(open(filename, 'wb'), avro.io.DatumWriter(),
                                       interop_schema, codec=codec) as dfw:
-      # NB: not using compression
       dfw.append(DATUM)
 
 if __name__ == "__main__":
diff --git a/lang/py/avro/test/mock_tether_parent.py b/lang/py/avro/test/mock_tether_parent.py
index d490313..dac958d 100644
--- a/lang/py/avro/test/mock_tether_parent.py
+++ b/lang/py/avro/test/mock_tether_parent.py
@@ -21,12 +21,17 @@ from __future__ import absolute_import, division, print_function
 
 import socket
 import sys
-from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
 
 import avro.tether.tether_task
 import avro.tether.util
 from avro import ipc, protocol
 
+try:
+  import BaseHTTPServer as http_server  # type: ignore
+except ImportError:
+  import http.server as http_server  # type: ignore
+
+
 SERVER_ADDRESS = ('localhost', avro.tether.util.find_port())
 
 class MockParentResponder(ipc.Responder):
@@ -52,11 +57,11 @@ class MockParentResponder(ipc.Responder):
 
     return None
 
-class MockParentHandler(BaseHTTPRequestHandler):
+class MockParentHandler(http_server.BaseHTTPRequestHandler):
   """Create a handler for the parent.
   """
   def do_POST(self):
-    self.responder =MockParentResponder()
+    self.responder = MockParentResponder()
     call_request_reader = ipc.FramedReader(self.rfile)
     call_request = call_request_reader.read_framed_message()
     resp_body = self.responder.respond(call_request)
@@ -82,6 +87,6 @@ if __name__ == '__main__':
 
     # flush the output so it shows up in the parent process
     sys.stdout.flush()
-    parent_server = HTTPServer(SERVER_ADDRESS, MockParentHandler)
+    parent_server = http_server.HTTPServer(SERVER_ADDRESS, MockParentHandler)
     parent_server.allow_reuse_address = True
     parent_server.serve_forever()
diff --git a/lang/py/avro/test/test_datafile.py b/lang/py/avro/test/test_datafile.py
index 68491e7..b23b5b1 100644
--- a/lang/py/avro/test/test_datafile.py
+++ b/lang/py/avro/test/test_datafile.py
@@ -24,19 +24,27 @@ import unittest
 
 from avro import datafile, io, schema
 
+try:
+  unicode
+except NameError:
+  unicode = str
+
+
 SCHEMAS_TO_VALIDATE = (
   ('"null"', None),
   ('"boolean"', True),
   ('"string"', unicode('adsfasdf09809dsf-=adsf')),
-  ('"bytes"', '12345abcd'),
+  ('"bytes"', b'12345abcd'),
   ('"int"', 1234),
   ('"long"', 1234),
   ('"float"', 1234.0),
   ('"double"', 1234.0),
-  ('{"type": "fixed", "name": "Test", "size": 1}', 'B'),
+  ('{"type": "fixed", "name": "Test", "size": 1}', b'B'),
   ('{"type": "enum", "name": "Test", "symbols": ["A", "B"]}', 'B'),
   ('{"type": "array", "items": "long"}', [1, 3, 2]),
-  ('{"type": "map", "values": "long"}', {'a': 1, 'b': 3, 'c': 2}),
+  ('{"type": "map", "values": "long"}', {unicode('a'): 1,
+                                         unicode('b'): 3,
+                                         unicode('c'): 2}),
   ('["string", "null", "long"]', None),
   ("""\
    {"type": "record",
@@ -52,7 +60,7 @@ SCHEMAS_TO_VALIDATE = (
                           "name": "Cons",
                           "fields": [{"name": "car", "type": "Lisp"},
                                      {"name": "cdr", "type": "Lisp"}]}]}]}
-   """, {'value': {'car': {'value': 'head'}, 'cdr': {'value': None}}}),
+   """, {'value': {'car': {'value': unicode('head')}, 'cdr': {'value': None}}}),
 )
 
 FILENAME = 'test_datafile.out'
@@ -68,7 +76,6 @@ try:
 except ImportError:
   print('Zstandard not present, will skip testing it.')
 
-# TODO(hammer): clean up written files with ant, not os.remove
 class TestDataFile(unittest.TestCase):
   def test_round_trip(self):
     print('')
@@ -186,8 +193,8 @@ class TestDataFile(unittest.TestCase):
     sample_schema, sample_datum = SCHEMAS_TO_VALIDATE[1]
     schema_object = schema.parse(sample_schema)
     with datafile.DataFileWriter(writer, datum_writer, schema_object) as dfw:
-      dfw.set_meta('test.string', 'foo')
-      dfw.set_meta('test.number', '1')
+      dfw.set_meta('test.string', b'foo')
+      dfw.set_meta('test.number', b'1')
       dfw.append(sample_datum)
     self.assertTrue(writer.closed)
 
@@ -196,8 +203,8 @@ class TestDataFile(unittest.TestCase):
     reader = open(FILENAME, 'rb')
     datum_reader = io.DatumReader()
     with datafile.DataFileReader(reader, datum_reader) as dfr:
-      self.assertEquals('foo', dfr.get_meta('test.string'))
-      self.assertEquals('1', dfr.get_meta('test.number'))
+      self.assertEquals(b'foo', dfr.get_meta('test.string'))
+      self.assertEquals(b'1', dfr.get_meta('test.number'))
       for datum in dfr:
         datums.append(datum)
     self.assertTrue(reader.closed)
diff --git a/lang/py/avro/test/test_datafile_interop.py b/lang/py/avro/test/test_datafile_interop.py
index 7c79502..2f2ac2b 100644
--- a/lang/py/avro/test/test_datafile_interop.py
+++ b/lang/py/avro/test/test_datafile_interop.py
@@ -31,31 +31,26 @@ _INTEROP_DATA_DIR = os.path.join(os.path.dirname(avro.__file__), 'test', 'intero
                      "{} does not exist".format(_INTEROP_DATA_DIR))
 class TestDataFileInterop(unittest.TestCase):
   def test_interop(self):
-    ran = False
-    print()
-    print('TEST INTEROP')
-    print('============')
-    print()
+    """Test Interop"""
     for f in os.listdir(_INTEROP_DATA_DIR):
-      ran = True
-
+      filename = os.path.join(_INTEROP_DATA_DIR, f)
+      assert os.stat(filename).st_size > 0
       base_ext = os.path.splitext(os.path.basename(f))[0].split('_', 1)
       if len(base_ext) < 2 or base_ext[1] in datafile.VALID_CODECS:
         print('READING %s' % f)
-        print('')
+        print()
 
         # read data in binary from file
-        reader = open(os.path.join(_INTEROP_DATA_DIR, f), 'rb')
         datum_reader = io.DatumReader()
-        dfr = datafile.DataFileReader(reader, datum_reader)
-        i = 0
-        for i, datum in enumerate(dfr, 1):
-          assert datum is not None
-        assert i > 0
+        with open(filename, 'rb') as reader:
+          dfr = datafile.DataFileReader(reader, datum_reader)
+          i = 0
+          for i, datum in enumerate(dfr, 1):
+            assert datum is not None
+          assert i > 0
       else:
         print('SKIPPING %s due to an unsupported codec' % f)
-        print('')
-    self.assertTrue(ran, "Didn't find any interop data files to test")
+        print()
 
 if __name__ == '__main__':
   unittest.main()
diff --git a/lang/py/avro/test/test_io.py b/lang/py/avro/test/test_io.py
index b4bd30a..7f21ae3 100644
--- a/lang/py/avro/test/test_io.py
+++ b/lang/py/avro/test/test_io.py
@@ -28,16 +28,21 @@ from decimal import Decimal
 import avro.io
 from avro import schema, timezones
 
+try:
+  unicode
+except NameError:
+  unicode = str
+
 SCHEMAS_TO_VALIDATE = (
   ('"null"', None),
   ('"boolean"', True),
-  ('"string"', u'adsfasdf09809dsf-=adsf'),
-  ('"bytes"', '12345abcd'),
+  ('"string"', unicode('adsfasdf09809dsf-=adsf')),
+  ('"bytes"', b'12345abcd'),
   ('"int"', 1234),
   ('"long"', 1234),
   ('"float"', 1234.0),
   ('"double"', 1234.0),
-  ('{"type": "fixed", "name": "Test", "size": 1}', 'B'),
+  ('{"type": "fixed", "name": "Test", "size": 1}', b'B'),
   ('{"type": "fixed", "logicalType": "decimal", "name": "Test", "size": 8, "precision": 5, "scale": 4}',
    Decimal('3.1415')),
   ('{"type": "fixed", "logicalType": "decimal", "name": "Test", "size": 8, "precision": 5, "scale": 4}',
@@ -46,7 +51,9 @@ SCHEMAS_TO_VALIDATE = (
   ('{"type": "bytes", "logicalType": "decimal", "precision": 5, "scale": 4}', Decimal('-3.1415')),
   ('{"type": "enum", "name": "Test", "symbols": ["A", "B"]}', 'B'),
   ('{"type": "array", "items": "long"}', [1, 3, 2]),
-  ('{"type": "map", "values": "long"}', {'a': 1, 'b': 3, 'c': 2}),
+  ('{"type": "map", "values": "long"}', {unicode('a'): 1,
+                                         unicode('b'): 3,
+                                         unicode('c'): 2}),
   ('["string", "null", "long"]', None),
   ('{"type": "int", "logicalType": "date"}', datetime.date(2000, 1, 1)),
   ('{"type": "int", "logicalType": "time-millis"}', datetime.time(23, 59, 59, 999000)),
@@ -94,19 +101,19 @@ SCHEMAS_TO_VALIDATE = (
                           "name": "Cons",
                           "fields": [{"name": "car", "type": "Lisp"},
                                      {"name": "cdr", "type": "Lisp"}]}]}]}
-   """, {'value': {'car': {'value': 'head'}, 'cdr': {'value': None}}}),
+   """, {'value': {'car': {'value': unicode('head')}, 'cdr': {'value': None}}}),
 )
 
 BINARY_ENCODINGS = (
-  (0, '00'),
-  (-1, '01'),
-  (1, '02'),
-  (-2, '03'),
-  (2, '04'),
-  (-64, '7f'),
-  (64, '80 01'),
-  (8192, '80 80 01'),
-  (-8193, '81 80 01'),
+  (0, b'00'),
+  (-1, b'01'),
+  (1, b'02'),
+  (-2, b'03'),
+  (2, b'04'),
+  (-64, b'7f'),
+  (64, b'80 01'),
+  (8192, b'80 80 01'),
+  (-8193, b'81 80 01'),
 )
 
 DEFAULT_VALUE_EXAMPLES = (
@@ -121,7 +128,8 @@ DEFAULT_VALUE_EXAMPLES = (
   ('{"type": "fixed", "name": "F", "size": 2}', '"\u00FF\u00FF"', u'\xff\xff'),
   ('{"type": "enum", "name": "F", "symbols": ["FOO", "BAR"]}', '"FOO"', 'FOO'),
   ('{"type": "array", "items": "int"}', '[1, 2, 3]', [1, 2, 3]),
-  ('{"type": "map", "values": "int"}', '{"a": 1, "b": 2}', {'a': 1, 'b': 2}),
+  ('{"type": "map", "values": "int"}', '{"a": 1, "b": 2}', {unicode('a'): 1,
+                                                            unicode('b'): 2}),
   ('["int", "null"]', '5', 5),
   ('{"type": "record", "name": "F", "fields": [{"name": "A", "type": "int"}]}',
    '{"A": 5}', {'A': 5}),
@@ -142,13 +150,13 @@ LONG_RECORD_DATUM = {'A': 1, 'B': 2, 'C': 3, 'D': 4, 'E': 5, 'F': 6, 'G': 7}
 
 def avro_hexlify(reader):
   """Return the hex value, as a string, of a binary-encoded int or long."""
-  bytes = []
+  b = []
   current_byte = reader.read(1)
-  bytes.append(hexlify(current_byte))
+  b.append(hexlify(current_byte))
   while (ord(current_byte) & 0x80) != 0:
     current_byte = reader.read(1)
-    bytes.append(hexlify(current_byte))
-  return ' '.join(bytes)
+    b.append(hexlify(current_byte))
+  return b' '.join(b)
 
 def print_test_name(test_name):
   print('')
diff --git a/lang/py/avro/test/test_protocol.py b/lang/py/avro/test/test_protocol.py
index 28e03b6..aa24620 100644
--- a/lang/py/avro/test/test_protocol.py
+++ b/lang/py/avro/test/test_protocol.py
@@ -27,6 +27,16 @@ import unittest
 import avro.protocol
 import avro.schema
 
+try:
+  unicode
+except NameError:
+  unicode = str
+
+try:
+  basestring  # type: ignore
+except NameError:
+  basestring = (bytes, unicode)
+
 
 class TestProtocol(object):
   """A proxy for a protocol string that provides useful test metadata."""
diff --git a/lang/py/avro/test/test_schema.py b/lang/py/avro/test/test_schema.py
index 542e511..8f08203 100644
--- a/lang/py/avro/test/test_schema.py
+++ b/lang/py/avro/test/test_schema.py
@@ -27,6 +27,16 @@ import warnings
 
 from avro import schema
 
+try:
+  unicode
+except NameError:
+  unicode = str
+
+try:
+  basestring  # type: ignore
+except NameError:
+  basestring = (bytes, unicode)
+
 
 class TestSchema(object):
   """A proxy for a schema string that provides useful test metadata."""
@@ -374,16 +384,10 @@ class TestMisc(unittest.TestCase):
 
   def test_exception_is_not_swallowed_on_parse_error(self):
     """A specific exception message should appear on a json parse error."""
-    try:
-        schema.parse('/not/a/real/file')
-        caught_exception = False
-    except schema.SchemaParseException as e:
-        expected_message = 'Error parsing JSON: /not/a/real/file, error = ' \
-                           'No JSON object could be decoded'
-        self.assertEqual(expected_message, e.args[0])
-        caught_exception = True
-
-    self.assertTrue(caught_exception, 'Exception was not caught')
+    self.assertRaisesRegexp(schema.SchemaParseException,
+                            r'Error parsing JSON: /not/a/real/file',
+                            schema.parse,
+                            '/not/a/real/file')
 
   def test_decimal_valid_type(self):
     fixed_decimal_schema = ValidTestSchema({
@@ -462,7 +466,7 @@ class SchemaParseTestCase(unittest.TestCase):
       actual_messages = [str(wmsg.message) for wmsg in actual_warnings]
       if self.test_schema.warnings:
         expected_messages = [str(w) for w in self.test_schema.warnings]
-        self.assertItemsEqual(actual_messages, expected_messages)
+        self.assertEqual(actual_messages, expected_messages)
       else:
         self.assertEqual(actual_messages, [])
 
diff --git a/lang/py/avro/test/test_script.py b/lang/py/avro/test/test_script.py
index db886d7..7ebda4e 100644
--- a/lang/py/avro/test/test_script.py
+++ b/lang/py/avro/test/test_script.py
@@ -22,6 +22,7 @@ from __future__ import absolute_import, division, print_function
 import csv
 import io
 import json
+import sys
 import unittest
 from operator import itemgetter
 from os import remove
@@ -33,6 +34,11 @@ import avro.schema
 from avro.datafile import DataFileWriter
 from avro.io import DatumWriter
 
+try:
+  unicode
+except NameError:
+  unicode = str
+
 NUM_RECORDS = 7
 
 
@@ -50,13 +56,13 @@ SCHEMA = '''
 '''
 
 LOONIES = (
-    ("daffy", "duck", "duck"),
-    ("bugs", "bunny", "bunny"),
-    ("tweety", "", "bird"),
-    ("road", "runner", "bird"),
-    ("wile", "e", "coyote"),
-    ("pepe", "le pew", "skunk"),
-    ("foghorn", "leghorn", "rooster"),
+    (unicode("daffy"), unicode("duck"), unicode("duck")),
+    (unicode("bugs"), unicode("bunny"), unicode("bunny")),
+    (unicode("tweety"), unicode(""), unicode("bird")),
+    (unicode("road"), unicode("runner"), unicode("bird")),
+    (unicode("wile"), unicode("e"), unicode("coyote")),
+    (unicode("pepe"), unicode("le pew"), unicode("skunk")),
+    (unicode("foghorn"), unicode("leghorn"), unicode("rooster")),
 )
 
 def looney_records():
@@ -93,11 +99,10 @@ class TestCat(unittest.TestCase):
             remove(self.avro_file)
 
     def _run(self, *args, **kw):
-        out = check_output([SCRIPT, "cat", self.avro_file] + list(args))
+        out = check_output([sys.executable, SCRIPT, "cat", self.avro_file] + list(args)).decode()
         if kw.get("raw"):
             return out
-        else:
-            return out.splitlines()
+        return out.splitlines()
 
     def test_print(self):
         return len(self._run()) == NUM_RECORDS
@@ -110,13 +115,14 @@ class TestCat(unittest.TestCase):
         return len(self._run("--skip", str(skip))) == NUM_RECORDS - skip
 
     def test_csv(self):
-        reader = csv.reader(io.BytesIO(self._run("-f", "csv", raw=True)))
+        reader = csv.reader(io.StringIO(self._run("-f", "csv", raw=True)))
         assert len(list(reader)) == NUM_RECORDS
 
     def test_csv_header(self):
-        io_ = io.BytesIO(self._run("-f", "csv", "--header", raw=True))
+        r = {"type": unicode("duck"), "last": unicode("duck"), "first": unicode("daffy")}
+        out = self._run("-f", "csv", "--header", raw=True)
+        io_ = io.StringIO(out)
         reader = csv.DictReader(io_)
-        r = {"type": "duck", "last": "duck", "first": "daffy"}
         assert next(reader) == r
 
     def test_print_schema(self):
@@ -133,7 +139,7 @@ class TestCat(unittest.TestCase):
         self.assertEqual(out.strip(), _JSON_PRETTY.strip())
 
     def test_version(self):
-        check_output([SCRIPT, "cat", "--version"])
+        check_output([sys.executable, SCRIPT, "cat", "--version"])
 
     def test_files(self):
         out = self._run(self.avro_file)
@@ -146,16 +152,17 @@ class TestCat(unittest.TestCase):
 
         # Field selection (with comma and space)
         out = self._run('--fields', 'first, last')
-        assert json.loads(out[0]) == {'first': 'daffy', 'last': 'duck'}
+        assert json.loads(out[0]) == {'first': unicode('daffy'), 'last': unicode('duck')}
 
         # Empty fields should get all
         out = self._run('--fields', '')
         assert json.loads(out[0]) == \
-                {'first': 'daffy', 'last': 'duck', 'type': 'duck'}
+                {'first': unicode('daffy'), 'last': unicode('duck'),
+                 'type': unicode('duck')}
 
         # Non existing fields are ignored
         out = self._run('--fields', 'first,last,age')
-        assert json.loads(out[0]) == {'first': 'daffy', 'last': 'duck'}
+        assert json.loads(out[0]) == {'first': unicode('daffy'), 'last': unicode('duck')}
 
 class TestWrite(unittest.TestCase):
     def setUp(self):
@@ -187,25 +194,24 @@ class TestWrite(unittest.TestCase):
                 continue
 
     def _run(self, *args, **kw):
-        args = [SCRIPT, "write", "--schema", self.schema_file] + list(args)
+        args = [sys.executable, SCRIPT, "write", "--schema", self.schema_file] + list(args)
         check_call(args, **kw)
 
     def load_avro(self, filename):
-        out = check_output([SCRIPT, "cat", filename])
-        return map(json.loads, out.splitlines())
+        out = check_output([sys.executable, SCRIPT, "cat", filename]).decode()
+        return [json.loads(o) for o in out.splitlines()]
 
     def test_version(self):
-        check_call([SCRIPT, "write", "--version"])
+        check_call([sys.executable, SCRIPT, "write", "--version"])
 
     def format_check(self, format, filename):
         tmp = tempfile()
-        fo = open(tmp, "wb")
-        self._run(filename, "-f", format, stdout=fo)
-        fo.close()
+        with open(tmp, "wb") as fo:
+          self._run(filename, "-f", format, stdout=fo)
 
         records = self.load_avro(tmp)
         assert len(records) == NUM_RECORDS
-        assert records[0]["first"] == "daffy"
+        assert records[0]["first"] == unicode("daffy")
 
         remove(tmp)
 
@@ -225,20 +231,15 @@ class TestWrite(unittest.TestCase):
 
     def test_multi_file(self):
         tmp = tempfile()
-        fo = open(tmp, "wb")
-        self._run(self.json_file, self.json_file, stdout=fo)
-        fo.close()
-
+        with open(tmp, 'wb') as o:
+            self._run(self.json_file, self.json_file, stdout=o)
         assert len(self.load_avro(tmp)) == 2 * NUM_RECORDS
         remove(tmp)
 
     def test_stdin(self):
         tmp = tempfile()
-
         info = open(self.json_file, "rb")
-        fo = open(tmp, "wb")
-        self._run("--input-type", "json", stdin=info, stdout=fo)
-        fo.close()
+        self._run("--input-type", "json", "-o", tmp, stdin=info)
 
         assert len(self.load_avro(tmp)) == NUM_RECORDS
         remove(tmp)
diff --git a/lang/py/avro/test/test_tether_task.py b/lang/py/avro/test/test_tether_task.py
index 78de323..8856281 100644
--- a/lang/py/avro/test/test_tether_task.py
+++ b/lang/py/avro/test/test_tether_task.py
@@ -33,35 +33,36 @@ import avro.tether.tether_task
 import avro.tether.util
 from avro import schema, tether
 
+try:
+  unicode
+except NameError:
+  unicode = str
+
 
 class TestTetherTask(unittest.TestCase):
   """
   TODO: We should validate the the server response by looking at stdout
   """
-  def test1(self):
+  def test_tether_task(self):
     """
-    Test that the thether_task is working. We run the mock_tether_parent in a separate
+    Test that the tether_task is working. We run the mock_tether_parent in a separate
     subprocess
     """
-    task=avro.test.word_count_task.WordCountTask()
-
-    proc=None
+    task = avro.test.word_count_task.WordCountTask()
+    proc = None
+    pyfile = avro.test.mock_tether_parent.__file__
+    server_port = avro.tether.util.find_port()
+    input_port = avro.tether.util.find_port()
     try:
       # launch the server in a separate process
-      # env["AVRO_TETHER_OUTPUT_PORT"]=output_port
-      env=dict()
-      env["PYTHONPATH"]=':'.join(sys.path)
-      server_port = avro.tether.util.find_port()
+      proc = subprocess.Popen([sys.executable, pyfile, "start_server", str(server_port)])
 
-      pyfile=avro.test.mock_tether_parent.__file__
-      proc=subprocess.Popen(["python", pyfile,"start_server","{0}".format(server_port)])
-      input_port = avro.tether.util.find_port()
+      print("Mock server started process pid={}".format(proc.pid))
 
-      print("Mock server started process pid={0}".format(proc.pid))
       # Possible race condition? open tries to connect to the subprocess before the subprocess is fully started
       # so we give the subprocess time to start up
       time.sleep(1)
-      task.open(input_port,clientPort=server_port)
+      task.open(input_port, clientPort=server_port)
 
       # TODO: We should validate that open worked by grabbing the STDOUT of the subproces
       # and ensuring that it outputted the correct message.
@@ -75,17 +76,17 @@ class TestTetherTask(unittest.TestCase):
       )
 
       # Serialize some data so we can send it to the input function
-      datum="This is a line of text"
+      datum = unicode("This is a line of text")
       writer = io.BytesIO()
       encoder = avro.io.BinaryEncoder(writer)
       datum_writer = avro.io.DatumWriter(task.inschema)
       datum_writer.write(datum, encoder)
 
       writer.seek(0)
-      data=writer.read()
+      data = writer.read()
 
       # Call input to simulate calling map
-      task.input(data,1)
+      task.input(data, 1)
 
       # Test the reducer
       task.configure(
@@ -95,31 +96,26 @@ class TestTetherTask(unittest.TestCase):
       )
 
       # Serialize some data so we can send it to the input function
-      datum={"key":"word","value":2}
+      datum = {"key": unicode("word"), "value": 2}
       writer = io.BytesIO()
       encoder = avro.io.BinaryEncoder(writer)
       datum_writer = avro.io.DatumWriter(task.midschema)
       datum_writer.write(datum, encoder)
 
       writer.seek(0)
-      data=writer.read()
+      data = writer.read()
 
       # Call input to simulate calling reduce
-      task.input(data,1)
+      task.input(data, 1)
 
       task.complete()
 
       # try a status
-      task.status("Status message")
-
-    except Exception as e:
-      raise
+      task.status(unicode("Status message"))
     finally:
       # close the process
       if not(proc is None):
         proc.kill()
 
-      pass
-
 if __name__ == '__main__':
   unittest.main()
diff --git a/lang/py/avro/test/test_tether_task_runner.py b/lang/py/avro/test/test_tether_task_runner.py
index 245f3bb..93827a0 100644
--- a/lang/py/avro/test/test_tether_task_runner.py
+++ b/lang/py/avro/test/test_tether_task_runner.py
@@ -34,6 +34,11 @@ import avro.tether.tether_task
 import avro.tether.tether_task_runner
 import avro.tether.util
 
+try:
+  unicode
+except NameError:
+  unicode = str
+
 
 class TestTetherTaskRunner(unittest.TestCase):
   """unit test for a tethered task runner."""
@@ -50,7 +55,7 @@ class TestTetherTaskRunner(unittest.TestCase):
       parent_port = avro.tether.util.find_port()
 
       pyfile=avro.test.mock_tether_parent.__file__
-      proc=subprocess.Popen(["python", pyfile,"start_server","{0}".format(parent_port)])
+      proc=subprocess.Popen([sys.executable, pyfile,"start_server","{0}".format(parent_port)])
       input_port = avro.tether.util.find_port()
 
       print("Mock server started process pid={0}".format(proc.pid))
@@ -72,12 +77,12 @@ class TestTetherTaskRunner(unittest.TestCase):
       # Test the mapper
       requestor.request("configure", {
         "taskType": avro.tether.tether_task.TaskType.MAP,
-        "inSchema": str(runner.task.inschema),
-        "outSchema": str(runner.task.midschema)
+        "inSchema": unicode(str(runner.task.inschema)),
+        "outSchema": unicode(str(runner.task.midschema))
       })
 
       # Serialize some data so we can send it to the input function
-      datum="This is a line of text"
+      datum = unicode("This is a line of text")
       writer = io.BytesIO()
       encoder = avro.io.BinaryEncoder(writer)
       datum_writer = avro.io.DatumWriter(runner.task.inschema)
@@ -93,12 +98,12 @@ class TestTetherTaskRunner(unittest.TestCase):
       # Test the reducer
       requestor.request("configure", {
         "taskType": avro.tether.tether_task.TaskType.REDUCE,
-        "inSchema": str(runner.task.midschema),
-        "outSchema": str(runner.task.outschema)}
+        "inSchema": unicode(str(runner.task.midschema)),
+        "outSchema": unicode(str(runner.task.outschema))}
       )
 
       #Serialize some data so we can send it to the input function
-      datum={"key":"word","value":2}
+      datum = {"key": unicode("word"), "value": 2}
       writer = io.BytesIO()
       encoder = avro.io.BinaryEncoder(writer)
       datum_writer = avro.io.DatumWriter(runner.task.midschema)
@@ -154,7 +159,7 @@ class TestTetherTaskRunner(unittest.TestCase):
       parent_port = avro.tether.util.find_port()
 
       pyfile=avro.test.mock_tether_parent.__file__
-      proc=subprocess.Popen(["python", pyfile,"start_server","{0}".format(parent_port)])
+      proc=subprocess.Popen([sys.executable, pyfile,"start_server","{0}".format(parent_port)])
 
       #Possible race condition? when we start tether_task_runner it will call
       # open tries to connect to the subprocess before the subprocess is fully started
@@ -166,7 +171,7 @@ class TestTetherTaskRunner(unittest.TestCase):
       env={"AVRO_TETHER_OUTPUT_PORT":"{0}".format(parent_port)}
       env["PYTHONPATH"]=':'.join(sys.path)
 
-      runnerproc = subprocess.Popen(["python", avro.tether.tether_task_runner.__file__, "avro.test.word_count_task.WordCountTask"], env=env)
+      runnerproc = subprocess.Popen([sys.executable, avro.tether.tether_task_runner.__file__, "avro.test.word_count_task.WordCountTask"], env=env)
 
       #possible race condition wait for the process to start
       time.sleep(1)
diff --git a/lang/py/avro/test/test_tether_word_count.py b/lang/py/avro/test/test_tether_word_count.py
index acd0b83..50cd978 100644
--- a/lang/py/avro/test/test_tether_word_count.py
+++ b/lang/py/avro/test/test_tether_word_count.py
@@ -35,6 +35,11 @@ import avro.io
 import avro.schema
 import avro.tether.tether_task_runner
 
+try:
+  unicode
+except NameError:
+  unicode = str
+
 _AVRO_DIR = os.path.abspath(os.path.dirname(avro.__file__))
 
 def _version():
@@ -46,9 +51,9 @@ _AVRO_VERSION = _version()
 _JAR_PATH = os.path.join(os.path.dirname(os.path.dirname(_AVRO_DIR)),
     "java", "tools", "target", "avro-tools-{}.jar".format(_AVRO_VERSION))
 
-_LINES = ("the quick brown fox jumps over the lazy dog",
-          "the cow jumps over the moon",
-          "the rain in spain falls mainly on the plains")
+_LINES = (unicode("the quick brown fox jumps over the lazy dog"),
+          unicode("the cow jumps over the moon"),
+          unicode("the rain in spain falls mainly on the plains"))
 _IN_SCHEMA = '"string"'
 
 # The schema for the output of the mapper and reducer
@@ -76,7 +81,7 @@ def _has_java():
       output = subprocess.check_output("/usr/libexec/java_home", stderr=subprocess.STDOUT)
     except subprocess.CalledProcessError as e:
       output = e.output
-    return ("No Java runtime present" not in output)
+    return (b"No Java runtime present" not in output)
   return bool(distutils.spawn.find_executable("java"))
 
 
@@ -106,7 +111,7 @@ class TestTetherWordCount(unittest.TestCase):
 
     # ...and the output schema...
     self._output_schema_path = os.path.join(self._base_dir, "output.avsc")
-    with open(self._output_schema_path, 'wb') as output_schema_handle:
+    with open(self._output_schema_path, 'w') as output_schema_handle:
       output_schema_handle.write(_OUT_SCHEMA)
     self.assertTrue(os.path.exists(self._output_schema_path), "Missing the schema file")
 
diff --git a/lang/py/avro/tether/tether_task.py b/lang/py/avro/tether/tether_task.py
index 262bf67..ef17835 100644
--- a/lang/py/avro/tether/tether_task.py
+++ b/lang/py/avro/tether/tether_task.py
@@ -45,7 +45,7 @@ if (inputProtocol is None):
   if not(os.path.exists(pfile)):
     raise Exception("Could not locate the InputProtocol: {0} does not exist".format(pfile))
 
-  with file(pfile,'r') as hf:
+  with open(pfile,'r') as hf:
     prototxt=hf.read()
 
   inputProtocol=protocol.parse(prototxt)
@@ -61,7 +61,7 @@ if (outputProtocol is None):
   if not(os.path.exists(pfile)):
     raise Exception("Could not locate the OutputProtocol: {0} does not exist".format(pfile))
 
-  with file(pfile,'r') as hf:
+  with open(pfile,'r') as hf:
     prototxt=hf.read()
 
   outputProtocol=protocol.parse(prototxt)
@@ -294,9 +294,9 @@ class TetherTask(object):
       raise NotImplementedError("Only http protocol is currently supported")
 
     try:
-      self.outputClient.request('configure',{"port":inputport})
-    except Exception as e:
-      estr= traceback.format_exc()
+      self.outputClient.request('configure', {"port": inputport})
+    except Exception:
+      estr = traceback.format_exc()
       self.fail(estr)
 
 
@@ -466,10 +466,14 @@ class TetherTask(object):
     """
     self.log.error("TetherTask.fail: failure occured message follows:\n{0}".format(message))
     try:
-      self.outputClient.request("fail",{"message":message})
+      message = message.decode()
+    except AttributeError:
+      pass
+
+    try:
+      self.outputClient.request("fail", {"message": message})
     except Exception as e:
-      estr=traceback.format_exc()
-      self.log.error("TetherTask.fail: an exception occured while trying to send the fail message to the output server:\n{0}".format(estr))
+      self.log.exception("TetherTask.fail: an exception occured while trying to send the fail message to the output server.")
 
     self.close()
 
diff --git a/lang/py/avro/tether/tether_task_runner.py b/lang/py/avro/tether/tether_task_runner.py
index 64bee7b..b83f351 100644
--- a/lang/py/avro/tether/tether_task_runner.py
+++ b/lang/py/avro/tether/tether_task_runner.py
@@ -24,12 +24,16 @@ import sys
 import threading
 import traceback
 import weakref
-from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
 
 import avro.tether.tether_task
 import avro.tether.util
 from avro import ipc
 
+try:
+  import BaseHTTPServer as http_server  # type: ignore
+except ImportError:
+  import http.server as http_server  # type: ignore
+
 __all__ = ["TaskRunner"]
 
 
@@ -105,7 +109,7 @@ def HTTPHandlerGen(runner):
   else:
     runnerref=runner
 
-  class TaskRunnerHTTPHandler(BaseHTTPRequestHandler):
+  class TaskRunnerHTTPHandler(http_server.BaseHTTPRequestHandler):
     """Create a handler for the parent.
     """
 
@@ -113,7 +117,7 @@ def HTTPHandlerGen(runner):
     def __init__(self,*args,**param):
       """
       """
-      BaseHTTPRequestHandler.__init__(self,*args,**param)
+      http_server.BaseHTTPRequestHandler.__init__(self,*args,**param)
 
     def do_POST(self):
       self.responder =TaskRunnerResponder(self.runner)
@@ -174,7 +178,7 @@ class TaskRunner(object):
 
 
     def thread_run(task_runner=None):
-      task_runner.server = HTTPServer(address, HTTPHandlerGen(task_runner))
+      task_runner.server = http_server.HTTPServer(address, HTTPHandlerGen(task_runner))
       task_runner.server.allow_reuse_address = True
       task_runner.server.serve_forever()
 
diff --git a/lang/py/scripts/avro b/lang/py/scripts/avro
index c9ed157..f9a7769 100755
--- a/lang/py/scripts/avro
+++ b/lang/py/scripts/avro
@@ -21,20 +21,33 @@
 from __future__ import absolute_import, division, print_function
 
 import csv
+import itertools
 import json
-from functools import partial
 import os.path
-from itertools import ifilter, imap
-from sys import stdin, stdout
+import sys
+from functools import partial
+from optparse import OptionGroup, OptionParser
 
 import avro
 import avro.schema
 from avro.datafile import DataFileReader, DataFileWriter
 from avro.io import DatumReader, DatumWriter
 
-
 _AVRO_DIR = os.path.abspath(os.path.dirname(avro.__file__))
 
+ifilter = getattr(itertools, 'ifilter', filter)
+imap = getattr(itertools, 'imap', map)
+
+try:
+  unicode
+except NameError:
+  unicode = str
+
+try:
+  long
+except NameError:
+  long = int
+
 def _version():
   with open(os.path.join(_AVRO_DIR, 'VERSION.txt')) as v:
     return v.read()
@@ -55,17 +68,12 @@ def print_json_pretty(row):
     result = json.dumps(row, sort_keys=True, indent=4).replace(' \n', '\n')
     print(result)
 
-_write_row = csv.writer(stdout).writerow
-_encoding = stdout.encoding or "UTF-8"
-def _encode(v, encoding=_encoding):
-    if not isinstance(v, basestring):
-        return v
-    return v.encode(_encoding)
+_write_row = csv.writer(sys.stdout).writerow
 
 def print_csv(row):
     # We sort the keys to the fields will be in the same place
     # FIXME: Do we want to do it in schema order?
-    _write_row([_encode(row[key]) for key in sorted(row)])
+    _write_row([row[key] for key in sorted(row)])
 
 def select_printer(format):
     return {
@@ -98,7 +106,7 @@ def print_avro(avro, opts):
     if opts.filter:
         avro = ifilter(partial(record_match, opts.filter), avro)
 
-    for i in xrange(opts.skip):
+    for i in range(opts.skip):
         try:
             next(avro)
         except StopIteration:
@@ -117,39 +125,36 @@ def print_avro(avro, opts):
         printer(record)
 
 def print_schema(avro):
-    schema = avro.meta["avro.schema"]
+    schema = avro.schema
     # Pretty print
     print(json.dumps(json.loads(schema), indent=4))
 
 def cat(opts, args):
     if not args:
         raise AvroError("No files to show")
-
     for filename in args:
-        try:
-            fo = open(filename, "rb")
-        except (OSError, IOError) as e:
-            raise AvroError("Can't open %s - %s" % (filename, e))
-
-        avro = DataFileReader(fo, DatumReader())
-
-        if opts.print_schema:
-            print_schema(avro)
-            continue
-
-        print_avro(avro, opts)
+        with DataFileReader(open(filename, 'rb'), DatumReader()) as avro:
+            if opts.print_schema:
+                print_schema(avro)
+                continue
+            print_avro(avro, opts)
 
 def _open(filename, mode):
     if filename == "-":
         return {
-            "rb" : stdin,
-            "wb" : stdout
+            "rb" : sys.stdin,
+            "wb" : sys.stdout
         }[mode]
 
     return open(filename, mode)
 
 def iter_json(info, _):
-    return imap(json.loads, info)
+    for i in info:
+        try:
+            s = i.decode()
+        except AttributeError:
+            s = i
+        yield json.loads(s)
 
 def convert(value, field):
     type = field.type.type
@@ -161,8 +166,8 @@ def convert(value, field):
         "long" : long,
         "float" : float,
         "double" : float,
-        "string" : str,
-        "bytes" : str,
+        "string" : unicode,
+        "bytes" : bytes,
         "boolean" : bool,
         "null" : lambda _: None,
         "union" : lambda v: convert_union(v, field),
@@ -177,7 +182,7 @@ def convert_union(value, field):
 
 def iter_csv(info, schema):
     header = [field.name for field in schema.fields]
-    for row in csv.reader(info):
+    for row in csv.reader((getattr(i, "decode", lambda: i)() for i in info)):
         values = [convert(v, f) for v, f in zip(row, schema.fields)]
         yield dict(zip(header, values))
 
@@ -200,16 +205,17 @@ def write(opts, files):
     input_type = opts.input_type or guess_input_type(files)
     if not input_type:
         raise AvroError("Can't guess input file type (not .json or .csv)")
+    iter_records = {"json" : iter_json, "csv" : iter_csv}[input_type]
 
     try:
-        schema = avro.schema.parse(open(opts.schema, "rb").read())
+        with open(opts.schema) as schema_file:
+            schema = avro.schema.parse(schema_file.read())
         out = _open(opts.output, "wb")
     except (IOError, OSError) as e:
         raise AvroError("Can't open file - %s" % e)
 
-    writer = DataFileWriter(out, DatumWriter(), schema)
+    writer = DataFileWriter(getattr(out, 'buffer', out), DatumWriter(), schema)
 
-    iter_records = {"json" : iter_json, "csv" : iter_csv}[input_type]
     for filename in (files or ["-"]):
         info = _open(filename, "rb")
         for record in iter_records(info, schema):
@@ -218,9 +224,6 @@ def write(opts, files):
     writer.close()
 
 def main(argv=None):
-    import sys
-    from optparse import OptionParser, OptionGroup
-
     argv = argv or sys.argv
 
     parser = OptionParser(description="Display/write for Avro files",
@@ -269,8 +272,6 @@ def main(argv=None):
             raise AvroError("Unknown command - %s" % command)
     except AvroError as e:
         parser.error("%s" % e) # Will exit
-    except Exception as e:
-        raise SystemExit("panic: %s" % e)
 
 if __name__ == "__main__":
     main()
diff --git a/lang/py/setup.cfg b/lang/py/setup.cfg
index 0c2148f..5c32c9b 100644
--- a/lang/py/setup.cfg
+++ b/lang/py/setup.cfg
@@ -31,7 +31,11 @@ url = https://avro.apache.org/
 license = Apache License 2.0
 classifiers =
     License :: OSI Approved :: Apache Software License
-    Programming Language :: Python :: 2.7 :: Only
+    Programming Language :: Python :: 2.7
+    Programming Language :: Python :: 3.5
+    Programming Language :: Python :: 3.6
+    Programming Language :: Python :: 3.7
+    Programming Language :: Python :: 3.8
 
 [options]
 packages =
@@ -50,7 +54,7 @@ install_requires =
 zip_safe = true
 scripts =
     scripts/avro
-python_requires = <3.0,>=2.7
+python_requires = >=2.7
 
 [options.package_data]
 avro =
@@ -75,7 +79,7 @@ line_length = 150
 known_third_party=zope
 
 [pycodestyle]
-exclude = .eggs,build
+exclude = .eggs,.tox,build
 ignore = E101,E111,E114,E121,E122,E124,E125,E126,E127,E128,E129,E201,E202,E203,E222,E226,E225,E231,E241,E251,E261,E262,E265,E266,E301,E302,E303,E305,E306,E402,E501,E701,E703,E704,E711,W191,W291,W292,W293,W391,W503,W504,W601
 max-line-length = 150
 statistics = True
diff --git a/lang/py/tox.ini b/lang/py/tox.ini
index 0c9ddcd..b9b2c46 100644
--- a/lang/py/tox.ini
+++ b/lang/py/tox.ini
@@ -16,6 +16,7 @@
 [tox]
 envlist =
     py27
+    py35
 
 [coverage:run]
 source =


Mime
View raw message