| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| """Test for the mergecontacts example.""" |
| |
| from __future__ import absolute_import |
| |
| import logging |
| import tempfile |
| import unittest |
| |
| from apache_beam.examples.cookbook import mergecontacts |
| from apache_beam.testing.util import open_shards |
| |
| |
| class MergeContactsTest(unittest.TestCase): |
| |
| CONTACTS_EMAIL = '\n'.join(['Nathan Nomad\tnathan@example.com', |
| 'Nicky Nomad\tnicky@example.com', |
| 'Noreen Nomad\tnoreen@example.com', |
| 'Noreen Nomad\tnomad@example.com', |
| 'Robert B\trobert@example.com', |
| 'Silviu C\tsilviu@example.com', |
| 'Tom S\ttom@example.com', |
| 'Wally Writer\twally@example.com', |
| '']) |
| |
| CONTACTS_PHONE = '\n'.join(['Larry Luddite\t724-228-3529', |
| 'Lisa Luddite\t304-277-3504', |
| 'Nathan Nomad\t412-466-8968', |
| 'Nicky Nomad\t724-379-5815', |
| 'Noreen Nomad\t412-472-0145', |
| 'Robert B\t814-865-8799', |
| 'Silviu C\t724-537-0671', |
| 'Tom S\t570-368-3420', |
| 'Tom S\t814-793-9655', |
| '']) |
| |
| CONTACTS_SNAILMAIL = '\n'.join( |
| ['Larry Luddite\t1949 Westcott St, Detroit, MI 48222', |
| 'Lisa Luddite\t1949 Westcott St, Detroit, MI 48222', |
| 'Robert B\t601 N 34th St, Seattle, WA 98103', |
| 'Silviu C\t1600 Amphitheatre Pkwy, Mountain View, CA 94043', |
| 'Tom S\t6425 Penn Ave Ste 700, Pittsburgh, PA 15206', |
| 'Wally Writer\t101 Ridge Rd, Chalkyitsik, AK 99788', |
| '']) |
| |
| EXPECTED_TSV = '\n'.join( |
| ['\t'.join(['"Larry Luddite"', '""', '"724-228-3529"', |
| '"1949 Westcott St, Detroit, MI 48222"']), |
| '\t'.join(['"Lisa Luddite"', '""', '"304-277-3504"', |
| '"1949 Westcott St, Detroit, MI 48222"']), |
| '\t'.join(['"Nathan Nomad"', '"nathan@example.com"', '"412-466-8968"', |
| '""']), |
| '\t'.join(['"Nicky Nomad"', '"nicky@example.com"', '"724-379-5815"', |
| '""']), |
| '\t'.join(['"Noreen Nomad"', '"nomad@example.com,noreen@example.com"', |
| '"412-472-0145"', '""']), |
| '\t'.join(['"Robert B"', '"robert@example.com"', '"814-865-8799"', |
| '"601 N 34th St, Seattle, WA 98103"']), |
| '\t'.join(['"Silviu C"', '"silviu@example.com"', '"724-537-0671"', |
| '"1600 Amphitheatre Pkwy, Mountain View, CA 94043"']), |
| '\t'.join(['"Tom S"', '"tom@example.com"', '"570-368-3420,814-793-9655"', |
| '"6425 Penn Ave Ste 700, Pittsburgh, PA 15206"']), |
| '\t'.join(['"Wally Writer"', '"wally@example.com"', '""', |
| '"101 Ridge Rd, Chalkyitsik, AK 99788"']), |
| '']) |
| |
| EXPECTED_STATS = '\n'.join(['2 luddites', |
| '1 writers', |
| '3 nomads', |
| '']) |
| |
| def create_temp_file(self, contents): |
| with tempfile.NamedTemporaryFile(delete=False) as f: |
| f.write(contents.encode('utf-8')) |
| return f.name |
| |
| def normalize_tsv_results(self, tsv_data): |
| """Sort .tsv file data so we can compare it with expected output.""" |
| lines_in = tsv_data.strip().split('\n') |
| lines_out = [] |
| for line in lines_in: |
| name, email, phone, snailmail = line.split('\t') |
| lines_out.append('\t'.join( |
| [name, |
| '"%s"' % ','.join(sorted(email.strip('"').split(','))), |
| '"%s"' % ','.join(sorted(phone.strip('"').split(','))), |
| snailmail])) |
| return '\n'.join(sorted(lines_out)) + '\n' |
| |
| def test_mergecontacts(self): |
| path_email = self.create_temp_file(self.CONTACTS_EMAIL) |
| path_phone = self.create_temp_file(self.CONTACTS_PHONE) |
| path_snailmail = self.create_temp_file(self.CONTACTS_SNAILMAIL) |
| |
| result_prefix = self.create_temp_file('') |
| |
| mergecontacts.run([ |
| '--input_email=%s' % path_email, |
| '--input_phone=%s' % path_phone, |
| '--input_snailmail=%s' % path_snailmail, |
| '--output_tsv=%s.tsv' % result_prefix, |
| '--output_stats=%s.stats' % result_prefix], assert_results=(2, 1, 3)) |
| |
| with open_shards('%s.tsv-*-of-*' % result_prefix) as f: |
| contents = f.read() |
| self.assertEqual(self.EXPECTED_TSV, self.normalize_tsv_results(contents)) |
| |
| |
| if __name__ == '__main__': |
| logging.getLogger().setLevel(logging.INFO) |
| unittest.main() |