blob: 162b2041b461f45b91716a658230a5e2181cfe71 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.jms.test.testpeer;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.qpid.jms.test.testpeer.basictypes.ReceiverSettleMode;
import org.apache.qpid.jms.test.testpeer.basictypes.Role;
import org.apache.qpid.jms.test.testpeer.basictypes.SenderSettleMode;
import org.apache.qpid.jms.test.testpeer.basictypes.TerminusDurability;
import org.apache.qpid.jms.test.testpeer.basictypes.TerminusExpiryPolicy;
import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
import org.apache.qpid.jms.test.testpeer.describedtypes.AttachFrame;
import org.apache.qpid.jms.test.testpeer.describedtypes.BeginFrame;
import org.apache.qpid.jms.test.testpeer.describedtypes.CloseFrame;
import org.apache.qpid.jms.test.testpeer.describedtypes.DetachFrame;
import org.apache.qpid.jms.test.testpeer.describedtypes.DispositionFrame;
import org.apache.qpid.jms.test.testpeer.describedtypes.EndFrame;
import org.apache.qpid.jms.test.testpeer.describedtypes.FlowFrame;
import org.apache.qpid.jms.test.testpeer.describedtypes.OpenFrame;
import org.apache.qpid.jms.test.testpeer.describedtypes.SaslMechanismsFrame;
import org.apache.qpid.jms.test.testpeer.describedtypes.SaslOutcomeFrame;
import org.apache.qpid.jms.test.testpeer.describedtypes.Target;
import org.apache.qpid.jms.test.testpeer.describedtypes.TransferFrame;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.ApplicationPropertiesDescribedType;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.HeaderDescribedType;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.MessageAnnotationsDescribedType;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
import org.apache.qpid.jms.test.testpeer.matchers.AttachMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.BeginMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.CloseMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.DetachMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.DispositionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.EndMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.FlowMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.OpenMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.SaslInitMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.TransferMatcher;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.UnsignedShort;
import org.apache.qpid.proton.codec.Data;
import org.apache.qpid.proton.engine.impl.AmqpHeader;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// TODO should expectXXXYYYZZZ methods just be expect(matcher)?
public class TestAmqpPeer implements AutoCloseable
{
private static final int LINK_HANDLE_OFFSET = 100;
private static final Logger LOGGER = LoggerFactory.getLogger(TestAmqpPeer.class.getName());
private final TestAmqpPeerRunner _driverRunnable;
private final Thread _driverThread;
/**
* Guarded by {@link #_handlersLock}
*/
private final List<Handler> _handlers = new ArrayList<Handler>();
private final Object _handlersLock = new Object();
/**
* Guarded by {@link #_handlersLock}
*/
private CountDownLatch _handlersCompletedLatch;
private volatile int _nextLinkHandle = LINK_HANDLE_OFFSET;
private volatile int _tempDestLinkHandle = LINK_HANDLE_OFFSET;
private byte[] _deferredBytes;
public TestAmqpPeer(int port) throws IOException
{
_driverRunnable = new TestAmqpPeerRunner(port, this);
_driverThread = new Thread(_driverRunnable, "MockAmqpPeerThread");
_driverThread.start();
}
/**
* Shuts down the test peer, throwing any Throwable
* that occurred on the peer, or validating that no
* unused matchers remain.
*/
@Override
public void close() throws Exception
{
_driverRunnable.stop();
try
{
_driverThread.join(30000);
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
}
finally
{
Throwable throwable = getThrowable();
if(throwable == null)
{
synchronized(_handlersLock)
{
assertThat(_handlers, Matchers.empty());
}
}
else
{
//AutoClosable can't handle throwing Throwables, so we wrap it.
throw new RuntimeException("TestPeer caught throwable during run", throwable);
}
}
}
public Throwable getThrowable()
{
return _driverRunnable.getException();
}
public void receiveHeader(byte[] header)
{
Handler handler = getFirstHandler();
if(handler instanceof HeaderHandler)
{
((HeaderHandler)handler).header(header,this);
if(handler.isComplete())
{
removeFirstHandler();
}
}
else
{
throw new IllegalStateException("Received header but the next handler is a " + handler);
}
}
public void receiveFrame(int type, int channel, DescribedType describedType, Binary payload)
{
Handler handler = getFirstHandler();
if(handler instanceof FrameHandler)
{
((FrameHandler)handler).frame(type, channel, describedType, payload, this);
if(handler.isComplete())
{
removeFirstHandler();
}
}
else
{
throw new IllegalStateException("Received frame but the next handler is a " + handler);
}
}
private void removeFirstHandler()
{
synchronized(_handlersLock)
{
Handler h = _handlers.remove(0);
if(_handlersCompletedLatch != null)
{
_handlersCompletedLatch.countDown();
}
LOGGER.trace("Removed completed handler: {}", h);
}
}
private void addHandler(Handler handler)
{
synchronized(_handlersLock)
{
_handlers.add(handler);
LOGGER.trace("Added handler: {}", handler);
}
}
private Handler getFirstHandler()
{
synchronized(_handlersLock)
{
if(_handlers.isEmpty())
{
throw new IllegalStateException("No handlers");
}
return _handlers.get(0);
}
}
public void waitForAllHandlersToComplete(int timeoutMillis) throws InterruptedException
{
synchronized(_handlersLock)
{
_handlersCompletedLatch = new CountDownLatch(_handlers.size());
}
boolean countedDownOk = _handlersCompletedLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
Assert.assertTrue(
"All handlers should have completed within the " + timeoutMillis + "ms timeout", countedDownOk);
}
void sendHeader(byte[] header)
{
LOGGER.debug("About to send header: {}", new Binary(header));
_driverRunnable.sendBytes(header);
}
public void sendFrame(FrameType type, int channel, DescribedType frameDescribedType, Binary framePayload, boolean deferWrite)
{
if(channel < 0)
{
throw new IllegalArgumentException("Frame must be sent on a channel >= 0");
}
LOGGER.debug("About to send: {}", frameDescribedType);
byte[] output = AmqpDataFramer.encodeFrame(type, channel, frameDescribedType, framePayload);
if(deferWrite && _deferredBytes == null)
{
_deferredBytes = output;
}
else if(_deferredBytes != null)
{
int newCapacity = _deferredBytes.length + output.length;
//TODO: check overflow
byte[] newOutput = new byte[newCapacity];
System.arraycopy(_deferredBytes, 0, newOutput, 0, _deferredBytes.length);
System.arraycopy(output, 0, newOutput, _deferredBytes.length, output.length);
_deferredBytes = newOutput;
output = newOutput;
}
if(deferWrite)
{
LOGGER.debug("Deferring write until pipelined with future frame bytes");
return;
}
else
{
//clear the deferred bytes to avoid corrupting future sends
_deferredBytes = null;
_driverRunnable.sendBytes(output);
}
}
public void expectAnonymousConnect(boolean authorize)
{
SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(Symbol.valueOf("ANONYMOUS"));
addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER,
new FrameSender(
this, FrameType.SASL, 0,
saslMechanismsFrame, null)));
addHandler(new SaslInitMatcher()
.withMechanism(equalTo(Symbol.valueOf("ANONYMOUS")))
.onSuccess(new AmqpPeerRunnable()
{
@Override
public void run()
{
TestAmqpPeer.this.sendFrame(
FrameType.SASL, 0,
new SaslOutcomeFrame().setCode(UnsignedByte.valueOf((byte)0)),
null,
false);
_driverRunnable.expectHeader();
}
}));
addHandler(new HeaderHandlerImpl(AmqpHeader.HEADER, AmqpHeader.HEADER));
addHandler(new OpenMatcher()
.withContainerId(notNullValue(String.class))
.onSuccess(new FrameSender(
this, FrameType.AMQP, 0,
new OpenFrame().setContainerId("test-amqp-peer-container-id"),
null)));
}
public void expectPlainConnect(String username, String password, Symbol[] serverCapabilities, Map<Symbol, Object> serverProperties)
{
SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(Symbol.valueOf("PLAIN"));
addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER,
new FrameSender(
this, FrameType.SASL, 0,
saslMechanismsFrame, null)));
byte[] usernameBytes = username.getBytes();
byte[] passwordBytes = password.getBytes();
byte[] data = new byte[usernameBytes.length+passwordBytes.length+2];
System.arraycopy(usernameBytes, 0, data, 1, usernameBytes.length);
System.arraycopy(passwordBytes, 0, data, 2 + usernameBytes.length, passwordBytes.length);
addHandler(new SaslInitMatcher()
.withMechanism(equalTo(Symbol.valueOf("PLAIN")))
.withInitialResponse(equalTo(new Binary(data)))
.onSuccess(new AmqpPeerRunnable()
{
@Override
public void run()
{
TestAmqpPeer.this.sendFrame(
FrameType.SASL, 0,
new SaslOutcomeFrame().setCode(UnsignedByte.valueOf((byte)0)),
null,
false);
_driverRunnable.expectHeader();
}
}));
addHandler(new HeaderHandlerImpl(AmqpHeader.HEADER, AmqpHeader.HEADER));
OpenFrame open = new OpenFrame();
open.setContainerId("test-amqp-peer-container-id");
if(serverCapabilities != null)
{
open.setOfferedCapabilities(serverCapabilities);
}
if(serverProperties != null)
{
open.setProperties(serverProperties);
}
addHandler(new OpenMatcher()
.withContainerId(notNullValue(String.class))
.onSuccess(new FrameSender(
this, FrameType.AMQP, 0,
open,
null)));
}
public void expectClose()
{
addHandler(new CloseMatcher()
.withError(Matchers.nullValue())
.onSuccess(new FrameSender(this, FrameType.AMQP, 0,
new CloseFrame(),
null)));
}
public void expectBegin(boolean expectSessionFlow)
{
final BeginMatcher beginMatcher = new BeginMatcher()
.withRemoteChannel(nullValue())
.withNextOutgoingId(equalTo(UnsignedInteger.ONE))
.withIncomingWindow(notNullValue())
.withOutgoingWindow(notNullValue());
// The response will have its remoteChannel field dynamically set based on incoming value
final BeginFrame beginResponse = new BeginFrame()
.setNextOutgoingId(UnsignedInteger.ONE)
.setIncomingWindow(UnsignedInteger.ZERO)
.setOutgoingWindow(UnsignedInteger.ZERO);
// The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender beginResponseSender = new FrameSender(this, FrameType.AMQP, -1, beginResponse, null);
beginResponseSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
beginResponseSender.setChannel(beginMatcher.getActualChannel());
beginResponse.setRemoteChannel(
UnsignedShort.valueOf((short) beginMatcher.getActualChannel()));
}
});
beginMatcher.onSuccess(beginResponseSender);
addHandler(beginMatcher);
if(expectSessionFlow)
{
expectSessionFlow();
}
}
public void expectEnd()
{
final EndMatcher endMatcher = new EndMatcher();
final EndFrame endResponse = new EndFrame();
// The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender frameSender = new FrameSender(this, FrameType.AMQP, -1, endResponse, null);
frameSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
frameSender.setChannel(endMatcher.getActualChannel());
}
});
endMatcher.onSuccess(frameSender);
addHandler(endMatcher);
}
public void expectTempQueueCreationAttach(final String dynamicAddress)
{
TargetMatcher targetMatcher = new TargetMatcher();
targetMatcher.withAddress(nullValue());
targetMatcher.withDynamic(equalTo(true));
targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
targetMatcher.withExpiryPolicy(equalTo(TerminusExpiryPolicy.LINK_DETACH));
final AttachMatcher attachMatcher = new AttachMatcher()
.withName(notNullValue())
.withHandle(notNullValue())
.withRole(equalTo(Role.SENDER))
.withSndSettleMode(equalTo(SenderSettleMode.UNSETTLED))
.withRcvSettleMode(equalTo(ReceiverSettleMode.FIRST))
.withSource(notNullValue())
.withTarget(targetMatcher);
UnsignedInteger linkHandle = UnsignedInteger.valueOf(_tempDestLinkHandle++);
final AttachFrame attachResponse = new AttachFrame()
.setHandle(linkHandle)
.setRole(Role.RECEIVER)
.setSndSettleMode(SenderSettleMode.UNSETTLED)
.setRcvSettleMode(ReceiverSettleMode.FIRST);
// The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender attachResponseSender = new FrameSender(this, FrameType.AMQP, -1, attachResponse, null);
attachResponseSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
attachResponseSender.setChannel(attachMatcher.getActualChannel());
attachResponse.setName(attachMatcher.getReceivedName());
attachResponse.setSource(attachMatcher.getReceivedSource());
Target t = createTargetObjectFromDescribedType(attachMatcher.getReceivedTarget());
t.setAddress(dynamicAddress);
attachResponse.setTarget(t);
}
});
final FlowFrame flowFrame = new FlowFrame().setNextIncomingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded
.setIncomingWindow(UnsignedInteger.valueOf(2048))
.setNextOutgoingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded
.setOutgoingWindow(UnsignedInteger.valueOf(2048))
.setLinkCredit(UnsignedInteger.valueOf(100))
.setHandle(linkHandle);
// The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender flowFrameSender = new FrameSender(this, FrameType.AMQP, -1, flowFrame, null);
flowFrameSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
flowFrameSender.setChannel(attachMatcher.getActualChannel());
flowFrame.setDeliveryCount(attachMatcher.getReceivedInitialDeliveryCount());
}
});
CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable();
composite.add(attachResponseSender);
composite.add(flowFrameSender);
attachMatcher.onSuccess(composite);
addHandler(attachMatcher);
}
public void expectSenderAttach()
{
expectSenderAttach(notNullValue(), false, false);
}
public void expectSenderAttach(final Matcher<?> targetMatcher, final boolean refuseLink, boolean deferAttachResponseWrite)
{
expectSenderAttach(notNullValue(), targetMatcher, refuseLink, deferAttachResponseWrite);
}
public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean refuseLink, boolean deferAttachResponseWrite)
{
final AttachMatcher attachMatcher = new AttachMatcher()
.withName(notNullValue())
.withHandle(notNullValue())
.withRole(equalTo(Role.SENDER))
.withSndSettleMode(equalTo(SenderSettleMode.UNSETTLED))
.withRcvSettleMode(equalTo(ReceiverSettleMode.FIRST))
.withSource(sourceMatcher)
.withTarget(targetMatcher);
UnsignedInteger linkHandle = UnsignedInteger.valueOf(_nextLinkHandle++);
final AttachFrame attachResponse = new AttachFrame()
.setHandle(linkHandle)
.setRole(Role.RECEIVER)
.setSndSettleMode(SenderSettleMode.UNSETTLED)
.setRcvSettleMode(ReceiverSettleMode.FIRST);
// The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender attachResponseSender = new FrameSender(this, FrameType.AMQP, -1, attachResponse, null);
attachResponseSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
attachResponseSender.setChannel(attachMatcher.getActualChannel());
attachResponse.setName(attachMatcher.getReceivedName());
attachResponse.setSource(attachMatcher.getReceivedSource());
if(refuseLink) {
attachResponse.setTarget(null);
} else {
attachResponse.setTarget(attachMatcher.getReceivedTarget());
}
}
});
if(deferAttachResponseWrite)
{
// Defer writing the attach frame until the subsequent frame is also ready
attachResponseSender.setDeferWrite(true);
}
final FlowFrame flowFrame = new FlowFrame().setNextIncomingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded
.setIncomingWindow(UnsignedInteger.valueOf(2048))
.setNextOutgoingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded
.setOutgoingWindow(UnsignedInteger.valueOf(2048))
.setLinkCredit(UnsignedInteger.valueOf(100))
.setHandle(linkHandle);
// The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender flowFrameSender = new FrameSender(this, FrameType.AMQP, -1, flowFrame, null);
flowFrameSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
flowFrameSender.setChannel(attachMatcher.getActualChannel());
flowFrame.setDeliveryCount(attachMatcher.getReceivedInitialDeliveryCount());
}
});
final DetachFrame detachResonse = new DetachFrame().setHandle(
linkHandle).setClosed(true);
// The response frame channel will be dynamically set based on the
// incoming frame. Using the -1 is an illegal placeholder.
final FrameSender detachResonseSender = new FrameSender(this, FrameType.AMQP, -1, detachResonse, null);
detachResonseSender.setValueProvider(new ValueProvider() {
@Override
public void setValues() {
detachResonseSender.setChannel(attachMatcher.getActualChannel());
}
});
CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable();
composite.add(attachResponseSender);
if (refuseLink) {
composite.add(detachResonseSender);
} else {
composite.add(flowFrameSender);
}
attachMatcher.onSuccess(composite);
addHandler(attachMatcher);
}
public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher)
{
final AttachMatcher attachMatcher = new AttachMatcher()
.withName(linkNameMatcher)
.withHandle(notNullValue())
.withRole(equalTo(Role.RECEIVER))
.withSndSettleMode(equalTo(SenderSettleMode.UNSETTLED))
.withRcvSettleMode(equalTo(ReceiverSettleMode.FIRST))
.withSource(sourceMatcher)
.withTarget(notNullValue());
UnsignedInteger linkHandle = UnsignedInteger.valueOf(_nextLinkHandle++);
final AttachFrame attachResponse = new AttachFrame()
.setHandle(linkHandle)
.setRole(Role.SENDER)
.setSndSettleMode(SenderSettleMode.UNSETTLED)
.setRcvSettleMode(ReceiverSettleMode.FIRST)
.setInitialDeliveryCount(UnsignedInteger.ZERO);
// The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender attachResponseSender = new FrameSender(this, FrameType.AMQP, -1, attachResponse, null);
attachResponseSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
attachResponseSender.setChannel(attachMatcher.getActualChannel());
attachResponse.setName(attachMatcher.getReceivedName());
attachResponse.setSource(attachMatcher.getReceivedSource());
attachResponse.setTarget(attachMatcher.getReceivedTarget());
}
});
attachMatcher.onSuccess(attachResponseSender);
addHandler(attachMatcher);
}
public void expectReceiverAttach()
{
expectReceiverAttach(notNullValue(), notNullValue());
}
public void expectDurableSubscriberAttach(String topicName, String subscriptionName)
{
SourceMatcher sourceMatcher = new SourceMatcher();
sourceMatcher.withAddress(equalTo(topicName));
sourceMatcher.withDynamic(equalTo(false));
//TODO: will possibly be changed to a 1/config durability
sourceMatcher.withDurable(equalTo(TerminusDurability.UNSETTLED_STATE));
sourceMatcher.withExpiryPolicy(equalTo(TerminusExpiryPolicy.NEVER));
expectReceiverAttach(equalTo(subscriptionName), sourceMatcher);
}
public void expectDetach(boolean expectClosed, boolean sendResponse, boolean replyClosed)
{
Matcher<Boolean> closeMatcher = null;
if(expectClosed)
{
closeMatcher = equalTo(true);
}
else
{
closeMatcher = Matchers.anyOf(equalTo(false), nullValue());
}
final DetachMatcher detachMatcher = new DetachMatcher().withClosed(closeMatcher);
if (sendResponse)
{
final DetachFrame detachResponse = new DetachFrame();
detachResponse.setHandle(UnsignedInteger.valueOf(_nextLinkHandle - 1)); // TODO: this needs to be the value used in the attach response
if(replyClosed)
{
detachResponse.setClosed(replyClosed);
}
// The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender detachResponseSender = new FrameSender(this, FrameType.AMQP, -1, detachResponse, null);
detachResponseSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
detachResponseSender.setChannel(detachMatcher.getActualChannel());
}
});
detachMatcher.onSuccess(detachResponseSender);
}
addHandler(detachMatcher);
}
public void expectSessionFlow()
{
final FlowMatcher flowMatcher = new FlowMatcher()
.withLinkCredit(Matchers.nullValue())
.withHandle(Matchers.nullValue());
addHandler(flowMatcher);
}
public void expectLinkFlow()
{
expectLinkFlow(false, false, Matchers.greaterThan(UnsignedInteger.ZERO));
}
public void expectLinkFlow(boolean drain, boolean sendDrainFlowResponse, Matcher<UnsignedInteger> creditMatcher)
{
expectLinkFlowRespondWithTransfer(null, null, null, null, null, 0, drain, sendDrainFlowResponse, creditMatcher, null);
}
public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType,
final MessageAnnotationsDescribedType messageAnnotationsDescribedType,
final PropertiesDescribedType propertiesDescribedType,
final ApplicationPropertiesDescribedType appPropertiesDescribedType,
final DescribedType content)
{
expectLinkFlowRespondWithTransfer(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType,
appPropertiesDescribedType, content, 1);
}
public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType,
final MessageAnnotationsDescribedType messageAnnotationsDescribedType,
final PropertiesDescribedType propertiesDescribedType,
final ApplicationPropertiesDescribedType appPropertiesDescribedType,
final DescribedType content,
final int count)
{
expectLinkFlowRespondWithTransfer(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType,
appPropertiesDescribedType, content, count, false, false,
Matchers.greaterThanOrEqualTo(UnsignedInteger.valueOf(count)), 1);
}
public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType,
final MessageAnnotationsDescribedType messageAnnotationsDescribedType,
final PropertiesDescribedType propertiesDescribedType,
final ApplicationPropertiesDescribedType appPropertiesDescribedType,
final DescribedType content,
final int count,
final boolean drain,
final boolean sendDrainFlowResponse,
Matcher<UnsignedInteger> creditMatcher,
final Integer nextIncomingId)
{
if (nextIncomingId == null && count > 0)
{
throw new IllegalArgumentException("The remote NextIncomingId must be specified if transfers have been requested");
}
Matcher<Boolean> drainMatcher = null;
if(drain)
{
drainMatcher = equalTo(true);
}
else
{
drainMatcher = Matchers.anyOf(equalTo(false), nullValue());
}
Matcher<UnsignedInteger> remoteNextIncomingIdMatcher = null;
if(nextIncomingId != null)
{
remoteNextIncomingIdMatcher = Matchers.equalTo(UnsignedInteger.valueOf(nextIncomingId));
}
else
{
remoteNextIncomingIdMatcher = Matchers.greaterThanOrEqualTo(UnsignedInteger.ONE);
}
final FlowMatcher flowMatcher = new FlowMatcher()
.withLinkCredit(Matchers.greaterThanOrEqualTo(UnsignedInteger.valueOf(count)))
.withDrain(drainMatcher)
.withNextIncomingId(remoteNextIncomingIdMatcher);
CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable();
boolean addComposite = false;
for(int i = 0; i < count; i++)
{
final int nextId = nextIncomingId + i;
String tagString = "theDeliveryTag" + nextId;
Binary dtag = new Binary(tagString.getBytes());
final TransferFrame transferResponse = new TransferFrame()
.setDeliveryId(UnsignedInteger.valueOf(nextId))
.setDeliveryTag(dtag)
.setMessageFormat(UnsignedInteger.ZERO)
.setSettled(false);
Binary payload = prepareTransferPayload(headerDescribedType, messageAnnotationsDescribedType,
propertiesDescribedType, appPropertiesDescribedType, content);
// The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender transferResponseSender = new FrameSender(this, FrameType.AMQP, -1, transferResponse, payload);
transferResponseSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
transferResponse.setHandle(calculateLinkHandle(flowMatcher));
transferResponseSender.setChannel(flowMatcher.getActualChannel());
}
});
addComposite = true;
composite.add(transferResponseSender);
}
if(drain && sendDrainFlowResponse)
{
final FlowFrame drainResponse = new FlowFrame();
drainResponse.setOutgoingWindow(UnsignedInteger.ZERO); //TODO: shouldnt be hard coded
drainResponse.setIncomingWindow(UnsignedInteger.valueOf(Integer.MAX_VALUE)); //TODO: shouldnt be hard coded
drainResponse.setLinkCredit(UnsignedInteger.ZERO);
drainResponse.setDrain(true);
// The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender flowResponseSender = new FrameSender(this, FrameType.AMQP, -1, drainResponse, null);
flowResponseSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
flowResponseSender.setChannel(flowMatcher.getActualChannel());
drainResponse.setHandle(calculateLinkHandle(flowMatcher));
drainResponse.setDeliveryCount(calculateNewDeliveryCount(flowMatcher));
drainResponse.setNextOutgoingId(calculateNewOutgoingId(flowMatcher, count));
drainResponse.setNextIncomingId(flowMatcher.getReceivedNextOutgoingId());
}
});
addComposite = true;
composite.add(flowResponseSender);
}
if(addComposite) {
flowMatcher.onSuccess(composite);
}
addHandler(flowMatcher);
}
private UnsignedInteger calculateLinkHandle(final FlowMatcher flowMatcher) {
UnsignedInteger h = (UnsignedInteger) flowMatcher.getReceivedHandle();
return h.add(UnsignedInteger.valueOf(LINK_HANDLE_OFFSET));
}
private UnsignedInteger calculateNewDeliveryCount(FlowMatcher flowMatcher) {
UnsignedInteger dc = (UnsignedInteger) flowMatcher.getReceivedDeliveryCount();
UnsignedInteger lc = (UnsignedInteger) flowMatcher.getReceivedLinkCredit();
return dc.add(lc);
}
private UnsignedInteger calculateNewOutgoingId(FlowMatcher flowMatcher, int sentCount) {
UnsignedInteger nid = (UnsignedInteger) flowMatcher.getReceivedNextIncomingId();
return nid.add(UnsignedInteger.valueOf(sentCount));
}
private Binary prepareTransferPayload(final HeaderDescribedType headerDescribedType,
final MessageAnnotationsDescribedType messageAnnotationsDescribedType,
final PropertiesDescribedType propertiesDescribedType,
final ApplicationPropertiesDescribedType appPropertiesDescribedType,
final DescribedType content)
{
Data payloadData = Proton.data(1024);
if(headerDescribedType != null)
{
payloadData.putDescribedType(headerDescribedType);
}
if(messageAnnotationsDescribedType != null)
{
payloadData.putDescribedType(messageAnnotationsDescribedType);
}
if(propertiesDescribedType != null)
{
payloadData.putDescribedType(propertiesDescribedType);
}
if(appPropertiesDescribedType != null)
{
payloadData.putDescribedType(appPropertiesDescribedType);
}
if(content != null)
{
payloadData.putDescribedType(content);
}
return payloadData.encode();
}
public void expectTransfer(Matcher<Binary> expectedPayloadMatcher)
{
expectTransfer(expectedPayloadMatcher, false, new Accepted(), true);
}
//TODO: fix responseState to only admit applicable types.
public void expectTransfer(Matcher<Binary> expectedPayloadMatcher, boolean settled, ListDescribedType responseState, boolean responseSettled)
{
Matcher<Boolean> settledMatcher = null;
if(settled)
{
settledMatcher = equalTo(true);
}
else
{
settledMatcher = Matchers.anyOf(equalTo(false), nullValue());
}
final TransferMatcher transferMatcher = new TransferMatcher();
transferMatcher.setPayloadMatcher(expectedPayloadMatcher);
transferMatcher.withSettled(settledMatcher);
final DispositionFrame dispositionResponse = new DispositionFrame()
.setRole(Role.RECEIVER)
.setSettled(responseSettled)
.setState(responseState);
// The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender dispositionFrameSender = new FrameSender(this, FrameType.AMQP, -1, dispositionResponse, null);
dispositionFrameSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
dispositionFrameSender.setChannel(transferMatcher.getActualChannel());
dispositionResponse.setFirst(transferMatcher.getReceivedDeliveryId());
}
});
transferMatcher.onSuccess(dispositionFrameSender);
addHandler(transferMatcher);
}
public void expectDispositionThatIsAcceptedAndSettled()
{
expectDisposition(true, new DescriptorMatcher(Accepted.DESCRIPTOR_CODE, Accepted.DESCRIPTOR_SYMBOL));
}
public void expectDisposition(boolean settled, Matcher<?> stateMatcher)
{
Matcher<Boolean> settledMatcher = null;
if(settled)
{
settledMatcher = equalTo(true);
}
else
{
settledMatcher = Matchers.anyOf(equalTo(false), nullValue());
}
addHandler(new DispositionMatcher()
.withSettled(settledMatcher)
.withState(stateMatcher));
}
private Target createTargetObjectFromDescribedType(Object o) {
assertThat(o, instanceOf(DescribedType.class));
Object described = ((DescribedType)o).getDescribed();
assertThat(described, instanceOf(List.class));
@SuppressWarnings("unchecked")
List<Object> targetFields = (List<Object>) described;
return new Target(targetFields);
}
}