| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.qpid.client; |
| |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.never; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.when; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashSet; |
| import java.util.List; |
| |
| import javax.jms.JMSException; |
| import javax.jms.Message; |
| import javax.jms.MessageListener; |
| import javax.jms.MessageProducer; |
| import javax.jms.StreamMessage; |
| |
| import org.apache.qpid.client.message.AMQPEncodedListMessage; |
| import org.apache.qpid.client.message.UnprocessedMessage; |
| import org.apache.qpid.test.utils.QpidTestCase; |
| import org.apache.qpid.transport.*; |
| import org.apache.qpid.transport.Connection.SessionFactory; |
| import org.apache.qpid.transport.Connection.State; |
| import org.apache.qpid.url.AMQBindingURL; |
| |
| /** |
| * Tests AMQSession_0_10 methods. |
| * <p> |
| * The main purpose of the tests in this test suite is to check that |
| * {@link SessionException} is not thrown from methods of |
| * {@link AMQSession_0_10}. |
| */ |
| public class AMQSession_0_10Test extends QpidTestCase |
| { |
| |
| public void testExceptionOnCommit() |
| { |
| AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10(); |
| try |
| { |
| session.commit(); |
| fail("JMSException should be thrown"); |
| } |
| catch (Exception e) |
| { |
| assertTrue("JMSException is expected", e instanceof JMSException); |
| assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode()); |
| } |
| } |
| |
| public void testExceptionOnCreateMessageProducer() |
| { |
| AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10(); |
| try |
| { |
| session.createMessageProducer(createDestination(), true, true, 1l); |
| fail("JMSException should be thrown"); |
| } |
| catch (Exception e) |
| { |
| assertTrue("JMSException is expected but got:" + e, e instanceof JMSException); |
| assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode()); |
| } |
| } |
| |
| public void testExceptionOnRollback() |
| { |
| AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10(); |
| try |
| { |
| session.rollback(); |
| fail("JMSException should be thrown"); |
| } |
| catch (Exception e) |
| { |
| assertTrue("JMSException is expected", e instanceof JMSException); |
| } |
| } |
| |
| public void testExceptionOnRecover() |
| { |
| AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10(javax.jms.Session.AUTO_ACKNOWLEDGE); |
| try |
| { |
| session.recover(); |
| fail("JMSException should be thrown"); |
| } |
| catch (Exception e) |
| { |
| assertTrue("JMSException is expected", e instanceof JMSException); |
| } |
| } |
| |
| public void testExceptionOnCreateBrowser() |
| { |
| AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10(); |
| AMQQueue destination = createQueue(); |
| try |
| { |
| session.createBrowser(destination); |
| fail("JMSException should be thrown"); |
| } |
| catch (Exception e) |
| { |
| assertTrue("JMSException is expected", e instanceof JMSException); |
| assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode()); |
| } |
| } |
| |
| public void testExceptionOnCreateConsumer() |
| { |
| AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10(); |
| AMQAnyDestination destination = createDestination(); |
| try |
| { |
| session.createConsumer(destination); |
| fail("JMSException should be thrown"); |
| } |
| catch (Exception e) |
| { |
| assertTrue("JMSException is expected", e instanceof JMSException); |
| assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode()); |
| } |
| } |
| |
| public void testExceptionOnCreateSubscriber() |
| { |
| AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10(); |
| AMQAnyDestination destination = createDestination(); |
| try |
| { |
| session.createSubscriber(destination); |
| fail("JMSException should be thrown"); |
| } |
| catch (Exception e) |
| { |
| assertTrue("JMSException is expected", e instanceof JMSException); |
| assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode()); |
| } |
| } |
| |
| public void testExceptionOnUnsubscribe() |
| { |
| AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10(); |
| try |
| { |
| session.unsubscribe("whatever"); |
| fail("JMSExceptiuon should be thrown"); |
| } |
| catch (Exception e) |
| { |
| assertTrue("JMSException is expected", e instanceof JMSException); |
| assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode()); |
| } |
| } |
| |
| public void testCommit() |
| { |
| AMQSession_0_10 session = createAMQSession_0_10(); |
| try |
| { |
| session.commit(); |
| } |
| catch (Exception e) |
| { |
| fail("Unexpected exception is caught:" + e.getMessage()); |
| } |
| ProtocolEvent event = findSentProtocolEventOfClass(session, TxCommit.class, false); |
| assertNotNull("TxCommit was not sent", event); |
| } |
| |
| public void testRollback() |
| { |
| AMQSession_0_10 session = createAMQSession_0_10(); |
| try |
| { |
| session.rollback(); |
| } |
| catch (Exception e) |
| { |
| fail("Unexpected exception is caught:" + e.getMessage()); |
| } |
| ProtocolEvent event = findSentProtocolEventOfClass(session, TxRollback.class, false); |
| assertNotNull("TxRollback was not sent", event); |
| } |
| |
| public void testRecover() |
| { |
| AMQSession_0_10 session = createAMQSession_0_10(javax.jms.Session.AUTO_ACKNOWLEDGE); |
| try |
| { |
| session.recover(); |
| } |
| catch (Exception e) |
| { |
| fail("Unexpected exception is caught:" + e.getMessage()); |
| } |
| ProtocolEvent event = findSentProtocolEventOfClass(session, MessageRelease.class, false); |
| assertNotNull("MessageRelease was not sent", event); |
| } |
| |
| public void testCreateProducer() |
| { |
| AMQSession_0_10 session = createAMQSession_0_10(); |
| try |
| { |
| session.createProducer(createQueue()); |
| } |
| catch (Exception e) |
| { |
| fail("Unexpected exception is caught:" + e.getMessage()); |
| } |
| ProtocolEvent event = findSentProtocolEventOfClass(session, ExchangeDeclare.class, false); |
| assertNotNull("ExchangeDeclare was not sent", event); |
| } |
| |
| public void testCreateConsumer() |
| { |
| AMQSession_0_10 session = createAMQSession_0_10(); |
| try |
| { |
| session.createConsumer(createQueue()); |
| } |
| catch (Exception e) |
| { |
| fail("Unexpected exception is caught:" + e.getMessage()); |
| } |
| ProtocolEvent event = findSentProtocolEventOfClass(session, MessageSubscribe.class, false); |
| assertNotNull("MessageSubscribe was not sent", event); |
| } |
| |
| public void testSync() |
| { |
| AMQSession_0_10 session = createAMQSession_0_10(); |
| try |
| { |
| session.sync(); |
| } |
| catch (Exception e) |
| { |
| fail("Unexpected exception is caught:" + e.getMessage()); |
| } |
| ProtocolEvent event = findSentProtocolEventOfClass(session, ExecutionSync.class, false); |
| assertNotNull("ExecutionSync was not sent", event); |
| } |
| |
| public void testSendQueueDelete() |
| { |
| AMQSession_0_10 session = createAMQSession_0_10(); |
| try |
| { |
| session.sendQueueDelete("test"); |
| } |
| catch (Exception e) |
| { |
| fail("Unexpected exception is caught:" + e.getMessage()); |
| } |
| ProtocolEvent event = findSentProtocolEventOfClass(session, QueueDelete.class, false); |
| assertNotNull("QueueDelete event was not sent", event); |
| QueueDelete exchangeDelete = (QueueDelete) event; |
| assertEquals("test", exchangeDelete.getQueue()); |
| } |
| |
| public void testSendConsume() |
| { |
| AMQSession_0_10 session = createAMQSession_0_10(); |
| try |
| { |
| BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, |
| null, null, false, true); |
| session.sendConsume(consumer, "test", true); |
| } |
| catch (Exception e) |
| { |
| fail("Unexpected exception is caught:" + e.getMessage()); |
| } |
| ProtocolEvent event = findSentProtocolEventOfClass(session, MessageSubscribe.class, false); |
| assertNotNull("MessageSubscribe event was not sent", event); |
| } |
| |
| public void testCreateMessageProducer() |
| { |
| AMQSession_0_10 session = createAMQSession_0_10(); |
| try |
| { |
| session.createMessageProducer(createDestination(), true, true, 1l); |
| } |
| catch (Exception e) |
| { |
| fail("Unexpected exception is caught:" + e.getMessage()); |
| } |
| ProtocolEvent event = findSentProtocolEventOfClass(session, ExchangeDeclare.class, false); |
| assertNotNull("ExchangeDeclare event was not sent", event); |
| } |
| |
| public void testSendExchangeDelete() |
| { |
| AMQSession_0_10 session = createAMQSession_0_10(); |
| try |
| { |
| session.sendExchangeDelete("test", true); |
| } |
| catch (Exception e) |
| { |
| fail("Unexpected exception is caught:" + e.getMessage()); |
| } |
| ProtocolEvent event = findSentProtocolEventOfClass(session, ExchangeDelete.class, false); |
| assertNotNull("ExchangeDelete event was not sent", event); |
| ExchangeDelete exchangeDelete = (ExchangeDelete) event; |
| assertEquals("test", exchangeDelete.getExchange()); |
| } |
| |
| public void testExceptionOnMessageConsumerReceive() |
| { |
| AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10(); |
| try |
| { |
| BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, |
| null, null, false, true); |
| session.start(); |
| consumer.receive(1); |
| fail("JMSException should be thrown"); |
| } |
| catch (Exception e) |
| { |
| assertTrue("JMSException is expected", e instanceof JMSException); |
| assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode()); |
| } |
| } |
| |
| public void testMessageConsumerReceive() |
| { |
| AMQSession_0_10 session = createAMQSession_0_10(); |
| try |
| { |
| BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, |
| null, null, false, true); |
| session.start(); |
| consumer.receive(1); |
| } |
| catch (Exception e) |
| { |
| fail("Unexpected exception is caught:" + e.getMessage()); |
| } |
| ProtocolEvent event = findSentProtocolEventOfClass(session, MessageFlow.class, false); |
| assertNotNull("MessageFlow event was not sent", event); |
| } |
| |
| public void testExceptionOnMessageConsumerReceiveNoWait() |
| { |
| AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10(); |
| try |
| { |
| BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, |
| null, null, false, true); |
| session.start(); |
| consumer.receiveNoWait(); |
| fail("JMSException should be thrown"); |
| } |
| catch (Exception e) |
| { |
| assertTrue("JMSException is expected", e instanceof JMSException); |
| assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode()); |
| } |
| } |
| |
| public void testMessageConsumerClose() |
| { |
| AMQSession_0_10 session = createAMQSession_0_10(); |
| try |
| { |
| BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, |
| null, null, false, true); |
| consumer.setConsumerTag(""); |
| consumer.close(); |
| } |
| catch (Exception e) |
| { |
| fail("Unexpected exception is caught:" + e.getMessage()); |
| } |
| ProtocolEvent event = findSentProtocolEventOfClass(session, MessageCancel.class, false); |
| assertNotNull("MessageCancel event was not sent", event); |
| } |
| |
| public void testExceptionOnMessageConsumerClose() |
| { |
| AMQSession_0_10 session = createThrowingExceptionAMQSession_0_10(); |
| try |
| { |
| BasicMessageConsumer_0_10 consumer = session.createMessageConsumer(createDestination(), 1, 1, true, false, |
| null, null, false, true); |
| consumer.close(); |
| fail("JMSException should be thrown"); |
| } |
| catch (Exception e) |
| { |
| assertTrue("JMSException is expected", e instanceof JMSException); |
| assertEquals("541 error code is expected", "541", ((JMSException) e).getErrorCode()); |
| } |
| } |
| |
| public void testMessageProducerSend() |
| { |
| AMQSession_0_10 session = createAMQSession_0_10(); |
| try |
| { |
| MessageProducer producer = session.createProducer(createQueue()); |
| producer.send(session.createTextMessage("Test")); |
| session.commit(); |
| } |
| catch (Exception e) |
| { |
| fail("Unexpected exception is caught:" + e.getMessage()); |
| } |
| ProtocolEvent event = findSentProtocolEventOfClass(session, MessageTransfer.class, false); |
| assertNotNull("MessageTransfer event was not sent", event); |
| event = findSentProtocolEventOfClass(session, ExchangeDeclare.class, false); |
| assertNotNull("ExchangeDeclare event was not sent", event); |
| } |
| |
| public void testCreateStreamMessage() throws Exception |
| { |
| AMQSession_0_10 session = createAMQSession_0_10(); |
| StreamMessage m = session.createStreamMessage(); |
| assertTrue("Legacy Stream message encoding should be the default" + m.getClass(),!(m instanceof AMQPEncodedListMessage)); |
| } |
| |
| public void testGetQueueDepthWithSync() |
| { |
| // slow down a flush thread |
| setTestSystemProperty("qpid.session.max_ack_delay", "10000"); |
| AMQSession_0_10 session = createAMQSession_0_10(false, javax.jms.Session.DUPS_OK_ACKNOWLEDGE); |
| try |
| { |
| session.acknowledgeMessage(-1, false); |
| session.getQueueDepth(createDestination(), true); |
| } |
| catch (Exception e) |
| { |
| fail("Unexpected exception is caught:" + e.getMessage()); |
| } |
| ProtocolEvent command = findSentProtocolEventOfClass(session, MessageAccept.class, false); |
| assertNotNull("MessageAccept command was not sent", command); |
| command = findSentProtocolEventOfClass(session, ExecutionSync.class, false); |
| assertNotNull("ExecutionSync command was not sent", command); |
| command = findSentProtocolEventOfClass(session, QueueQuery.class, false); |
| assertNotNull("QueueQuery command was not sent", command); |
| } |
| |
| public void testResubscribe() throws Exception |
| { |
| AMQSession_0_10 session = createAMQSession_0_10(AMQSession_0_10.AUTO_ACKNOWLEDGE); |
| |
| AMQQueue queue1 = new AMQQueue(new AMQBindingURL("direct://amq.direct//test1?routingkey='test1'&durable='true'")); |
| session.createProducer(queue1); |
| BasicMessageConsumer_0_10 consumer1 = (BasicMessageConsumer_0_10)session.createConsumer(queue1); |
| |
| AMQQueue queue2 = new AMQQueue(new AMQBindingURL("direct://amq.direct//test2?routingkey='test2'")); |
| session.createProducer(queue2); |
| BasicMessageConsumer_0_10 consumer2 = (BasicMessageConsumer_0_10)session.createConsumer(queue2); |
| |
| UnprocessedMessage[] messages = new UnprocessedMessage[4]; |
| for (int i =0; i< messages.length;i++ ) |
| { |
| String consumerTag = i % 2 == 0 ? consumer1.getConsumerTag() : consumer2.getConsumerTag(); |
| int deliveryTag = i + 1; |
| messages[i]= createMockMessage(deliveryTag, consumerTag); |
| session.messageReceived(messages[i]); |
| if (deliveryTag % 2 == 0) |
| { |
| session.addUnacknowledgedMessage(deliveryTag); |
| } |
| } |
| |
| assertEquals("Unexpected highest delivery tag", 4, session.getHighestDeliveryTag().get()); |
| assertFalse("Unexpected unacknowledged message tags", session.getUnacknowledgedMessageTags().isEmpty()); |
| assertEquals("Unexpected consumers", new HashSet<>(Arrays.asList(consumer1, consumer2)), new HashSet<>(session.getConsumers())); |
| |
| // verify test messages were not dispatched |
| for (UnprocessedMessage message: messages ) |
| { |
| verify(message, never()).dispatch(session); |
| } |
| |
| session.resubscribe(); |
| |
| assertEquals("Unexpected highest delivery tag", -1, session.getHighestDeliveryTag().get()); |
| assertTrue("Unexpected unacknowledged message tags", session.getUnacknowledgedMessageTags().isEmpty()); |
| assertTrue("Unexpected pre-fetched message tags", session.getPrefetchedMessageTags().isEmpty()); |
| assertEquals("Unexpected consumers", new HashSet<>(Arrays.asList(consumer1, consumer2)), new HashSet<>(session.getConsumers())); |
| } |
| |
| public void testFailoverPrep() throws Exception |
| { |
| AMQSession_0_10 session = createAMQSession_0_10(AMQSession_0_10.AUTO_ACKNOWLEDGE); |
| |
| UnprocessedMessage[] messages = new UnprocessedMessage[4]; |
| for (int i =0; i< messages.length;i++ ) |
| { |
| String consumerTag = String.valueOf(i % 2); |
| int deliveryTag = i + 1; |
| messages[i]= createMockMessage(deliveryTag, consumerTag); |
| session.messageReceived(messages[i]); |
| if (deliveryTag % 2 == 0) |
| { |
| session.addUnacknowledgedMessage(deliveryTag); |
| } |
| } |
| |
| // verify test messages were not dispatched |
| for (UnprocessedMessage message: messages ) |
| { |
| verify(message, never()).dispatch(session); |
| } |
| |
| session.failoverPrep(); |
| |
| // verify dispatcher queue is drained |
| for (UnprocessedMessage message: messages ) |
| { |
| verify(message).dispatch(session); |
| } |
| } |
| |
| private UnprocessedMessage createMockMessage(long deliveryTag, String consumerTag) |
| { |
| UnprocessedMessage message = mock(UnprocessedMessage.class); |
| when(message.getConsumerTag()).thenReturn(consumerTag); |
| when(message.getDeliveryTag()).thenReturn(deliveryTag); |
| return message; |
| } |
| |
| |
| |
| private AMQAnyDestination createDestination() |
| { |
| AMQAnyDestination destination = null; |
| try |
| { |
| destination = new AMQAnyDestination("amq.direct", "direct", |
| "test", false, true, "test", true, null); |
| } |
| catch (Exception e) |
| { |
| fail("Failued to create destination:" + e.getMessage()); |
| } |
| return destination; |
| } |
| |
| private AMQQueue createQueue() |
| { |
| AMQQueue destination = null; |
| try |
| { |
| destination = new AMQQueue("amq.direct", "test", "test"); |
| } |
| catch (Exception e) |
| { |
| fail("Failed to create destination:" + e.getMessage()); |
| } |
| return destination; |
| } |
| |
| private AMQSession_0_10 createThrowingExceptionAMQSession_0_10() |
| { |
| return createAMQSession_0_10(true, javax.jms.Session.SESSION_TRANSACTED); |
| } |
| |
| private AMQSession_0_10 createThrowingExceptionAMQSession_0_10(int akcnowledgeMode) |
| { |
| return createAMQSession_0_10(true, akcnowledgeMode); |
| } |
| |
| private ProtocolEvent findSentProtocolEventOfClass(AMQSession_0_10 session, Class<? extends ProtocolEvent> class1, |
| boolean isLast) |
| { |
| ProtocolEvent found = null; |
| List<ProtocolEvent> events = ((MockSession) session.getQpidSession()).getSender().getSendEvents(); |
| assertNotNull("Events list should not be null", events); |
| assertFalse("Events list should not be empty", events.isEmpty()); |
| if (isLast) |
| { |
| ProtocolEvent event = events.get(events.size() - 1); |
| if (event.getClass().isAssignableFrom(class1)) |
| { |
| found = event; |
| } |
| } |
| else |
| { |
| for (ProtocolEvent protocolEvent : events) |
| { |
| if (protocolEvent.getClass().isAssignableFrom(class1)) |
| { |
| found = protocolEvent; |
| break; |
| } |
| } |
| |
| } |
| return found; |
| } |
| |
| private AMQSession_0_10 createAMQSession_0_10() |
| { |
| return createAMQSession_0_10(false, javax.jms.Session.SESSION_TRANSACTED); |
| } |
| |
| private AMQSession_0_10 createAMQSession_0_10(int acknowledgeMode) |
| { |
| return createAMQSession_0_10(false, acknowledgeMode); |
| } |
| |
| private AMQSession_0_10 createAMQSession_0_10(boolean throwException, int acknowledgeMode) |
| { |
| AMQConnection amqConnection = null; |
| try |
| { |
| amqConnection = new MockAMQConnection( |
| "amqp://guest:guest@client/test?brokerlist='tcp://localhost:1'&maxprefetch='0'"); |
| } |
| catch (Exception e) |
| { |
| fail("Failure to create a mock connection:" + e.getMessage()); |
| } |
| boolean isTransacted = acknowledgeMode == javax.jms.Session.SESSION_TRANSACTED ? true : false; |
| AMQSession_0_10 session = new AMQSession_0_10(createConnection(throwException), amqConnection, 1, isTransacted, acknowledgeMode, |
| 10, 10, "test"); |
| return session; |
| } |
| |
| private Connection createConnection(final boolean throwException) |
| { |
| MockTransportConnection connection = new MockTransportConnection(); |
| connection.setState(State.OPEN); |
| connection.setSender(new MockSender()); |
| connection.setSessionFactory(new SessionFactory() |
| { |
| |
| public Session newSession(Connection conn, Binary name, long expiry, boolean isNoReplay) |
| { |
| return new MockSession(conn, new SessionDelegate(), name, expiry, throwException); |
| } |
| }); |
| return connection; |
| } |
| |
| private final class MockMessageListener implements MessageListener |
| { |
| public void onMessage(Message arg0) |
| { |
| } |
| } |
| |
| class MockSession extends Session |
| { |
| private final boolean _throwException; |
| private final Connection _connection; |
| private final SessionDelegate _delegate; |
| |
| protected MockSession(Connection connection, SessionDelegate delegate, Binary name, long expiry, |
| boolean throwException) |
| { |
| super(connection, delegate, name, expiry); |
| _throwException = throwException; |
| setState(State.OPEN); |
| _connection = connection; |
| _delegate = delegate; |
| } |
| |
| public void invoke(Method m, Runnable postIdSettingAction) |
| { |
| if (_throwException) |
| { |
| if (m instanceof SessionAttach || m instanceof SessionRequestTimeout || m instanceof TxSelect) |
| { |
| // do not throw exception for SessionAttach, |
| // SessionRequestTimeout and TxSelect |
| // session needs to be instantiated |
| return; |
| } |
| ExecutionException e = new ExecutionException(); |
| e.setErrorCode(ExecutionErrorCode.INTERNAL_ERROR); |
| throw new SessionException(e); |
| } |
| else |
| { |
| super.invoke(m, postIdSettingAction); |
| if (m instanceof SessionDetach) |
| { |
| setState(State.CLOSED); |
| } |
| } |
| } |
| |
| public void sync() |
| { |
| // to avoid recursive calls |
| setAutoSync(false); |
| // simply send sync command |
| super.executionSync(Option.SYNC); |
| } |
| |
| protected <T> Future<T> invoke(Method m, Class<T> klass) |
| { |
| int commandId = getCommandsOut(); |
| Future<T> future = super.invoke(m, klass); |
| ExecutionResult result = new ExecutionResult(); |
| result.setCommandId(commandId); |
| if (m instanceof ExchangeBound) |
| { |
| ExchangeBoundResult struc = new ExchangeBoundResult(); |
| result.setValue(struc); |
| } |
| else if (m instanceof ExchangeQuery) |
| { |
| ExchangeQueryResult struc = new ExchangeQueryResult(); |
| result.setValue(struc); |
| } |
| else if (m instanceof QueueQuery) |
| { |
| QueueQueryResult struc = new QueueQueryResult(); |
| result.setValue(struc); |
| } |
| _delegate.executionResult(this, result); |
| return future; |
| } |
| |
| public MockSender getSender() |
| { |
| return (MockSender) _connection.getSender(); |
| } |
| } |
| |
| class MockTransportConnection extends Connection |
| { |
| public void setState(State state) |
| { |
| super.setState(state); |
| } |
| } |
| |
| class MockSender implements ProtocolEventSender |
| { |
| private List<ProtocolEvent> _sendEvents = new ArrayList<ProtocolEvent>(); |
| |
| private void setIdleTimeout(int i) |
| { |
| } |
| |
| public void send(ProtocolEvent msg) |
| { |
| _sendEvents.add(msg); |
| } |
| |
| public void flush() |
| { |
| } |
| |
| public void close() |
| { |
| } |
| |
| public List<ProtocolEvent> getSendEvents() |
| { |
| return _sendEvents; |
| } |
| |
| } |
| |
| } |