avro-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rskr...@apache.org
Subject [avro] 19/23: AVRO-1788: Remove Obsolete Python < 2.7 Syntax (#683)
Date Wed, 29 Jan 2020 08:54:38 GMT
This is an automated email from the ASF dual-hosted git repository.

rskraba pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/avro.git

commit e9eee386a14b592a7cca55a1214c14c10d03a3b2
Author: Michael A. Smith <michael@smith-li.com>
AuthorDate: Tue Oct 22 08:49:44 2019 -0400

    AVRO-1788: Remove Obsolete Python < 2.7 Syntax (#683)
---
 lang/py/scripts/avro                          | 24 +++++----
 lang/py/setup.py                              |  7 ++-
 lang/py/src/avro/__init__.py                  | 10 ++--
 lang/py/src/avro/constants.py                 | 13 +++--
 lang/py/src/avro/datafile.py                  |  2 +-
 lang/py/src/avro/io.py                        | 63 ++++++++++++----------
 lang/py/src/avro/ipc.py                       | 27 ++++++----
 lang/py/src/avro/protocol.py                  | 32 +++++------
 lang/py/src/avro/schema.py                    | 31 +++++++----
 lang/py/src/avro/tether/__init__.py           | 15 +++---
 lang/py/src/avro/tether/tether_task.py        | 41 +++++++-------
 lang/py/src/avro/tether/tether_task_runner.py | 61 ++++++++++-----------
 lang/py/src/avro/tether/util.py               | 37 ++++++-------
 lang/py/src/avro/timezones.py                 |  9 +++-
 lang/py/src/avro/tool.py                      | 45 +++++++++-------
 lang/py/src/avro/txipc.py                     | 13 +++--
 lang/py/{src/avro => test}/__init__.py        | 10 ++--
 lang/py/test/av_bench.py                      |  7 ++-
 lang/py/test/gen_interop_data.py              | 40 +++++++++++---
 lang/py/test/mock_tether_parent.py            | 36 ++++++-------
 lang/py/test/sample_http_client.py            |  7 ++-
 lang/py/test/sample_http_server.py            |  3 ++
 lang/py/test/set_avro_test_path.py            |  7 +++
 lang/py/test/test_datafile.py                 | 77 +++++++++++++++------------
 lang/py/test/test_datafile_interop.py         | 43 +++++++++------
 lang/py/test/test_io.py                       | 59 ++++++++++----------
 lang/py/test/test_ipc.py                      |  7 +++
 lang/py/test/test_script.py                   | 11 ++--
 lang/py/test/test_tether_task.py              | 39 ++++++++------
 lang/py/test/test_tether_task_runner.py       | 67 ++++++++++++-----------
 lang/py/test/test_tether_word_count.py        | 15 +++---
 lang/py/test/txsample_http_client.py          |  5 ++
 lang/py/test/txsample_http_server.py          |  3 ++
 lang/py/test/word_count_task.py               | 49 +++++++++--------
 34 files changed, 533 insertions(+), 382 deletions(-)

diff --git a/lang/py/scripts/avro b/lang/py/scripts/avro
index b320a39..1aac028 100644
--- a/lang/py/scripts/avro
+++ b/lang/py/scripts/avro
@@ -18,16 +18,19 @@
 
 """Command line utility for reading and writing Avro files."""
 
-from avro.io import DatumReader, DatumWriter
-from avro.datafile import DataFileReader, DataFileWriter
-import avro.schema
+from __future__ import absolute_import, division, print_function
 
-import json
 import csv
-from sys import stdout, stdin
-from itertools import ifilter, imap
+import json
 from functools import partial
+from itertools import ifilter, imap
 from os.path import splitext
+from sys import stdin, stdout
+
+import avro.schema
+from avro.datafile import DataFileReader, DataFileWriter
+from avro.io import DatumReader, DatumWriter
+
 
 class AvroError(Exception):
     pass
@@ -52,9 +55,9 @@ def print_csv(row):
 
 def select_printer(format):
     return {
-        "json" : print_json,
-        "json-pretty" : print_json_pretty,
-        "csv" : print_csv
+        "json": print_json,
+        "json-pretty": print_json_pretty,
+        "csv": print_csv
     }[format]
 
 def record_match(expr, record):
@@ -102,7 +105,7 @@ def print_avro(avro, opts):
 def print_schema(avro):
     schema = avro.meta["avro.schema"]
     # Pretty print
-    print json.dumps(json.loads(schema), indent=4)
+    print(json.dumps(json.loads(schema), indent=4))
 
 def cat(opts, args):
     if not args:
@@ -257,4 +260,3 @@ def main(argv=None):
 
 if __name__ == "__main__":
     main()
-
diff --git a/lang/py/setup.py b/lang/py/setup.py
index c2e07c6..b978092 100755
--- a/lang/py/setup.py
+++ b/lang/py/setup.py
@@ -1,5 +1,6 @@
 #!/usr/bin/env python
 
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -7,9 +8,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 # https://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,6 +18,8 @@
 # limitations under the License.
 
 
+from __future__ import absolute_import, division, print_function
+
 import distutils.errors
 import glob
 import os
diff --git a/lang/py/src/avro/__init__.py b/lang/py/src/avro/__init__.py
index 47d1295..9a859e9 100644
--- a/lang/py/src/avro/__init__.py
+++ b/lang/py/src/avro/__init__.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -5,14 +8,15 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 # https://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-__all__ = ['schema', 'io', 'datafile', 'protocol', 'ipc', 'constants', 'timezones']
+from __future__ import absolute_import, division, print_function
 
+__all__ = ['schema', 'io', 'datafile', 'protocol', 'ipc', 'constants', 'timezones']
diff --git a/lang/py/src/avro/constants.py b/lang/py/src/avro/constants.py
index 66e31df..2197201 100644
--- a/lang/py/src/avro/constants.py
+++ b/lang/py/src/avro/constants.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -5,18 +8,18 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 # https://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-"""
-Contains Constants for Python Avro
-"""
+"""Contains Constants for Python Avro"""
+
+from __future__ import absolute_import, division, print_function
 
 DATE = "date"
 DECIMAL = "decimal"
diff --git a/lang/py/src/avro/datafile.py b/lang/py/src/avro/datafile.py
index 75a8e4a..a9c9c22 100644
--- a/lang/py/src/avro/datafile.py
+++ b/lang/py/src/avro/datafile.py
@@ -401,4 +401,4 @@ def generate_sixteen_random_bytes():
   try:
     return os.urandom(16)
   except NotImplementedError:
-    return [chr(random.randrange(256)) for i in range(16)]
+    return bytes(random.randrange(256) for i in range(16))
diff --git a/lang/py/src/avro/io.py b/lang/py/src/avro/io.py
index c36c5b3..b18b148 100644
--- a/lang/py/src/avro/io.py
+++ b/lang/py/src/avro/io.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -38,6 +41,8 @@ uses the following mapping:
   * Schema booleans are implemented as bool.
 """
 
+from __future__ import absolute_import, division, print_function
+
 import datetime
 import json
 import struct
@@ -183,7 +188,7 @@ class BinaryDecoder(object):
 
   def read_boolean(self):
     """
-    a boolean is written as a single byte 
+    a boolean is written as a single byte
     whose value is either 0 (false) or 1 (true).
     """
     return ord(self.read(1)) == 1
@@ -261,7 +266,7 @@ class BinaryDecoder(object):
 
   def read_bytes(self):
     """
-    Bytes are encoded as a long followed by that many bytes of data. 
+    Bytes are encoded as a long followed by that many bytes of data.
     """
     return self.read(self.read_long())
 
@@ -297,7 +302,7 @@ class BinaryDecoder(object):
 
   def read_time_millis_from_int(self):
     """
-    int is decoded as python time object which represents 
+    int is decoded as python time object which represents
     the number of milliseconds after midnight, 00:00:00.000.
     """
     milliseconds = self.read_int()
@@ -305,7 +310,7 @@ class BinaryDecoder(object):
 
   def read_time_micros_from_long(self):
     """
-    long is decoded as python time object which represents 
+    long is decoded as python time object which represents
     the number of microseconds after midnight, 00:00:00.000000.
     """
     microseconds = self.read_long()
@@ -313,17 +318,17 @@ class BinaryDecoder(object):
 
   def read_timestamp_millis_from_long(self):
     """
-    long is decoded as python datetime object which represents 
+    long is decoded as python datetime object which represents
     the number of milliseconds from the unix epoch, 1 January 1970.
     """
     timestamp_millis = self.read_long()
     timedelta = datetime.timedelta(microseconds=timestamp_millis * 1000)
-    unix_epoch_datetime = datetime.datetime(1970, 1, 1, 0, 0, 0, 0, tzinfo=timezones.utc) 
+    unix_epoch_datetime = datetime.datetime(1970, 1, 1, 0, 0, 0, 0, tzinfo=timezones.utc)
     return unix_epoch_datetime + timedelta
 
   def read_timestamp_micros_from_long(self):
     """
-    long is decoded as python datetime object which represents 
+    long is decoded as python datetime object which represents
     the number of microseconds from the unix epoch, 1 January 1970.
     """
     timestamp_micros = self.read_long()
@@ -386,10 +391,10 @@ class BinaryEncoder(object):
     null is written as zero bytes
     """
     pass
-  
+
   def write_boolean(self, datum):
     """
-    a boolean is written as a single byte 
+    a boolean is written as a single byte
     whose value is either 0 (false) or 1 (true).
     """
     if datum:
@@ -399,7 +404,7 @@ class BinaryEncoder(object):
 
   def write_int(self, datum):
     """
-    int and long values are written using variable-length, zig-zag coding.    
+    int and long values are written using variable-length, zig-zag coding.
     """
     self.write_long(datum);
 
@@ -491,7 +496,7 @@ class BinaryEncoder(object):
         bits_to_write = unscaled_datum >> (8 * index)
         self.write(chr(bits_to_write & 0xff))
     else:
-      for i in range(offset_bits/8):
+      for i in range(offset_bits // 8):
         self.write(chr(0))
       for index in range(bytes_req-1, -1, -1):
         bits_to_write = unscaled_datum >> (8 * index)
@@ -499,7 +504,7 @@ class BinaryEncoder(object):
 
   def write_bytes(self, datum):
     """
-    Bytes are encoded as a long followed by that many bytes of data. 
+    Bytes are encoded as a long followed by that many bytes of data.
     """
     self.write_long(len(datum))
     self.write(struct.pack('%ds' % len(datum), datum))
@@ -591,32 +596,32 @@ class DatumReader(object):
           and w_type == r_type):
       return True
     elif (w_type == r_type == 'record' and
-          DatumReader.check_props(writers_schema, readers_schema, 
+          DatumReader.check_props(writers_schema, readers_schema,
                                   ['fullname'])):
       return True
     elif (w_type == r_type == 'error' and
-          DatumReader.check_props(writers_schema, readers_schema, 
+          DatumReader.check_props(writers_schema, readers_schema,
                                   ['fullname'])):
       return True
     elif (w_type == r_type == 'request'):
       return True
-    elif (w_type == r_type == 'fixed' and 
-          DatumReader.check_props(writers_schema, readers_schema, 
+    elif (w_type == r_type == 'fixed' and
+          DatumReader.check_props(writers_schema, readers_schema,
                                   ['fullname', 'size'])):
       return True
-    elif (w_type == r_type == 'enum' and 
-          DatumReader.check_props(writers_schema, readers_schema, 
+    elif (w_type == r_type == 'enum' and
+          DatumReader.check_props(writers_schema, readers_schema,
                                   ['fullname'])):
       return True
-    elif (w_type == r_type == 'map' and 
+    elif (w_type == r_type == 'map' and
           DatumReader.check_props(writers_schema.values,
                                   readers_schema.values, ['type'])):
       return True
-    elif (w_type == r_type == 'array' and 
+    elif (w_type == r_type == 'array' and
           DatumReader.check_props(writers_schema.items,
                                   readers_schema.items, ['type'])):
       return True
-    
+
     # Handle schema promotion
     if w_type == 'int' and r_type in ['long', 'float', 'double']:
       return True
@@ -633,7 +638,7 @@ class DatumReader(object):
     reader the "reader's schema".
     """
     self._writers_schema = writers_schema
-    self._readers_schema = readers_schema 
+    self._readers_schema = readers_schema
 
   # read/write properties
   def set_writers_schema(self, writers_schema):
@@ -644,7 +649,7 @@ class DatumReader(object):
     self._readers_schema = readers_schema
   readers_schema = property(lambda self: self._readers_schema,
                             set_readers_schema)
-  
+
   def read(self, decoder):
     if self.readers_schema is None:
       self.readers_schema = self.writers_schema
@@ -682,13 +687,13 @@ class DatumReader(object):
       else:
         return decoder.read_int()
     elif writers_schema.type == 'long':
-      if (hasattr(writers_schema, 'logical_type') and 
+      if (hasattr(writers_schema, 'logical_type') and
           writers_schema.logical_type == constants.TIME_MICROS):
         return decoder.read_time_micros_from_long()
-      elif (hasattr(writers_schema, 'logical_type') and 
+      elif (hasattr(writers_schema, 'logical_type') and
             writers_schema.logical_type == constants.TIMESTAMP_MILLIS):
         return decoder.read_timestamp_millis_from_long()
-      elif (hasattr(writers_schema, 'logical_type') and 
+      elif (hasattr(writers_schema, 'logical_type') and
             writers_schema.logical_type == constants.TIMESTAMP_MICROS):
         return decoder.read_timestamp_micros_from_long()
       else:
@@ -886,7 +891,7 @@ class DatumReader(object):
                  % (index_of_schema, len(writers_schema.schemas))
       raise SchemaResolutionException(fail_msg, writers_schema, readers_schema)
     selected_writers_schema = writers_schema.schemas[index_of_schema]
-    
+
     # read data
     return self.read_data(selected_writers_schema, readers_schema, decoder)
 
@@ -914,7 +919,7 @@ class DatumReader(object):
      * if the reader's record schema has a field that contains a default value,
        and writer's schema does not have a field with the same name, then the
        reader should use the default value from its field.
-     * if the reader's record schema has a field with no default value, and 
+     * if the reader's record schema has a field with no default value, and
        writer's schema does not have a field with the same name, then the
        field's value is unset.
     """
@@ -933,7 +938,7 @@ class DatumReader(object):
     if len(readers_fields_dict) > len(read_record):
       writers_fields_dict = writers_schema.fields_dict
       for field_name, field in readers_fields_dict.items():
-        if not writers_fields_dict.has_key(field_name):
+        if field_name not in writers_fields_dict:
           if field.has_default:
             field_val = self._read_default_value(field.type, field.default)
             read_record[field.name] = field_val
diff --git a/lang/py/src/avro/ipc.py b/lang/py/src/avro/ipc.py
index 8cbf07b..1bba4f7 100644
--- a/lang/py/src/avro/ipc.py
+++ b/lang/py/src/avro/ipc.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -5,17 +8,19 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 # https://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-"""
-Support for inter-process calls.
-"""
+
+"""Support for inter-process calls."""
+
+from __future__ import absolute_import, division, print_function
+
 import httplib
 
 from avro import io, protocol, schema
@@ -189,7 +194,7 @@ class BaseRequestor(object):
       * a one-byte error flag boolean, followed by either:
         o if the error flag is false,
           the message response, serialized per the message's response schema.
-        o if the error flag is true, 
+        o if the error flag is true,
           the error, serialized per the message's error union schema.
     """
     # response metadata
@@ -267,11 +272,11 @@ class Responder(object):
     buffer_encoder = io.BinaryEncoder(buffer_writer)
     error = None
     response_metadata = {}
-    
+
     try:
       remote_protocol = self.process_handshake(buffer_decoder, buffer_encoder)
       # handshake failure
-      if remote_protocol is None:  
+      if remote_protocol is None:
         return buffer_writer.getvalue()
 
       # read request using remote protocol
@@ -296,9 +301,9 @@ class Responder(object):
       # perform server logic
       try:
         response = self.invoke(local_message, request)
-      except AvroRemoteException, e:
+      except AvroRemoteException as e:
         error = e
-      except Exception, e:
+      except Exception as e:
         error = AvroRemoteException(str(e))
 
       # write response using local protocol
@@ -310,7 +315,7 @@ class Responder(object):
       else:
         writers_schema = local_message.errors
         self.write_error(writers_schema, error, buffer_encoder)
-    except schema.AvroException, e:
+    except schema.AvroException as e:
       error = AvroRemoteException(str(e))
       buffer_encoder = io.BinaryEncoder(StringIO())
       META_WRITER.write(response_metadata, buffer_encoder)
diff --git a/lang/py/src/avro/protocol.py b/lang/py/src/avro/protocol.py
index 117401c..9b27a45 100644
--- a/lang/py/src/avro/protocol.py
+++ b/lang/py/src/avro/protocol.py
@@ -1,6 +1,3 @@
-#!/usr/bin/env python
-
-##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -21,10 +18,15 @@
 
 from __future__ import absolute_import, division, print_function
 
-import hashlib
 import json
 
-import avro.schema
+from avro import schema
+
+try:
+  from hashlib import md5
+except ImportError:
+  from md5 import md5
+
 
 #
 # Constants
@@ -37,7 +39,7 @@ VALID_TYPE_SCHEMA_TYPES = ('enum', 'record', 'error', 'fixed')
 # Exceptions
 #
 
-class ProtocolParseException(avro.schema.AvroException):
+class ProtocolParseException(schema.AvroException):
   pass
 
 #
@@ -49,7 +51,7 @@ class Protocol(object):
   def _parse_types(self, types, type_names):
     type_objects = []
     for type in types:
-      type_object = avro.schema.make_avsc_object(type, type_names)
+      type_object = schema.make_avsc_object(type, type_names)
       if type_object.type not in VALID_TYPE_SCHEMA_TYPES:
         fail_msg = 'Type %s not an enum, fixed, record, or error.' % type
         raise ProtocolParseException(fail_msg)
@@ -92,7 +94,7 @@ class Protocol(object):
 
     self._props = {}
     self.set_prop('name', name)
-    type_names = avro.schema.Names()
+    type_names = schema.Names()
     if namespace is not None:
       self.set_prop('namespace', namespace)
       type_names.default_namespace = namespace
@@ -100,13 +102,13 @@ class Protocol(object):
       self.set_prop('types', self._parse_types(types, type_names))
     if messages is not None:
       self.set_prop('messages', self._parse_messages(messages, type_names))
-    self._md5 = hashlib.md5(str(self)).digest()
+    self._md5 = md5(str(self)).digest()
 
   # read-only properties
   name = property(lambda self: self.get_prop('name'))
   namespace = property(lambda self: self.get_prop('namespace'))
   fullname = property(lambda self:
-                      avro.schema.Name(self.name, self.namespace).fullname)
+                      schema.Name(self.name, self.namespace).fullname)
   types = property(lambda self: self.get_prop('types'))
   types_dict = property(lambda self: dict([(type.name, type)
                                            for type in self.types]))
@@ -123,7 +125,7 @@ class Protocol(object):
   def to_json(self):
     to_dump = {}
     to_dump['protocol'] = self.name
-    names = avro.schema.Names(default_namespace=self.namespace)
+    names = schema.Names(default_namespace=self.namespace)
     if self.namespace:
       to_dump['namespace'] = self.namespace
     if self.types:
@@ -148,20 +150,20 @@ class Message(object):
     if not isinstance(request, list):
       fail_msg = 'Request property not a list: %s' % request
       raise ProtocolParseException(fail_msg)
-    return avro.schema.RecordSchema(None, None, request, names, 'request')
+    return schema.RecordSchema(None, None, request, names, 'request')
 
   def _parse_response(self, response, names):
     if isinstance(response, basestring) and names.has_name(response, None):
       return names.get_name(response, None)
     else:
-      return avro.schema.make_avsc_object(response, names)
+      return schema.make_avsc_object(response, names)
 
   def _parse_errors(self, errors, names):
     if not isinstance(errors, list):
       fail_msg = 'Errors property not a list: %s' % errors
       raise ProtocolParseException(fail_msg)
     errors_for_parsing = {'type': 'error_union', 'declared_errors': errors}
-    return avro.schema.make_avsc_object(errors_for_parsing, names)
+    return schema.make_avsc_object(errors_for_parsing, names)
 
   def __init__(self,  name, request, response, errors=None, names=None):
     self._name = name
@@ -190,7 +192,7 @@ class Message(object):
 
   def to_json(self, names=None):
     if names is None:
-      names = avro.schema.Names()
+      names = schema.Names()
     to_dump = {}
     to_dump['request'] = self.request.to_json(names)
     to_dump['response'] = self.response.to_json(names)
diff --git a/lang/py/src/avro/schema.py b/lang/py/src/avro/schema.py
index 822d76c..06c1aeb 100644
--- a/lang/py/src/avro/schema.py
+++ b/lang/py/src/avro/schema.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -35,7 +38,10 @@ A schema may be one of:
   Null.
 """
 
+from __future__ import absolute_import, division, print_function
+
 import json
+import sys
 from math import floor, log10
 
 from avro import constants
@@ -231,11 +237,11 @@ class Names(object):
 
   def has_name(self, name_attr, space_attr):
       test = Name(name_attr, space_attr, self.default_namespace).fullname
-      return self.names.has_key(test)
+      return test in self.names
 
   def get_name(self, name_attr, space_attr):
       test = Name(name_attr, space_attr, self.default_namespace).fullname
-      if not self.names.has_key(test):
+      if test not in self.names:
           return None
       return self.names[test]
 
@@ -270,7 +276,7 @@ class Names(object):
     if to_add.fullname in VALID_TYPES:
       fail_msg = '%s is a reserved type name.' % to_add.fullname
       raise SchemaParseException(fail_msg)
-    elif self.names.has_key(to_add.fullname):
+    elif to_add.fullname in self.names:
       fail_msg = 'The name "%s" is already in use.' % to_add.fullname
       raise SchemaParseException(fail_msg)
 
@@ -377,7 +383,7 @@ class Field(object):
     else:
       try:
         type_schema = make_avsc_object(type, names)
-      except Exception, e:
+      except Exception as e:
         fail_msg = 'Type property "%s" not a valid Avro schema: %s' % (type, e)
         raise SchemaParseException(fail_msg)
     self.set_prop('type', type_schema)
@@ -578,7 +584,7 @@ class ArraySchema(Schema):
     else:
       try:
         items_schema = make_avsc_object(items, names)
-      except SchemaParseException, e:
+      except SchemaParseException as e:
         fail_msg = 'Items schema (%s) not a valid Avro schema: %s (known names: %s)' % (items, e, names.names.keys())
         raise SchemaParseException(fail_msg)
 
@@ -652,7 +658,7 @@ class UnionSchema(Schema):
       else:
         try:
           new_schema = make_avsc_object(schema, names)
-        except Exception, e:
+        except Exception as e:
           raise SchemaParseException('Union item must be a valid Avro schema: %s' % str(e))
       # check the new schema
       if (new_schema.type in VALID_TYPES and new_schema.type not in NAMED_TYPES
@@ -708,7 +714,7 @@ class RecordSchema(NamedSchema):
         # null values can have a default value of None
         has_default = False
         default = None
-        if field.has_key('default'):
+        if 'default' in field:
           has_default = True
           default = field.get('default')
 
@@ -978,10 +984,13 @@ def parse(json_string):
   # parse the JSON
   try:
     json_data = json.loads(json_string)
-  except Exception, e:
-    import sys
-    raise SchemaParseException('Error parsing JSON: %s, error = %s'
-                               % (json_string, e)), None, sys.exc_info()[2]
+  except Exception as e:
+    msg = 'Error parsing JSON: {}, error = {}'.format(json_string, e)
+    new_exception = SchemaParseException(msg)
+    traceback = sys.exc_info()[2]
+    if not hasattr(new_exception, 'with_traceback'):
+      raise (new_exception, None, traceback)  # Python 2 syntax
+    raise new_exception.with_traceback(traceback)
 
   # Initialize the names object
   names = Names()
diff --git a/lang/py/src/avro/tether/__init__.py b/lang/py/src/avro/tether/__init__.py
index 0dbd3d8..c60edf9 100644
--- a/lang/py/src/avro/tether/__init__.py
+++ b/lang/py/src/avro/tether/__init__.py
@@ -1,4 +1,6 @@
-#
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -15,12 +17,9 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-#
 
-from .util import *
-from .tether_task import *
-from .tether_task_runner import *
+from __future__ import absolute_import, division, print_function
 
-__all__=util.__all__
-__all__+=tether_task.__all__
-__all__+=tether_task_runner.__all__
+from avro.tether.tether_task import HTTPRequestor, TaskType, TetherTask, inputProtocol, outputProtocol
+from avro.tether.tether_task_runner import TaskRunner
+from avro.tether.util import find_port
diff --git a/lang/py/src/avro/tether/tether_task.py b/lang/py/src/avro/tether/tether_task.py
index 23112a7..4e2004d 100644
--- a/lang/py/src/avro/tether/tether_task.py
+++ b/lang/py/src/avro/tether/tether_task.py
@@ -1,22 +1,23 @@
-"""
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-"""
-
-__all__=["TetherTask","TaskType","inputProtocol","outputProtocol","HTTPRequestor"]
+#!/usr/bin/env python
+
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import, division, print_function
 
 import collections
 import io as pyio
@@ -30,6 +31,8 @@ from StringIO import StringIO
 from avro import io as avio
 from avro import ipc, protocol, schema
 
+__all__ = ["TetherTask", "TaskType", "inputProtocol", "outputProtocol", "HTTPRequestor"]
+
 # create protocol objects for the input and output protocols
 # The build process should copy InputProtocol.avpr and OutputProtocol.avpr
 # into the same directory as this module
diff --git a/lang/py/src/avro/tether/tether_task_runner.py b/lang/py/src/avro/tether/tether_task_runner.py
index b248ffd..64bee7b 100644
--- a/lang/py/src/avro/tether/tether_task_runner.py
+++ b/lang/py/src/avro/tether/tether_task_runner.py
@@ -1,30 +1,23 @@
-"""
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-"""
-
-__all__=["TaskRunner"]
-
-if __name__ == "__main__":
-  # Relative imports don't work when being run directly
-  from avro import tether
-  from avro.tether import TetherTask, find_port, inputProtocol
-
-else:
-  from . import TetherTask, find_port, inputProtocol
+#!/usr/bin/env python
+
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import, division, print_function
 
 import logging
 import sys
@@ -33,12 +26,16 @@ import traceback
 import weakref
 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
 
+import avro.tether.tether_task
+import avro.tether.util
 from avro import ipc
 
+__all__ = ["TaskRunner"]
+
 
 class TaskRunnerResponder(ipc.Responder):
   """
-  The responder for the thethered process
+  The responder for the tethered process
   """
   def __init__(self,runner):
     """
@@ -46,7 +43,7 @@ class TaskRunnerResponder(ipc.Responder):
     ----------------------------------------------------------
     runner - Instance of TaskRunner
     """
-    ipc.Responder.__init__(self, inputProtocol)
+    ipc.Responder.__init__(self, avro.tether.tether_task.inputProtocol)
 
     self.log=logging.getLogger("TaskRunnerResponder")
 
@@ -148,7 +145,7 @@ class TaskRunner(object):
 
     self.log=logging.getLogger("TaskRunner:")
 
-    if not(isinstance(task,TetherTask)):
+    if not(isinstance(task, avro.tether.tether_task.TetherTask)):
       raise ValueError("task must be an instance of tether task")
     self.task=task
 
@@ -172,7 +169,7 @@ class TaskRunner(object):
                 testing
     """
 
-    port=find_port()
+    port = avro.tether.util.find_port()
     address=("localhost",port)
 
 
@@ -212,7 +209,7 @@ if __name__ == '__main__':
   logging.basicConfig(level=logging.INFO)
 
   if (len(sys.argv)<=1):
-    print "Error: tether_task_runner.__main__: Usage: tether_task_runner task_package.task_module.TaskClass"
+    print("Error: tether_task_runner.__main__: Usage: tether_task_runner task_package.task_module.TaskClass")
     raise ValueError("Usage: tether_task_runner task_package.task_module.TaskClass")
 
   fullcls=sys.argv[1]
diff --git a/lang/py/src/avro/tether/util.py b/lang/py/src/avro/tether/util.py
index cbeeef0..3d8ad3a 100644
--- a/lang/py/src/avro/tether/util.py
+++ b/lang/py/src/avro/tether/util.py
@@ -1,22 +1,23 @@
-"""
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-"""
+#!/usr/bin/env python
 
-__all__=["find_port"]
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import, division, print_function
 
 import socket
 
diff --git a/lang/py/src/avro/timezones.py b/lang/py/src/avro/timezones.py
index a4985b4..a306f6d 100644
--- a/lang/py/src/avro/timezones.py
+++ b/lang/py/src/avro/timezones.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -5,15 +8,17 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 # https://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from __future__ import absolute_import, division, print_function
+
 from datetime import datetime, timedelta, tzinfo
 
 
diff --git a/lang/py/src/avro/tool.py b/lang/py/src/avro/tool.py
index 6a92fee..3c0c228 100644
--- a/lang/py/src/avro/tool.py
+++ b/lang/py/src/avro/tool.py
@@ -1,4 +1,6 @@
-#! /usr/bin/env python
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -6,20 +8,23 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 # https://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 """
 Command-line tool
 
 NOTE: The API for the command-line tool is experimental.
 """
 
+from __future__ import absolute_import, division, print_function
+
 import sys
 import threading
 import urlparse
@@ -37,7 +42,7 @@ class GenericResponder(ipc.Responder):
 
   def invoke(self, message, request):
     if message.name == self.msg:
-      print >> sys.stderr, "Message: %s Datum: %s" % (message.name, self.datum)
+      print("Message: %s Datum: %s" % (message.name, self.datum), file=sys.stderr)
       # server will shut down after processing a single Avro request
       global server_should_shutdown
       server_should_shutdown = True
@@ -55,7 +60,7 @@ class GenericHandler(BaseHTTPRequestHandler):
     resp_writer = ipc.FramedWriter(self.wfile)
     resp_writer.write_framed_message(resp_body)
     if server_should_shutdown:
-      print >> sys.stderr, "Shutting down server."
+      print("Shutting down server.", file=sys.stderr)
       quitter = threading.Thread(target=self.server.shutdown)
       quitter.daemon = True
       quitter.start()
@@ -68,10 +73,10 @@ def run_server(uri, proto, msg, datum):
   server_should_shutdown = False
   responder = GenericResponder(proto, msg, datum)
   server = HTTPServer(server_addr, GenericHandler)
-  print "Port: %s" % server.server_port
+  print("Port: %s" % server.server_port)
   sys.stdout.flush()
   server.allow_reuse_address = True
-  print >> sys.stderr, "Starting server."
+  print("Starting server.", file=sys.stderr)
   server.serve_forever()
 
 def send_message(uri, proto, msg, datum):
@@ -79,7 +84,7 @@ def send_message(uri, proto, msg, datum):
   client = ipc.HTTPTransceiver(url_obj.hostname, url_obj.port)
   proto_json = file(proto, 'r').read()
   requestor = ipc.Requestor(protocol.parse(proto_json), client)
-  print requestor.request(msg, datum)
+  print(requestor.request(msg, datum))
 
 def file_or_stdin(f):
   if f == "-":
@@ -89,20 +94,20 @@ def file_or_stdin(f):
 
 def main(args=sys.argv):
   if len(args) == 1:
-    print "Usage: %s [dump|rpcreceive|rpcsend]" % args[0]
+    print("Usage: %s [dump|rpcreceive|rpcsend]" % args[0])
     return 1
 
   if args[1] == "dump":
     if len(args) != 3:
-      print "Usage: %s dump input_file" % args[0]
+      print("Usage: %s dump input_file" % args[0])
       return 1
     for d in datafile.DataFileReader(file_or_stdin(args[2]), io.DatumReader()):
-      print repr(d)
+      print(repr(d))
   elif args[1] == "rpcreceive":
     usage_str = "Usage: %s rpcreceive uri protocol_file " % args[0]
     usage_str += "message_name (-data d | -file f)"
     if len(args) not in [5, 7]:
-      print usage_str
+      print(usage_str)
       return 1
     uri, proto, msg = args[2:5]
     datum = None
@@ -111,19 +116,19 @@ def main(args=sys.argv):
         reader = open(args[6], 'rb')
         datum_reader = io.DatumReader()
         dfr = datafile.DataFileReader(reader, datum_reader)
-        datum = dfr.next()
+        datum = next(dfr)
       elif args[5] == "-data":
-        print "JSON Decoder not yet implemented."
+        print("JSON Decoder not yet implemented.")
         return 1
       else:
-        print usage_str
+        print(usage_str)
         return 1
     run_server(uri, proto, msg, datum)
   elif args[1] == "rpcsend":
     usage_str = "Usage: %s rpcsend uri protocol_file " % args[0]
     usage_str += "message_name (-data d | -file f)"
     if len(args) not in [5, 7]:
-      print usage_str
+      print(usage_str)
       return 1
     uri, proto, msg = args[2:5]
     datum = None
@@ -132,15 +137,15 @@ def main(args=sys.argv):
         reader = open(args[6], 'rb')
         datum_reader = io.DatumReader()
         dfr = datafile.DataFileReader(reader, datum_reader)
-        datum = dfr.next()
+        datum = next(dfr)
       elif args[5] == "-data":
-        print "JSON Decoder not yet implemented."
+        print("JSON Decoder not yet implemented.")
         return 1
       else:
-        print usage_str
+        print(usage_str)
         return 1
     send_message(uri, proto, msg, datum)
   return 0
-  
+
 if __name__ == "__main__":
   sys.exit(main(sys.argv))
diff --git a/lang/py/src/avro/txipc.py b/lang/py/src/avro/txipc.py
index 72d63a6..66ca726 100644
--- a/lang/py/src/avro/txipc.py
+++ b/lang/py/src/avro/txipc.py
@@ -1,5 +1,6 @@
 #!/usr/bin/env python
 
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -15,10 +16,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-try:
-  from cStringIO import StringIO
-except ImportError:
-  from StringIO import StringIO
+
+from __future__ import absolute_import, division, print_function
+
 from zope.interface import implements
 
 from avro import io, ipc
@@ -29,6 +29,11 @@ from twisted.web.client import Agent
 from twisted.web.http_headers import Headers
 from twisted.web.iweb import IBodyProducer
 
+try:
+  from cStringIO import StringIO
+except ImportError:
+  from StringIO import StringIO
+
 
 class TwistedRequestor(ipc.BaseRequestor):
   """A Twisted-compatible requestor. Returns a Deferred that will fire with the
diff --git a/lang/py/src/avro/__init__.py b/lang/py/test/__init__.py
similarity index 89%
copy from lang/py/src/avro/__init__.py
copy to lang/py/test/__init__.py
index 47d1295..a2a5bef 100644
--- a/lang/py/src/avro/__init__.py
+++ b/lang/py/test/__init__.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -5,14 +8,11 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 # https://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-__all__ = ['schema', 'io', 'datafile', 'protocol', 'ipc', 'constants', 'timezones']
-
diff --git a/lang/py/test/av_bench.py b/lang/py/test/av_bench.py
index e90c987..1e6a05d 100644
--- a/lang/py/test/av_bench.py
+++ b/lang/py/test/av_bench.py
@@ -1,5 +1,6 @@
 #!/usr/bin/env python
 
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -16,6 +17,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from __future__ import absolute_import, division, print_function
+
 import sys
 import time
 from random import choice, randint, sample
@@ -72,5 +75,5 @@ def t(f, *args):
 
 if __name__ == "__main__":
     n = int(sys.argv[1])
-    print "Write %0.4f" % t(write, n)
-    print "Read %0.4f" % t(read)
+    print("Write %0.4f" % t(write, n))
+    print("Read %0.4f" % t(read))
diff --git a/lang/py/test/gen_interop_data.py b/lang/py/test/gen_interop_data.py
index 336434e..13bf86c 100644
--- a/lang/py/test/gen_interop_data.py
+++ b/lang/py/test/gen_interop_data.py
@@ -1,5 +1,6 @@
 #!/usr/bin/env python
 
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -13,15 +14,33 @@
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+from __future__ import absolute_import, division, print_function
+
+import os
 import sys
 
 from avro import datafile, io, schema
 
+CODECS_TO_VALIDATE = ('null', 'deflate')
+
+try:
+  import snappy
+  CODECS_TO_VALIDATE += ('snappy',)
+except ImportError:
+  print('Snappy not present, will skip generating it.')
+try:
+  import zstandard
+  CODECS_TO_VALIDATE += ('zstandard',)
+except ImportError:
+  print('Zstandard not present, will skip generating it.')
+
 DATUM = {
   'intField': 12,
-  'longField': 15234324L,
+  'longField': 15234324,
   'stringField': unicode('hey'),
   'boolField': True,
   'floatField': 1234.0,
@@ -37,10 +56,15 @@ DATUM = {
 }
 
 if __name__ == "__main__":
-  interop_schema = schema.parse(open(sys.argv[1], 'r').read())
-  writer = open(sys.argv[2], 'wb')
-  datum_writer = io.DatumWriter()
-  # NB: not using compression
-  dfw = datafile.DataFileWriter(writer, datum_writer, interop_schema)
-  dfw.append(DATUM)
-  dfw.close()
+  for codec in CODECS_TO_VALIDATE:
+    interop_schema = schema.parse(open(sys.argv[1], 'r').read())
+    filename = sys.argv[2]
+    if codec != 'null':
+      base, ext = os.path.splitext(filename)
+      filename = base + "_" + codec + ext
+    writer = open(filename, 'wb')
+    datum_writer = io.DatumWriter()
+    # NB: not using compression
+    dfw = datafile.DataFileWriter(writer, datum_writer, interop_schema, codec=codec)
+    dfw.append(DATUM)
+    dfw.close()
diff --git a/lang/py/test/mock_tether_parent.py b/lang/py/test/mock_tether_parent.py
index c82e249..88d84dd 100644
--- a/lang/py/test/mock_tether_parent.py
+++ b/lang/py/test/mock_tether_parent.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -14,45 +17,36 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from __future__ import absolute_import, division, print_function
+
 import socket
 import sys
 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
 
+import avro.tether.tether_task
+import avro.tether.util
 import set_avro_test_path
-from avro import ipc, protocol, tether
-
-
-def find_port():
-  """
-  Return an unbound port
-  """
-  s=socket.socket()
-  s.bind(("127.0.0.1",0))
-
-  port=s.getsockname()[1]
-  s.close()
-
-  return port
+from avro import ipc, protocol
 
-SERVER_ADDRESS = ('localhost', find_port())
+SERVER_ADDRESS = ('localhost', avro.tether.util.find_port())
 
 class MockParentResponder(ipc.Responder):
   """
   The responder for the mocked parent
   """
   def __init__(self):
-    ipc.Responder.__init__(self, tether.outputProtocol)
+    ipc.Responder.__init__(self, avro.tether.tether_task.outputProtocol)
 
   def invoke(self, message, request):
     if message.name=='configure':
-      print "MockParentResponder: Recieved 'configure': inputPort={0}".format(request["port"])
+      print("MockParentResponder: Recieved 'configure': inputPort={0}".format(request["port"]))
 
     elif message.name=='status':
-      print "MockParentResponder: Recieved 'status': message={0}".format(request["message"])
+      print("MockParentResponder: Recieved 'status': message={0}".format(request["message"]))
     elif message.name=='fail':
-      print "MockParentResponder: Recieved 'fail': message={0}".format(request["message"])
+      print("MockParentResponder: Recieved 'fail': message={0}".format(request["message"]))
     else:
-      print "MockParentResponder: Recieved {0}".format(message.name)
+      print("MockParentResponder: Recieved {0}".format(message.name))
 
     # flush the output so it shows up in the parent process
     sys.stdout.flush()
@@ -85,7 +79,7 @@ if __name__ == '__main__':
       raise ValueError("Usage: mock_tether_parent start_server port")
 
     SERVER_ADDRESS=(SERVER_ADDRESS[0],port)
-    print "mock_tether_parent: Launching Server on Port: {0}".format(SERVER_ADDRESS[1])
+    print("mock_tether_parent: Launching Server on Port: {0}".format(SERVER_ADDRESS[1]))
 
     # flush the output so it shows up in the parent process
     sys.stdout.flush()
diff --git a/lang/py/test/sample_http_client.py b/lang/py/test/sample_http_client.py
index 62c91fd..02b8421 100644
--- a/lang/py/test/sample_http_client.py
+++ b/lang/py/test/sample_http_client.py
@@ -1,5 +1,6 @@
 #!/usr/bin/env python
 
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -13,8 +14,12 @@
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+from __future__ import absolute_import, division, print_function
+
 import sys
 
 from avro import ipc, protocol
@@ -78,7 +83,7 @@ if __name__ == '__main__':
   # build the parameters for the request
   params = {}
   params['message'] = message
-   
+
   # send the requests and print the result
   for msg_count in range(num_messages):
     requestor = make_requestor(SERVER_HOST, SERVER_PORT, MAIL_PROTOCOL)
diff --git a/lang/py/test/sample_http_server.py b/lang/py/test/sample_http_server.py
index e412ab5..c680afb 100644
--- a/lang/py/test/sample_http_server.py
+++ b/lang/py/test/sample_http_server.py
@@ -1,5 +1,6 @@
 #!/usr/bin/env python
 
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -16,6 +17,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from __future__ import absolute_import, division, print_function
+
 from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
 
 from avro import ipc, protocol
diff --git a/lang/py/test/set_avro_test_path.py b/lang/py/test/set_avro_test_path.py
index 8e47faf..fd395da 100644
--- a/lang/py/test/set_avro_test_path.py
+++ b/lang/py/test/set_avro_test_path.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -13,6 +16,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 """
 Module adjusts the path PYTHONPATH so the unittests
 will work even if an egg for AVRO is already installed.
@@ -28,6 +32,9 @@ being built. To work around this the unittests import this module before
 importing AVRO. This module in turn adjusts the python path so that the test
 build of AVRO is higher on the path then any installed eggs.
 """
+
+from __future__ import absolute_import, division, print_function
+
 import os
 import sys
 
diff --git a/lang/py/test/test_datafile.py b/lang/py/test/test_datafile.py
index 2b7061c..bceb071 100644
--- a/lang/py/test/test_datafile.py
+++ b/lang/py/test/test_datafile.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -5,14 +8,17 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 # https://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+from __future__ import absolute_import, division, print_function
+
 import os
 import unittest
 
@@ -56,25 +62,30 @@ try:
   import snappy
   CODECS_TO_VALIDATE += ('snappy',)
 except ImportError:
-  print 'Snappy not present, will skip testing it.'
+  print('Snappy not present, will skip testing it.')
+try:
+  import zstandard
+  CODECS_TO_VALIDATE += ('zstandard',)
+except ImportError:
+  print('Zstandard not present, will skip testing it.')
 
 # TODO(hammer): clean up written files with ant, not os.remove
 class TestDataFile(unittest.TestCase):
   def test_round_trip(self):
-    print ''
-    print 'TEST ROUND TRIP'
-    print '==============='
-    print ''
+    print('')
+    print('TEST ROUND TRIP')
+    print('===============')
+    print('')
     correct = 0
     for i, (example_schema, datum) in enumerate(SCHEMAS_TO_VALIDATE):
       for codec in CODECS_TO_VALIDATE:
-        print ''
-        print 'SCHEMA NUMBER %d' % (i + 1)
-        print '================'
-        print ''
-        print 'Schema: %s' % example_schema
-        print 'Datum: %s' % datum
-        print 'Codec: %s' % codec
+        print('')
+        print('SCHEMA NUMBER %d' % (i + 1))
+        print('================')
+        print('')
+        print('Schema: %s' % example_schema)
+        print('Datum: %s' % datum)
+        print('Codec: %s' % codec)
 
         # write data in binary to file 10 times
         writer = open(FILENAME, 'wb')
@@ -93,30 +104,30 @@ class TestDataFile(unittest.TestCase):
         for datum in dfr:
           round_trip_data.append(datum)
 
-        print 'Round Trip Data: %s' % round_trip_data
-        print 'Round Trip Data Length: %d' % len(round_trip_data)
+        print('Round Trip Data: %s' % round_trip_data)
+        print('Round Trip Data Length: %d' % len(round_trip_data))
         is_correct = [datum] * 10 == round_trip_data
         if is_correct: correct += 1
-        print 'Correct Round Trip: %s' % is_correct
-        print ''
+        print('Correct Round Trip: %s' % is_correct)
+        print('')
     os.remove(FILENAME)
     self.assertEquals(correct, len(CODECS_TO_VALIDATE)*len(SCHEMAS_TO_VALIDATE))
 
   def test_append(self):
-    print ''
-    print 'TEST APPEND'
-    print '==========='
-    print ''
+    print('')
+    print('TEST APPEND')
+    print('===========')
+    print('')
     correct = 0
     for i, (example_schema, datum) in enumerate(SCHEMAS_TO_VALIDATE):
       for codec in CODECS_TO_VALIDATE:
-        print ''
-        print 'SCHEMA NUMBER %d' % (i + 1)
-        print '================'
-        print ''
-        print 'Schema: %s' % example_schema
-        print 'Datum: %s' % datum
-        print 'Codec: %s' % codec
+        print('')
+        print('SCHEMA NUMBER %d' % (i + 1))
+        print('================')
+        print('')
+        print('Schema: %s' % example_schema)
+        print('Datum: %s' % datum)
+        print('Codec: %s' % codec)
 
         # write data in binary to file once
         writer = open(FILENAME, 'wb')
@@ -141,12 +152,12 @@ class TestDataFile(unittest.TestCase):
         for datum in dfr:
           appended_data.append(datum)
 
-        print 'Appended Data: %s' % appended_data
-        print 'Appended Data Length: %d' % len(appended_data)
+        print('Appended Data: %s' % appended_data)
+        print('Appended Data Length: %d' % len(appended_data))
         is_correct = [datum] * 10 == appended_data
         if is_correct: correct += 1
-        print 'Correct Appended: %s' % is_correct
-        print ''
+        print('Correct Appended: %s' % is_correct)
+        print('')
     os.remove(FILENAME)
     self.assertEquals(correct, len(CODECS_TO_VALIDATE)*len(SCHEMAS_TO_VALIDATE))
 
diff --git a/lang/py/test/test_datafile_interop.py b/lang/py/test/test_datafile_interop.py
index ee02f99..329b9a1 100644
--- a/lang/py/test/test_datafile_interop.py
+++ b/lang/py/test/test_datafile_interop.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -5,14 +8,17 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 # https://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+from __future__ import absolute_import, division, print_function
+
 import os
 import unittest
 
@@ -22,20 +28,27 @@ from avro import datafile, io
 
 class TestDataFileInterop(unittest.TestCase):
   def test_interop(self):
-    print ''
-    print 'TEST INTEROP'
-    print '============'
-    print ''
+    print()
+    print('TEST INTEROP')
+    print('============')
+    print()
     for f in os.listdir('@INTEROP_DATA_DIR@'):
-      print 'READING %s' % f
-      print ''
-
-      # read data in binary from file
-      reader = open(os.path.join('@INTEROP_DATA_DIR@', f), 'rb')
-      datum_reader = io.DatumReader()
-      dfr = datafile.DataFileReader(reader, datum_reader)
-      for datum in dfr:
-        assert datum is not None
+      base_ext = os.path.splitext(os.path.basename(f))[0].split('_', 1)
+      if len(base_ext) < 2 or base_ext[1] in datafile.VALID_CODECS:
+        print('READING %s' % f)
+        print('')
+
+        # read data in binary from file
+        reader = open(os.path.join('@INTEROP_DATA_DIR@', f), 'rb')
+        datum_reader = io.DatumReader()
+        dfr = datafile.DataFileReader(reader, datum_reader)
+        i = 0
+        for i, datum in enumerate(dfr, 1):
+          assert datum is not None
+        assert i > 0
+      else:
+        print('SKIPPING %s due to an unsupported codec' % f)
+        print('')
 
 if __name__ == '__main__':
   unittest.main()
diff --git a/lang/py/test/test_io.py b/lang/py/test/test_io.py
index 533aa40..2d734a5 100644
--- a/lang/py/test/test_io.py
+++ b/lang/py/test/test_io.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -5,14 +8,17 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 # https://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+from __future__ import absolute_import, division, print_function
+
 import datetime
 import unittest
 from binascii import hexlify
@@ -27,7 +33,6 @@ except ImportError:
   from StringIO import StringIO
 
 
-
 SCHEMAS_TO_VALIDATE = (
   ('"null"', None),
   ('"boolean"', True),
@@ -112,7 +117,7 @@ DEFAULT_VALUE_EXAMPLES = (
   ('"string"', '"foo"', u'foo'),
   ('"bytes"', '"\u00FF\u00FF"', u'\xff\xff'),
   ('"int"', '5', 5),
-  ('"long"', '5', 5L),
+  ('"long"', '5', 5),
   ('"float"', '1.1', 1.1),
   ('"double"', '1.1', 1.1),
   ('{"type": "fixed", "name": "F", "size": 2}', '"\u00FF\u00FF"', u'\xff\xff'),
@@ -148,10 +153,10 @@ def avro_hexlify(reader):
   return ' '.join(bytes)
 
 def print_test_name(test_name):
-  print ''
-  print test_name
-  print '=' * len(test_name)
-  print ''
+  print('')
+  print(test_name)
+  print('=' * len(test_name))
+  print('')
 
 def write_datum(datum, writers_schema):
   writer = StringIO()
@@ -170,17 +175,17 @@ def check_binary_encoding(number_type):
   print_test_name('TEST BINARY %s ENCODING' % number_type.upper())
   correct = 0
   for datum, hex_encoding in BINARY_ENCODINGS:
-    print 'Datum: %d' % datum
-    print 'Correct Encoding: %s' % hex_encoding
+    print('Datum: %d' % datum)
+    print('Correct Encoding: %s' % hex_encoding)
 
     writers_schema = schema.parse('"%s"' % number_type.lower())
     writer, encoder, datum_writer = write_datum(datum, writers_schema)
     writer.seek(0)
     hex_val = avro_hexlify(writer)
 
-    print 'Read Encoding: %s' % hex_val
+    print('Read Encoding: %s' % hex_val)
     if hex_encoding == hex_val: correct += 1
-    print ''
+    print('')
   return correct
 
 def check_skip_number(number_type):
@@ -188,7 +193,7 @@ def check_skip_number(number_type):
   correct = 0
   for value_to_skip, hex_encoding in BINARY_ENCODINGS:
     VALUE_TO_READ = 6253
-    print 'Value to Skip: %d' % value_to_skip
+    print('Value to Skip: %d' % value_to_skip)
 
     # write the value to skip and a known value
     writers_schema = schema.parse('"%s"' % number_type.lower())
@@ -204,11 +209,11 @@ def check_skip_number(number_type):
     datum_reader = io.DatumReader(writers_schema)
     read_value = datum_reader.read(decoder)
 
-    print 'Read Value: %d' % read_value
+    print('Read Value: %d' % read_value)
     if read_value == VALUE_TO_READ: correct += 1
-    print ''
+    print('')
   return correct
-    
+
 class TestIO(unittest.TestCase):
   #
   # BASIC FUNCTIONALITY
@@ -218,10 +223,10 @@ class TestIO(unittest.TestCase):
     print_test_name('TEST VALIDATE')
     passed = 0
     for example_schema, datum in SCHEMAS_TO_VALIDATE:
-      print 'Schema: %s' % example_schema
-      print 'Datum: %s' % datum
+      print('Schema: %s' % example_schema)
+      print('Datum: %s' % datum)
       validated = io.validate(schema.parse(example_schema), datum)
-      print 'Valid: %s' % validated
+      print('Valid: %s' % validated)
       if validated: passed += 1
     self.assertEquals(passed, len(SCHEMAS_TO_VALIDATE))
 
@@ -229,14 +234,14 @@ class TestIO(unittest.TestCase):
     print_test_name('TEST ROUND TRIP')
     correct = 0
     for example_schema, datum in SCHEMAS_TO_VALIDATE:
-      print 'Schema: %s' % example_schema
-      print 'Datum: %s' % datum
+      print('Schema: %s' % example_schema)
+      print('Datum: %s' % datum)
 
       writers_schema = schema.parse(example_schema)
       writer, encoder, datum_writer = write_datum(datum, writers_schema)
       round_trip_datum = read_datum(writer, writers_schema)
 
-      print 'Round Trip Datum: %s' % round_trip_datum
+      print('Round Trip Datum: %s' % round_trip_datum)
       if isinstance(round_trip_datum, Decimal):
         round_trip_datum = round_trip_datum.to_eng_string()
         datum = str(datum)
@@ -283,8 +288,8 @@ class TestIO(unittest.TestCase):
         readers_schema = schema.parse(rs)
         writer, enc, dw = write_datum(datum_to_write, writers_schema)
         datum_read = read_datum(writer, writers_schema, readers_schema)
-        print 'Writer: %s Reader: %s' % (writers_schema, readers_schema)
-        print 'Datum Read: %s' % datum_read
+        print('Writer: %s Reader: %s' % (writers_schema, readers_schema))
+        print('Datum Read: %s' % datum_read)
         if datum_read != datum_to_write: incorrect += 1
     self.assertEquals(incorrect, 0)
 
@@ -320,7 +325,7 @@ class TestIO(unittest.TestCase):
 
       writer, encoder, datum_writer = write_datum(datum_to_write, writers_schema)
       datum_read = read_datum(writer, writers_schema, readers_schema)
-      print 'Datum Read: %s' % datum_read
+      print('Datum Read: %s' % datum_read)
       if datum_to_read == datum_read: correct += 1
     self.assertEquals(correct, len(DEFAULT_VALUE_EXAMPLES))
 
@@ -352,7 +357,7 @@ class TestIO(unittest.TestCase):
 
     writer, encoder, datum_writer = write_datum(datum_to_write, writers_schema)
     datum_read = read_datum(writer, writers_schema, readers_schema)
-    print 'Datum Read: %s' % datum_read
+    print('Datum Read: %s' % datum_read)
     self.assertEquals(datum_to_read, datum_read)
 
   def test_field_order(self):
@@ -368,7 +373,7 @@ class TestIO(unittest.TestCase):
 
     writer, encoder, datum_writer = write_datum(datum_to_write, writers_schema)
     datum_read = read_datum(writer, writers_schema, readers_schema)
-    print 'Datum Read: %s' % datum_read
+    print('Datum Read: %s' % datum_read)
     self.assertEquals(datum_to_read, datum_read)
 
   def test_type_exception(self):
diff --git a/lang/py/test/test_ipc.py b/lang/py/test/test_ipc.py
index 575a0c9..bc9bd21 100644
--- a/lang/py/test/test_ipc.py
+++ b/lang/py/test/test_ipc.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -13,10 +16,14 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 """
 There are currently no IPC tests within python, in part because there are no
 servers yet available.
 """
+
+from __future__ import absolute_import, division, print_function
+
 import unittest
 
 import set_avro_test_path
diff --git a/lang/py/test/test_script.py b/lang/py/test/test_script.py
index 214fc15..bd0cb4d 100644
--- a/lang/py/test/test_script.py
+++ b/lang/py/test/test_script.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -5,15 +8,17 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 # https://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from __future__ import absolute_import, division, print_function
+
 import csv
 import json
 import unittest
@@ -125,7 +130,7 @@ class TestCat(unittest.TestCase):
 
     def test_json_pretty(self):
         out = self._run("--format", "json-pretty", "-n", "1", raw=1)
-        assert out.strip() == _JSON_PRETTY.strip()
+        self.assertEqual(out.strip(), _JSON_PRETTY.strip())
 
     def test_version(self):
         check_output([SCRIPT, "cat", "--version"])
diff --git a/lang/py/test/test_tether_task.py b/lang/py/test/test_tether_task.py
index 9933070..85ed9cb 100644
--- a/lang/py/test/test_tether_task.py
+++ b/lang/py/test/test_tether_task.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -14,15 +17,22 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
+from __future__ import absolute_import, division, print_function
 
 import os
+import StringIO
 import subprocess
 import sys
 import time
 import unittest
 
+import avro.tether.tether_task
+import avro.tether.util
+import mock_tether_parent
 import set_avro_test_path
+from avro import io as avio
+from avro import schema, tether
+from word_count_task import WordCountTask
 
 
 class TestTetherTask(unittest.TestCase):
@@ -34,15 +44,6 @@ class TestTetherTask(unittest.TestCase):
     Test that the thether_task is working. We run the mock_tether_parent in a separate
     subprocess
     """
-    from avro import tether
-    from avro import io as avio
-    from avro import schema
-    from avro.tether import HTTPRequestor,inputProtocol, find_port
-
-    import StringIO
-    import mock_tether_parent
-    from word_count_task import WordCountTask
-
     task=WordCountTask()
 
     proc=None
@@ -51,13 +52,13 @@ class TestTetherTask(unittest.TestCase):
       # env["AVRO_TETHER_OUTPUT_PORT"]=output_port
       env=dict()
       env["PYTHONPATH"]=':'.join(sys.path)
-      server_port=find_port()
+      server_port = avro.tether.util.find_port()
 
       pyfile=mock_tether_parent.__file__
       proc=subprocess.Popen(["python", pyfile,"start_server","{0}".format(server_port)])
-      input_port=find_port()
+      input_port = avro.tether.util.find_port()
 
-      print "Mock server started process pid={0}".format(proc.pid)
+      print("Mock server started process pid={0}".format(proc.pid))
       # Possible race condition? open tries to connect to the subprocess before the subprocess is fully started
       # so we give the subprocess time to start up
       time.sleep(1)
@@ -68,7 +69,11 @@ class TestTetherTask(unittest.TestCase):
 
       #***************************************************************
       # Test the mapper
-      task.configure(tether.TaskType.MAP,str(task.inschema),str(task.midschema))
+      task.configure(
+        avro.tether.tether_task.TaskType.MAP,
+        str(task.inschema),
+        str(task.midschema)
+      )
 
       # Serialize some data so we can send it to the input function
       datum="This is a line of text"
@@ -84,7 +89,11 @@ class TestTetherTask(unittest.TestCase):
       task.input(data,1)
 
       # Test the reducer
-      task.configure(tether.TaskType.REDUCE,str(task.midschema),str(task.outschema))
+      task.configure(
+        avro.tether.tether_task.TaskType.REDUCE,
+        str(task.midschema),
+        str(task.outschema)
+      )
 
       # Serialize some data so we can send it to the input function
       datum={"key":"word","value":2}
diff --git a/lang/py/test/test_tether_task_runner.py b/lang/py/test/test_tether_task_runner.py
index 3832dbe..985eb3c 100644
--- a/lang/py/test/test_tether_task_runner.py
+++ b/lang/py/test/test_tether_task_runner.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -14,28 +17,29 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from __future__ import absolute_import, division, print_function
+
+import logging
 import os
+import StringIO
 import subprocess
 import sys
 import time
 import unittest
 
+import avro.tether.tether_task
+import avro.tether.tether_task_runner
+import avro.tether.util
+import mock_tether_parent
 import set_avro_test_path
+from avro import io as avio
+from word_count_task import WordCountTask
 
 
 class TestTetherTaskRunner(unittest.TestCase):
-  """ unit test for a tethered task runner.
-  """
+  """unit test for a tethered task runner."""
 
   def test1(self):
-    from word_count_task import WordCountTask
-    from avro.tether import TaskRunner, find_port,HTTPRequestor,inputProtocol, TaskType
-    from avro import io as avio
-    import mock_tether_parent
-    import subprocess
-    import StringIO
-    import logging
-
     # set the logging level to debug so that debug messages are printed
     logging.basicConfig(level=logging.DEBUG)
 
@@ -44,30 +48,34 @@ class TestTetherTaskRunner(unittest.TestCase):
       # launch the server in a separate process
       env=dict()
       env["PYTHONPATH"]=':'.join(sys.path)
-      parent_port=find_port()
+      parent_port = avro.tether.util.find_port()
 
       pyfile=mock_tether_parent.__file__
       proc=subprocess.Popen(["python", pyfile,"start_server","{0}".format(parent_port)])
-      input_port=find_port()
+      input_port = avro.tether.util.find_port()
 
-      print "Mock server started process pid={0}".format(proc.pid)
+      print("Mock server started process pid={0}".format(proc.pid))
       # Possible race condition? open tries to connect to the subprocess before the subprocess is fully started
       # so we give the subprocess time to start up
       time.sleep(1)
 
-      runner=TaskRunner(WordCountTask())
+      runner = avro.tether.tether_task_runner.TaskRunner(WordCountTask())
 
       runner.start(outputport=parent_port,join=False)
 
-      # Test sending various messages to the server and ensuring they are
-      # processed correctly
-      requestor=HTTPRequestor("localhost",runner.server.server_address[1],inputProtocol)
+      # Test sending various messages to the server and ensuring they are processed correctly
+      requestor = avro.tether.tether_task.HTTPRequestor(
+          "localhost", runner.server.server_address[1], avro.tether.tether_task.inputProtocol)
 
       # TODO: We should validate that open worked by grabbing the STDOUT of the subproces
       # and ensuring that it outputted the correct message.
 
       # Test the mapper
-      requestor.request("configure",{"taskType":TaskType.MAP,"inSchema":str(runner.task.inschema),"outSchema":str(runner.task.midschema)})
+      requestor.request("configure", {
+        "taskType": avro.tether.tether_task.TaskType.MAP,
+        "inSchema": str(runner.task.inschema),
+        "outSchema": str(runner.task.midschema)
+      })
 
       # Serialize some data so we can send it to the input function
       datum="This is a line of text"
@@ -83,8 +91,12 @@ class TestTetherTaskRunner(unittest.TestCase):
       # Call input to simulate calling map
       requestor.request("input",{"data":data,"count":1})
 
-      #Test the reducer
-      requestor.request("configure",{"taskType":TaskType.REDUCE,"inSchema":str(runner.task.midschema),"outSchema":str(runner.task.outschema)})
+      # Test the reducer
+      requestor.request("configure", {
+        "taskType": avro.tether.tether_task.TaskType.REDUCE,
+        "inSchema": str(runner.task.midschema),
+        "outSchema": str(runner.task.outschema)}
+      )
 
       #Serialize some data so we can send it to the input function
       datum={"key":"word","value":2}
@@ -133,15 +145,6 @@ class TestTetherTaskRunner(unittest.TestCase):
     as our main script everything works as expected. We do this by using subprocess to run it
     in a separate thread.
     """
-    from word_count_task import WordCountTask
-    from avro.tether import TaskRunner, find_port,HTTPRequestor,inputProtocol, TaskType
-    from avro.tether import tether_task_runner
-    from avro import io as avio
-    import mock_tether_parent
-    import subprocess
-    import StringIO
-
-
     proc=None
 
     runnerproc=None
@@ -149,7 +152,7 @@ class TestTetherTaskRunner(unittest.TestCase):
       #launch the server in a separate process
       env=dict()
       env["PYTHONPATH"]=':'.join(sys.path)
-      parent_port=find_port()
+      parent_port = avro.tether.util.find_port()
 
       pyfile=mock_tether_parent.__file__
       proc=subprocess.Popen(["python", pyfile,"start_server","{0}".format(parent_port)])
@@ -164,14 +167,14 @@ class TestTetherTaskRunner(unittest.TestCase):
       env={"AVRO_TETHER_OUTPUT_PORT":"{0}".format(parent_port)}
       env["PYTHONPATH"]=':'.join(sys.path)
 
-      runnerproc=subprocess.Popen(["python",tether_task_runner.__file__,"word_count_task.WordCountTask"],env=env)
+      runnerproc = subprocess.Popen(["python", avro.tether.tether_task_runner.__file__, "word_count_task.WordCountTask"],env=env)
 
       #possible race condition wait for the process to start
       time.sleep(1)
 
 
 
-      print "Mock server started process pid={0}".format(proc.pid)
+      print("Mock server started process pid={0}".format(proc.pid))
       #Possible race condition? open tries to connect to the subprocess before the subprocess is fully started
       #so we give the subprocess time to start up
       time.sleep(1)
diff --git a/lang/py/test/test_tether_word_count.py b/lang/py/test/test_tether_word_count.py
index d2f1858..8c3fb08 100644
--- a/lang/py/test/test_tether_word_count.py
+++ b/lang/py/test/test_tether_word_count.py
@@ -1,3 +1,6 @@
+#!/usr/bin/env python
+
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -14,6 +17,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from __future__ import absolute_import, division, print_function
+
 import inspect
 import os
 import subprocess
@@ -25,8 +30,7 @@ import set_avro_test_path
 
 
 class TestTetherWordCount(unittest.TestCase):
-  """ unittest for a python tethered map-reduce job.
-  """
+  """unittest for a python tethered map-reduce job."""
 
   def _write_lines(self,lines,fname):
     """
@@ -72,7 +76,7 @@ class TestTetherWordCount(unittest.TestCase):
       words=line.split()
 
       for w in words:
-        if not(counts.has_key(w.strip())):
+        if not(w.strip() in counts):
           counts[w.strip()]=0
 
         counts[w.strip()]=counts[w.strip()]+1
@@ -92,7 +96,6 @@ class TestTetherWordCount(unittest.TestCase):
     import avro
 
     import subprocess
-    import StringIO
     import shutil
     import tempfile
     import inspect
@@ -182,11 +185,11 @@ python -m avro.tether.tether_task_runner word_count_task.WordCountTask
       exhf.close()
 
       # make it world executable
-      os.chmod(exfile,0755)
+      os.chmod(exfile,0o755)
 
       args.extend(["--program",exfile])
 
-      print "Command:\n\t{0}".format(" ".join(args))
+      print("Command:\n\t{0}".format(" ".join(args)))
       proc=subprocess.Popen(args)
 
 
diff --git a/lang/py/test/txsample_http_client.py b/lang/py/test/txsample_http_client.py
index dba4ade..28c2c28 100644
--- a/lang/py/test/txsample_http_client.py
+++ b/lang/py/test/txsample_http_client.py
@@ -1,5 +1,6 @@
 #!/usr/bin/env python
 
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -13,8 +14,12 @@
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+from __future__ import absolute_import, division, print_function
+
 import sys
 
 from avro import protocol, txipc
diff --git a/lang/py/test/txsample_http_server.py b/lang/py/test/txsample_http_server.py
index 604ef54..fafaecd 100644
--- a/lang/py/test/txsample_http_server.py
+++ b/lang/py/test/txsample_http_server.py
@@ -1,5 +1,6 @@
 #!/usr/bin/env python
 
+##
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -16,6 +17,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from __future__ import absolute_import, division, print_function
+
 from avro import ipc, protocol, txipc
 from twisted.internet import reactor
 from twisted.web import server
diff --git a/lang/py/test/word_count_task.py b/lang/py/test/word_count_task.py
index 24f2da2..8181340 100644
--- a/lang/py/test/word_count_task.py
+++ b/lang/py/test/word_count_task.py
@@ -1,33 +1,36 @@
-"""
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     https://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-"""
-
-__all__=["WordCountTask"]
+#!/usr/bin/env python
+
+##
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import, division, print_function
 
 import logging
 
-from avro.tether import TetherTask
+import avro.tether.tether_task
+
+__all__ = ["WordCountTask"]
 
 
 #TODO::Make the logging level a parameter we can set
 #logging.basicConfig(level=logging.INFO)
-class WordCountTask(TetherTask):
+class WordCountTask(avro.tether.tether_task.TetherTask):
   """
-  Implements the mappper and reducer for the word count example
+  Implements the mapper and reducer for the word count example
   """
 
   def __init__(self):
@@ -40,7 +43,7 @@ class WordCountTask(TetherTask):
               {"name":"value","type":"long","order":"ignore"}]
               }"""
     outschema=midschema
-    TetherTask.__init__(self,inschema,midschema,outschema)
+    avro.tether.tether_task.TetherTask.__init__(self, inschema, midschema, outschema)
 
 
     #keep track of the partial sums of the counts


Mime
View raw message