#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

from __future__ import absolute_import

from unittest import TestCase
from unittest import SkipTest

import sys, os, subprocess
from random import randint
from threading import Thread
from socket import socket, AF_INET, SOCK_STREAM
from subprocess import Popen,PIPE,STDOUT
from string import Template

from proton import SASL, SSL
from proton.reactor import Container
from proton.handlers import Handshaker, FlowController


def free_tcp_ports(count=1):
  """ return a list of 'count' TCP ports that are free to used (ie. unbound)
  """
  retry = 0
  ports = []
  sockets = []
  while len(ports) != count:
    port = randint(49152, 65535)
    sockets.append( socket( AF_INET, SOCK_STREAM ) )
    try:
      sockets[-1].bind( ("0.0.0.0", port ) )
      ports.append( port )
      retry = 0
    except:
      retry += 1
      assert retry != 100, "No free sockets available for test!"
  for s in sockets:
    s.close()
  return ports

def free_tcp_port():
  return free_tcp_ports(1)[0]

def pump_uni(src, dst, buffer_size=1024):
  p = src.pending()
  c = dst.capacity()

  if c < 0:
    if p < 0:
      return False
    else:
      src.close_head()
      return True

  if p < 0:
    dst.close_tail()
  elif p == 0 or c == 0:
    return False
  else:
    binary = src.peek(min(c, buffer_size))
    dst.push(binary)
    src.pop(len(binary))

  return True

def pump(transport1, transport2, buffer_size=1024):
  """ Transfer all pending bytes between two Proton engines
      by repeatedly calling peek/pop and push.
      Asserts that each engine accepts some bytes every time
      (unless it's already closed).
  """
  while (pump_uni(transport1, transport2, buffer_size) or
         pump_uni(transport2, transport1, buffer_size)):
    pass

def findfileinpath(filename, searchpath):
    """Find filename in the searchpath
        return absolute path to the file or None
    """
    paths = searchpath.split(os.pathsep)
    for path in paths:
        if os.path.exists(os.path.join(path, filename)):
            return os.path.abspath(os.path.join(path, filename))
    return None

def isSSLPresent():
    return SSL.present()

createdSASLDb = False

def _cyrusSetup(conf_dir):
  """Write out simple SASL config.
  """
  saslpasswd = ""
  if 'SASLPASSWD' in os.environ:
    saslpasswd = os.environ['SASLPASSWD']
  else:
    saslpasswd = findfileinpath('saslpasswd2', os.getenv('PATH')) or ""
  if os.path.exists(saslpasswd):
    t = Template("""sasldb_path: ${db}
mech_list: EXTERNAL DIGEST-MD5 SCRAM-SHA-1 CRAM-MD5 PLAIN ANONYMOUS
""")
    abs_conf_dir = os.path.abspath(conf_dir)
    subprocess.call(args=['rm','-rf',abs_conf_dir])
    os.mkdir(abs_conf_dir)
    db = os.path.join(abs_conf_dir,'proton.sasldb')
    conf = os.path.join(abs_conf_dir,'proton-server.conf')
    f = open(conf, 'w')
    f.write(t.substitute(db=db))
    f.close()

    cmd_template = Template("echo password | ${saslpasswd} -c -p -f ${db} -u proton user")
    cmd = cmd_template.substitute(db=db, saslpasswd=saslpasswd)
    subprocess.call(args=cmd, shell=True)

    os.environ['PN_SASL_CONFIG_PATH'] = abs_conf_dir
    global createdSASLDb
    createdSASLDb = True

# Globally initialize Cyrus SASL configuration
if SASL.extended():
  _cyrusSetup('sasl-conf')

def ensureCanTestExtendedSASL():
  if not SASL.extended():
    raise Skipped('Extended SASL not supported')
  if not createdSASLDb:
    raise Skipped("Can't Test Extended SASL: Couldn't create auth db")

class DefaultConfig:
    defines = {}

class Test(TestCase):
  config = DefaultConfig()

  def __init__(self, name):
    super(Test, self).__init__(name)
    self.name = name

  def configure(self, config):
    self.config = config

  def default(self, name, value, **profiles):
    default = value
    profile = self.config.defines.get("profile")
    if profile:
      default = profiles.get(profile, default)
    return self.config.defines.get(name, default)

  @property
  def delay(self):
    return float(self.default("delay", "1", fast="0.1"))

  @property
  def timeout(self):
    return float(self.default("timeout", "60", fast="10"))

  @property
  def verbose(self):
    return int(self.default("verbose", 0))


class Skipped(SkipTest):
  skipped = True


class TestServer(object):
  """ Base class for creating test-specific message servers.
  """
  def __init__(self, **kwargs):
    self.args = kwargs
    self.reactor = Container(self)
    self.host = "127.0.0.1"
    self.port = 0
    if "host" in kwargs:
      self.host = kwargs["host"]
    if "port" in kwargs:
      self.port = kwargs["port"]
    self.handlers = [FlowController(10), Handshaker()]
    self.thread = Thread(name="server-thread", target=self.run)
    self.thread.daemon = True
    self.running = True
    self.conditions = []

  def start(self):
    self.reactor.start()
    retry = 0
    if self.port == 0:
      self.port = str(randint(49152, 65535))
      retry = 10
    while retry > 0:
      try:
        self.acceptor = self.reactor.acceptor(self.host, self.port)
        break
      except IOError:
        self.port = str(randint(49152, 65535))
        retry -= 1
    assert retry > 0, "No free port for server to listen on!"
    self.thread.start()

  def stop(self):
    self.running = False
    self.reactor.wakeup()
    self.thread.join()

  # Note: all following methods all run under the thread:

  def run(self):
    self.reactor.timeout = 3.14159265359
    while self.reactor.process():
      if not self.running:
        self.acceptor.close()
        self.reactor.stop()
        break

  def on_connection_bound(self, event):
    if "idle_timeout" in self.args:
      event.transport.idle_timeout = self.args["idle_timeout"]

  def on_connection_local_close(self, event):
    self.conditions.append(event.connection.condition)

  def on_delivery(self, event):
    event.delivery.settle()

#
# Classes that wrap the messenger applications msgr-send and msgr-recv.
# These applications reside in the c/tools directory
#

class MessengerApp(object):
    """ Interface to control a MessengerApp """
    def __init__(self):
        self._cmdline = None
        # options common to Receivers and Senders:
        self.ca_db = None
        self.certificate = None
        self.privatekey = None
        self.password = None
        self._output = None

    def start(self, verbose=False):
        """ Begin executing the test """
        cmd = self.cmdline()
        self._verbose = verbose
        if self._verbose:
            print("COMMAND='%s'" % str(cmd))
        #print("ENV='%s'" % str(os.environ.copy()))
        try:
            # Handle python launch by replacing script 'filename' with
            # 'python abspath-to-filename' in cmdline arg list.
            if cmd[0].endswith('.py'):
                foundfile = findfileinpath(cmd[0], os.getenv('PATH'))
                if foundfile is None:
                    msg = "Unable to locate file '%s' in PATH" % cmd[0]
                    raise Skipped("Skipping test - %s" % msg)

                del cmd[0:1]
                cmd.insert(0, foundfile)
                cmd.insert(0, sys.executable)
            self._process = Popen(cmd, stdout=PIPE,
                                  bufsize=4096, universal_newlines=True)
        except OSError:
            e = sys.exc_info()[1]
            print("ERROR: '%s'" % e)
            msg = "Unable to execute command '%s', is it in your PATH?" % cmd[0]

            # NOTE(flaper87): Skip the test if the command is not found.
            if e.errno == 2:
              raise Skipped("Skipping test - %s" % msg)
            assert False, msg

        self._ready()  # wait for it to initialize

    def stop(self):
        """ Signal the client to start clean shutdown """
        pass

    def wait(self):
        """ Wait for client to complete """
        self._output = self._process.communicate()
        if self._verbose:
            print("OUTPUT='%s'" % self.stdout())

    def status(self):
        """ Return status from client process """
        return self._process.returncode

    def stdout(self):
        #self._process.communicate()[0]
        if not self._output or not self._output[0]:
            return "*** NO STDOUT ***"
        return self._output[0]

    def stderr(self):
        if not self._output or not self._output[1]:
            return "*** NO STDERR ***"
        return self._output[1]

    def cmdline(self):
        if not self._cmdline:
            self._build_command()
        return self._cmdline

    def _build_command(self):
        assert False, "_build_command() needs override"

    def _ready(self):
        assert False, "_ready() needs override"

    def _do_common_options(self):
        """ Common option handling """
        if self.ca_db is not None:
            self._cmdline.append("-T")
            self._cmdline.append(str(self.ca_db))
        if self.certificate is not None:
            self._cmdline.append("-C")
            self._cmdline.append(str(self.certificate))
        if self.privatekey is not None:
            self._cmdline.append("-K")
            self._cmdline.append(str(self.privatekey))
        if self.password is not None:
            self._cmdline.append("-P")
            self._cmdline.append("pass:" + str(self.password))


class MessengerSender(MessengerApp):
    """ Interface to configure a sending MessengerApp """
    def __init__(self):
        MessengerApp.__init__(self)
        self._command = None
        # @todo make these properties
        self.targets = []
        self.send_count = None
        self.msg_size = None
        self.send_batch = None
        self.outgoing_window = None
        self.report_interval = None
        self.get_reply = False
        self.timeout = None
        self.incoming_window = None
        self.recv_count = None
        self.name = None

    # command string?
    def _build_command(self):
        self._cmdline = self._command
        self._do_common_options()
        assert self.targets, "Missing targets, required for sender!"
        self._cmdline.append("-a")
        self._cmdline.append(",".join(self.targets))
        if self.send_count is not None:
            self._cmdline.append("-c")
            self._cmdline.append(str(self.send_count))
        if self.msg_size is not None:
            self._cmdline.append("-b")
            self._cmdline.append(str(self.msg_size))
        if self.send_batch is not None:
            self._cmdline.append("-p")
            self._cmdline.append(str(self.send_batch))
        if self.outgoing_window is not None:
            self._cmdline.append("-w")
            self._cmdline.append(str(self.outgoing_window))
        if self.report_interval is not None:
            self._cmdline.append("-e")
            self._cmdline.append(str(self.report_interval))
        if self.get_reply:
            self._cmdline.append("-R")
        if self.timeout is not None:
            self._cmdline.append("-t")
            self._cmdline.append(str(self.timeout))
        if self.incoming_window is not None:
            self._cmdline.append("-W")
            self._cmdline.append(str(self.incoming_window))
        if self.recv_count is not None:
            self._cmdline.append("-B")
            self._cmdline.append(str(self.recv_count))
        if self.name is not None:
            self._cmdline.append("-N")
            self._cmdline.append(str(self.name))

    def _ready(self):
        pass


class MessengerReceiver(MessengerApp):
    """ Interface to configure a receiving MessengerApp """
    def __init__(self):
        MessengerApp.__init__(self)
        self._command = None
        # @todo make these properties
        self.subscriptions = []
        self.receive_count = None
        self.recv_count = None
        self.incoming_window = None
        self.timeout = None
        self.report_interval = None
        self.send_reply = False
        self.outgoing_window = None
        self.forwards = []
        self.name = None

    # command string?
    def _build_command(self):
        self._cmdline = os.environ.get("TEST_EXE_PREFIX", "").split()
        self._cmdline += self._command
        self._do_common_options()
        self._cmdline += ["-X", "READY"]
        assert self.subscriptions, "Missing subscriptions, required for receiver!"
        self._cmdline.append("-a")
        self._cmdline.append(",".join(self.subscriptions))
        if self.receive_count is not None:
            self._cmdline.append("-c")
            self._cmdline.append(str(self.receive_count))
        if self.recv_count is not None:
            self._cmdline.append("-b")
            self._cmdline.append(str(self.recv_count))
        if self.incoming_window is not None:
            self._cmdline.append("-w")
            self._cmdline.append(str(self.incoming_window))
        if self.timeout is not None:
            self._cmdline.append("-t")
            self._cmdline.append(str(self.timeout))
        if self.report_interval is not None:
            self._cmdline.append("-e")
            self._cmdline.append(str(self.report_interval))
        if self.send_reply:
            self._cmdline.append("-R")
        if self.outgoing_window is not None:
            self._cmdline.append("-W")
            self._cmdline.append(str(self.outgoing_window))
        if self.forwards:
            self._cmdline.append("-F")
            self._cmdline.append(",".join(self.forwards))
        if self.name is not None:
            self._cmdline.append("-N")
            self._cmdline.append(str(self.name))

    def _ready(self):
        """ wait for subscriptions to complete setup. """
        r = self._process.stdout.readline()
        assert r.strip() == "READY", "Unexpected input while waiting for receiver to initialize: %s" % r

class MessengerSenderC(MessengerSender):
    def __init__(self):
        MessengerSender.__init__(self)
        self._command = ["msgr-send"]

class MessengerReceiverC(MessengerReceiver):
    def __init__(self):
        MessengerReceiver.__init__(self)
        self._command = ["msgr-recv"]

class ReactorSenderC(MessengerSender):
    def __init__(self):
        MessengerSender.__init__(self)
        self._command = ["reactor-send"]

class ReactorReceiverC(MessengerReceiver):
    def __init__(self):
        MessengerReceiver.__init__(self)
        self._command = ["reactor-recv"]
