blob: 741f6261cf1485b3c4ab38f78a652ab0e06880b7 [file] [log] [blame]
#!/usr/bin/env python
##
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import, division, print_function
import io
import logging
import os
import subprocess
import sys
import time
import unittest
import avro.io
import avro.tether.tether_task
import avro.tether.tether_task_runner
import avro.tether.util
import mock_tether_parent
import set_avro_test_path
from word_count_task import WordCountTask
class TestTetherTaskRunner(unittest.TestCase):
"""unit test for a tethered task runner."""
def test1(self):
# set the logging level to debug so that debug messages are printed
logging.basicConfig(level=logging.DEBUG)
proc=None
try:
# launch the server in a separate process
env=dict()
env["PYTHONPATH"]=':'.join(sys.path)
parent_port = avro.tether.util.find_port()
pyfile=mock_tether_parent.__file__
proc=subprocess.Popen(["python", pyfile,"start_server","{0}".format(parent_port)])
input_port = avro.tether.util.find_port()
print("Mock server started process pid={0}".format(proc.pid))
# Possible race condition? open tries to connect to the subprocess before the subprocess is fully started
# so we give the subprocess time to start up
time.sleep(1)
runner = avro.tether.tether_task_runner.TaskRunner(WordCountTask())
runner.start(outputport=parent_port,join=False)
# Test sending various messages to the server and ensuring they are processed correctly
requestor = avro.tether.tether_task.HTTPRequestor(
"localhost", runner.server.server_address[1], avro.tether.tether_task.inputProtocol)
# TODO: We should validate that open worked by grabbing the STDOUT of the subproces
# and ensuring that it outputted the correct message.
# Test the mapper
requestor.request("configure", {
"taskType": avro.tether.tether_task.TaskType.MAP,
"inSchema": str(runner.task.inschema),
"outSchema": str(runner.task.midschema)
})
# Serialize some data so we can send it to the input function
datum="This is a line of text"
writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(writer)
datum_writer = avro.io.DatumWriter(runner.task.inschema)
datum_writer.write(datum, encoder)
writer.seek(0)
data=writer.read()
# Call input to simulate calling map
requestor.request("input",{"data":data,"count":1})
# Test the reducer
requestor.request("configure", {
"taskType": avro.tether.tether_task.TaskType.REDUCE,
"inSchema": str(runner.task.midschema),
"outSchema": str(runner.task.outschema)}
)
#Serialize some data so we can send it to the input function
datum={"key":"word","value":2}
writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(writer)
datum_writer = avro.io.DatumWriter(runner.task.midschema)
datum_writer.write(datum, encoder)
writer.seek(0)
data=writer.read()
#Call input to simulate calling reduce
requestor.request("input",{"data":data,"count":1})
requestor.request("complete",{})
runner.task.ready_for_shutdown.wait()
runner.server.shutdown()
#time.sleep(2)
#runner.server.shutdown()
sthread=runner.sthread
#Possible race condition?
time.sleep(1)
#make sure the other thread terminated
self.assertFalse(sthread.isAlive())
#shutdown the logging
logging.shutdown()
except Exception as e:
raise
finally:
#close the process
if not(proc is None):
proc.kill()
def test2(self):
"""
In this test we want to make sure that when we run "tether_task_runner.py"
as our main script everything works as expected. We do this by using subprocess to run it
in a separate thread.
"""
proc=None
runnerproc=None
try:
#launch the server in a separate process
env=dict()
env["PYTHONPATH"]=':'.join(sys.path)
parent_port = avro.tether.util.find_port()
pyfile=mock_tether_parent.__file__
proc=subprocess.Popen(["python", pyfile,"start_server","{0}".format(parent_port)])
#Possible race condition? when we start tether_task_runner it will call
# open tries to connect to the subprocess before the subprocess is fully started
#so we give the subprocess time to start up
time.sleep(1)
#start the tether_task_runner in a separate process
env={"AVRO_TETHER_OUTPUT_PORT":"{0}".format(parent_port)}
env["PYTHONPATH"]=':'.join(sys.path)
runnerproc = subprocess.Popen(["python", avro.tether.tether_task_runner.__file__, "word_count_task.WordCountTask"],env=env)
#possible race condition wait for the process to start
time.sleep(1)
print("Mock server started process pid={0}".format(proc.pid))
#Possible race condition? open tries to connect to the subprocess before the subprocess is fully started
#so we give the subprocess time to start up
time.sleep(1)
except Exception as e:
raise
finally:
#close the process
if not(runnerproc is None):
runnerproc.kill()
if not(proc is None):
proc.kill()
if __name__==("__main__"):
unittest.main()