blob: 10582d3f7a44dc8747e67e0aa4e03443a221737d [file] [log] [blame]
#!/usr/bin/env python
##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function
import io
import logging
import os
import subprocess
import sys
import time
import unittest
import avro.io
import avro.test.mock_tether_parent
import avro.test.word_count_task
import avro.tether.tether_task
import avro.tether.tether_task_runner
import avro.tether.util
try:
unicode
except NameError:
unicode = str
class TestTetherTaskRunner(unittest.TestCase):
"""unit test for a tethered task runner."""
def test1(self):
# set the logging level to debug so that debug messages are printed
logging.basicConfig(level=logging.DEBUG)
proc = None
try:
# launch the server in a separate process
env = dict()
env["PYTHONPATH"] = ':'.join(sys.path)
parent_port = avro.tether.util.find_port()
pyfile = avro.test.mock_tether_parent.__file__
proc = subprocess.Popen([sys.executable, pyfile, "start_server", "{0}".format(parent_port)])
input_port = avro.tether.util.find_port()
print("Mock server started process pid={0}".format(proc.pid))
# Possible race condition? open tries to connect to the subprocess before the subprocess is fully started
# so we give the subprocess time to start up
time.sleep(1)
runner = avro.tether.tether_task_runner.TaskRunner(avro.test.word_count_task.WordCountTask())
runner.start(outputport=parent_port, join=False)
# Test sending various messages to the server and ensuring they are processed correctly
requestor = avro.tether.tether_task.HTTPRequestor(
"localhost", runner.server.server_address[1], avro.tether.tether_task.inputProtocol)
# TODO: We should validate that open worked by grabbing the STDOUT of the subproces
# and ensuring that it outputted the correct message.
# Test the mapper
requestor.request("configure", {
"taskType": avro.tether.tether_task.TaskType.MAP,
"inSchema": unicode(str(runner.task.inschema)),
"outSchema": unicode(str(runner.task.midschema))
})
# Serialize some data so we can send it to the input function
datum = unicode("This is a line of text")
writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(writer)
datum_writer = avro.io.DatumWriter(runner.task.inschema)
datum_writer.write(datum, encoder)
writer.seek(0)
data = writer.read()
# Call input to simulate calling map
requestor.request("input", {"data": data, "count": 1})
# Test the reducer
requestor.request("configure", {
"taskType": avro.tether.tether_task.TaskType.REDUCE,
"inSchema": unicode(str(runner.task.midschema)),
"outSchema": unicode(str(runner.task.outschema))}
)
# Serialize some data so we can send it to the input function
datum = {"key": unicode("word"), "value": 2}
writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(writer)
datum_writer = avro.io.DatumWriter(runner.task.midschema)
datum_writer.write(datum, encoder)
writer.seek(0)
data = writer.read()
# Call input to simulate calling reduce
requestor.request("input", {"data": data, "count": 1})
requestor.request("complete", {})
runner.task.ready_for_shutdown.wait()
runner.server.shutdown()
# time.sleep(2)
# runner.server.shutdown()
sthread = runner.sthread
# Possible race condition?
time.sleep(1)
# make sure the other thread terminated
self.assertFalse(sthread.isAlive())
# shutdown the logging
logging.shutdown()
except Exception as e:
raise
finally:
# close the process
if not(proc is None):
proc.kill()
def test2(self):
"""
In this test we want to make sure that when we run "tether_task_runner.py"
as our main script everything works as expected. We do this by using subprocess to run it
in a separate thread.
"""
proc = None
runnerproc = None
try:
# launch the server in a separate process
env = dict()
env["PYTHONPATH"] = ':'.join(sys.path)
parent_port = avro.tether.util.find_port()
pyfile = avro.test.mock_tether_parent.__file__
proc = subprocess.Popen([sys.executable, pyfile, "start_server", "{0}".format(parent_port)])
# Possible race condition? when we start tether_task_runner it will call
# open tries to connect to the subprocess before the subprocess is fully started
# so we give the subprocess time to start up
time.sleep(1)
# start the tether_task_runner in a separate process
env = {"AVRO_TETHER_OUTPUT_PORT": "{0}".format(parent_port)}
env["PYTHONPATH"] = ':'.join(sys.path)
runnerproc = subprocess.Popen([sys.executable, avro.tether.tether_task_runner.__file__,
"avro.test.word_count_task.WordCountTask"], env=env)
# possible race condition wait for the process to start
time.sleep(1)
print("Mock server started process pid={0}".format(proc.pid))
# Possible race condition? open tries to connect to the subprocess before the subprocess is fully started
# so we give the subprocess time to start up
time.sleep(1)
except Exception as e:
raise
finally:
# close the process
if not(runnerproc is None):
runnerproc.kill()
if not(proc is None):
proc.kill()
if __name__ == ("__main__"):
unittest.main()