blob: 801a4170aad2a68c182fdc4eb560aea2872300f2 [file] [log] [blame]
#!/usr/bin/python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from __future__ import absolute_import
from .common import Test, free_tcp_port, Skipped
from proton import Message
from proton.handlers import CHandshaker, CFlowController
from proton.reactor import Reactor
import os
import subprocess
from threading import Thread
import time
class JavaThread(Thread):
def __init__(self, operation, port, count):
Thread.__init__(self)
self.operation = operation
self.port = str(port)
self.count = str(count)
self.result = 1
def run(self):
self.result = subprocess.call(['java',
'org.apache.qpid.proton.ProtonJInterop',
self.operation, self.port, self.count])
class ReceiveHandler:
def __init__(self, count):
self.count = count
self.handlers = [CHandshaker(), CFlowController()]
self.messages = []
def on_reactor_init(self, event):
port = free_tcp_port()
self.acceptor = event.reactor.acceptor("127.0.0.1", port)
self.java_thread = JavaThread("send", port, self.count)
self.java_thread.start()
def on_delivery(self, event):
rcv = event.receiver
msg = Message()
if rcv and msg.recv(rcv):
event.delivery.settle()
self.messages += [msg.body]
self.count -= 1
if (self.count == 0):
self.acceptor.close()
class SendHandler:
def __init__(self, host, num_msgs):
self.host = host
self.num_msgs = num_msgs
self.count = 0
self.handlers = [CHandshaker()]
def on_connection_init(self, event):
conn = event.connection
conn.hostname = self.host
ssn = conn.session()
snd = ssn.sender("sender")
conn.open()
ssn.open()
snd.open()
def on_link_flow(self, event):
snd = event.sender
if snd.credit > 0 and self.count < self.num_msgs:
self.count += 1
msg = Message("message-" + str(self.count))
dlv = snd.send(msg)
dlv.settle()
if (self.count == self.num_msgs):
snd.close()
snd.session.close()
snd.connection.close()
def on_reactor_init(self, event):
event.reactor.connection(self)
class ReactorInteropTest(Test):
def setUp(self):
classpath = ""
if ('CLASSPATH' in os.environ):
classpath = os.environ['CLASSPATH']
entries = classpath.split(os.pathsep)
self.proton_j_available = False
for entry in entries:
self.proton_j_available |= entry != "" and os.path.exists(entry)
def protonc_to_protonj(self, count):
if (not self.proton_j_available):
raise Skipped("ProtonJ not found")
port = free_tcp_port()
java_thread = JavaThread("recv", port, count)
java_thread.start()
# Give the Java thread time to spin up a JVM and start listening
# XXX: would be better to parse the stdout output for a message
time.sleep(1)
sh = SendHandler('127.0.0.1:' + str(port), count)
r = Reactor(sh)
r.run()
java_thread.join()
assert(java_thread.result == 0)
def protonj_to_protonc(self, count):
if (not self.proton_j_available):
raise Skipped("ProtonJ not found")
rh = ReceiveHandler(count)
r = Reactor(rh)
r.run()
rh.java_thread.join()
assert(rh.java_thread.result == 0)
for i in range(1, count):
assert(rh.messages[i-1] == ("message-" + str(i)))
def test_protonc_to_protonj_1(self):
self.protonc_to_protonj(1)
def test_protonc_to_protonj_5(self):
self.protonc_to_protonj(5)
def test_protonc_to_protonj_500(self):
self.protonc_to_protonj(500)
def test_protonc_to_protonj_5000(self):
self.protonc_to_protonj(5000)
def test_protonj_to_protonc_1(self):
self.protonj_to_protonc(1)
def test_protonj_to_protonc_5(self):
self.protonj_to_protonc(5)
def test_protonj_to_protonc_500(self):
self.protonj_to_protonc(500)
def test_protonj_to_protonc_5000(self):
self.protonj_to_protonc(5000)