#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

from __future__ import absolute_import

import errno
import logging
import socket
import time
import weakref

from ._condition import Condition
from ._delivery import Delivery
from ._endpoints import Endpoint
from ._events import Event, Handler, _dispatch
from ._exceptions import ProtonException
from ._io import IO
from ._message import Message
from ._selectable import Selectable
from ._transport import Transport
from ._url import Url

log = logging.getLogger("proton")


class OutgoingMessageHandler(Handler):
    """
    A utility for simpler and more intuitive handling of delivery
    events related to outgoing i.e. sent messages.

    :param auto_settle: If ``True``, settle all messages (default). Otherwise
        messages must be explicitly settled.
    :type auto_settle: ``bool``
    :param delegate: A client handler for the endpoint event
    """

    def __init__(self, auto_settle=True, delegate=None):
        self.auto_settle = auto_settle
        self.delegate = delegate

    def on_link_flow(self, event):
        if event.link.is_sender and event.link.credit \
                and event.link.state & Endpoint.LOCAL_ACTIVE \
                and event.link.state & Endpoint.REMOTE_ACTIVE:
            self.on_sendable(event)

    def on_delivery(self, event):
        dlv = event.delivery
        if dlv.link.is_sender and dlv.updated:
            if dlv.remote_state == Delivery.ACCEPTED:
                self.on_accepted(event)
            elif dlv.remote_state == Delivery.REJECTED:
                self.on_rejected(event)
            elif dlv.remote_state == Delivery.RELEASED or dlv.remote_state == Delivery.MODIFIED:
                self.on_released(event)
            if dlv.settled:
                self.on_settled(event)
                if self.auto_settle:
                    dlv.settle()

    def on_sendable(self, event):
        """
        Called when the sender link has credit and messages can
        therefore be transferred.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        if self.delegate is not None:
            _dispatch(self.delegate, 'on_sendable', event)

    def on_accepted(self, event):
        """
        Called when the remote peer accepts an outgoing message.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        if self.delegate is not None:
            _dispatch(self.delegate, 'on_accepted', event)

    def on_rejected(self, event):
        """
        Called when the remote peer rejects an outgoing message.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        if self.delegate is not None:
            _dispatch(self.delegate, 'on_rejected', event)

    def on_released(self, event):
        """
        Called when the remote peer releases an outgoing message. Note
        that this may be in response to either the ``RELEASE`` or ``MODIFIED``
        state as defined by the AMQP specification.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        if self.delegate is not None:
            _dispatch(self.delegate, 'on_released', event)

    def on_settled(self, event):
        """
        Called when the remote peer has settled the outgoing
        message. This is the point at which it should never be
        retransmitted.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        if self.delegate is not None:
            _dispatch(self.delegate, 'on_settled', event)


def recv_msg(delivery):
    msg = Message()
    msg.decode(delivery.link.recv(delivery.pending))
    delivery.link.advance()
    return msg


class Reject(ProtonException):
    """
    An exception that indicates a message should be rejected.
    """
    pass


class Release(ProtonException):
    """
    An exception that indicates a message should be released.
    """
    pass


class Acking(object):
    """
    A class containing methods for handling received messages.
    """
    def accept(self, delivery):
        """
        Accepts a received message.

        .. note:: This method cannot currently be used in combination
            with transactions. See :class:`proton.reactor.Transaction`
            for transactional methods.

        :param delivery: The message delivery tracking object
        :type delivery: :class:`proton.Delivery`
        """
        self.settle(delivery, Delivery.ACCEPTED)

    def reject(self, delivery):
        """
        Rejects a received message that is considered invalid or
        unprocessable.

        .. note:: This method cannot currently be used in combination
            with transactions. See :class:`proton.reactor.Transaction`
            for transactional methods.

        :param delivery: The message delivery tracking object
        :type delivery: :class:`proton.Delivery`
        """
        self.settle(delivery, Delivery.REJECTED)

    def release(self, delivery, delivered=True):
        """
        Releases a received message, making it available at the source
        for any (other) interested receiver. The ``delivered``
        parameter indicates whether this should be considered a
        delivery attempt (and the delivery count updated) or not.

        .. note:: This method cannot currently be used in combination
            with transactions. See :class:`proton.reactor.Transaction`
            for transactional methods.

        :param delivery: The message delivery tracking object
        :type delivery: :class:`proton.Delivery`
        :param delivered: If ``True``, the message will be annotated
            with a delivery attempt (setting delivery flag
            :const:`proton.Delivery.MODIFIED`). Otherwise, the message
            will be returned without the annotation and released (setting
            delivery flag :const:`proton.Delivery.RELEASED`
        :type delivered: ``bool``
        """
        if delivered:
            self.settle(delivery, Delivery.MODIFIED)
        else:
            self.settle(delivery, Delivery.RELEASED)

    def settle(self, delivery, state=None):
        """
        Settles the message delivery, and optionally updating the
        delivery state.

        :param delivery: The message delivery tracking object
        :type delivery: :class:`proton.Delivery`
        :param state: The delivery state, or ``None`` if not update
            is to be performed.
        :type  state: ``int`` or ``None``
        """
        if state:
            delivery.update(state)
        delivery.settle()


class IncomingMessageHandler(Handler, Acking):
    """
    A utility for simpler and more intuitive handling of delivery
    events related to incoming i.e. received messages.

    :type auto_accept: ``bool``
    :param auto_settle: If ``True``, settle all messages (default). Otherwise
        messages must be explicitly settled.
    :param delegate: A client handler for the endpoint event
    """

    def __init__(self, auto_accept=True, delegate=None):
        self.delegate = delegate
        self.auto_accept = auto_accept

    def on_delivery(self, event):
        dlv = event.delivery
        if not dlv.link.is_receiver: return
        if dlv.aborted:
            self.on_aborted(event)
            dlv.settle()
        elif dlv.readable and not dlv.partial:
            event.message = recv_msg(dlv)
            if event.link.state & Endpoint.LOCAL_CLOSED:
                if self.auto_accept:
                    dlv.update(Delivery.RELEASED)
                    dlv.settle()
            else:
                try:
                    self.on_message(event)
                    if self.auto_accept:
                        dlv.update(Delivery.ACCEPTED)
                        dlv.settle()
                except Reject:
                    dlv.update(Delivery.REJECTED)
                    dlv.settle()
                except Release:
                    dlv.update(Delivery.MODIFIED)
                    dlv.settle()
        elif dlv.updated and dlv.settled:
            self.on_settled(event)

    def on_message(self, event):
        """
        Called when a message is received. The message itself can be
        obtained as a property on the event. For the purpose of
        referring to this message in further actions (e.g. if
        explicitly accepting it, the ``delivery`` should be used, also
        obtainable via a property on the event.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        if self.delegate is not None:
            _dispatch(self.delegate, 'on_message', event)

    def on_settled(self, event):
        """
        Callback for when a message delivery is settled by the remote peer.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        if self.delegate is not None:
            _dispatch(self.delegate, 'on_settled', event)

    def on_aborted(self, event):
        """
        Callback for when a message delivery is aborted by the remote peer.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        if self.delegate is not None:
            _dispatch(self.delegate, 'on_aborted', event)


class EndpointStateHandler(Handler):
    """
    A utility that exposes 'endpoint' events - ie the open/close for
    links, sessions and connections in a more intuitive manner. A
    ``XXX_opened()`` method will be called when both local and remote peers
    have opened the link, session or connection. This can be used to
    confirm a locally initiated action for example. A ``XXX_opening()``
    method will be called when the remote peer has requested an open
    that was not initiated locally. By default this will simply open
    locally, which then triggers the ``XXX_opened()`` call. The same applies
    to close.

    :param peer_close_is_error: If ``True``, a peer endpoint closing will be
        treated as an error with an error callback. Otherwise (default), the
        normal callbacks for the closing will occur. 
    :type peer_close_is_error:  ``bool`` 
    :param delegate: A client handler for the endpoint event
    """

    def __init__(self, peer_close_is_error=False, delegate=None):
        self.delegate = delegate
        self.peer_close_is_error = peer_close_is_error

    @classmethod
    def is_local_open(cls, endpoint):
        """
        Test if local ``enpoint`` is open (ie has state
        :const:`proton.Endpoint.LOCAL_ACTIVE`).

        :param endpoint: The local endpoint to be tested.
        :type endpoint: Any child of :class:`proton.Endpoint`
        :return: ``True`` if local endpoint is in state
            :const:`proton.Endpoint.LOCAL_ACTIVE`, ``False`` otherwise.
        :rtype: ``bool``
        """
        return endpoint.state & Endpoint.LOCAL_ACTIVE

    @classmethod
    def is_local_uninitialised(cls, endpoint):
        """
        Test if local ``enpoint`` is uninitialised (ie has state
        :const:`proton.Endpoint.LOCAL_UNINIT`).

        :param endpoint: The local endpoint to be tested.
        :type endpoint: Any child of :class:`proton.Endpoint`
        :return: ``True`` if local endpoint is in state
            :const:`proton.Endpoint.LOCAL_UNINIT`, ``False`` otherwise.
        :rtype: ``bool``
        """
        return endpoint.state & Endpoint.LOCAL_UNINIT

    @classmethod
    def is_local_closed(cls, endpoint):
        """
        Test if local ``enpoint`` is closed (ie has state
        :const:`proton.Endpoint.LOCAL_CLOSED`).

        :param endpoint: The local endpoint to be tested.
        :type endpoint: Any child of :class:`proton.Endpoint`
        :return: ``True`` if local endpoint is in state
            :const:`proton.Endpoint.LOCAL_CLOSED`, ``False`` otherwise.
        :rtype: ``bool``
        """
        return endpoint.state & Endpoint.LOCAL_CLOSED

    @classmethod
    def is_remote_open(cls, endpoint):
        """
        Test if remote ``enpoint`` is open (ie has state
        :const:`proton.Endpoint.LOCAL_ACTIVE`).

        :param endpoint: The remote endpoint to be tested.
        :type endpoint: Any child of :class:`proton.Endpoint`
        :return: ``True`` if remote endpoint is in state
            :const:`proton.Endpoint.LOCAL_ACTIVE`, ``False`` otherwise.
        :rtype: ``bool``
        """
        return endpoint.state & Endpoint.REMOTE_ACTIVE

    @classmethod
    def is_remote_closed(cls, endpoint):
        """
        Test if remote ``enpoint`` is closed (ie has state
        :const:`proton.Endpoint.REMOTE_CLOSED`).

        :param endpoint: The remote endpoint to be tested.
        :type endpoint: Any child of :class:`proton.Endpoint`
        :return: ``True`` if remote endpoint is in state
            :const:`proton.Endpoint.REMOTE_CLOSED`, ``False`` otherwise.
        :rtype: ``bool``
        """
        return endpoint.state & Endpoint.REMOTE_CLOSED

    @classmethod
    def print_error(cls, endpoint, endpoint_type):
        """
        Logs an error message related to an error condition at an endpoint.

        :param endpoint: The endpoint to be tested
        :type endpoint: :class:`proton.Endpoint`
        :param endpoint_type: The endpoint type as a string to be printed
            in the log message.
        :type endpoint_type: ``str``
        """
        if endpoint.remote_condition:
            log.error(endpoint.remote_condition.description or endpoint.remote_condition.name)
        elif cls.is_local_open(endpoint) and cls.is_remote_closed(endpoint):
            log.error("%s closed by peer" % endpoint_type)

    def on_link_remote_close(self, event):
        if event.link.remote_condition:
            self.on_link_error(event)
        elif self.is_local_closed(event.link):
            self.on_link_closed(event)
        else:
            self.on_link_closing(event)
        event.link.close()

    def on_session_remote_close(self, event):
        if event.session.remote_condition:
            self.on_session_error(event)
        elif self.is_local_closed(event.session):
            self.on_session_closed(event)
        else:
            self.on_session_closing(event)
        event.session.close()

    def on_connection_remote_close(self, event):
        if event.connection.remote_condition:
            if event.connection.remote_condition.name == "amqp:connection:forced":
                # Treat this the same as just having the transport closed by the peer without
                # sending any events. Allow reconnection to happen transparently.
                return
            self.on_connection_error(event)
        elif self.is_local_closed(event.connection):
            self.on_connection_closed(event)
        else:
            self.on_connection_closing(event)
        event.connection.close()

    def on_connection_local_open(self, event):
        if self.is_remote_open(event.connection):
            self.on_connection_opened(event)

    def on_connection_remote_open(self, event):
        if self.is_local_open(event.connection):
            self.on_connection_opened(event)
        elif self.is_local_uninitialised(event.connection):
            self.on_connection_opening(event)
            event.connection.open()

    def on_session_local_open(self, event):
        if self.is_remote_open(event.session):
            self.on_session_opened(event)

    def on_session_remote_open(self, event):
        if self.is_local_open(event.session):
            self.on_session_opened(event)
        elif self.is_local_uninitialised(event.session):
            self.on_session_opening(event)
            event.session.open()

    def on_link_local_open(self, event):
        if self.is_remote_open(event.link):
            self.on_link_opened(event)

    def on_link_remote_open(self, event):
        if self.is_local_open(event.link):
            self.on_link_opened(event)
        elif self.is_local_uninitialised(event.link):
            self.on_link_opening(event)
            event.link.open()

    def on_connection_opened(self, event):
        """
        Callback for when both the local and remote endpoints of a
        connection have opened.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        if self.delegate is not None:
            _dispatch(self.delegate, 'on_connection_opened', event)

    def on_session_opened(self, event):
        """
        Callback for when both the local and remote endpoints of a
        session have opened.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        if self.delegate is not None:
            _dispatch(self.delegate, 'on_session_opened', event)

    def on_link_opened(self, event):
        """
        Callback for when both the local and remote endpoints of a
        link have opened.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        if self.delegate is not None:
            _dispatch(self.delegate, 'on_link_opened', event)

    def on_connection_opening(self, event):
        """
        Callback for when a remote peer initiates the opening of
        a connection.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        if self.delegate is not None:
            _dispatch(self.delegate, 'on_connection_opening', event)

    def on_session_opening(self, event):
        """
        Callback for when a remote peer initiates the opening of
        a session.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        if self.delegate is not None:
            _dispatch(self.delegate, 'on_session_opening', event)

    def on_link_opening(self, event):
        """
        Callback for when a remote peer initiates the opening of
        a link.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        if self.delegate is not None:
            _dispatch(self.delegate, 'on_link_opening', event)

    def on_connection_error(self, event):
        """
        Callback for when an initiated connection open fails.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        if self.delegate is not None:
            _dispatch(self.delegate, 'on_connection_error', event)
        else:
            self.print_error(event.connection, "connection")

    def on_session_error(self, event):
        """
        Callback for when an initiated session open fails.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        if self.delegate is not None:
            _dispatch(self.delegate, 'on_session_error', event)
        else:
            self.print_error(event.session, "session")
            event.connection.close()

    def on_link_error(self, event):
        """
        Callback for when an initiated link open fails.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        if self.delegate is not None:
            _dispatch(self.delegate, 'on_link_error', event)
        else:
            self.print_error(event.link, "link")
            event.connection.close()

    def on_connection_closed(self, event):
        """
        Callback for when both the local and remote endpoints of a
        connection have closed.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        if self.delegate is not None:
            _dispatch(self.delegate, 'on_connection_closed', event)

    def on_session_closed(self, event):
        """
        Callback for when both the local and remote endpoints of a
        session have closed.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        if self.delegate is not None:
            _dispatch(self.delegate, 'on_session_closed', event)

    def on_link_closed(self, event):
        """
        Callback for when both the local and remote endpoints of a
        link have closed.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        if self.delegate is not None:
            _dispatch(self.delegate, 'on_link_closed', event)

    def on_connection_closing(self, event):
        """
        Callback for when a remote peer initiates the closing of
        a connection.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        if self.delegate is not None:
            _dispatch(self.delegate, 'on_connection_closing', event)
        elif self.peer_close_is_error:
            self.on_connection_error(event)

    def on_session_closing(self, event):
        """
        Callback for when a remote peer initiates the closing of
        a session.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        if self.delegate is not None:
            _dispatch(self.delegate, 'on_session_closing', event)
        elif self.peer_close_is_error:
            self.on_session_error(event)

    def on_link_closing(self, event):
        """
        Callback for when a remote peer initiates the closing of
        a link.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        if self.delegate is not None:
            _dispatch(self.delegate, 'on_link_closing', event)
        elif self.peer_close_is_error:
            self.on_link_error(event)

    def on_transport_tail_closed(self, event):
        """
        Callback for when the transport tail has closed (ie no further input will
        be accepted by the transport).

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        self.on_transport_closed(event)

    def on_transport_closed(self, event):
        """
        Callback for when the transport has closed - ie both the head (input) and
        tail (output) of the transport pipeline are closed.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        if self.delegate is not None and event.connection and self.is_local_open(event.connection):
            _dispatch(self.delegate, 'on_disconnected', event)


class MessagingHandler(Handler, Acking):
    """
    A general purpose handler that makes the proton-c events somewhat
    simpler to deal with and/or avoids repetitive tasks for common use
    cases.

    :param prefetch: Initial flow credit for receiving messages, defaults to 10.
    :type prefetch: ``int``
    :param auto_accept: If ``True``, accept all messages (default). Otherwise messages
        must be individually accepted or rejected.
    :type auto_accept: ``bool``
    :param auto_settle: If ``True``, settle all messages (default). Otherwise
        messages must be explicitly settled.
    :type auto_settle: ``bool``
    :param peer_close_is_error: If ``True``, a peer endpoint closing will be
        treated as an error with an error callback. Otherwise (default), the
        normal callbacks for the closing will occur. 
    :type peer_close_is_error:  ``bool``  
    """

    def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False):
        self.handlers = []
        if prefetch:
            self.handlers.append(FlowController(prefetch))
        self.handlers.append(EndpointStateHandler(peer_close_is_error, weakref.proxy(self)))
        self.handlers.append(IncomingMessageHandler(auto_accept, weakref.proxy(self)))
        self.handlers.append(OutgoingMessageHandler(auto_settle, weakref.proxy(self)))
        self.fatal_conditions = ["amqp:unauthorized-access"]

    def on_transport_error(self, event):
        """
        Called when some error is encountered with the transport over
        which the AMQP connection is to be established. This includes
        authentication errors as well as socket errors.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        if event.transport.condition:
            if event.transport.condition.info:
                log.error("%s: %s: %s" % (
                    event.transport.condition.name, event.transport.condition.description,
                    event.transport.condition.info))
            else:
                log.error("%s: %s" % (event.transport.condition.name, event.transport.condition.description))
            if event.transport.condition.name in self.fatal_conditions:
                event.connection.close()
        else:
            logging.error("Unspecified transport error")

    def on_connection_error(self, event):
        """
        Called when the peer closes the connection with an error condition.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        EndpointStateHandler.print_error(event.connection, "connection")

    def on_session_error(self, event):
        """
        Called when the peer closes the session with an error condition.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        EndpointStateHandler.print_error(event.session, "session")
        event.connection.close()

    def on_link_error(self, event):
        """
        Called when the peer closes the link with an error condition.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        EndpointStateHandler.print_error(event.link, "link")
        event.connection.close()

    def on_reactor_init(self, event):
        """
        Called when the event loop - the reactor - starts.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        if hasattr(event.reactor, 'subclass'):
            setattr(event, event.reactor.subclass.__name__.lower(), event.reactor)
        self.on_start(event)

    def on_start(self, event):
        """
        Called when the event loop starts. (Just an alias for on_reactor_init)

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        pass

    def on_connection_closed(self, event):
        """
        Called when the connection is closed.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        pass

    def on_session_closed(self, event):
        """
        Called when the session is closed.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        pass

    def on_link_closed(self, event):
        """
        Called when the link is closed.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        pass

    def on_connection_closing(self, event):
        """
        Called when the peer initiates the closing of the connection.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        pass

    def on_session_closing(self, event):
        """
        Called when the peer initiates the closing of the session.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        pass

    def on_link_closing(self, event):
        """
        Called when the peer initiates the closing of the link.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        pass

    def on_disconnected(self, event):
        """
        Called when the socket is disconnected.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        pass

    def on_sendable(self, event):
        """
        Called when the sender link has credit and messages can
        therefore be transferred.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        pass

    def on_accepted(self, event):
        """
        Called when the remote peer accepts an outgoing message.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        pass

    def on_rejected(self, event):
        """
        Called when the remote peer rejects an outgoing message.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        pass

    def on_released(self, event):
        """
        Called when the remote peer releases an outgoing message. Note
        that this may be in response to either the RELEASE or MODIFIED
        state as defined by the AMQP specification.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        pass

    def on_settled(self, event):
        """
        Called when the remote peer has settled the outgoing
        message. This is the point at which it should never be
        retransmitted.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        pass

    def on_message(self, event):
        """
        Called when a message is received. The message itself can be
        obtained as a property on the event. For the purpose of
        referring to this message in further actions (e.g. if
        explicitly accepting it, the ``delivery`` should be used, also
        obtainable via a property on the event.

        :param event: The underlying event object. Use this to obtain further
            information on the event. In particular, the message itself may
            be obtained by accessing ``event.message``.
        :type event: :class:`proton.Event`
        """
        pass


class TransactionHandler(object):
    """
    The interface for transaction handlers - ie objects that want to
    be notified of state changes related to a transaction.
    """

    def on_transaction_declared(self, event):
        """
        Called when a local transaction is declared.

        :param event: The underlying event object. Use this to obtain further
            information on the event. In particular, the :class:`proton.reactor.Transaction`
            object may be obtained by accessing ``event.transaction``.
        :type event: :class:`proton.Event`
        """
        pass

    def on_transaction_committed(self, event):
        """
        Called when a local transaction is discharged successfully
        (committed).

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        pass

    def on_transaction_aborted(self, event):
        """
        Called when a local transaction is discharged unsuccessfully
        (aborted).

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        pass

    def on_transaction_declare_failed(self, event):
        """
        Called when a local transaction declare fails.

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        pass

    def on_transaction_commit_failed(self, event):
        """
        Called when the commit of a local transaction fails. 

        :param event: The underlying event object. Use this to obtain further
            information on the event.
        :type event: :class:`proton.Event`
        """
        pass


class TransactionalClientHandler(MessagingHandler, TransactionHandler):
    """
    An extension to the MessagingHandler for applications using
    transactions. This handler provides all of the callbacks found
    in :class:`MessagingHandler` and :class:`TransactionHandler`,
    and provides a convenience method :meth:`accept` for performing
    a transactional acceptance of received messages.

    :param prefetch: Initial flow credit for receiving messages, defaults to 10.
    :type prefetch: ``int``
    :param auto_accept: If ``True``, accept all messages (default). Otherwise messages
        must be individually accepted or rejected.
    :type auto_accept: ``bool``
    :param auto_settle: If ``True``, settle all messages (default). Otherwise
        messages must be explicitly settled.
    :type auto_settle: ``bool``
    :param peer_close_is_error: If ``True``, a peer endpoint closing will be
        treated as an error with an error callback. Otherwise (default), the
        normal callbacks for the closing will occur. 
    :type peer_close_is_error:  ``bool`` 
    """

    def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False):
        super(TransactionalClientHandler, self).__init__(prefetch, auto_accept, auto_settle, peer_close_is_error)

    def accept(self, delivery, transaction=None):
        """
        A convenience method for accepting a received message as part of a
        transaction. If no transaction object is supplied, a regular
        non-transactional acceptance will be performed.

        :param delivery: Delivery tracking object for received message.
        :type delivery: :class:`proton.Delivery`
        :param transaction: Transaction tracking object which is required if
            the message is being accepted under the transaction. If ``None`` (default),
            then a normal non-transactional accept occurs.
        :type transaction: :class:`proton.reactor.Transaction`
        """
        if transaction:
            transaction.accept(delivery)
        else:
            super(TransactionalClientHandler, self).accept(delivery)


class FlowController(Handler):
    def __init__(self, window=1024):
        self._window = window
        self._drained = 0

    def on_link_local_open(self, event):
        self._flow(event.link)

    def on_link_remote_open(self, event):
        self._flow(event.link)

    def on_link_flow(self, event):
        self._flow(event.link)

    def on_delivery(self, event):
        self._flow(event.link)

    def _flow(self, link):
        if link.is_receiver:
            self._drained += link.drained()
            if self._drained == 0:
                delta = self._window - link.credit
                link.flow(delta)


class Handshaker(Handler):

    @staticmethod
    def on_connection_remote_open(event):
        conn = event.connection
        if conn.state & Endpoint.LOCAL_UNINIT:
            conn.open()

    @staticmethod
    def on_session_remote_open(event):
        ssn = event.session
        if ssn.state & Endpoint.LOCAL_UNINIT:
            ssn.open()

    @staticmethod
    def on_link_remote_open(event):
        link = event.link
        if link.state & Endpoint.LOCAL_UNINIT:
            link.source.copy(link.remote_source)
            link.target.copy(link.remote_target)
            link.open()

    @staticmethod
    def on_connection_remote_close(event):
        conn = event.connection
        if not conn.state & Endpoint.LOCAL_CLOSED:
            conn.close()

    @staticmethod
    def on_session_remote_close(event):
        ssn = event.session
        if not ssn.state & Endpoint.LOCAL_CLOSED:
            ssn.close()

    @staticmethod
    def on_link_remote_close(event):
        link = event.link
        if not link.state & Endpoint.LOCAL_CLOSED:
            link.close()


# Back compatibility definitions
CFlowController = FlowController
CHandshaker = Handshaker


class PythonIO:

    def __init__(self):
        self.selectables = []
        self.delegate = IOHandler()

    def on_unhandled(self, method, event):
        event.dispatch(self.delegate)

    def on_selectable_init(self, event):
        self.selectables.append(event.context)

    def on_selectable_updated(self, event):
        pass

    def on_selectable_final(self, event):
        sel = event.context
        if sel.is_terminal:
            self.selectables.remove(sel)
            sel.close()

    def on_reactor_quiesced(self, event):
        reactor = event.reactor
        # check if we are still quiesced, other handlers of
        # on_reactor_quiesced could have produced events to process
        if not reactor.quiesced: return

        reading = []
        writing = []
        deadline = None
        for sel in self.selectables:
            if sel.reading:
                reading.append(sel)
            if sel.writing:
                writing.append(sel)
            if sel.deadline:
                if deadline is None:
                    deadline = sel.deadline
                else:
                    deadline = min(sel.deadline, deadline)

        if deadline is not None:
            timeout = deadline - time.time()
        else:
            timeout = reactor.timeout
        if timeout < 0: timeout = 0
        timeout = min(timeout, reactor.timeout)
        readable, writable, _ = IO.select(reading, writing, [], timeout)

        now = reactor.mark()

        for s in readable:
            s.readable()
        for s in writable:
            s.writable()
        for s in self.selectables:
            if s.deadline and now > s.deadline:
                s.expired()

        reactor.yield_()


# For C style IO handler need to implement Selector
class IOHandler(Handler):

    def __init__(self):
        self._selector = IO.Selector()

    def on_selectable_init(self, event):
        s = event.selectable
        self._selector.add(s)
        s._reactor._selectables += 1

    def on_selectable_updated(self, event):
        s = event.selectable
        self._selector.update(s)

    def on_selectable_final(self, event):
        s = event.selectable
        self._selector.remove(s)
        s._reactor._selectables -= 1
        s.close()

    def on_reactor_quiesced(self, event):
        r = event.reactor

        if not r.quiesced:
            return

        d = r.timer_deadline
        readable, writable, expired = self._selector.select(r.timeout)

        now = r.mark()

        for s in readable:
            s.readable()
        for s in writable:
            s.writable()
        for s in expired:
            s.expired()

        r.yield_()

    def on_selectable_readable(self, event):
        s = event.selectable
        t = s._transport

        # If we're an acceptor we can't have a transport
        # and we don't want to do anything here in any case
        if not t:
            return

        capacity = t.capacity()
        if capacity > 0:
            try:
                b = s.recv(capacity)
                if len(b) > 0:
                    n = t.push(b)
                else:
                    # EOF handling
                    self.on_selectable_error(event)
            except socket.error as e:
                # TODO: What's the error handling to be here?
                log.error("Couldn't recv: %r" % e)
                t.close_tail()

        # Always update as we may have gone to not reading or from
        # not writing to writing when processing the incoming bytes
        r = s._reactor
        self.update(t, s, r.now)

    def on_selectable_writable(self, event):
        s = event.selectable
        t = s._transport

        # If we're an acceptor we can't have a transport
        # and we don't want to do anything here in any case
        if not t:
            return

        pending = t.pending()
        if pending > 0:

            try:
                n = s.send(t.peek(pending))
                t.pop(n)
            except socket.error as e:
                log.error("Couldn't send: %r" % e)
                # TODO: Error? or actually an exception
                t.close_head()

        newpending = t.pending()
        if newpending != pending:
            r = s._reactor
            self.update(t, s, r.now)

    def on_selectable_error(self, event):
        s = event.selectable
        t = s._transport

        t.close_head()
        t.close_tail()
        s.terminate()
        s._transport = None
        t._selectable = None
        s.update()

    def on_selectable_expired(self, event):
        s = event.selectable
        t = s._transport
        r = s._reactor

        self.update(t, s, r.now)

    def on_connection_local_open(self, event):
        c = event.connection
        if not c.state & Endpoint.REMOTE_UNINIT:
            return

        t = Transport()
        # It seems perverse, but the C code ignores bind errors too!
        # and this is required or you get errors because Connector() has already
        # bound the transport and connection!
        t.bind_nothrow(c)

    def on_connection_bound(self, event):
        c = event.connection
        t = event.transport

        reactor = c._reactor

        # link the new transport to its reactor:
        t._reactor = reactor

        if c._acceptor:
            # this connection was created by the acceptor.  There is already a
            # socket assigned to this connection.  Nothing needs to be done.
            return

        url = c.url or Url(c.hostname)
        url.defaults()

        host = url.host
        port = int(url.port)

        if not c.user:
            user = url.username
            if user:
                c.user = user
            password = url.password
            if password:
                c.password = password

        addrs = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM)

        # Try first possible address
        log.debug("Connect trying first transport address: %s", addrs[0])
        sock = IO.connect(addrs[0])

        # At this point we need to arrange to be called back when the socket is writable
        ConnectSelectable(sock, reactor, addrs[1:], t, self)

        # TODO: Don't understand why we need this now - how can we get PN_TRANSPORT until the connection succeeds?
        t._selectable = None

    @staticmethod
    def update(transport, selectable, now):
        try:
            capacity = transport.capacity()
            selectable.reading = capacity>0
        except:
            if transport.closed:
                selectable.terminate()
                selectable._transport = None
                transport._selectable = None
        try:
            pending = transport.pending()
            selectable.writing = pending>0
        except:
            if transport.closed:
                selectable.terminate()
                selectable._transport = None
                transport._selectable = None
        selectable.deadline = transport.tick(now)
        selectable.update()

    def on_transport(self, event):
        t = event.transport
        r = t._reactor
        s = t._selectable
        if s and not s.is_terminal:
            self.update(t, s, r.now)

    def on_transport_closed(self, event):
        t = event.transport
        r = t._reactor
        s = t._selectable
        if s and not s.is_terminal:
            s.terminate()
            s._transport = None
            t._selectable = None
            r.update(s)
        t.unbind()


class ConnectSelectable(Selectable):
    def __init__(self, sock, reactor, addrs, transport, iohandler):
        super(ConnectSelectable, self).__init__(sock, reactor)
        self.writing = True
        self._addrs = addrs
        self._transport = transport
        self._iohandler = iohandler
        transport._connect_selectable = self

    def readable(self):
        pass

    def writable(self):
        e = self._delegate.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
        t = self._transport
        t._connect_selectable = None

        # Always cleanup this ConnectSelectable: either we failed or created a new one
        # Do it first to ensure the socket gets deregistered before being registered again
        # in the case of connecting
        self.terminate()
        self._transport = None
        self.update()

        if e == 0:
            log.debug("Connection succeeded")

            # Disassociate from the socket (which will be passed on)
            self.release()

            s = self._reactor.selectable(delegate=self._delegate)
            s._transport = t
            t._selectable = s
            self._iohandler.update(t, s, t._reactor.now)

            return
        elif e == errno.ECONNREFUSED:
            if len(self._addrs) > 0:
                log.debug("Connection refused: trying next transport address: %s", self._addrs[0])

                sock = IO.connect(self._addrs[0])
                # New ConnectSelectable for the new socket with rest of addresses
                ConnectSelectable(sock, self._reactor, self._addrs[1:], t, self._iohandler)
                return
            else:
                log.debug("Connection refused, but tried all transport addresses")
                t.condition = Condition("proton.pythonio", "Connection refused to all addresses")
        else:
            log.error("Couldn't connect: %s", e)
            t.condition = Condition("proton.pythonio", "Connection error: %s" % e)

        t.close_tail()
        t.close_head()
