blob: a9567f32f3ebe02ae118db57e01cfa632497c9bd [file] [log] [blame]
#!/usr/bin/env python
##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function
import io
import os
import subprocess
import sys
import time
import unittest
import avro.io
import avro.test.mock_tether_parent
import avro.test.word_count_task
import avro.tether.tether_task
import avro.tether.util
from avro import schema, tether
try:
unicode
except NameError:
unicode = str
class TestTetherTask(unittest.TestCase):
"""
TODO: We should validate the the server response by looking at stdout
"""
def test_tether_task(self):
"""
Test that the tether_task is working. We run the mock_tether_parent in a separate
subprocess
"""
task = avro.test.word_count_task.WordCountTask()
proc = None
pyfile = avro.test.mock_tether_parent.__file__
server_port = avro.tether.util.find_port()
input_port = avro.tether.util.find_port()
try:
# launch the server in a separate process
proc = subprocess.Popen([sys.executable, pyfile, "start_server", str(server_port)])
print("Mock server started process pid={}".format(proc.pid))
# Possible race condition? open tries to connect to the subprocess before the subprocess is fully started
# so we give the subprocess time to start up
time.sleep(1)
task.open(input_port, clientPort=server_port)
# TODO: We should validate that open worked by grabbing the STDOUT of the subproces
# and ensuring that it outputted the correct message.
# ***************************************************************
# Test the mapper
task.configure(
avro.tether.tether_task.TaskType.MAP,
str(task.inschema),
str(task.midschema)
)
# Serialize some data so we can send it to the input function
datum = unicode("This is a line of text")
writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(writer)
datum_writer = avro.io.DatumWriter(task.inschema)
datum_writer.write(datum, encoder)
writer.seek(0)
data = writer.read()
# Call input to simulate calling map
task.input(data, 1)
# Test the reducer
task.configure(
avro.tether.tether_task.TaskType.REDUCE,
str(task.midschema),
str(task.outschema)
)
# Serialize some data so we can send it to the input function
datum = {"key": unicode("word"), "value": 2}
writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(writer)
datum_writer = avro.io.DatumWriter(task.midschema)
datum_writer.write(datum, encoder)
writer.seek(0)
data = writer.read()
# Call input to simulate calling reduce
task.input(data, 1)
task.complete()
# try a status
task.status(unicode("Status message"))
finally:
# close the process
if not(proc is None):
proc.kill()
if __name__ == '__main__':
unittest.main()