blob: 7b1bea0f92478c1d3cb661695eddfcf4adc6d697 [file] [log] [blame]
#!/usr/bin/env python
##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function
import io
import os
import subprocess
import sys
import time
import unittest
import avro.io
import avro.tether.tether_task
import avro.tether.util
import mock_tether_parent
import set_avro_test_path
from avro import schema, tether
from word_count_task import WordCountTask
class TestTetherTask(unittest.TestCase):
"""
TODO: We should validate the the server response by looking at stdout
"""
def test1(self):
"""
Test that the thether_task is working. We run the mock_tether_parent in a separate
subprocess
"""
task=WordCountTask()
proc=None
try:
# launch the server in a separate process
# env["AVRO_TETHER_OUTPUT_PORT"]=output_port
env=dict()
env["PYTHONPATH"]=':'.join(sys.path)
server_port = avro.tether.util.find_port()
pyfile=mock_tether_parent.__file__
proc=subprocess.Popen(["python", pyfile,"start_server","{0}".format(server_port)])
input_port = avro.tether.util.find_port()
print("Mock server started process pid={0}".format(proc.pid))
# Possible race condition? open tries to connect to the subprocess before the subprocess is fully started
# so we give the subprocess time to start up
time.sleep(1)
task.open(input_port,clientPort=server_port)
# TODO: We should validate that open worked by grabbing the STDOUT of the subproces
# and ensuring that it outputted the correct message.
#***************************************************************
# Test the mapper
task.configure(
avro.tether.tether_task.TaskType.MAP,
str(task.inschema),
str(task.midschema)
)
# Serialize some data so we can send it to the input function
datum="This is a line of text"
writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(writer)
datum_writer = avro.io.DatumWriter(task.inschema)
datum_writer.write(datum, encoder)
writer.seek(0)
data=writer.read()
# Call input to simulate calling map
task.input(data,1)
# Test the reducer
task.configure(
avro.tether.tether_task.TaskType.REDUCE,
str(task.midschema),
str(task.outschema)
)
# Serialize some data so we can send it to the input function
datum={"key":"word","value":2}
writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(writer)
datum_writer = avro.io.DatumWriter(task.midschema)
datum_writer.write(datum, encoder)
writer.seek(0)
data=writer.read()
# Call input to simulate calling reduce
task.input(data,1)
task.complete()
# try a status
task.status("Status message")
except Exception as e:
raise
finally:
# close the process
if not(proc is None):
proc.kill()
pass
if __name__ == '__main__':
unittest.main()