blob: bff6468a0252144162ce0452c86352ddcc78559f [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
import subprocess
import sys
import time
import unittest
import os
import set_avro_test_path
class TestTetherWordCount(unittest.TestCase):
""" unittest for a python tethered map-reduce job.
"""
def _write_lines(self,lines,fname):
"""
Write the lines to an avro file named fname
Parameters
--------------------------------------------------------
lines - list of strings to write
fname - the name of the file to write to.
"""
import avro.io as avio
from avro.datafile import DataFileReader,DataFileWriter
from avro import schema
#recursively make all directories
dparts=fname.split(os.sep)[:-1]
for i in range(len(dparts)):
pdir=os.sep+os.sep.join(dparts[:i+1])
if not(os.path.exists(pdir)):
os.mkdir(pdir)
with file(fname,'w') as hf:
inschema="""{"type":"string"}"""
writer=DataFileWriter(hf,avio.DatumWriter(inschema),writers_schema=schema.parse(inschema))
#encoder = avio.BinaryEncoder(writer)
#datum_writer = avio.DatumWriter()
for datum in lines:
writer.append(datum)
writer.close()
def _count_words(self,lines):
"""Return a dictionary counting the words in lines
"""
counts={}
for line in lines:
words=line.split()
for w in words:
if not(counts.has_key(w.strip())):
counts[w.strip()]=0
counts[w.strip()]=counts[w.strip()]+1
return counts
def test1(self):
"""
Run a tethered map-reduce job.
Assumptions: 1) bash is available in /bin/bash
"""
from word_count_task import WordCountTask
from avro.tether import tether_task_runner
from avro.datafile import DataFileReader
from avro.io import DatumReader
import avro
import subprocess
import StringIO
import shutil
import tempfile
import inspect
proc=None
try:
# TODO we use the tempfile module to generate random names
# for the files
base_dir = "/tmp/test_tether_word_count"
if os.path.exists(base_dir):
shutil.rmtree(base_dir)
inpath = os.path.join(base_dir, "in")
infile=os.path.join(inpath, "lines.avro")
lines=["the quick brown fox jumps over the lazy dog",
"the cow jumps over the moon",
"the rain in spain falls mainly on the plains"]
self._write_lines(lines,infile)
true_counts=self._count_words(lines)
if not(os.path.exists(infile)):
self.fail("Missing the input file {0}".format(infile))
# The schema for the output of the mapper and reducer
oschema="""
{"type":"record",
"name":"Pair","namespace":"org.apache.avro.mapred","fields":[
{"name":"key","type":"string"},
{"name":"value","type":"long","order":"ignore"}
]
}
"""
# write the schema to a temporary file
osfile=tempfile.NamedTemporaryFile(mode='w',suffix=".avsc",prefix="wordcount",delete=False)
outschema=osfile.name
osfile.write(oschema)
osfile.close()
if not(os.path.exists(outschema)):
self.fail("Missing the schema file")
outpath = os.path.join(base_dir, "out")
args=[]
args.append("java")
args.append("-jar")
args.append(os.path.abspath("@TOPDIR@/../java/tools/target/avro-tools-@AVRO_VERSION@.jar"))
args.append("tether")
args.extend(["--in",inpath])
args.extend(["--out",outpath])
args.extend(["--outschema",outschema])
args.extend(["--protocol","http"])
# form the arguments for the subprocess
subargs=[]
srcfile=inspect.getsourcefile(tether_task_runner)
# Create a shell script to act as the program we want to execute
# We do this so we can set the python path appropriately
script="""#!/bin/bash
export PYTHONPATH={0}
python -m avro.tether.tether_task_runner word_count_task.WordCountTask
"""
# We need to make sure avro is on the path
# getsourcefile(avro) returns .../avro/__init__.py
asrc=inspect.getsourcefile(avro)
apath=asrc.rsplit(os.sep,2)[0]
# path to where the tests lie
tpath=os.path.split(__file__)[0]
exhf=tempfile.NamedTemporaryFile(mode='w',prefix="exec_word_count_",delete=False)
exfile=exhf.name
exhf.write(script.format((os.pathsep).join([apath,tpath]),srcfile))
exhf.close()
# make it world executable
os.chmod(exfile,0755)
args.extend(["--program",exfile])
print "Command:\n\t{0}".format(" ".join(args))
proc=subprocess.Popen(args)
proc.wait()
time.sleep(1) # wait a bit longer to clean up
# read the output
with file(os.path.join(outpath,"part-00000.avro")) as hf:
reader=DataFileReader(hf, DatumReader())
for record in reader:
self.assertEqual(record["value"],true_counts[record["key"]])
reader.close()
except Exception as e:
raise
finally:
# close the process
if proc is not None and proc.returncode is None:
proc.kill()
if os.path.exists(base_dir):
shutil.rmtree(base_dir)
if os.path.exists(exfile):
os.remove(exfile)
if __name__== "__main__":
unittest.main()