blob: 93827a0293ad97ab676fe1893ecd6d49ef0ec384 [file] [log] [blame]
#!/usr/bin/env python
##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function
import io
import logging
import os
import subprocess
import sys
import time
import unittest
import avro.io
import avro.test.mock_tether_parent
import avro.test.word_count_task
import avro.tether.tether_task
import avro.tether.tether_task_runner
import avro.tether.util
try:
unicode
except NameError:
unicode = str
class TestTetherTaskRunner(unittest.TestCase):
"""unit test for a tethered task runner."""
def test1(self):
# set the logging level to debug so that debug messages are printed
logging.basicConfig(level=logging.DEBUG)
proc=None
try:
# launch the server in a separate process
env=dict()
env["PYTHONPATH"]=':'.join(sys.path)
parent_port = avro.tether.util.find_port()
pyfile=avro.test.mock_tether_parent.__file__
proc=subprocess.Popen([sys.executable, pyfile,"start_server","{0}".format(parent_port)])
input_port = avro.tether.util.find_port()
print("Mock server started process pid={0}".format(proc.pid))
# Possible race condition? open tries to connect to the subprocess before the subprocess is fully started
# so we give the subprocess time to start up
time.sleep(1)
runner = avro.tether.tether_task_runner.TaskRunner(avro.test.word_count_task.WordCountTask())
runner.start(outputport=parent_port,join=False)
# Test sending various messages to the server and ensuring they are processed correctly
requestor = avro.tether.tether_task.HTTPRequestor(
"localhost", runner.server.server_address[1], avro.tether.tether_task.inputProtocol)
# TODO: We should validate that open worked by grabbing the STDOUT of the subproces
# and ensuring that it outputted the correct message.
# Test the mapper
requestor.request("configure", {
"taskType": avro.tether.tether_task.TaskType.MAP,
"inSchema": unicode(str(runner.task.inschema)),
"outSchema": unicode(str(runner.task.midschema))
})
# Serialize some data so we can send it to the input function
datum = unicode("This is a line of text")
writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(writer)
datum_writer = avro.io.DatumWriter(runner.task.inschema)
datum_writer.write(datum, encoder)
writer.seek(0)
data=writer.read()
# Call input to simulate calling map
requestor.request("input",{"data":data,"count":1})
# Test the reducer
requestor.request("configure", {
"taskType": avro.tether.tether_task.TaskType.REDUCE,
"inSchema": unicode(str(runner.task.midschema)),
"outSchema": unicode(str(runner.task.outschema))}
)
#Serialize some data so we can send it to the input function
datum = {"key": unicode("word"), "value": 2}
writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(writer)
datum_writer = avro.io.DatumWriter(runner.task.midschema)
datum_writer.write(datum, encoder)
writer.seek(0)
data=writer.read()
#Call input to simulate calling reduce
requestor.request("input",{"data":data,"count":1})
requestor.request("complete",{})
runner.task.ready_for_shutdown.wait()
runner.server.shutdown()
#time.sleep(2)
#runner.server.shutdown()
sthread=runner.sthread
#Possible race condition?
time.sleep(1)
#make sure the other thread terminated
self.assertFalse(sthread.isAlive())
#shutdown the logging
logging.shutdown()
except Exception as e:
raise
finally:
#close the process
if not(proc is None):
proc.kill()
def test2(self):
"""
In this test we want to make sure that when we run "tether_task_runner.py"
as our main script everything works as expected. We do this by using subprocess to run it
in a separate thread.
"""
proc=None
runnerproc=None
try:
#launch the server in a separate process
env=dict()
env["PYTHONPATH"]=':'.join(sys.path)
parent_port = avro.tether.util.find_port()
pyfile=avro.test.mock_tether_parent.__file__
proc=subprocess.Popen([sys.executable, pyfile,"start_server","{0}".format(parent_port)])
#Possible race condition? when we start tether_task_runner it will call
# open tries to connect to the subprocess before the subprocess is fully started
#so we give the subprocess time to start up
time.sleep(1)
#start the tether_task_runner in a separate process
env={"AVRO_TETHER_OUTPUT_PORT":"{0}".format(parent_port)}
env["PYTHONPATH"]=':'.join(sys.path)
runnerproc = subprocess.Popen([sys.executable, avro.tether.tether_task_runner.__file__, "avro.test.word_count_task.WordCountTask"], env=env)
#possible race condition wait for the process to start
time.sleep(1)
print("Mock server started process pid={0}".format(proc.pid))
#Possible race condition? open tries to connect to the subprocess before the subprocess is fully started
#so we give the subprocess time to start up
time.sleep(1)
except Exception as e:
raise
finally:
#close the process
if not(runnerproc is None):
runnerproc.kill()
if not(proc is None):
proc.kill()
if __name__==("__main__"):
unittest.main()