| #!/usr/bin/env python |
| |
| ## |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| from __future__ import absolute_import, division, print_function |
| |
| import collections |
| import distutils.spawn |
| import os |
| import platform |
| import shutil |
| import subprocess |
| import sys |
| import tempfile |
| import unittest |
| |
| import avro |
| import avro.datafile |
| import avro.io |
| import avro.schema |
| import avro.tether.tether_task_runner |
| |
| try: |
| unicode |
| except NameError: |
| unicode = str |
| |
| _AVRO_DIR = os.path.abspath(os.path.dirname(avro.__file__)) |
| |
| |
| def _version(): |
| with open(os.path.join(_AVRO_DIR, 'VERSION.txt')) as v: |
| # Convert it back to the java version |
| return v.read().strip().replace('+', '-') |
| |
| |
| _AVRO_VERSION = _version() |
| _JAR_PATH = os.path.join(os.path.dirname(os.path.dirname(_AVRO_DIR)), |
| "java", "tools", "target", "avro-tools-{}.jar".format(_AVRO_VERSION)) |
| |
| _LINES = (unicode("the quick brown fox jumps over the lazy dog"), |
| unicode("the cow jumps over the moon"), |
| unicode("the rain in spain falls mainly on the plains")) |
| _IN_SCHEMA = '"string"' |
| |
| # The schema for the output of the mapper and reducer |
| _OUT_SCHEMA = """{ |
| "type": "record", |
| "name": "Pair", |
| "namespace": "org.apache.avro.mapred", |
| "fields": [{"name": "key", "type": "string"}, |
| {"name": "value", "type": "long", "order": "ignore"}] |
| }""" |
| |
| _PYTHON_PATH = os.pathsep.join([os.path.dirname(os.path.dirname(avro.__file__)), |
| os.path.dirname(__file__)]) |
| |
| |
| def _has_java(): |
| """Detect if this system has a usable java installed. |
| |
| On most systems, this is just checking if `java` is in the PATH. |
| |
| But macos always has a /usr/bin/java, which does not mean java is installed. |
| If you invoke java on macos and java is not installed, macos will spawn a popup |
| telling you how to install java. This code does additional work around that |
| to be completely automatic. |
| """ |
| if platform.system() == "Darwin": |
| try: |
| output = subprocess.check_output("/usr/libexec/java_home", stderr=subprocess.STDOUT) |
| except subprocess.CalledProcessError as e: |
| output = e.output |
| return (b"No Java runtime present" not in output) |
| return bool(distutils.spawn.find_executable("java")) |
| |
| |
| @unittest.skipUnless(_has_java(), "No Java runtime present") |
| @unittest.skipUnless(os.path.exists(_JAR_PATH), "{} not found".format(_JAR_PATH)) |
| class TestTetherWordCount(unittest.TestCase): |
| """unittest for a python tethered map-reduce job.""" |
| |
| _base_dir = None |
| _script_path = None |
| _input_path = None |
| _output_path = None |
| _output_schema_path = None |
| |
| def setUp(self): |
| """Create temporary files for testing.""" |
| prefix, _ = os.path.splitext(os.path.basename(__file__)) |
| self._base_dir = tempfile.mkdtemp(prefix=prefix) |
| |
| # We create the input path... |
| self._input_path = os.path.join(self._base_dir, "in") |
| if not os.path.exists(self._input_path): |
| os.makedirs(self._input_path) |
| infile = os.path.join(self._input_path, "lines.avro") |
| self._write_lines(_LINES, infile) |
| self.assertTrue(os.path.exists(infile), "Missing the input file {}".format(infile)) |
| |
| # ...and the output schema... |
| self._output_schema_path = os.path.join(self._base_dir, "output.avsc") |
| with open(self._output_schema_path, 'w') as output_schema_handle: |
| output_schema_handle.write(_OUT_SCHEMA) |
| self.assertTrue(os.path.exists(self._output_schema_path), "Missing the schema file") |
| |
| # ...but we just name the output path. The tether tool creates it. |
| self._output_path = os.path.join(self._base_dir, "out") |
| |
| def tearDown(self): |
| """Remove temporary files used in testing.""" |
| if os.path.exists(self._base_dir): |
| shutil.rmtree(self._base_dir) |
| if self._script_path is not None and os.path.exists(self._script_path): |
| os.remove(self._script_path) |
| |
| def _write_lines(self, lines, fname): |
| """ |
| Write the lines to an avro file named fname |
| |
| Parameters |
| -------------------------------------------------------- |
| lines - list of strings to write |
| fname - the name of the file to write to. |
| """ |
| datum_writer = avro.io.DatumWriter(_IN_SCHEMA) |
| writers_schema = avro.schema.parse(_IN_SCHEMA) |
| with avro.datafile.DataFileWriter(open(fname, 'wb'), datum_writer, writers_schema) as writer: |
| for datum in lines: |
| writer.append(datum) |
| |
| def test_tether_word_count(self): |
| """Check that a tethered map-reduce job produces the output expected locally.""" |
| # Run the job... |
| args = ("java", "-jar", _JAR_PATH, "tether", |
| "--protocol", "http", |
| "--in", self._input_path, |
| "--out", self._output_path, |
| "--outschema", self._output_schema_path, |
| "--program", sys.executable, |
| "--exec_args", "-m avro.tether.tether_task_runner word_count_task.WordCountTask") |
| print("Command:\n\t{0}".format(" ".join(args))) |
| subprocess.check_call(args, env={"PYTHONPATH": _PYTHON_PATH, "PATH": os.environ["PATH"]}) |
| |
| # ...and test the results. |
| datum_reader = avro.io.DatumReader() |
| outfile = os.path.join(self._output_path, "part-00000.avro") |
| expected_counts = collections.Counter(' '.join(_LINES).split()) |
| with avro.datafile.DataFileReader(open(outfile, 'rb'), datum_reader) as reader: |
| actual_counts = {r["key"]: r["value"] for r in reader} |
| self.assertDictEqual(actual_counts, expected_counts) |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |