blob: 78157ca02b3ddf738c5c8f17324330f4dea38aaa [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import os, time, sys
from threading import Thread, Event
from unittest import TestCase
from proton_tests.common import Test, free_tcp_port
from copy import copy
from proton import Message, Url, generate_uuid, Array, UNDESCRIBED, Data, symbol, ConnectionException
from proton.handlers import MessagingHandler
from proton.reactor import Container
from proton.utils import SyncRequestResponse, BlockingConnection
from .common import Skipped
CONNECTION_PROPERTIES={u'connection': u'properties'}
OFFERED_CAPABILITIES = Array(UNDESCRIBED, Data.SYMBOL, symbol("O_one"), symbol("O_two"), symbol("O_three"))
DESIRED_CAPABILITIES = Array(UNDESCRIBED, Data.SYMBOL, symbol("D_one"), symbol("D_two"), symbol("D_three"))
ANONYMOUS='ANONYMOUS'
EXTERNAL='EXTERNAL'
class EchoServer(MessagingHandler, Thread):
"""
Simple echo server that echos messages to their reply-to. Runs in a thread.
Will only accept a single connection and shut down when that connection closes.
"""
def __init__(self, url, timeout):
MessagingHandler.__init__(self)
Thread.__init__(self)
self.daemon = True
self.timeout = timeout
self.url = url
self.senders = {}
self.container = None
self.event = Event()
def on_start(self, event):
self.acceptor = event.container.listen(self.url)
self.container = event.container
self.event.set()
def on_link_opening(self, event):
if event.link.is_sender:
if event.link.remote_source and event.link.remote_source.dynamic:
event.link.source.address = str(generate_uuid())
self.senders[event.link.source.address] = event.link
def on_message(self, event):
m = event.message
sender = self.senders.get(m.reply_to)
if sender:
reply = Message(address=m.reply_to, body=m.body, correlation_id=m.correlation_id)
sender.send(reply)
def on_connection_closing(self, event):
self.acceptor.close()
def on_transport_error(self, event):
self.acceptor.close()
def run(self):
Container(self).run()
def wait(self):
self.event.wait(self.timeout)
class ConnPropertiesServer(EchoServer):
def __init__(self, url, timeout):
EchoServer.__init__(self, url, timeout)
self.properties_received = False
self.offered_capabilities_received = False
self.desired_capabilities_received = False
def on_connection_opening(self, event):
conn = event.connection
if conn.remote_properties == CONNECTION_PROPERTIES:
self.properties_received = True
if conn.remote_offered_capabilities == OFFERED_CAPABILITIES:
self.offered_capabilities_received = True
if conn.remote_desired_capabilities == DESIRED_CAPABILITIES:
self.desired_capabilities_received = True