#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

import re
import subprocess
import time
import unittest


class Popen(subprocess.Popen):

    # We always use these options
    def __init__(self, args, **kwargs):
        super(Popen, self).\
            __init__(args,
                     stderr=subprocess.STDOUT,
                     stdout=subprocess.PIPE,
                     universal_newlines=True, **kwargs)

    # For Python 2 compatibility add context manager support to Popen if it's not there
    if not hasattr(subprocess.Popen, '__enter__'):
        def __enter__(self):
            return self

    # For Python 2 compatibility add context manager support to Popen if it's not there
    if not hasattr(subprocess.Popen, '__exit__'):
        def __exit__(self, exc_type, exc_val, exc_tb):
            try:
                if self.stdin:
                    self.stdin.close()
                if self.stdout:
                    self.stdout.close()
                if self.stderr:
                    self.stderr.close()
            finally:
                self.wait()


def remove_unicode_prefix(line):
    return re.sub(r"u(['\"])", r"\1", line)


class ExamplesTest(unittest.TestCase):
    def test_helloworld(self, example="helloworld.py"):
        with Popen([example]) as p:
            p.wait()
            output = [l.strip() for l in p.stdout]
            self.assertEqual(output, ['Hello World!'])

    def test_helloworld_direct(self):
        self.test_helloworld('helloworld_direct.py')

    def test_helloworld_blocking(self):
        self.test_helloworld('helloworld_blocking.py')

    def test_simple_send_recv(self, recv='simple_recv.py', send='simple_send.py'):
        with Popen([recv]) as r:
            with Popen([send]):
                pass
            actual = [remove_unicode_prefix(l.strip()) for l in r.stdout]
            expected_py2 = ["{'sequence': int32(%i)}" % (i+1,) for i in range(100)]
            expected_py3 = ["{'sequence': %i}" % (i+1,) for i in range(100)]
            self.assertIn(actual,[expected_py2, expected_py3])

    def test_client_server(self, client=['client.py'], server=['server.py'], sleep=0):
        with Popen(server) as s:
            if sleep:
                time.sleep(sleep)
            with Popen(client) as c:
                c.wait()
                actual = [l.strip() for l in c.stdout]
                inputs = ["Twas brillig, and the slithy toves",
                          "Did gire and gymble in the wabe.",
                          "All mimsy were the borogroves,",
                          "And the mome raths outgrabe."]
                expected = ["%s => %s" % (l, l.upper()) for l in inputs]
                self.assertEqual(actual, expected)
            s.terminate()

    def test_sync_client_server(self):
        self.test_client_server(client=['sync_client.py'])

    def test_client_server_tx(self):
        self.test_client_server(server=['server_tx.py'])

    def test_sync_client_server_tx(self):
        self.test_client_server(client=['sync_client.py'], server=['server_tx.py'])

    def test_client_server_direct(self):
        self.test_client_server(client=['client.py', '-a', 'localhost:8888/examples'], server=['server_direct.py'], sleep=0.5)

    def test_sync_client_server_direct(self):
        self.test_client_server(client=['sync_client.py', '-a', 'localhost:8888/examples'], server=['server_direct.py'], sleep=0.5)

    def test_db_send_recv(self):
        self.maxDiff = None
        # setup databases
        subprocess.check_call(['db_ctrl.py', 'init', './src_db'])
        subprocess.check_call(['db_ctrl.py', 'init', './dst_db'])
        with Popen(['db_ctrl.py', 'insert', './src_db'], stdin=subprocess.PIPE) as fill:
            for i in range(100):
                fill.stdin.write("Message-%i\n" % (i+1))
            fill.stdin.close()

        # run send and recv
        with Popen(['db_recv.py', '-m', '100']) as r:
            with Popen(['db_send.py', '-m', '100']):
                pass
            r.wait()
            # verify output of receive
            actual = [l.strip() for l in r.stdout]
            expected = ["inserted message %i" % (i+1) for i in range(100)]
            self.assertEqual(actual, expected)

        # verify state of databases
        with Popen(['db_ctrl.py', 'list', './dst_db']) as v:
            v.wait()
            expected = ["(%i, 'Message-%i')" % (i+1, i+1) for i in range(100)]
            actual = [remove_unicode_prefix(l.strip()) for l in v.stdout]
            self.assertEqual(actual, expected)

    def test_tx_send_tx_recv(self):
        self.test_simple_send_recv(recv='tx_recv.py', send='tx_send.py')

    def test_simple_send_direct_recv(self):
        self.maxDiff = None
        with Popen(['direct_recv.py', '-a', 'localhost:8888']) as r:
            time.sleep(0.5)
            with Popen(['simple_send.py', '-a', 'localhost:8888']):
                pass
            r.wait()
            actual = [remove_unicode_prefix(l.strip()) for l in r.stdout]
            expected_py2 = ["{'sequence': int32(%i)}" % (i+1,) for i in range(100)]
            expected_py3 = ["{'sequence': %i}" % (i+1,) for i in range(100)]
            self.assertIn(actual,[expected_py2, expected_py3])

    def test_direct_send_simple_recv(self):
        with Popen(['direct_send.py', '-a', 'localhost:8888']):
            time.sleep(0.5)
            with Popen(['simple_recv.py', '-a', 'localhost:8888']) as r:
                r.wait()
                actual = [remove_unicode_prefix(l.strip()) for l in r.stdout]
                expected_py2 = ["{'sequence': int32(%i)}" % (i + 1,) for i in range(100)]
                expected_py3 = ["{'sequence': %i}" % (i + 1,) for i in range(100)]
                self.assertIn(actual, [expected_py2, expected_py3])

    def test_selected_recv(self):
        with Popen(['colour_send.py']):
            pass

        with Popen(['selected_recv.py', '-m', '50']) as r:
            r.wait()
            actual = [l.strip() for l in r.stdout]
            expected = ["green %i" % (i+1) for i in range(100) if i % 2 == 0]
            self.assertEqual(actual, expected)

        with Popen(['simple_recv.py', '-m', '50']) as r:
            r.wait()
            actual = [l.strip() for l in r.stdout]
            expected = ["red %i" % (i+1) for i in range(100) if i % 2 == 1]
            self.assertEqual(actual, expected)
