AVRO-1880: Futurize Py2 via BytesIO (#720)

* Replace buffer instead of truncating because bytesio.truncate does not seek.
diff --git a/lang/py/src/avro/datafile.py b/lang/py/src/avro/datafile.py
index 0d29a6a..964a049 100644
--- a/lang/py/src/avro/datafile.py
+++ b/lang/py/src/avro/datafile.py
@@ -21,17 +21,15 @@
 
 from __future__ import absolute_import, division, print_function
 
+import io
 import os
 import random
 import zlib
 
-from avro import io, schema
+import avro.io
+import avro.schema
 
 try:
-  from cStringIO import StringIO
-except ImportError:
-  from StringIO import StringIO
-try:
   import snappy
   has_snappy = True
 except ImportError:
@@ -50,7 +48,7 @@
 MAGIC_SIZE = len(MAGIC)
 SYNC_SIZE = 16
 SYNC_INTERVAL = 4000 * SYNC_SIZE # TODO(hammer): make configurable
-META_SCHEMA = schema.parse("""\
+META_SCHEMA = avro.schema.parse("""\
 {"type": "record", "name": "org.apache.avro.file.Header",
  "fields" : [
    {"name": "magic", "type": {"type": "fixed", "name": "magic", "size": %d}},
@@ -71,12 +69,12 @@
 # Exceptions
 #
 
-class DataFileException(schema.AvroException):
+class DataFileException(avro.schema.AvroException):
   """
   Raised when there's a problem reading or writing file object containers.
   """
   def __init__(self, fail_msg):
-    schema.AvroException.__init__(self, fail_msg)
+    avro.schema.AvroException.__init__(self, fail_msg)
 
 #
 # Write Path
@@ -95,10 +93,10 @@
     @param writer: File-like object to write into.
     """
     self._writer = writer
-    self._encoder = io.BinaryEncoder(writer)
+    self._encoder = avro.io.BinaryEncoder(writer)
     self._datum_writer = datum_writer
-    self._buffer_writer = StringIO()
-    self._buffer_encoder = io.BinaryEncoder(self._buffer_writer)
+    self._buffer_writer = io.BytesIO()
+    self._buffer_encoder = avro.io.BinaryEncoder(self._buffer_writer)
     self._block_count = 0
     self._meta = {}
     self._header_written = False
@@ -112,7 +110,7 @@
       self.datum_writer.writers_schema = writers_schema
     else:
       # open writer for reading to collect metadata
-      dfr = DataFileReader(writer, io.DatumReader())
+      dfr = DataFileReader(writer, avro.io.DatumReader())
 
       # TODO(hammer): collect arbitrary metadata
       # collect metadata
@@ -122,7 +120,7 @@
       # get schema used to write existing file
       schema_from_file = dfr.get_meta('avro.schema')
       self.set_meta('avro.schema', schema_from_file)
-      self.datum_writer.writers_schema = schema.parse(schema_from_file)
+      self.datum_writer.writers_schema = avro.schema.parse(schema_from_file)
 
       # seek to the end of the file and prepare for writing
       writer.seek(0, 2)
@@ -243,7 +241,7 @@
   # TODO(hammer): allow user to specify the encoder
   def __init__(self, reader, datum_reader):
     self._reader = reader
-    self._raw_decoder = io.BinaryDecoder(reader)
+    self._raw_decoder = avro.io.BinaryDecoder(reader)
     self._datum_decoder = None # Maybe reset at every block.
     self._datum_reader = datum_reader
 
@@ -262,7 +260,7 @@
 
     # get ready to read
     self._block_count = 0
-    self.datum_reader.writers_schema = schema.parse(self.get_meta(SCHEMA_KEY))
+    self.datum_reader.writers_schema = avro.schema.parse(self.get_meta(SCHEMA_KEY))
 
   def __enter__(self):
     return self
@@ -320,7 +318,7 @@
     if header.get('magic') != MAGIC:
       fail_msg = "Not an Avro data file: %s doesn't match %s."\
                  % (header.get('magic'), MAGIC)
-      raise schema.AvroException(fail_msg)
+      raise avro.schema.AvroException(fail_msg)
 
     # set metadata
     self._meta = header['meta']
@@ -341,26 +339,26 @@
       # -15 is the log of the window size; negative indicates
       # "raw" (no zlib headers) decompression.  See zlib.h.
       uncompressed = zlib.decompress(data, -15)
-      self._datum_decoder = io.BinaryDecoder(StringIO(uncompressed))
+      self._datum_decoder = avro.io.BinaryDecoder(io.BytesIO(uncompressed))
     elif self.codec == 'snappy':
       # Compressed data includes a 4-byte CRC32 checksum
       length = self.raw_decoder.read_long()
       data = self.raw_decoder.read(length - 4)
       uncompressed = snappy.decompress(data)
-      self._datum_decoder = io.BinaryDecoder(StringIO(uncompressed))
+      self._datum_decoder = avro.io.BinaryDecoder(io.BytesIO(uncompressed))
       self.raw_decoder.check_crc32(uncompressed);
     elif self.codec == 'zstandard':
       length = self.raw_decoder.read_long()
       data = self.raw_decoder.read(length)
       uncompressed = bytearray()
       dctx = zstd.ZstdDecompressor()
-      with dctx.stream_reader(StringIO(data)) as reader:
+      with dctx.stream_reader(io.BytesIO(data)) as reader:
         while True:
           chunk = reader.read(16384)
           if not chunk:
             break
           uncompressed.extend(chunk)
-      self._datum_decoder = io.BinaryDecoder(StringIO(uncompressed))
+      self._datum_decoder = avro.io.BinaryDecoder(io.BytesIO(uncompressed))
     else:
       raise DataFileException("Unknown codec: %r" % self.codec)
 
diff --git a/lang/py/src/avro/ipc.py b/lang/py/src/avro/ipc.py
index 48307ac..e21245e 100644
--- a/lang/py/src/avro/ipc.py
+++ b/lang/py/src/avro/ipc.py
@@ -22,13 +22,10 @@
 from __future__ import absolute_import, division, print_function
 
 import httplib
+import io
 
-from avro import io, protocol, schema
-
-try:
-  from cStringIO import StringIO
-except ImportError:
-  from StringIO import StringIO
+import avro.io
+from avro import protocol, schema
 
 #
 # Constants
@@ -43,14 +40,14 @@
 @HANDSHAKE_RESPONSE_SCHEMA@
 """)
 
-HANDSHAKE_REQUESTOR_WRITER = io.DatumWriter(HANDSHAKE_REQUEST_SCHEMA)
-HANDSHAKE_REQUESTOR_READER = io.DatumReader(HANDSHAKE_RESPONSE_SCHEMA)
-HANDSHAKE_RESPONDER_WRITER = io.DatumWriter(HANDSHAKE_RESPONSE_SCHEMA)
-HANDSHAKE_RESPONDER_READER = io.DatumReader(HANDSHAKE_REQUEST_SCHEMA)
+HANDSHAKE_REQUESTOR_WRITER = avro.io.DatumWriter(HANDSHAKE_REQUEST_SCHEMA)
+HANDSHAKE_REQUESTOR_READER = avro.io.DatumReader(HANDSHAKE_RESPONSE_SCHEMA)
+HANDSHAKE_RESPONDER_WRITER = avro.io.DatumWriter(HANDSHAKE_RESPONSE_SCHEMA)
+HANDSHAKE_RESPONDER_READER = avro.io.DatumReader(HANDSHAKE_REQUEST_SCHEMA)
 
 META_SCHEMA = schema.parse('{"type": "map", "values": "bytes"}')
-META_WRITER = io.DatumWriter(META_SCHEMA)
-META_READER = io.DatumReader(META_SCHEMA)
+META_WRITER = avro.io.DatumWriter(META_SCHEMA)
+META_READER = avro.io.DatumReader(META_SCHEMA)
 
 SYSTEM_ERROR_SCHEMA = schema.parse('["string"]')
 
@@ -58,7 +55,7 @@
 REMOTE_HASHES = {}
 REMOTE_PROTOCOLS = {}
 
-BIG_ENDIAN_INT_STRUCT = io.struct_class('!I')
+BIG_ENDIAN_INT_STRUCT = avro.io.struct_class('!I')
 BUFFER_HEADER_LENGTH = 4
 BUFFER_SIZE = 8192
 
@@ -114,8 +111,8 @@
     Writes a request message and reads a response or error message.
     """
     # build handshake and call request
-    buffer_writer = StringIO()
-    buffer_encoder = io.BinaryEncoder(buffer_writer)
+    buffer_writer = io.BytesIO()
+    buffer_encoder = avro.io.BinaryEncoder(buffer_writer)
     self.write_handshake_request(buffer_encoder)
     self.write_call_request(message_name, request_datum, buffer_encoder)
 
@@ -159,7 +156,7 @@
     self.write_request(message.request, request_datum, encoder)
 
   def write_request(self, request_schema, request_datum, encoder):
-    datum_writer = io.DatumWriter(request_schema)
+    datum_writer = avro.io.DatumWriter(request_schema)
     datum_writer.write(request_datum, encoder)
 
   def read_handshake_response(self, decoder):
@@ -221,12 +218,12 @@
       raise self.read_error(writers_schema, readers_schema, decoder)
 
   def read_response(self, writers_schema, readers_schema, decoder):
-    datum_reader = io.DatumReader(writers_schema, readers_schema)
+    datum_reader = avro.io.DatumReader(writers_schema, readers_schema)
     result = datum_reader.read(decoder)
     return result
 
   def read_error(self, writers_schema, readers_schema, decoder):
-    datum_reader = io.DatumReader(writers_schema, readers_schema)
+    datum_reader = avro.io.DatumReader(writers_schema, readers_schema)
     return AvroRemoteException(datum_reader.read(decoder))
 
 class Requestor(BaseRequestor):
@@ -235,7 +232,7 @@
     call_response = self.transceiver.transceive(call_request)
 
     # process the handshake and call response
-    buffer_decoder = io.BinaryDecoder(StringIO(call_response))
+    buffer_decoder = avro.io.BinaryDecoder(io.BytesIO(call_response))
     call_response_exists = self.read_handshake_response(buffer_decoder)
     if call_response_exists:
       return self.read_call_response(message_name, buffer_decoder)
@@ -266,10 +263,10 @@
     Called by a server to deserialize a request, compute and serialize
     a response or error. Compare to 'handle()' in Thrift.
     """
-    buffer_reader = StringIO(call_request)
-    buffer_decoder = io.BinaryDecoder(buffer_reader)
-    buffer_writer = StringIO()
-    buffer_encoder = io.BinaryEncoder(buffer_writer)
+    buffer_reader = io.BytesIO(call_request)
+    buffer_decoder = avro.io.BinaryDecoder(buffer_reader)
+    buffer_writer = io.BytesIO()
+    buffer_encoder = avro.io.BinaryEncoder(buffer_writer)
     error = None
     response_metadata = {}
 
@@ -317,7 +314,7 @@
         self.write_error(writers_schema, error, buffer_encoder)
     except schema.AvroException as e:
       error = AvroRemoteException(str(e))
-      buffer_encoder = io.BinaryEncoder(StringIO())
+      buffer_encoder = avro.io.BinaryEncoder(io.BytesIO())
       META_WRITER.write(response_metadata, buffer_encoder)
       buffer_encoder.write_boolean(True)
       self.write_error(SYSTEM_ERROR_SCHEMA, error, buffer_encoder)
@@ -362,15 +359,15 @@
     pass
 
   def read_request(self, writers_schema, readers_schema, decoder):
-    datum_reader = io.DatumReader(writers_schema, readers_schema)
+    datum_reader = avro.io.DatumReader(writers_schema, readers_schema)
     return datum_reader.read(decoder)
 
   def write_response(self, writers_schema, response_datum, encoder):
-    datum_writer = io.DatumWriter(writers_schema)
+    datum_writer = avro.io.DatumWriter(writers_schema)
     datum_writer.write(response_datum, encoder)
 
   def write_error(self, writers_schema, error_exception, encoder):
-    datum_writer = io.DatumWriter(writers_schema)
+    datum_writer = avro.io.DatumWriter(writers_schema)
     datum_writer.write(str(error_exception), encoder)
 
 #
@@ -388,7 +385,7 @@
   def read_framed_message(self):
     message = []
     while True:
-      buffer = StringIO()
+      buffer = io.BytesIO()
       buffer_length = self._read_buffer_length()
       if buffer_length == 0:
         return ''.join(message)
@@ -466,7 +463,7 @@
     req_method = 'POST'
     req_headers = {'Content-Type': 'avro/binary'}
 
-    req_body_buffer = FramedWriter(StringIO())
+    req_body_buffer = FramedWriter(io.BytesIO())
     req_body_buffer.write_framed_message(message)
     req_body = req_body_buffer.writer.getvalue()
 
diff --git a/lang/py/src/avro/tether/tether_task.py b/lang/py/src/avro/tether/tether_task.py
index 4e2004d..262bf67 100644
--- a/lang/py/src/avro/tether/tether_task.py
+++ b/lang/py/src/avro/tether/tether_task.py
@@ -20,15 +20,14 @@
 from __future__ import absolute_import, division, print_function
 
 import collections
-import io as pyio
+import io
 import logging
 import os
 import sys
 import threading
 import traceback
-from StringIO import StringIO
 
-from avro import io as avio
+import avro.io
 from avro import ipc, protocol, schema
 
 __all__ = ["TetherTask", "TaskType", "inputProtocol", "outputProtocol", "HTTPRequestor"]
@@ -88,10 +87,8 @@
       raise ValueError("output client can't be none.")
 
     self.scheme=scheme
-    self.buff=StringIO()
-    self.encoder=avio.BinaryEncoder(self.buff)
 
-    self.datum_writer = avio.DatumWriter(writers_schema=self.scheme)
+    self.datum_writer = avro.io.DatumWriter(writers_schema=self.scheme)
     self.outputClient=outputClient
 
   def collect(self,record,partition=None):
@@ -103,25 +100,16 @@
     partition - Indicates the partition for a pre-partitioned map output
               - currently not supported
     """
+    # Replace the encoder and buffer every time we collect.
+    with io.BytesIO() as buff:
+      self.encoder = avro.io.BinaryEncoder(buff)
+      self.datum_writer.write(record, self.encoder)
+      value = buff.getvalue()
 
-    self.buff.truncate(0)
-    self.datum_writer.write(record, self.encoder);
-    self.buff.flush();
-    self.buff.seek(0)
-
-    # delete all the data in the buffer
-    if (partition is None):
-
-      # TODO: Is there a more efficient way to read the data in self.buff?
-      # we could use self.buff.read() but that returns the byte array as a string
-      # will that work?  We can also use self.buff.readinto to read it into
-      # a bytearray but the byte array must be pre-allocated
-      # self.outputClient.output(self.buff.buffer.read())
-
-      #its not a StringIO
-      self.outputClient.request("output",{"datum":self.buff.read()})
-    else:
-      self.outputClient.request("outputPartitioned",{"datum":self.buff.read(),"partition":partition})
+    datum = {"datum": value}
+    if partition is not None:
+      datum["partition"] = partition
+    self.outputClient.request("output", datum)
 
 
 
@@ -335,11 +323,11 @@
       outSchema = schema.parse(outSchemaText)
 
       if (taskType==TaskType.MAP):
-        self.inReader=avio.DatumReader(writers_schema=inSchema,readers_schema=self.inschema)
+        self.inReader=avro.io.DatumReader(writers_schema=inSchema,readers_schema=self.inschema)
         self.midCollector=Collector(outSchemaText,self.outputClient)
 
       elif(taskType==TaskType.REDUCE):
-        self.midReader=avio.DatumReader(writers_schema=inSchema,readers_schema=self.midschema)
+        self.midReader=avro.io.DatumReader(writers_schema=inSchema,readers_schema=self.midschema)
         # this.outCollector = new Collector<OUT>(outSchema);
         self.outCollector=Collector(outSchemaText,self.outputClient)
 
@@ -373,9 +361,9 @@
     count - how many input records are provided in the binary stream
     """
     try:
-      # to avio.BinaryDecoder
-      bdata=StringIO(data)
-      decoder = avio.BinaryDecoder(bdata)
+      # to avro.io.BinaryDecoder
+      bdata=io.BytesIO(data)
+      decoder = avro.io.BinaryDecoder(bdata)
 
       for i in range(count):
         if (self.taskType==TaskType.MAP):
diff --git a/lang/py/src/avro/txipc.py b/lang/py/src/avro/txipc.py
index 66ca726..b96f293 100644
--- a/lang/py/src/avro/txipc.py
+++ b/lang/py/src/avro/txipc.py
@@ -19,9 +19,12 @@
 
 from __future__ import absolute_import, division, print_function
 
+import io
+
 from zope.interface import implements
 
-from avro import io, ipc
+import avro.io
+from avro import ipc
 from twisted.internet.defer import Deferred, maybeDeferred
 from twisted.internet.protocol import Protocol
 from twisted.web import resource, server
@@ -29,18 +32,13 @@
 from twisted.web.http_headers import Headers
 from twisted.web.iweb import IBodyProducer
 
-try:
-  from cStringIO import StringIO
-except ImportError:
-  from StringIO import StringIO
-
 
 class TwistedRequestor(ipc.BaseRequestor):
   """A Twisted-compatible requestor. Returns a Deferred that will fire with the
      returning value, instead of blocking until the request completes."""
   def _process_handshake(self, call_response, message_name, request_datum):
     # process the handshake and call response
-    buffer_decoder = io.BinaryDecoder(StringIO(call_response))
+    buffer_decoder = avro.io.BinaryDecoder(io.BytesIO(call_response))
     call_response_exists = self.read_handshake_response(buffer_decoder)
     if call_response_exists:
       return self.read_call_response(message_name, buffer_decoder)
diff --git a/lang/py/test/test_io.py b/lang/py/test/test_io.py
index 0fbf958..93fa2b1 100644
--- a/lang/py/test/test_io.py
+++ b/lang/py/test/test_io.py
@@ -20,18 +20,14 @@
 from __future__ import absolute_import, division, print_function
 
 import datetime
+import io
 import unittest
 from binascii import hexlify
 from decimal import Decimal
 
+import avro.io
 import set_avro_test_path
-from avro import io, schema, timezones
-
-try:
-  from cStringIO import StringIO
-except ImportError:
-  from StringIO import StringIO
-
+from avro import schema, timezones
 
 SCHEMAS_TO_VALIDATE = (
   ('"null"', None),
@@ -162,16 +158,16 @@
   print('')
 
 def write_datum(datum, writers_schema):
-  writer = StringIO()
-  encoder = io.BinaryEncoder(writer)
-  datum_writer = io.DatumWriter(writers_schema)
+  writer = io.BytesIO()
+  encoder = avro.io.BinaryEncoder(writer)
+  datum_writer = avro.io.DatumWriter(writers_schema)
   datum_writer.write(datum, encoder)
   return writer, encoder, datum_writer
 
 def read_datum(buffer, writers_schema, readers_schema=None):
-  reader = StringIO(buffer.getvalue())
-  decoder = io.BinaryDecoder(reader)
-  datum_reader = io.DatumReader(writers_schema, readers_schema)
+  reader = io.BytesIO(buffer.getvalue())
+  decoder = avro.io.BinaryDecoder(reader)
+  datum_reader = avro.io.DatumReader(writers_schema, readers_schema)
   return datum_reader.read(decoder)
 
 def check_binary_encoding(number_type):
@@ -204,12 +200,12 @@
     datum_writer.write(VALUE_TO_READ, encoder)
 
     # skip the value
-    reader = StringIO(writer.getvalue())
-    decoder = io.BinaryDecoder(reader)
+    reader = io.BytesIO(writer.getvalue())
+    decoder = avro.io.BinaryDecoder(reader)
     decoder.skip_long()
 
     # read data from string buffer
-    datum_reader = io.DatumReader(writers_schema)
+    datum_reader = avro.io.DatumReader(writers_schema)
     read_value = datum_reader.read(decoder)
 
     print('Read Value: %d' % read_value)
@@ -228,7 +224,7 @@
     for example_schema, datum in SCHEMAS_TO_VALIDATE:
       print('Schema: %s' % example_schema)
       print('Datum: %s' % datum)
-      validated = io.validate(schema.parse(example_schema), datum)
+      validated = avro.io.validate(schema.parse(example_schema), datum)
       print('Valid: %s' % validated)
       if validated: passed += 1
     self.assertEquals(passed, len(SCHEMAS_TO_VALIDATE))
@@ -308,10 +304,10 @@
        "symbols": ["BAR", "BAZ"]}""")
 
     writer, encoder, datum_writer = write_datum(datum_to_write, writers_schema)
-    reader = StringIO(writer.getvalue())
-    decoder = io.BinaryDecoder(reader)
-    datum_reader = io.DatumReader(writers_schema, readers_schema)
-    self.assertRaises(io.SchemaResolutionException, datum_reader.read, decoder)
+    reader = io.BytesIO(writer.getvalue())
+    decoder = avro.io.BinaryDecoder(reader)
+    datum_reader = avro.io.DatumReader(writers_schema, readers_schema)
+    self.assertRaises(avro.io.SchemaResolutionException, datum_reader.read, decoder)
 
   def test_default_value(self):
     print_test_name('TEST DEFAULT VALUE')
@@ -342,10 +338,10 @@
        "fields": [{"name": "H", "type": "int"}]}""")
 
     writer, encoder, datum_writer = write_datum(datum_to_write, writers_schema)
-    reader = StringIO(writer.getvalue())
-    decoder = io.BinaryDecoder(reader)
-    datum_reader = io.DatumReader(writers_schema, readers_schema)
-    self.assertRaises(io.SchemaResolutionException, datum_reader.read, decoder)
+    reader = io.BytesIO(writer.getvalue())
+    decoder = avro.io.BinaryDecoder(reader)
+    datum_reader = avro.io.DatumReader(writers_schema, readers_schema)
+    self.assertRaises(avro.io.SchemaResolutionException, datum_reader.read, decoder)
 
   def test_projection(self):
     print_test_name('TEST PROJECTION')
@@ -386,7 +382,7 @@
        "fields": [{"name": "F", "type": "int"},
                   {"name": "E", "type": "int"}]}""")
     datum_to_write = {'E': 5, 'F': 'Bad'}
-    self.assertRaises(io.AvroTypeException, write_datum, datum_to_write, writers_schema)
+    self.assertRaises(avro.io.AvroTypeException, write_datum, datum_to_write, writers_schema)
 
 if __name__ == '__main__':
   unittest.main()
diff --git a/lang/py/test/test_script.py b/lang/py/test/test_script.py
index 39e856e..ed66e2d 100644
--- a/lang/py/test/test_script.py
+++ b/lang/py/test/test_script.py
@@ -20,9 +20,9 @@
 from __future__ import absolute_import, division, print_function
 
 import csv
+import io
 import json
 import unittest
-from cStringIO import StringIO
 from operator import itemgetter
 from os import remove
 from os.path import dirname, isfile, join
@@ -110,12 +110,12 @@
         return len(self._run("--skip", str(skip))) == NUM_RECORDS - skip
 
     def test_csv(self):
-        reader = csv.reader(StringIO(self._run("-f", "csv", raw=True)))
+        reader = csv.reader(io.BytesIO(self._run("-f", "csv", raw=True)))
         assert len(list(reader)) == NUM_RECORDS
 
     def test_csv_header(self):
-        io = StringIO(self._run("-f", "csv", "--header", raw=True))
-        reader = csv.DictReader(io)
+        io_ = io.BytesIO(self._run("-f", "csv", "--header", raw=True))
+        reader = csv.DictReader(io_)
         r = {"type": "duck", "last": "duck", "first": "daffy"}
         assert next(reader) == r
 
diff --git a/lang/py/test/test_tether_task.py b/lang/py/test/test_tether_task.py
index 85ed9cb..7b1bea0 100644
--- a/lang/py/test/test_tether_task.py
+++ b/lang/py/test/test_tether_task.py
@@ -19,18 +19,18 @@
 
 from __future__ import absolute_import, division, print_function
 
+import io
 import os
-import StringIO
 import subprocess
 import sys
 import time
 import unittest
 
+import avro.io
 import avro.tether.tether_task
 import avro.tether.util
 import mock_tether_parent
 import set_avro_test_path
-from avro import io as avio
 from avro import schema, tether
 from word_count_task import WordCountTask
 
@@ -77,9 +77,9 @@
 
       # Serialize some data so we can send it to the input function
       datum="This is a line of text"
-      writer = StringIO.StringIO()
-      encoder = avio.BinaryEncoder(writer)
-      datum_writer = avio.DatumWriter(task.inschema)
+      writer = io.BytesIO()
+      encoder = avro.io.BinaryEncoder(writer)
+      datum_writer = avro.io.DatumWriter(task.inschema)
       datum_writer.write(datum, encoder)
 
       writer.seek(0)
@@ -97,9 +97,9 @@
 
       # Serialize some data so we can send it to the input function
       datum={"key":"word","value":2}
-      writer = StringIO.StringIO()
-      encoder = avio.BinaryEncoder(writer)
-      datum_writer = avio.DatumWriter(task.midschema)
+      writer = io.BytesIO()
+      encoder = avro.io.BinaryEncoder(writer)
+      datum_writer = avro.io.DatumWriter(task.midschema)
       datum_writer.write(datum, encoder)
 
       writer.seek(0)
diff --git a/lang/py/test/test_tether_task_runner.py b/lang/py/test/test_tether_task_runner.py
index 985eb3c..741f626 100644
--- a/lang/py/test/test_tether_task_runner.py
+++ b/lang/py/test/test_tether_task_runner.py
@@ -19,20 +19,20 @@
 
 from __future__ import absolute_import, division, print_function
 
+import io
 import logging
 import os
-import StringIO
 import subprocess
 import sys
 import time
 import unittest
 
+import avro.io
 import avro.tether.tether_task
 import avro.tether.tether_task_runner
 import avro.tether.util
 import mock_tether_parent
 import set_avro_test_path
-from avro import io as avio
 from word_count_task import WordCountTask
 
 
@@ -79,9 +79,9 @@
 
       # Serialize some data so we can send it to the input function
       datum="This is a line of text"
-      writer = StringIO.StringIO()
-      encoder = avio.BinaryEncoder(writer)
-      datum_writer = avio.DatumWriter(runner.task.inschema)
+      writer = io.BytesIO()
+      encoder = avro.io.BinaryEncoder(writer)
+      datum_writer = avro.io.DatumWriter(runner.task.inschema)
       datum_writer.write(datum, encoder)
 
       writer.seek(0)
@@ -100,9 +100,9 @@
 
       #Serialize some data so we can send it to the input function
       datum={"key":"word","value":2}
-      writer = StringIO.StringIO()
-      encoder = avio.BinaryEncoder(writer)
-      datum_writer = avio.DatumWriter(runner.task.midschema)
+      writer = io.BytesIO()
+      encoder = avro.io.BinaryEncoder(writer)
+      datum_writer = avro.io.DatumWriter(runner.task.midschema)
       datum_writer.write(datum, encoder)
 
       writer.seek(0)