| #!/usr/bin/env python |
| |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| |
| import time, string, traceback |
| from brokertest import * |
| from qpid.messaging import * |
| |
| |
| try: |
| import java.lang.System |
| _cp = java.lang.System.getProperty("java.class.path"); |
| except ImportError: |
| _cp = checkenv("QP_CP") |
| |
| class Formatter: |
| |
| def __init__(self, message): |
| self.message = message |
| self.environ = {"M": self.message, |
| "P": self.message.properties, |
| "C": self.message.content} |
| |
| def __getitem__(self, st): |
| return eval(st, self.environ) |
| |
| # The base test case has support for launching the generic |
| # receiver and sender through the TestLauncher with all the options. |
| # |
| class JavaClientTest(BrokerTest): |
| """Base Case for Java Test cases""" |
| |
| client_class = "org.apache.qpid.testkit.TestLauncher" |
| |
| # currently there is no transparent reconnection. |
| # temp hack: just creating the queue here and closing it. |
| def start_error_watcher(self,broker=None): |
| ssn = broker.connect().session() |
| err_watcher = ssn.receiver("control; {create:always}", capacity=1) |
| ssn.close() |
| |
| def store_module_args(self): |
| if BrokerTest.store_lib: |
| return ["--load-module", BrokerTest.store_lib] |
| else: |
| print "Store module not present." |
| return [""] |
| |
| def client(self,**options): |
| cmd = ["java","-cp",_cp] |
| |
| cmd += ["-Dtest_name=" + options.get("test_name", "UNKNOWN")] |
| cmd += ["-Dhost=" + options.get("host","127.0.0.1")] |
| cmd += ["-Dport=" + str(options.get("port",5672))] |
| cmd += ["-Dcon_count=" + str(options.get("con_count",1))] |
| cmd += ["-Dssn_per_con=" + str(options.get("ssn_per_con",1))] |
| cmd += ["-Duse_unique_dests=" + str(options.get("use_unique_dests",False))] |
| cmd += ["-Dcheck_for_dups=" + str(options.get("check_for_dups",False))] |
| cmd += ["-Ddurable=" + str(options.get("durable",False))] |
| cmd += ["-Dtransacted=" + str(options.get("transacted",False))] |
| cmd += ["-Dreceiver=" + str(options.get("receiver",False))] |
| cmd += ["-Dsync_rcv=" + str(options.get("sync_rcv",False))] |
| cmd += ["-Dsender=" + str(options.get("sender",False))] |
| cmd += ["-Dmsg_size=" + str(options.get("msg_size",256))] |
| cmd += ["-Dtx_size=" + str(options.get("tx_size",10))] |
| cmd += ["-Dmsg_count=" + str(options.get("msg_count",1000))] |
| cmd += ["-Dmax_prefetch=" + str(options.get("max_prefetch",500))] |
| cmd += ["-Dsync_ack=" + str(options.get("sync_ack",False))] |
| cmd += ["-Dsync_persistence=" + str(options.get("sync_pub",False))] |
| cmd += ["-Dsleep_time=" + str(options.get("sleep_time",1000))] |
| cmd += ["-Dfailover=" + options.get("failover", "failover_exchange")] |
| cmd += ["-Djms_durable_sub=" + str(options.get("jms_durable_sub", False))] |
| cmd += ["-Dlog.level=" + options.get("log.level", "warn")] |
| cmd += [self.client_class] |
| cmd += [options.get("address", "my_queue; {create: always}")] |
| |
| #print str(options.get("port",5672)) |
| return cmd |
| |
| # currently there is no transparent reconnection. |
| # temp hack: just creating a receiver and closing session soon after. |
| def monitor_clients(self,broker=None,run_time=600,error_ck_freq=60): |
| ssn = broker.connect().session() |
| err_watcher = ssn.receiver("control; {create:always}", capacity=1) |
| i = run_time/error_ck_freq |
| is_error = False |
| for j in range(i): |
| not_empty = True |
| while not_empty: |
| try: |
| m = err_watcher.fetch(timeout=error_ck_freq) |
| ssn.acknowledge() |
| print "Java process notified of an error" |
| self.print_error(m) |
| is_error = True |
| except messaging.Empty, e: |
| not_empty = False |
| |
| ssn.close() |
| return is_error |
| |
| def print_error(self,msg): |
| print msg.properties.get("exception-trace") |
| |
| def verify(self, receiver,sender): |
| sender_running = receiver.is_running() |
| receiver_running = sender.is_running() |
| |
| self.assertTrue(receiver_running,"Receiver has exited prematually") |
| self.assertTrue(sender_running,"Sender has exited prematually") |
| |
| def start_sender_and_receiver(self,**options): |
| |
| receiver_opts = options |
| receiver_opts["receiver"]=True |
| receiver = self.popen(self.client(**receiver_opts), |
| expect=EXPECT_RUNNING) |
| |
| sender_opts = options |
| sender_opts["sender"]=True |
| sender = self.popen(self.client(**sender_opts), |
| expect=EXPECT_RUNNING) |
| |
| return receiver, sender |
| |
| def start_cluster(self,count=2,expect=EXPECT_RUNNING,**options): |
| if options.get("durable",False)==True: |
| cluster = Cluster(self, count=count, expect=expect, args=self.store_module_args()) |
| else: |
| cluster = Cluster(self, count=count) |
| return cluster |
| |
| class ConcurrencyTest(JavaClientTest): |
| """A concurrency test suite for the JMS client""" |
| skip = False |
| |
| def base_case(self,**options): |
| if self.skip : |
| print "Skipping test" |
| return |
| |
| cluster = self.start_cluster(count=2,**options) |
| self.start_error_watcher(broker=cluster[0]) |
| options["port"] = port=cluster[0].port() |
| |
| options["use_unique_dests"]=True |
| options["address"]="amq.topic" |
| receiver, sender = self.start_sender_and_receiver(**options) |
| self.monitor_clients(broker=cluster[0],run_time=180) |
| self.verify(receiver,sender) |
| |
| def test_multiplexing_con(self): |
| """Tests multiple sessions on a single connection""" |
| |
| self.base_case(ssn_per_con=25,test_name=self.id()) |
| |
| def test_multiplexing_con_with_tx(self): |
| """Tests multiple transacted sessions on a single connection""" |
| |
| self.base_case(ssn_per_con=25,transacted=True,test_name=self.id()) |
| |
| def test_multiplexing_con_with_sync_rcv(self): |
| """Tests multiple sessions with sync receive""" |
| |
| self.base_case(ssn_per_con=25,sync_rcv=True,test_name=self.id()) |
| |
| def test_multiplexing_con_with_durable_sub(self): |
| """Tests multiple sessions with durable subs""" |
| |
| self.base_case(ssn_per_con=25,durable=True,jms_durable_sub=True,test_name=self.id()) |
| |
| def test_multiplexing_con_with_sync_ack(self): |
| """Tests multiple sessions with sync ack""" |
| |
| self.base_case(ssn_per_con=25,sync_ack=True,test_name=self.id()) |
| |
| def test_multiplexing_con_with_sync_pub(self): |
| """Tests multiple sessions with sync pub""" |
| |
| self.base_case(ssn_per_con=25,sync_pub=True,durable=True,test_name=self.id()) |
| |
| def test_multiple_cons_and_ssns(self): |
| """Tests multiple connections and sessions""" |
| |
| self.base_case(con_count=10,ssn_per_con=25,test_name=self.id()) |
| |
| |
| class SoakTest(JavaClientTest): |
| """A soak test suite for the JMS client""" |
| |
| def base_case(self,**options): |
| cluster = self.start_cluster(count=4, expect=EXPECT_EXIT_FAIL,**options) |
| options["port"] = port=cluster[0].port() |
| self.start_error_watcher(broker=cluster[0]) |
| options["use_unique_dests"]=True |
| options["address"]="amq.topic" |
| receiver,sender = self.start_sender_and_receiver(**options) |
| is_error = self.monitor_clients(broker=cluster[0],run_time=30,error_ck_freq=30) |
| |
| if (is_error): |
| print "The sender or receiver didn't start properly. Exiting test." |
| return |
| else: |
| "Print no error !" |
| |
| # grace period for clients to get the failover properly setup. |
| time.sleep(30) |
| error_msg= None |
| # Kill original brokers, start new ones. |
| try: |
| for i in range(8): |
| cluster[i].kill() |
| b=cluster.start() |
| self.monitor_clients(broker=b,run_time=30,error_ck_freq=30) |
| print "iteration : " + str(i) |
| except ConnectError, e1: |
| error_msg = "Unable to connect to new cluster node : " + traceback.format_exc(e1) |
| |
| except SessionError, e2: |
| error_msg = "Session error while connected to new cluster node : " + traceback.format_exc(e2) |
| |
| self.verify(receiver,sender) |
| if error_msg: |
| raise Exception(error_msg) |
| |
| |
| def test_failover(self) : |
| """Test basic failover""" |
| |
| self.base_case(test_name=self.id()) |
| |
| |
| def test_failover_with_durablesub(self): |
| """Test failover with durable subscriber""" |
| |
| self.base_case(durable=True,jms_durable_sub=True,test_name=self.id()) |
| |
| |
| def test_failover_with_sync_rcv(self): |
| """Test failover with sync receive""" |
| |
| self.base_case(sync_rcv=True,test_name=self.id()) |
| |
| |
| def test_failover_with_sync_ack(self): |
| """Test failover with sync ack""" |
| |
| self.base_case(sync_ack=True,test_name=self.id()) |
| |
| |
| def test_failover_with_noprefetch(self): |
| """Test failover with no prefetch""" |
| |
| self.base_case(max_prefetch=1,test_name=self.id()) |
| |
| |
| def test_failover_with_multiple_cons_and_ssns(self): |
| """Test failover with multiple connections and sessions""" |
| |
| self.base_case(use_unique_dests=True,address="amq.topic", |
| con_count=10,ssn_per_con=25,test_name=self.id()) |