AVRO-1880: Futurize Py2 via BytesIO (#720)
* Replace buffer instead of truncating because bytesio.truncate does not seek.
diff --git a/lang/py/src/avro/datafile.py b/lang/py/src/avro/datafile.py
index 0d29a6a..964a049 100644
--- a/lang/py/src/avro/datafile.py
+++ b/lang/py/src/avro/datafile.py
@@ -21,17 +21,15 @@
from __future__ import absolute_import, division, print_function
+import io
import os
import random
import zlib
-from avro import io, schema
+import avro.io
+import avro.schema
try:
- from cStringIO import StringIO
-except ImportError:
- from StringIO import StringIO
-try:
import snappy
has_snappy = True
except ImportError:
@@ -50,7 +48,7 @@
MAGIC_SIZE = len(MAGIC)
SYNC_SIZE = 16
SYNC_INTERVAL = 4000 * SYNC_SIZE # TODO(hammer): make configurable
-META_SCHEMA = schema.parse("""\
+META_SCHEMA = avro.schema.parse("""\
{"type": "record", "name": "org.apache.avro.file.Header",
"fields" : [
{"name": "magic", "type": {"type": "fixed", "name": "magic", "size": %d}},
@@ -71,12 +69,12 @@
# Exceptions
#
-class DataFileException(schema.AvroException):
+class DataFileException(avro.schema.AvroException):
"""
Raised when there's a problem reading or writing file object containers.
"""
def __init__(self, fail_msg):
- schema.AvroException.__init__(self, fail_msg)
+ avro.schema.AvroException.__init__(self, fail_msg)
#
# Write Path
@@ -95,10 +93,10 @@
@param writer: File-like object to write into.
"""
self._writer = writer
- self._encoder = io.BinaryEncoder(writer)
+ self._encoder = avro.io.BinaryEncoder(writer)
self._datum_writer = datum_writer
- self._buffer_writer = StringIO()
- self._buffer_encoder = io.BinaryEncoder(self._buffer_writer)
+ self._buffer_writer = io.BytesIO()
+ self._buffer_encoder = avro.io.BinaryEncoder(self._buffer_writer)
self._block_count = 0
self._meta = {}
self._header_written = False
@@ -112,7 +110,7 @@
self.datum_writer.writers_schema = writers_schema
else:
# open writer for reading to collect metadata
- dfr = DataFileReader(writer, io.DatumReader())
+ dfr = DataFileReader(writer, avro.io.DatumReader())
# TODO(hammer): collect arbitrary metadata
# collect metadata
@@ -122,7 +120,7 @@
# get schema used to write existing file
schema_from_file = dfr.get_meta('avro.schema')
self.set_meta('avro.schema', schema_from_file)
- self.datum_writer.writers_schema = schema.parse(schema_from_file)
+ self.datum_writer.writers_schema = avro.schema.parse(schema_from_file)
# seek to the end of the file and prepare for writing
writer.seek(0, 2)
@@ -243,7 +241,7 @@
# TODO(hammer): allow user to specify the encoder
def __init__(self, reader, datum_reader):
self._reader = reader
- self._raw_decoder = io.BinaryDecoder(reader)
+ self._raw_decoder = avro.io.BinaryDecoder(reader)
self._datum_decoder = None # Maybe reset at every block.
self._datum_reader = datum_reader
@@ -262,7 +260,7 @@
# get ready to read
self._block_count = 0
- self.datum_reader.writers_schema = schema.parse(self.get_meta(SCHEMA_KEY))
+ self.datum_reader.writers_schema = avro.schema.parse(self.get_meta(SCHEMA_KEY))
def __enter__(self):
return self
@@ -320,7 +318,7 @@
if header.get('magic') != MAGIC:
fail_msg = "Not an Avro data file: %s doesn't match %s."\
% (header.get('magic'), MAGIC)
- raise schema.AvroException(fail_msg)
+ raise avro.schema.AvroException(fail_msg)
# set metadata
self._meta = header['meta']
@@ -341,26 +339,26 @@
# -15 is the log of the window size; negative indicates
# "raw" (no zlib headers) decompression. See zlib.h.
uncompressed = zlib.decompress(data, -15)
- self._datum_decoder = io.BinaryDecoder(StringIO(uncompressed))
+ self._datum_decoder = avro.io.BinaryDecoder(io.BytesIO(uncompressed))
elif self.codec == 'snappy':
# Compressed data includes a 4-byte CRC32 checksum
length = self.raw_decoder.read_long()
data = self.raw_decoder.read(length - 4)
uncompressed = snappy.decompress(data)
- self._datum_decoder = io.BinaryDecoder(StringIO(uncompressed))
+ self._datum_decoder = avro.io.BinaryDecoder(io.BytesIO(uncompressed))
self.raw_decoder.check_crc32(uncompressed);
elif self.codec == 'zstandard':
length = self.raw_decoder.read_long()
data = self.raw_decoder.read(length)
uncompressed = bytearray()
dctx = zstd.ZstdDecompressor()
- with dctx.stream_reader(StringIO(data)) as reader:
+ with dctx.stream_reader(io.BytesIO(data)) as reader:
while True:
chunk = reader.read(16384)
if not chunk:
break
uncompressed.extend(chunk)
- self._datum_decoder = io.BinaryDecoder(StringIO(uncompressed))
+ self._datum_decoder = avro.io.BinaryDecoder(io.BytesIO(uncompressed))
else:
raise DataFileException("Unknown codec: %r" % self.codec)
diff --git a/lang/py/src/avro/ipc.py b/lang/py/src/avro/ipc.py
index 48307ac..e21245e 100644
--- a/lang/py/src/avro/ipc.py
+++ b/lang/py/src/avro/ipc.py
@@ -22,13 +22,10 @@
from __future__ import absolute_import, division, print_function
import httplib
+import io
-from avro import io, protocol, schema
-
-try:
- from cStringIO import StringIO
-except ImportError:
- from StringIO import StringIO
+import avro.io
+from avro import protocol, schema
#
# Constants
@@ -43,14 +40,14 @@
@HANDSHAKE_RESPONSE_SCHEMA@
""")
-HANDSHAKE_REQUESTOR_WRITER = io.DatumWriter(HANDSHAKE_REQUEST_SCHEMA)
-HANDSHAKE_REQUESTOR_READER = io.DatumReader(HANDSHAKE_RESPONSE_SCHEMA)
-HANDSHAKE_RESPONDER_WRITER = io.DatumWriter(HANDSHAKE_RESPONSE_SCHEMA)
-HANDSHAKE_RESPONDER_READER = io.DatumReader(HANDSHAKE_REQUEST_SCHEMA)
+HANDSHAKE_REQUESTOR_WRITER = avro.io.DatumWriter(HANDSHAKE_REQUEST_SCHEMA)
+HANDSHAKE_REQUESTOR_READER = avro.io.DatumReader(HANDSHAKE_RESPONSE_SCHEMA)
+HANDSHAKE_RESPONDER_WRITER = avro.io.DatumWriter(HANDSHAKE_RESPONSE_SCHEMA)
+HANDSHAKE_RESPONDER_READER = avro.io.DatumReader(HANDSHAKE_REQUEST_SCHEMA)
META_SCHEMA = schema.parse('{"type": "map", "values": "bytes"}')
-META_WRITER = io.DatumWriter(META_SCHEMA)
-META_READER = io.DatumReader(META_SCHEMA)
+META_WRITER = avro.io.DatumWriter(META_SCHEMA)
+META_READER = avro.io.DatumReader(META_SCHEMA)
SYSTEM_ERROR_SCHEMA = schema.parse('["string"]')
@@ -58,7 +55,7 @@
REMOTE_HASHES = {}
REMOTE_PROTOCOLS = {}
-BIG_ENDIAN_INT_STRUCT = io.struct_class('!I')
+BIG_ENDIAN_INT_STRUCT = avro.io.struct_class('!I')
BUFFER_HEADER_LENGTH = 4
BUFFER_SIZE = 8192
@@ -114,8 +111,8 @@
Writes a request message and reads a response or error message.
"""
# build handshake and call request
- buffer_writer = StringIO()
- buffer_encoder = io.BinaryEncoder(buffer_writer)
+ buffer_writer = io.BytesIO()
+ buffer_encoder = avro.io.BinaryEncoder(buffer_writer)
self.write_handshake_request(buffer_encoder)
self.write_call_request(message_name, request_datum, buffer_encoder)
@@ -159,7 +156,7 @@
self.write_request(message.request, request_datum, encoder)
def write_request(self, request_schema, request_datum, encoder):
- datum_writer = io.DatumWriter(request_schema)
+ datum_writer = avro.io.DatumWriter(request_schema)
datum_writer.write(request_datum, encoder)
def read_handshake_response(self, decoder):
@@ -221,12 +218,12 @@
raise self.read_error(writers_schema, readers_schema, decoder)
def read_response(self, writers_schema, readers_schema, decoder):
- datum_reader = io.DatumReader(writers_schema, readers_schema)
+ datum_reader = avro.io.DatumReader(writers_schema, readers_schema)
result = datum_reader.read(decoder)
return result
def read_error(self, writers_schema, readers_schema, decoder):
- datum_reader = io.DatumReader(writers_schema, readers_schema)
+ datum_reader = avro.io.DatumReader(writers_schema, readers_schema)
return AvroRemoteException(datum_reader.read(decoder))
class Requestor(BaseRequestor):
@@ -235,7 +232,7 @@
call_response = self.transceiver.transceive(call_request)
# process the handshake and call response
- buffer_decoder = io.BinaryDecoder(StringIO(call_response))
+ buffer_decoder = avro.io.BinaryDecoder(io.BytesIO(call_response))
call_response_exists = self.read_handshake_response(buffer_decoder)
if call_response_exists:
return self.read_call_response(message_name, buffer_decoder)
@@ -266,10 +263,10 @@
Called by a server to deserialize a request, compute and serialize
a response or error. Compare to 'handle()' in Thrift.
"""
- buffer_reader = StringIO(call_request)
- buffer_decoder = io.BinaryDecoder(buffer_reader)
- buffer_writer = StringIO()
- buffer_encoder = io.BinaryEncoder(buffer_writer)
+ buffer_reader = io.BytesIO(call_request)
+ buffer_decoder = avro.io.BinaryDecoder(buffer_reader)
+ buffer_writer = io.BytesIO()
+ buffer_encoder = avro.io.BinaryEncoder(buffer_writer)
error = None
response_metadata = {}
@@ -317,7 +314,7 @@
self.write_error(writers_schema, error, buffer_encoder)
except schema.AvroException as e:
error = AvroRemoteException(str(e))
- buffer_encoder = io.BinaryEncoder(StringIO())
+ buffer_encoder = avro.io.BinaryEncoder(io.BytesIO())
META_WRITER.write(response_metadata, buffer_encoder)
buffer_encoder.write_boolean(True)
self.write_error(SYSTEM_ERROR_SCHEMA, error, buffer_encoder)
@@ -362,15 +359,15 @@
pass
def read_request(self, writers_schema, readers_schema, decoder):
- datum_reader = io.DatumReader(writers_schema, readers_schema)
+ datum_reader = avro.io.DatumReader(writers_schema, readers_schema)
return datum_reader.read(decoder)
def write_response(self, writers_schema, response_datum, encoder):
- datum_writer = io.DatumWriter(writers_schema)
+ datum_writer = avro.io.DatumWriter(writers_schema)
datum_writer.write(response_datum, encoder)
def write_error(self, writers_schema, error_exception, encoder):
- datum_writer = io.DatumWriter(writers_schema)
+ datum_writer = avro.io.DatumWriter(writers_schema)
datum_writer.write(str(error_exception), encoder)
#
@@ -388,7 +385,7 @@
def read_framed_message(self):
message = []
while True:
- buffer = StringIO()
+ buffer = io.BytesIO()
buffer_length = self._read_buffer_length()
if buffer_length == 0:
return ''.join(message)
@@ -466,7 +463,7 @@
req_method = 'POST'
req_headers = {'Content-Type': 'avro/binary'}
- req_body_buffer = FramedWriter(StringIO())
+ req_body_buffer = FramedWriter(io.BytesIO())
req_body_buffer.write_framed_message(message)
req_body = req_body_buffer.writer.getvalue()
diff --git a/lang/py/src/avro/tether/tether_task.py b/lang/py/src/avro/tether/tether_task.py
index 4e2004d..262bf67 100644
--- a/lang/py/src/avro/tether/tether_task.py
+++ b/lang/py/src/avro/tether/tether_task.py
@@ -20,15 +20,14 @@
from __future__ import absolute_import, division, print_function
import collections
-import io as pyio
+import io
import logging
import os
import sys
import threading
import traceback
-from StringIO import StringIO
-from avro import io as avio
+import avro.io
from avro import ipc, protocol, schema
__all__ = ["TetherTask", "TaskType", "inputProtocol", "outputProtocol", "HTTPRequestor"]
@@ -88,10 +87,8 @@
raise ValueError("output client can't be none.")
self.scheme=scheme
- self.buff=StringIO()
- self.encoder=avio.BinaryEncoder(self.buff)
- self.datum_writer = avio.DatumWriter(writers_schema=self.scheme)
+ self.datum_writer = avro.io.DatumWriter(writers_schema=self.scheme)
self.outputClient=outputClient
def collect(self,record,partition=None):
@@ -103,25 +100,16 @@
partition - Indicates the partition for a pre-partitioned map output
- currently not supported
"""
+ # Replace the encoder and buffer every time we collect.
+ with io.BytesIO() as buff:
+ self.encoder = avro.io.BinaryEncoder(buff)
+ self.datum_writer.write(record, self.encoder)
+ value = buff.getvalue()
- self.buff.truncate(0)
- self.datum_writer.write(record, self.encoder);
- self.buff.flush();
- self.buff.seek(0)
-
- # delete all the data in the buffer
- if (partition is None):
-
- # TODO: Is there a more efficient way to read the data in self.buff?
- # we could use self.buff.read() but that returns the byte array as a string
- # will that work? We can also use self.buff.readinto to read it into
- # a bytearray but the byte array must be pre-allocated
- # self.outputClient.output(self.buff.buffer.read())
-
- #its not a StringIO
- self.outputClient.request("output",{"datum":self.buff.read()})
- else:
- self.outputClient.request("outputPartitioned",{"datum":self.buff.read(),"partition":partition})
+ datum = {"datum": value}
+ if partition is not None:
+ datum["partition"] = partition
+ self.outputClient.request("output", datum)
@@ -335,11 +323,11 @@
outSchema = schema.parse(outSchemaText)
if (taskType==TaskType.MAP):
- self.inReader=avio.DatumReader(writers_schema=inSchema,readers_schema=self.inschema)
+ self.inReader=avro.io.DatumReader(writers_schema=inSchema,readers_schema=self.inschema)
self.midCollector=Collector(outSchemaText,self.outputClient)
elif(taskType==TaskType.REDUCE):
- self.midReader=avio.DatumReader(writers_schema=inSchema,readers_schema=self.midschema)
+ self.midReader=avro.io.DatumReader(writers_schema=inSchema,readers_schema=self.midschema)
# this.outCollector = new Collector<OUT>(outSchema);
self.outCollector=Collector(outSchemaText,self.outputClient)
@@ -373,9 +361,9 @@
count - how many input records are provided in the binary stream
"""
try:
- # to avio.BinaryDecoder
- bdata=StringIO(data)
- decoder = avio.BinaryDecoder(bdata)
+ # to avro.io.BinaryDecoder
+ bdata=io.BytesIO(data)
+ decoder = avro.io.BinaryDecoder(bdata)
for i in range(count):
if (self.taskType==TaskType.MAP):
diff --git a/lang/py/src/avro/txipc.py b/lang/py/src/avro/txipc.py
index 66ca726..b96f293 100644
--- a/lang/py/src/avro/txipc.py
+++ b/lang/py/src/avro/txipc.py
@@ -19,9 +19,12 @@
from __future__ import absolute_import, division, print_function
+import io
+
from zope.interface import implements
-from avro import io, ipc
+import avro.io
+from avro import ipc
from twisted.internet.defer import Deferred, maybeDeferred
from twisted.internet.protocol import Protocol
from twisted.web import resource, server
@@ -29,18 +32,13 @@
from twisted.web.http_headers import Headers
from twisted.web.iweb import IBodyProducer
-try:
- from cStringIO import StringIO
-except ImportError:
- from StringIO import StringIO
-
class TwistedRequestor(ipc.BaseRequestor):
"""A Twisted-compatible requestor. Returns a Deferred that will fire with the
returning value, instead of blocking until the request completes."""
def _process_handshake(self, call_response, message_name, request_datum):
# process the handshake and call response
- buffer_decoder = io.BinaryDecoder(StringIO(call_response))
+ buffer_decoder = avro.io.BinaryDecoder(io.BytesIO(call_response))
call_response_exists = self.read_handshake_response(buffer_decoder)
if call_response_exists:
return self.read_call_response(message_name, buffer_decoder)
diff --git a/lang/py/test/test_io.py b/lang/py/test/test_io.py
index 0fbf958..93fa2b1 100644
--- a/lang/py/test/test_io.py
+++ b/lang/py/test/test_io.py
@@ -20,18 +20,14 @@
from __future__ import absolute_import, division, print_function
import datetime
+import io
import unittest
from binascii import hexlify
from decimal import Decimal
+import avro.io
import set_avro_test_path
-from avro import io, schema, timezones
-
-try:
- from cStringIO import StringIO
-except ImportError:
- from StringIO import StringIO
-
+from avro import schema, timezones
SCHEMAS_TO_VALIDATE = (
('"null"', None),
@@ -162,16 +158,16 @@
print('')
def write_datum(datum, writers_schema):
- writer = StringIO()
- encoder = io.BinaryEncoder(writer)
- datum_writer = io.DatumWriter(writers_schema)
+ writer = io.BytesIO()
+ encoder = avro.io.BinaryEncoder(writer)
+ datum_writer = avro.io.DatumWriter(writers_schema)
datum_writer.write(datum, encoder)
return writer, encoder, datum_writer
def read_datum(buffer, writers_schema, readers_schema=None):
- reader = StringIO(buffer.getvalue())
- decoder = io.BinaryDecoder(reader)
- datum_reader = io.DatumReader(writers_schema, readers_schema)
+ reader = io.BytesIO(buffer.getvalue())
+ decoder = avro.io.BinaryDecoder(reader)
+ datum_reader = avro.io.DatumReader(writers_schema, readers_schema)
return datum_reader.read(decoder)
def check_binary_encoding(number_type):
@@ -204,12 +200,12 @@
datum_writer.write(VALUE_TO_READ, encoder)
# skip the value
- reader = StringIO(writer.getvalue())
- decoder = io.BinaryDecoder(reader)
+ reader = io.BytesIO(writer.getvalue())
+ decoder = avro.io.BinaryDecoder(reader)
decoder.skip_long()
# read data from string buffer
- datum_reader = io.DatumReader(writers_schema)
+ datum_reader = avro.io.DatumReader(writers_schema)
read_value = datum_reader.read(decoder)
print('Read Value: %d' % read_value)
@@ -228,7 +224,7 @@
for example_schema, datum in SCHEMAS_TO_VALIDATE:
print('Schema: %s' % example_schema)
print('Datum: %s' % datum)
- validated = io.validate(schema.parse(example_schema), datum)
+ validated = avro.io.validate(schema.parse(example_schema), datum)
print('Valid: %s' % validated)
if validated: passed += 1
self.assertEquals(passed, len(SCHEMAS_TO_VALIDATE))
@@ -308,10 +304,10 @@
"symbols": ["BAR", "BAZ"]}""")
writer, encoder, datum_writer = write_datum(datum_to_write, writers_schema)
- reader = StringIO(writer.getvalue())
- decoder = io.BinaryDecoder(reader)
- datum_reader = io.DatumReader(writers_schema, readers_schema)
- self.assertRaises(io.SchemaResolutionException, datum_reader.read, decoder)
+ reader = io.BytesIO(writer.getvalue())
+ decoder = avro.io.BinaryDecoder(reader)
+ datum_reader = avro.io.DatumReader(writers_schema, readers_schema)
+ self.assertRaises(avro.io.SchemaResolutionException, datum_reader.read, decoder)
def test_default_value(self):
print_test_name('TEST DEFAULT VALUE')
@@ -342,10 +338,10 @@
"fields": [{"name": "H", "type": "int"}]}""")
writer, encoder, datum_writer = write_datum(datum_to_write, writers_schema)
- reader = StringIO(writer.getvalue())
- decoder = io.BinaryDecoder(reader)
- datum_reader = io.DatumReader(writers_schema, readers_schema)
- self.assertRaises(io.SchemaResolutionException, datum_reader.read, decoder)
+ reader = io.BytesIO(writer.getvalue())
+ decoder = avro.io.BinaryDecoder(reader)
+ datum_reader = avro.io.DatumReader(writers_schema, readers_schema)
+ self.assertRaises(avro.io.SchemaResolutionException, datum_reader.read, decoder)
def test_projection(self):
print_test_name('TEST PROJECTION')
@@ -386,7 +382,7 @@
"fields": [{"name": "F", "type": "int"},
{"name": "E", "type": "int"}]}""")
datum_to_write = {'E': 5, 'F': 'Bad'}
- self.assertRaises(io.AvroTypeException, write_datum, datum_to_write, writers_schema)
+ self.assertRaises(avro.io.AvroTypeException, write_datum, datum_to_write, writers_schema)
if __name__ == '__main__':
unittest.main()
diff --git a/lang/py/test/test_script.py b/lang/py/test/test_script.py
index 39e856e..ed66e2d 100644
--- a/lang/py/test/test_script.py
+++ b/lang/py/test/test_script.py
@@ -20,9 +20,9 @@
from __future__ import absolute_import, division, print_function
import csv
+import io
import json
import unittest
-from cStringIO import StringIO
from operator import itemgetter
from os import remove
from os.path import dirname, isfile, join
@@ -110,12 +110,12 @@
return len(self._run("--skip", str(skip))) == NUM_RECORDS - skip
def test_csv(self):
- reader = csv.reader(StringIO(self._run("-f", "csv", raw=True)))
+ reader = csv.reader(io.BytesIO(self._run("-f", "csv", raw=True)))
assert len(list(reader)) == NUM_RECORDS
def test_csv_header(self):
- io = StringIO(self._run("-f", "csv", "--header", raw=True))
- reader = csv.DictReader(io)
+ io_ = io.BytesIO(self._run("-f", "csv", "--header", raw=True))
+ reader = csv.DictReader(io_)
r = {"type": "duck", "last": "duck", "first": "daffy"}
assert next(reader) == r
diff --git a/lang/py/test/test_tether_task.py b/lang/py/test/test_tether_task.py
index 85ed9cb..7b1bea0 100644
--- a/lang/py/test/test_tether_task.py
+++ b/lang/py/test/test_tether_task.py
@@ -19,18 +19,18 @@
from __future__ import absolute_import, division, print_function
+import io
import os
-import StringIO
import subprocess
import sys
import time
import unittest
+import avro.io
import avro.tether.tether_task
import avro.tether.util
import mock_tether_parent
import set_avro_test_path
-from avro import io as avio
from avro import schema, tether
from word_count_task import WordCountTask
@@ -77,9 +77,9 @@
# Serialize some data so we can send it to the input function
datum="This is a line of text"
- writer = StringIO.StringIO()
- encoder = avio.BinaryEncoder(writer)
- datum_writer = avio.DatumWriter(task.inschema)
+ writer = io.BytesIO()
+ encoder = avro.io.BinaryEncoder(writer)
+ datum_writer = avro.io.DatumWriter(task.inschema)
datum_writer.write(datum, encoder)
writer.seek(0)
@@ -97,9 +97,9 @@
# Serialize some data so we can send it to the input function
datum={"key":"word","value":2}
- writer = StringIO.StringIO()
- encoder = avio.BinaryEncoder(writer)
- datum_writer = avio.DatumWriter(task.midschema)
+ writer = io.BytesIO()
+ encoder = avro.io.BinaryEncoder(writer)
+ datum_writer = avro.io.DatumWriter(task.midschema)
datum_writer.write(datum, encoder)
writer.seek(0)
diff --git a/lang/py/test/test_tether_task_runner.py b/lang/py/test/test_tether_task_runner.py
index 985eb3c..741f626 100644
--- a/lang/py/test/test_tether_task_runner.py
+++ b/lang/py/test/test_tether_task_runner.py
@@ -19,20 +19,20 @@
from __future__ import absolute_import, division, print_function
+import io
import logging
import os
-import StringIO
import subprocess
import sys
import time
import unittest
+import avro.io
import avro.tether.tether_task
import avro.tether.tether_task_runner
import avro.tether.util
import mock_tether_parent
import set_avro_test_path
-from avro import io as avio
from word_count_task import WordCountTask
@@ -79,9 +79,9 @@
# Serialize some data so we can send it to the input function
datum="This is a line of text"
- writer = StringIO.StringIO()
- encoder = avio.BinaryEncoder(writer)
- datum_writer = avio.DatumWriter(runner.task.inschema)
+ writer = io.BytesIO()
+ encoder = avro.io.BinaryEncoder(writer)
+ datum_writer = avro.io.DatumWriter(runner.task.inschema)
datum_writer.write(datum, encoder)
writer.seek(0)
@@ -100,9 +100,9 @@
#Serialize some data so we can send it to the input function
datum={"key":"word","value":2}
- writer = StringIO.StringIO()
- encoder = avio.BinaryEncoder(writer)
- datum_writer = avio.DatumWriter(runner.task.midschema)
+ writer = io.BytesIO()
+ encoder = avro.io.BinaryEncoder(writer)
+ datum_writer = avro.io.DatumWriter(runner.task.midschema)
datum_writer.write(datum, encoder)
writer.seek(0)