blob: 6d54148d5e309f8f28b464f88b6a32cdf97344f0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.jms.test.testpeer;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.ANONYMOUS_RELAY;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DELAYED_DELIVERY;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DYNAMIC_NODE_LIFETIME_POLICY;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.GLOBAL;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SHARED;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SHARED_SUBS;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SOLE_CONNECTION_CAPABILITY;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import java.io.IOException;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLContext;
import javax.security.auth.Subject;
import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;
import javax.security.auth.login.LoginContext;
import javax.security.sasl.AuthorizeCallback;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslServer;
import org.apache.qpid.jms.provider.amqp.AmqpSupport;
import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
import org.apache.qpid.jms.test.testpeer.basictypes.ReceiverSettleMode;
import org.apache.qpid.jms.test.testpeer.basictypes.Role;
import org.apache.qpid.jms.test.testpeer.basictypes.SenderSettleMode;
import org.apache.qpid.jms.test.testpeer.basictypes.TerminusDurability;
import org.apache.qpid.jms.test.testpeer.basictypes.TerminusExpiryPolicy;
import org.apache.qpid.jms.test.testpeer.basictypes.TransactionError;
import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted;
import org.apache.qpid.jms.test.testpeer.describedtypes.AttachFrame;
import org.apache.qpid.jms.test.testpeer.describedtypes.BeginFrame;
import org.apache.qpid.jms.test.testpeer.describedtypes.CloseFrame;
import org.apache.qpid.jms.test.testpeer.describedtypes.Coordinator;
import org.apache.qpid.jms.test.testpeer.describedtypes.Declare;
import org.apache.qpid.jms.test.testpeer.describedtypes.Declared;
import org.apache.qpid.jms.test.testpeer.describedtypes.DetachFrame;
import org.apache.qpid.jms.test.testpeer.describedtypes.Discharge;
import org.apache.qpid.jms.test.testpeer.describedtypes.DispositionFrame;
import org.apache.qpid.jms.test.testpeer.describedtypes.EndFrame;
import org.apache.qpid.jms.test.testpeer.describedtypes.FlowFrame;
import org.apache.qpid.jms.test.testpeer.describedtypes.FrameDescriptorMapping;
import org.apache.qpid.jms.test.testpeer.describedtypes.OpenFrame;
import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected;
import org.apache.qpid.jms.test.testpeer.describedtypes.Released;
import org.apache.qpid.jms.test.testpeer.describedtypes.SaslChallengeFrame;
import org.apache.qpid.jms.test.testpeer.describedtypes.SaslMechanismsFrame;
import org.apache.qpid.jms.test.testpeer.describedtypes.SaslOutcomeFrame;
import org.apache.qpid.jms.test.testpeer.describedtypes.Source;
import org.apache.qpid.jms.test.testpeer.describedtypes.Target;
import org.apache.qpid.jms.test.testpeer.describedtypes.TransferFrame;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.ApplicationPropertiesDescribedType;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.HeaderDescribedType;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.MessageAnnotationsDescribedType;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
import org.apache.qpid.jms.test.testpeer.matchers.AttachMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.BeginMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.CloseMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.DeleteOnCloseMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.DetachMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.DispositionMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.EndMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.FlowMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.OpenMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.SaslInitMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.SaslResponseMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.TransferMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.UnsignedShort;
import org.apache.qpid.proton.codec.Data;
import org.apache.qpid.proton.engine.impl.AmqpHeader;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestAmqpPeer implements AutoCloseable
{
private static final Logger LOGGER = LoggerFactory.getLogger(TestAmqpPeer.class.getName());
public static final String MESSAGE_NUMBER = "MessageNumber";
private static final Symbol ANONYMOUS = Symbol.valueOf("ANONYMOUS");
private static final Symbol EXTERNAL = Symbol.valueOf("EXTERNAL");
private static final Symbol PLAIN = Symbol.valueOf("PLAIN");
private static final Symbol GSSAPI = Symbol.valueOf("GSSAPI");
private static final Symbol XOAUTH2 = Symbol.valueOf("XOAUTH2");
private static final UnsignedByte SASL_OK = UnsignedByte.valueOf((byte) 0);
private static final UnsignedByte SASL_FAIL_AUTH = UnsignedByte.valueOf((byte) 1);
private static final UnsignedByte SASL_SYS_TEMP = UnsignedByte.valueOf((byte) 4);
private static final int CONNECTION_CHANNEL = 0;
private static final int DEFAULT_PRODUCER_CREDIT = 100;
private static final Symbol[] DEFAULT_DESIRED_CAPABILITIES = new Symbol[] { SOLE_CONNECTION_CAPABILITY, DELAYED_DELIVERY, ANONYMOUS_RELAY, SHARED_SUBS};
private volatile AssertionError _firstAssertionError = null;
private final TestAmqpPeerRunner _driverRunnable;
private final Thread _driverThread;
/**
* Guarded by {@link #_handlersLock}
*/
private final List<Handler> _handlers = new ArrayList<Handler>();
private final Object _handlersLock = new Object();
/**
* Guarded by {@link #_handlersLock}
*/
private CountDownLatch _handlersCompletedLatch;
private byte[] _deferredBytes;
private int _lastInitiatedChannel = -1;
private UnsignedInteger _lastInitiatedLinkHandle = null;
private UnsignedInteger _lastInitiatedCoordinatorLinkHandle = null;
private int advertisedIdleTimeout = 0;
private AtomicInteger _emptyFrameCount = new AtomicInteger();
public TestAmqpPeer() throws IOException
{
this(null, false);
}
public TestAmqpPeer(SSLContext context, boolean needClientCert) throws IOException
{
this(context, needClientCert, false);
}
public TestAmqpPeer(SSLContext context, boolean needClientCert, boolean sendSaslHeaderPreEmptively) throws IOException
{
_driverRunnable = new TestAmqpPeerRunner(this, context, needClientCert);
_driverRunnable.setSendSaslHeaderPreEmptively(sendSaslHeaderPreEmptively);
_driverThread = new Thread(_driverRunnable, "MockAmqpPeer-" + _driverRunnable.getServerPort());
_driverThread.start();
}
/**
* Shuts down the test peer, throwing any Throwable
* that occurred on the peer, or validating that no
* unused matchers remain.
*/
@Override
public void close() throws Exception
{
_driverRunnable.stop();
try
{
_driverThread.join(30000);
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
}
finally
{
AssertionError ae = _firstAssertionError;
if(ae != null)
{
String message = "Assertion failure during test run";
if(ae.getMessage() != null)
{
message += ": " + ae.getMessage();
}
throw new AssertionError(message, _firstAssertionError);
}
Throwable throwable = getThrowable();
if(throwable == null)
{
synchronized(_handlersLock)
{
assertThat(_handlers, Matchers.empty());
}
}
else
{
//AutoClosable can't handle throwing Throwables, so we wrap it.
throw new RuntimeException("TestPeer caught throwable during run", throwable);
}
}
}
public Throwable getThrowable()
{
return _driverRunnable.getException();
}
public void setSuppressReadExceptionOnClose(boolean suppress)
{
_driverRunnable.setSuppressReadExceptionOnClose(suppress);
}
public int getServerPort()
{
return _driverRunnable.getServerPort();
}
public Socket getClientSocket()
{
return _driverRunnable.getClientSocket();
}
public boolean isSSL()
{
return _driverRunnable.isSSL();
}
public int getAdvertisedIdleTimeout()
{
return advertisedIdleTimeout;
}
public void setAdvertisedIdleTimeout(int advertisedIdleTimeout)
{
this.advertisedIdleTimeout = advertisedIdleTimeout;
}
public int getEmptyFrameCount() {
return _emptyFrameCount.get();
}
public void purgeExpectations() {
synchronized (_handlersLock) {
_handlers.clear();
}
}
void receiveHeader(byte[] header)
{
Handler handler = getFirstHandler();
if(handler instanceof HeaderHandler)
{
((HeaderHandler)handler).header(header,this);
removeFirstHandler();
}
else
{
throw new IllegalStateException("Received header but the next handler is a " + handler);
}
}
void receiveFrame(int type, int channel, int frameSize, DescribedType describedType, Binary payload)
{
Handler handler = getFirstHandler();
while (handler instanceof FrameHandler && ((FrameHandler) handler).isOptional())
{
FrameHandler frameHandler = (FrameHandler) handler;
if(frameHandler.descriptorMatches(describedType.getDescriptor())){
LOGGER.info("Optional frame handler matches the descriptor, proceeding to verify it");
break;
} else {
LOGGER.info("Skipping non-matching optional frame handler, received frame descriptor (" + describedType.getDescriptor() + ") does not match handler: " + frameHandler);
removeFirstHandler();
handler = getFirstHandler();
}
}
if(handler == null)
{
Object actualDescriptor = describedType.getDescriptor();
Object mappedDescriptor = FrameDescriptorMapping.lookupMapping(actualDescriptor);
throw new IllegalStateException("No handler! Received frame, descriptor=" + actualDescriptor + "/" + mappedDescriptor);
}
if(handler instanceof FrameHandler)
{
((FrameHandler)handler).frame(type, channel, frameSize, describedType, payload, this);
removeFirstHandler();
}
else
{
throw new IllegalStateException("Received frame but the next handler is a " + handler);
}
}
void receiveEmptyFrame(int type, int channel)
{
_emptyFrameCount.incrementAndGet();
LOGGER.debug("Received empty frame");
}
private void removeFirstHandler()
{
synchronized(_handlersLock)
{
Handler h = _handlers.remove(0);
if(_handlersCompletedLatch != null)
{
_handlersCompletedLatch.countDown();
}
LOGGER.trace("Removed completed handler: {}", h);
}
}
private void addHandler(Handler handler)
{
synchronized(_handlersLock)
{
_handlers.add(handler);
LOGGER.trace("Added handler: {}", handler);
}
}
private Handler getFirstHandler()
{
synchronized(_handlersLock)
{
if(_handlers.isEmpty())
{
return null;
}
return _handlers.get(0);
}
}
private Handler getLastHandler()
{
synchronized(_handlersLock)
{
if(_handlers.isEmpty())
{
throw new IllegalStateException("No handlers");
}
return _handlers.get(_handlers.size() - 1);
}
}
public void waitForAllHandlersToComplete(int timeoutMillis) throws InterruptedException
{
boolean countedDownOk = waitForAllHandlersToCompleteNoAssert(timeoutMillis);
String message = "All handlers did not complete within the " + timeoutMillis + "ms timeout.";
Throwable t = getThrowable();
if(t != null){
message += System.lineSeparator() + "A *potential* reason, peer caught throwable: " + t;
}
Assert.assertTrue(message, countedDownOk);
}
public boolean waitForAllHandlersToCompleteNoAssert(int timeoutMillis) throws InterruptedException
{
synchronized(_handlersLock)
{
_handlersCompletedLatch = new CountDownLatch(_handlers.size());
}
return _handlersCompletedLatch.await(timeoutMillis, TimeUnit.MILLISECONDS);
}
void sendHeader(byte[] header)
{
LOGGER.debug("About to send header: {}", new Binary(header));
_driverRunnable.sendBytes(header);
}
public void sendPreemptiveServerAmqpHeader() {
// Arrange to send the AMQP header after the previous handler
CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
comp.add(new AmqpPeerRunnable() {
@Override
public void run() {
sendHeader(AmqpHeader.HEADER);
}
});
}
public void sendEmptyFrame(boolean deferWrite)
{
sendFrame(FrameType.AMQP, 0, null, null, deferWrite, 0);
}
void sendFrame(FrameType type, int channel, DescribedType frameDescribedType, Binary framePayload, boolean deferWrite, long sendDelay)
{
if(channel < 0)
{
throw new IllegalArgumentException("Frame must be sent on a channel >= 0");
}
LOGGER.debug("About to send: {}", frameDescribedType);
if(sendDelay > 0)
{
LOGGER.debug("Delaying send by {} ms", sendDelay);
try {
Thread.sleep(sendDelay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted before send", e);
}
}
byte[] output = AmqpDataFramer.encodeFrame(type, channel, frameDescribedType, framePayload);
if(deferWrite && _deferredBytes == null)
{
_deferredBytes = output;
}
else if(_deferredBytes != null)
{
int newCapacity = _deferredBytes.length + output.length;
//TODO: check overflow
byte[] newOutput = new byte[newCapacity];
System.arraycopy(_deferredBytes, 0, newOutput, 0, _deferredBytes.length);
System.arraycopy(output, 0, newOutput, _deferredBytes.length, output.length);
_deferredBytes = newOutput;
output = newOutput;
}
if(deferWrite)
{
LOGGER.debug("Deferring write until pipelined with future frame bytes");
return;
}
else
{
//clear the deferred bytes to avoid corrupting future sends
_deferredBytes = null;
_driverRunnable.sendBytes(output);
}
}
private OpenFrame createOpenFrame()
{
OpenFrame openFrame = new OpenFrame();
openFrame.setContainerId("test-amqp-peer-container-id");
if(advertisedIdleTimeout != 0)
{
openFrame.setIdleTimeOut(UnsignedInteger.valueOf(advertisedIdleTimeout));
}
return openFrame;
}
public void expectHeader(byte[] header, byte[] response)
{
addHandler(new HeaderHandlerImpl(header, response));
}
private void expectSaslAuthentication(Symbol mechanism, Matcher<Binary> initialResponseMatcher, Matcher<?> hostnameMatcher,
boolean sendSaslHeaderResponse, boolean amqpHeaderSentPreemptively)
{
SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(mechanism);
byte[] saslHeaderResponse = null;
if(sendSaslHeaderResponse) {
saslHeaderResponse = AmqpHeader.SASL_HEADER;
}
addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, saslHeaderResponse,
new FrameSender(
this, FrameType.SASL, 0,
saslMechanismsFrame, null)));
SaslInitMatcher saslInitMatcher = new SaslInitMatcher()
.withMechanism(equalTo(mechanism))
.withInitialResponse(initialResponseMatcher)
.onCompletion(new AmqpPeerRunnable()
{
@Override
public void run()
{
TestAmqpPeer.this.sendFrame(
FrameType.SASL, 0,
new SaslOutcomeFrame().setCode(SASL_OK),
null,
false, 0);
// Now that we processed the SASL layer AMQP header, reset the
// peer to expect the non-SASL AMQP header.
_driverRunnable.expectHeader();
}
});
if(hostnameMatcher != null)
{
saslInitMatcher.withHostname(hostnameMatcher);
}
addHandler(saslInitMatcher);
if (!amqpHeaderSentPreemptively)
{
addHandler(new HeaderHandlerImpl(AmqpHeader.HEADER, AmqpHeader.HEADER));
}
}
public void expectSaslGSSAPIFail() throws Exception {
SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(GSSAPI);
addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER,
new FrameSender(
this, FrameType.SASL, 0,
saslMechanismsFrame, null)));
}
public void expectSaslGSSAPI(String serviceName, String keyTab, String clientAuthId) throws Exception {
SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(GSSAPI);
addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER,
new FrameSender(
this, FrameType.SASL, 0,
saslMechanismsFrame, null)));
// setup server gss context
final Map<String, String> options = new HashMap<>();
options.put("principal", serviceName);
options.put("useKeyTab", "true");
options.put("keyTab", keyTab);
options.put("storeKey", "true");
options.put("isInitiator", "false");
Configuration loginCconfiguration = new Configuration() {
@Override
public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
return new AppConfigurationEntry[]{
new AppConfigurationEntry("com.sun.security.auth.module.Krb5LoginModule",
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
options)};
}
};
LoginContext loginContext = new LoginContext("", null, null, loginCconfiguration);
loginContext.login();
final Subject serverSubject =loginContext.getSubject();
LOGGER.info("saslServer subject:" + serverSubject.getPrivateCredentials());
Map<String, ?> config = new HashMap<>();
final CallbackHandler handler = new CallbackHandler() {
@Override
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
LOGGER.info("Here with: " + Arrays.asList(callbacks));
for (Callback callback :callbacks) {
if (callback instanceof AuthorizeCallback) {
AuthorizeCallback authorizeCallback = (AuthorizeCallback) callback;
authorizeCallback.setAuthorized(authorizeCallback.getAuthenticationID().equals(authorizeCallback.getAuthorizationID()));
}
}
}
};
final SaslServer saslServer = Subject.doAs(serverSubject, new PrivilegedExceptionAction<SaslServer>() {
@Override
public SaslServer run() throws Exception {
return Sasl.createSaslServer(GSSAPI.toString(), null, null, config, handler);
}
});
final SaslChallengeFrame challengeFrame1 = new SaslChallengeFrame();
SaslInitMatcher saslInitMatcher = new SaslInitMatcher()
.withMechanism(equalTo(GSSAPI))
.withInitialResponse(new BaseMatcher<Binary>() {
@Override
public void describeTo(Description description) {}
@Override
public boolean matches(Object o) {
if (o == null) {
LOGGER.error("Got null initial response!");
return false;
}
final Binary binary = (Binary) o;
// validate via sasl
try {
byte[] challenge1data = Subject.doAs(serverSubject, new PrivilegedExceptionAction<byte[]>() {
@Override
public byte[] run() throws Exception {
LOGGER.info("Evaluate Initial Response.. size:" + binary.getLength());
return saslServer.evaluateResponse(binary.getArray());
}
});
LOGGER.info("Creating challenge 1.. size: " + challenge1data.length);
challengeFrame1.setChallenge(new Binary(challenge1data));
} catch (PrivilegedActionException e) {
LOGGER.error("Unexpected error during processing initial response", e);
throw new RuntimeException("Failed to eval initial response", e);
}
LOGGER.info("Complete:" + saslServer.isComplete());
return true;
}
}).onCompletion(new AmqpPeerRunnable() {
@Override
public void run() {
LOGGER.info("Send challenge 1..");
TestAmqpPeer.this.sendFrame(
FrameType.SASL, 0,
challengeFrame1,
null,
false, 0);
}
});
AtomicBoolean succeeded = new AtomicBoolean(false);
final SaslChallengeFrame challengeFrame2 = new SaslChallengeFrame();
SaslResponseMatcher responseMatcher1 = new SaslResponseMatcher().withResponse(new BaseMatcher<Binary>() {
@Override
public void describeTo(Description description) {}
@Override
public boolean matches(Object o) {
final Binary responseBinary1 = (Binary) o;
// validate via sasl
byte[] challenge2data = null;
try {
challenge2data = Subject.doAs(serverSubject, new PrivilegedExceptionAction<byte[]>() {
@Override
public byte[] run() throws Exception {
LOGGER.info("Evaluate challenge response 1.. size:" + responseBinary1.getLength());
return saslServer.evaluateResponse(responseBinary1.getArray());
}
});
} catch (PrivilegedActionException e) {
LOGGER.error("Unexpected error during processing challenge response 1", e);
throw new RuntimeException("failed to evaluate challenge response 1", e);
}
LOGGER.info("Creating challenge 2.. size: " + challenge2data.length);
challengeFrame2.setChallenge(new Binary(challenge2data));
LOGGER.info("Complete:" + saslServer.isComplete());
return true;
}
}).onCompletion(new AmqpPeerRunnable() {
@Override
public void run() {
LOGGER.info("Send challenge 2..");
TestAmqpPeer.this.sendFrame(
FrameType.SASL, 0,
challengeFrame2,
null,
false, 0);
}
});
SaslResponseMatcher responseMatcher2 = new SaslResponseMatcher().withResponse(new BaseMatcher<Binary>() {
@Override
public void describeTo(Description description) {}
@Override
public boolean matches(Object o) {
final Binary binary = (Binary) o;
// validate via sasl
byte[] additionalData = null;
try {
additionalData = Subject.doAs(serverSubject, new PrivilegedExceptionAction<byte[]>() {
@Override
public byte[] run() throws Exception {
LOGGER.info("Evaluate challenge response 2.. size:" + binary.getLength());
return saslServer.evaluateResponse(binary.getArray());
}
});
} catch (PrivilegedActionException e) {
LOGGER.error("Unexpected error during processing challenge response 2", e);
throw new RuntimeException("failed to evaluate challenge response 2", e);
}
boolean complete = saslServer.isComplete();
boolean expectedAuthId = false;
if(complete) {
expectedAuthId = clientAuthId.equals(saslServer.getAuthorizationID());
LOGGER.info("Authorized ID: " + saslServer.getAuthorizationID());
}
LOGGER.info("Complete:" + complete + ", expectedAuthID:" + expectedAuthId +", additionalData:" + Arrays.toString(additionalData));
if(complete && expectedAuthId && additionalData == null) {
succeeded.set(true);
return true;
} else {
return false;
}
}
}).onCompletion(new AmqpPeerRunnable() {
@Override
public void run() {
SaslOutcomeFrame saslOutcome = new SaslOutcomeFrame();
if (saslServer.isComplete() && succeeded.get()) {
saslOutcome.setCode(SASL_OK);
} else {
saslOutcome.setCode(SASL_FAIL_AUTH);
}
LOGGER.info("Send Outcome");
TestAmqpPeer.this.sendFrame(
FrameType.SASL, 0,
saslOutcome,
null,
false, 0);
// Now that we processed the SASL layer AMQP header, reset the
// peer to expect the non-SASL AMQP header.
_driverRunnable.expectHeader();
}
});
addHandler(saslInitMatcher);
addHandler(responseMatcher1);
addHandler(responseMatcher2);
addHandler(new HeaderHandlerImpl(AmqpHeader.HEADER, AmqpHeader.HEADER));
}
public void expectSaslPlain(String username, String password)
{
byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8);
byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8);
byte[] data = new byte[usernameBytes.length+passwordBytes.length+2];
System.arraycopy(usernameBytes, 0, data, 1, usernameBytes.length);
System.arraycopy(passwordBytes, 0, data, 2 + usernameBytes.length, passwordBytes.length);
Matcher<Binary> initialResponseMatcher = equalTo(new Binary(data));
expectSaslAuthentication(PLAIN, initialResponseMatcher, null, true, false);
}
public void expectSaslXOauth2(String username, String password)
{
byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8);
byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8);
byte[] data = new byte[usernameBytes.length+passwordBytes.length+20];
System.arraycopy("user=".getBytes(StandardCharsets.US_ASCII), 0, data, 0, 5);
System.arraycopy(usernameBytes, 0, data, 5, usernameBytes.length);
data[5+usernameBytes.length] = 1;
System.arraycopy("auth=Bearer ".getBytes(StandardCharsets.US_ASCII), 0, data, 6+usernameBytes.length, 12);
System.arraycopy(passwordBytes, 0, data, 18 + usernameBytes.length, passwordBytes.length);
data[data.length-2] = 1;
data[data.length-1] = 1;
Matcher<Binary> initialResponseMatcher = equalTo(new Binary(data));
expectSaslAuthentication(XOAUTH2, initialResponseMatcher, null, true, false);
}
public void expectSaslExternal()
{
if(!_driverRunnable.isNeedClientCert())
{
throw new IllegalStateException("need-client-cert must be enabled on the test peer");
}
expectSaslAuthentication(EXTERNAL, equalTo(new Binary(new byte[0])), null, true, false);
}
public void expectSaslAnonymous()
{
expectSaslAnonymous(null);
}
public void expectSaslAnonymous(Matcher<?> hostnameMatcher)
{
expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), hostnameMatcher, true, false);
}
public void expectSaslAnonymousWithPreEmptiveServerHeader()
{
assertThat("Peer should be created with instruction to send preemptively", _driverRunnable.isSendSaslHeaderPreEmptively(), equalTo(true));
boolean sendSaslHeaderResponse = false; // Must arrange for the server to have already sent it preemptively
expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), null, sendSaslHeaderResponse, false);
}
public void expectSaslAnonymousWithServerAmqpHeaderSentPreemptively()
{
expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), null, true, true);
}
public void expectSaslFailingAuthentication(Symbol[] serverMechs, Symbol clientSelectedMech)
{
expectSaslFailingExchange(serverMechs, clientSelectedMech, SASL_FAIL_AUTH);
}
public void expectSaslFailingExchange(Symbol[] serverMechs, Symbol clientSelectedMech, UnsignedByte saslFailureAuthCode)
{
SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(serverMechs);
addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER,
new FrameSender(
this, FrameType.SASL, 0,
saslMechanismsFrame, null)));
if(saslFailureAuthCode.compareTo(SASL_FAIL_AUTH) < 0 || saslFailureAuthCode.compareTo(SASL_SYS_TEMP) > 0) {
throw new IllegalArgumentException("A valid failing SASL code must be supplied");
}
SaslInitMatcher saslInitMatcher = new SaslInitMatcher().withMechanism(equalTo(clientSelectedMech));
saslInitMatcher.onCompletion(new AmqpPeerRunnable()
{
@Override
public void run()
{
TestAmqpPeer.this.sendFrame(
FrameType.SASL, 0,
new SaslOutcomeFrame().setCode(saslFailureAuthCode),
null,
false, 0);
_driverRunnable.expectHeader();
}
});
addHandler(saslInitMatcher);
}
public void expectSaslMechanismNegotiationFailure(Symbol[] serverMechs)
{
SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(serverMechs);
FrameSender mechanismsFrameSender = new FrameSender(this, FrameType.SASL, 0, saslMechanismsFrame, null);
addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER, mechanismsFrameSender));
}
/**
* Expect a connection that does not use a SASL layer, but proceeds straight
* to the AMQP connection (useful to skip a stage for connections that don't
* require SASL, e.g. because of anonymous or client certificate authentication).
*
* @param maxFrameSizeMatcher
* The Matcher used to validate the maxFrameSize setting.
*/
public void expectSaslLayerDisabledConnect(Matcher<?> maxFrameSizeMatcher)
{
addHandler(new HeaderHandlerImpl(AmqpHeader.HEADER, AmqpHeader.HEADER));
OpenFrame openFrame = createOpenFrame();
OpenMatcher openMatcher = new OpenMatcher()
.withContainerId(notNullValue(String.class));
if(maxFrameSizeMatcher != null) {
openMatcher.withMaxFrameSize(maxFrameSizeMatcher);
}
openMatcher.onCompletion(new FrameSender(
this, FrameType.AMQP, 0,
openFrame,
null));
addHandler(openMatcher);
}
public void expectOpen() {
expectOpen(false);
}
public void expectOpen(boolean deferOpened) {
expectOpen(null, null, deferOpened);
}
public void expectOpen(Map<Symbol, Object> serverProperties) {
expectOpen(DEFAULT_DESIRED_CAPABILITIES, new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY }, null, serverProperties, null, null, false);
}
public void expectOpen(Map<Symbol, Object> serverProperties, Symbol[] serverCapabilities) {
expectOpen(DEFAULT_DESIRED_CAPABILITIES, serverCapabilities, null, serverProperties, null, null, false);
}
public void expectOpen(Matcher<?> clientPropertiesMatcher, Matcher<?> hostnameMatcher, boolean deferOpened) {
expectOpen(clientPropertiesMatcher, nullValue(), hostnameMatcher, deferOpened);
}
public void expectOpen(Matcher<?> clientPropertiesMatcher, Matcher<?> idleTimeoutMatcher, Matcher<?> hostnameMatcher, boolean deferOpened) {
expectOpen(DEFAULT_DESIRED_CAPABILITIES, new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY }, clientPropertiesMatcher, null, null, hostnameMatcher, deferOpened);
}
public void sendPreemptiveServerOpenFrame() {
// Arrange to send the Open frame after the previous handler
OpenFrame open = createOpenFrame();
CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
comp.add(new FrameSender(this, FrameType.AMQP, 0, open, null));
}
public void expectOpen(Symbol[] desiredCapabilities, Symbol[] serverCapabilities,
Matcher<?> clientPropertiesMatcher, Map<Symbol, Object> serverProperties,
Matcher<?> idleTimeoutMatcher, Matcher<?> hostnameMatcher, boolean deferOpened) {
OpenFrame open = createOpenFrame();
if (serverCapabilities != null) {
open.setOfferedCapabilities(serverCapabilities);
}
if (serverProperties != null) {
open.setProperties(serverProperties);
}
OpenMatcher openMatcher = new OpenMatcher().withContainerId(notNullValue(String.class));
if (!deferOpened) {
openMatcher.onCompletion(new FrameSender(this, FrameType.AMQP, 0, open, null));
}
if (desiredCapabilities != null) {
openMatcher.withDesiredCapabilities(arrayContaining(desiredCapabilities));
} else {
openMatcher.withDesiredCapabilities(nullValue());
}
if (idleTimeoutMatcher != null) {
openMatcher.withIdleTimeOut(idleTimeoutMatcher);
}
if (hostnameMatcher != null) {
openMatcher.withHostname(hostnameMatcher);
}
if (clientPropertiesMatcher != null) {
openMatcher.withProperties(clientPropertiesMatcher);
}
addHandler(openMatcher);
}
public void rejectConnect(Symbol errorType, String errorMessage, Map<Symbol, Object> errorInfo) {
// Expect a connection, establish through the SASL negotiation and sending of the Open frame
Map<Symbol, Object> serverProperties = new HashMap<Symbol, Object>();
serverProperties.put(AmqpSupport.CONNECTION_OPEN_FAILED, true);
expectSaslAnonymous();
expectOpen(serverProperties);
// Now generate the Close frame with the supplied error
final FrameSender closeSender = createCloseFrameSender(errorType, errorMessage, errorInfo, 0);
// Update the handler to send the Close frame after the Open frame.
CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
comp.add(closeSender);
addHandler(new CloseMatcher().withError(Matchers.nullValue()));
}
public void expectClose()
{
expectClose(Matchers.nullValue(), true);
}
public void expectClose(boolean sendReply)
{
expectClose(Matchers.nullValue(), sendReply);
}
public void expectClose(Matcher<?> errorMatcher, boolean sendReply)
{
CloseMatcher closeMatcher = new CloseMatcher().withError(errorMatcher);
if(sendReply) {
closeMatcher.onCompletion(new FrameSender(this, FrameType.AMQP, 0,
new CloseFrame(),
null));
}
addHandler(closeMatcher);
}
public void expectBegin()
{
expectBegin(notNullValue(), true);
}
public void expectBegin(Matcher<?> outgoingWindowMatcher, boolean sendResponse)
{
final BeginMatcher beginMatcher = new BeginMatcher()
.withRemoteChannel(nullValue())
.withNextOutgoingId(equalTo(UnsignedInteger.ONE))
.withIncomingWindow(notNullValue());
if(outgoingWindowMatcher != null)
{
beginMatcher.withOutgoingWindow(notNullValue());
}
else
{
beginMatcher.withOutgoingWindow(outgoingWindowMatcher);
}
if(sendResponse) {
// The response will have its remoteChannel field dynamically set based on incoming value
final BeginFrame beginResponse = new BeginFrame()
.setNextOutgoingId(UnsignedInteger.ONE)
.setIncomingWindow(UnsignedInteger.ZERO)
.setOutgoingWindow(UnsignedInteger.ZERO);
// The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender beginResponseSender = new FrameSender(this, FrameType.AMQP, -1, beginResponse, null);
beginResponseSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
int actualChannel = beginMatcher.getActualChannel();
beginResponseSender.setChannel(actualChannel);
beginResponse.setRemoteChannel(
UnsignedShort.valueOf((short) actualChannel));
_lastInitiatedChannel = actualChannel;
}
});
beginMatcher.onCompletion(beginResponseSender);
}
addHandler(beginMatcher);
}
public void expectEnd()
{
expectEnd(true);
}
public void expectEnd(boolean sendResponse)
{
final EndMatcher endMatcher = new EndMatcher();
if (sendResponse) {
final EndFrame endResponse = new EndFrame();
// The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender frameSender = new FrameSender(this, FrameType.AMQP, -1, endResponse, null);
frameSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
frameSender.setChannel(endMatcher.getActualChannel());
}
});
endMatcher.onCompletion(frameSender);
}
addHandler(endMatcher);
}
public void expectTempQueueCreationAttach(final String dynamicAddress)
{
expectTempNodeCreationAttach(dynamicAddress, AmqpDestinationHelper.TEMP_QUEUE_CAPABILITY, false, false, null, null);
}
public void expectTempQueueCreationAttach(final String dynamicAddress, boolean sendReponse)
{
expectTempNodeCreationAttach(dynamicAddress, AmqpDestinationHelper.TEMP_QUEUE_CAPABILITY, sendReponse, false, false, null, null);
}
public void expectTempTopicCreationAttach(final String dynamicAddress)
{
expectTempNodeCreationAttach(dynamicAddress, AmqpDestinationHelper.TEMP_TOPIC_CAPABILITY, false, false, null, null);
}
public void expectTempTopicCreationAttach(final String dynamicAddress, boolean sendReponse)
{
expectTempNodeCreationAttach(dynamicAddress, AmqpDestinationHelper.TEMP_TOPIC_CAPABILITY, sendReponse, false, false, null, null);
}
public void expectAndRefuseTempQueueCreationAttach(Symbol errorType, String errorMessage, boolean deferAttachResponseWrite)
{
expectTempNodeCreationAttach(null, AmqpDestinationHelper.TEMP_QUEUE_CAPABILITY, true, deferAttachResponseWrite, errorType, errorMessage);
}
public void expectAndRefuseTempTopicCreationAttach(Symbol errorType, String errorMessage, boolean deferAttachResponseWrite)
{
expectTempNodeCreationAttach(null, AmqpDestinationHelper.TEMP_TOPIC_CAPABILITY, true, deferAttachResponseWrite, errorType, errorMessage);
}
private void expectTempNodeCreationAttach(final String dynamicAddress, final Symbol nodeTypeCapability, final boolean refuseLink, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage)
{
expectTempNodeCreationAttach(dynamicAddress, nodeTypeCapability, true, refuseLink, deferAttachResponseWrite, errorType, errorMessage);
}
private void expectTempNodeCreationAttach(final String dynamicAddress, final Symbol nodeTypeCapability, boolean sendResponse, final boolean refuseLink, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage)
{
TargetMatcher targetMatcher = new TargetMatcher();
targetMatcher.withAddress(nullValue());
targetMatcher.withDynamic(equalTo(true));
targetMatcher.withDurable(equalTo(TerminusDurability.NONE));
targetMatcher.withExpiryPolicy(equalTo(TerminusExpiryPolicy.LINK_DETACH));
targetMatcher.withDynamicNodeProperties(hasEntry(equalTo(DYNAMIC_NODE_LIFETIME_POLICY), new DeleteOnCloseMatcher()));
targetMatcher.withCapabilities(arrayContaining(nodeTypeCapability));
final AttachMatcher attachMatcher = new AttachMatcher()
.withName(notNullValue())
.withHandle(notNullValue())
.withRole(equalTo(Role.SENDER))
.withSndSettleMode(equalTo(SenderSettleMode.UNSETTLED))
.withRcvSettleMode(equalTo(ReceiverSettleMode.FIRST))
.withSource(notNullValue())
.withTarget(targetMatcher);
if (sendResponse)
{
final AttachFrame attachResponse = new AttachFrame()
.setRole(Role.RECEIVER)
.setSndSettleMode(SenderSettleMode.UNSETTLED)
.setRcvSettleMode(ReceiverSettleMode.FIRST);
// The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender attachResponseSender = new FrameSender(this, FrameType.AMQP, -1, attachResponse, null);
attachResponseSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
Object receivedHandle = attachMatcher.getReceivedHandle();
attachResponseSender.setChannel(attachMatcher.getActualChannel());
attachResponse.setHandle(receivedHandle);
attachResponse.setName(attachMatcher.getReceivedName());
attachResponse.setSource(attachMatcher.getReceivedSource());
if (!refuseLink) {
Target t = (Target) createTargetObjectFromDescribedType(attachMatcher.getReceivedTarget());
t.setAddress(dynamicAddress);
attachResponse.setTarget(t);
} else {
attachResponse.setTarget(null);
}
_lastInitiatedLinkHandle = (UnsignedInteger) receivedHandle;
}
});
if (deferAttachResponseWrite)
{
// Defer writing the attach frame until the subsequent frame is also ready
attachResponseSender.setDeferWrite(true);
}
CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable();
composite.add(attachResponseSender);
if (!refuseLink) {
final FlowFrame flowFrame = new FlowFrame().setNextIncomingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded
.setIncomingWindow(UnsignedInteger.valueOf(2048))
.setNextOutgoingId(UnsignedInteger.ONE) //TODO: shouldn't be hard coded
.setOutgoingWindow(UnsignedInteger.valueOf(2048))
.setLinkCredit(UnsignedInteger.valueOf(100));
// The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender flowFrameSender = new FrameSender(this, FrameType.AMQP, -1, flowFrame, null);
flowFrameSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
flowFrameSender.setChannel(attachMatcher.getActualChannel());
flowFrame.setHandle(attachMatcher.getReceivedHandle());
flowFrame.setDeliveryCount(attachMatcher.getReceivedInitialDeliveryCount());
}
});
composite.add(flowFrameSender);
} else {
final DetachFrame detachResponse = new DetachFrame().setClosed(true);
if (errorType != null)
{
org.apache.qpid.jms.test.testpeer.describedtypes.Error detachError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error();
detachError.setCondition(errorType);
detachError.setDescription(errorMessage);
detachResponse.setError(detachError);
}
// The response frame channel will be dynamically set based on the
// incoming frame. Using the -1 is an illegal placeholder.
final FrameSender detachResonseSender = new FrameSender(this, FrameType.AMQP, -1, detachResponse, null);
detachResonseSender.setValueProvider(new ValueProvider() {
@Override
public void setValues() {
detachResonseSender.setChannel(attachMatcher.getActualChannel());
detachResponse.setHandle(attachMatcher.getReceivedHandle());
}
});
composite.add(detachResonseSender);
}
attachMatcher.onCompletion(composite);
}
addHandler(attachMatcher);
}
public void expectSenderAttach()
{
expectSenderAttach(notNullValue(), false, false);
}
public void expectSettledSenderAttach()
{
expectSenderAttach(notNullValue(), notNullValue(), true, false, false, false, 0, DEFAULT_PRODUCER_CREDIT, null, null);
}
public void expectSenderAttachWithoutGrantingCredit()
{
expectSenderAttach(notNullValue(), notNullValue(), false, false, false, 0, 0, null, null);
}
public void expectSenderAttach(long creditFlowDelay)
{
expectSenderAttach(notNullValue(), notNullValue(), false, false, false, creditFlowDelay, DEFAULT_PRODUCER_CREDIT, null, null);
}
public void expectSenderAttach(final Matcher<?> targetMatcher, final boolean refuseLink, boolean deferAttachResponseWrite)
{
expectSenderAttach(notNullValue(), targetMatcher, refuseLink, deferAttachResponseWrite);
}
public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean refuseLink, boolean deferAttachResponseWrite)
{
expectSenderAttach(sourceMatcher, targetMatcher, refuseLink, false, deferAttachResponseWrite, 0, null, null);
}
public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, long creditFlowDelay, Symbol errorType, String errorMessage)
{
expectSenderAttach(sourceMatcher, targetMatcher, refuseLink, omitDetach, deferAttachResponseWrite, creditFlowDelay, DEFAULT_PRODUCER_CREDIT, errorType, errorMessage);
}
public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, long creditFlowDelay, int creditAmount, Symbol errorType, String errorMessage)
{
expectSenderAttach(sourceMatcher, targetMatcher, false, refuseLink, omitDetach, deferAttachResponseWrite, creditFlowDelay, creditAmount, errorType, errorMessage);
}
public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean senderSettled, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, long creditFlowDelay, int creditAmount, Symbol errorType, String errorMessage)
{
expectSenderAttach(sourceMatcher, targetMatcher, senderSettled, refuseLink, omitDetach, deferAttachResponseWrite, creditFlowDelay, creditAmount, errorType, errorMessage, null, null);
}
public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean senderSettled, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, long creditFlowDelay, int creditAmount, Symbol errorType, String errorMessage, Matcher<?> desiredCapabilitiesMatcher, Symbol[] offeredCapabilitiesResponse)
{
final AttachMatcher attachMatcher = new AttachMatcher()
.withName(notNullValue())
.withHandle(notNullValue())
.withRole(equalTo(Role.SENDER))
.withSndSettleMode(equalTo(senderSettled ? SenderSettleMode.SETTLED : SenderSettleMode.UNSETTLED))
.withRcvSettleMode(equalTo(ReceiverSettleMode.FIRST))
.withSource(sourceMatcher)
.withTarget(targetMatcher);
if(desiredCapabilitiesMatcher != null) {
attachMatcher.withDesiredCapabilities(desiredCapabilitiesMatcher);
}
final AttachFrame attachResponse = new AttachFrame()
.setRole(Role.RECEIVER)
.setOfferedCapabilities(offeredCapabilitiesResponse)
.setSndSettleMode(senderSettled ? SenderSettleMode.SETTLED : SenderSettleMode.UNSETTLED)
.setRcvSettleMode(ReceiverSettleMode.FIRST);
expectSenderAttach(attachMatcher, attachResponse, refuseLink, omitDetach, deferAttachResponseWrite, creditFlowDelay, creditAmount, errorType, errorMessage);
}
public void expectSenderAttach(final AttachMatcher attachMatcher, final AttachFrame attachResponse, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, long creditFlowDelay, int creditAmount, Symbol errorType, String errorMessage)
{
// The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender attachResponseSender = new FrameSender(this, FrameType.AMQP, -1, attachResponse, null);
attachResponseSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
Object receivedHandle = attachMatcher.getReceivedHandle();
attachResponseSender.setChannel(attachMatcher.getActualChannel());
attachResponse.setHandle(receivedHandle);
attachResponse.setName(attachMatcher.getReceivedName());
attachResponse.setSource(createSourceObjectFromDescribedType(attachMatcher.getReceivedSource()));
if(refuseLink) {
attachResponse.setTarget(null);
} else {
attachResponse.setTarget(createTargetObjectFromDescribedType(attachMatcher.getReceivedTarget()));
}
_lastInitiatedLinkHandle = (UnsignedInteger) receivedHandle;
Object target = createTargetObjectFromDescribedType(attachMatcher.getReceivedTarget());
if (target instanceof Coordinator)
{
_lastInitiatedCoordinatorLinkHandle = (UnsignedInteger) receivedHandle;
}
}
});
if(deferAttachResponseWrite)
{
// Defer writing the attach frame until the subsequent frame is also ready
attachResponseSender.setDeferWrite(true);
}
CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable();
composite.add(attachResponseSender);
if (refuseLink) {
if (!omitDetach) {
final DetachFrame detachResponse = new DetachFrame().setClosed(true);
if (errorType != null)
{
org.apache.qpid.jms.test.testpeer.describedtypes.Error detachError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error();
detachError.setCondition(errorType);
detachError.setDescription(errorMessage);
detachResponse.setError(detachError);
}
// The response frame channel will be dynamically set based on the
// incoming frame. Using the -1 is an illegal placeholder.
final FrameSender detachResonseSender = new FrameSender(this, FrameType.AMQP, -1, detachResponse, null);
detachResonseSender.setValueProvider(new ValueProvider() {
@Override
public void setValues() {
detachResonseSender.setChannel(attachMatcher.getActualChannel());
detachResponse.setHandle(attachMatcher.getReceivedHandle());
}
});
composite.add(detachResonseSender);
}
} else {
final FlowFrame flowFrame = new FlowFrame().setNextIncomingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded
.setIncomingWindow(UnsignedInteger.valueOf(2048))
.setNextOutgoingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded
.setOutgoingWindow(UnsignedInteger.valueOf(2048))
.setLinkCredit(UnsignedInteger.valueOf(creditAmount));
// The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender flowFrameSender = new FrameSender(this, FrameType.AMQP, -1, flowFrame, null);
flowFrameSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
flowFrameSender.setChannel(attachMatcher.getActualChannel());
flowFrame.setHandle(attachMatcher.getReceivedHandle());
flowFrame.setDeliveryCount(attachMatcher.getReceivedInitialDeliveryCount());
}
});
flowFrameSender.setSendDelay(creditFlowDelay);
composite.add(flowFrameSender);
}
attachMatcher.onCompletion(composite);
addHandler(attachMatcher);
}
public void expectCoordinatorAttach()
{
expectCoordinatorAttach(false, false, null, null);
}
public void expectCoordinatorAttach(SourceMatcher sourceMatcher)
{
expectCoordinatorAttach(sourceMatcher, false, false, null, null);
}
public void expectCoordinatorAttach(boolean refuseLink, boolean deferAttachResponseWrite)
{
expectCoordinatorAttach(refuseLink, deferAttachResponseWrite, null, null);
}
public void expectCoordinatorAttach(final boolean refuseLink, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage)
{
expectCoordinatorAttach(notNullValue(), refuseLink, deferAttachResponseWrite, errorType, errorMessage);
}
private void expectCoordinatorAttach(Matcher<Object> sourceMatcher, final boolean refuseLink, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage) {
expectSenderAttach(sourceMatcher, new CoordinatorMatcher(), refuseLink, false, deferAttachResponseWrite, 0, errorType, errorMessage);
}
public void expectQueueBrowserAttach()
{
expectReceiverAttach(notNullValue(), notNullValue(), true);
}
public void expectReceiverAttach()
{
expectReceiverAttach(notNullValue(), notNullValue());
}
public void expectSettledReceiverAttach()
{
expectReceiverAttach(notNullValue(), notNullValue(), true, false, false, false, null, null);
}
public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher)
{
expectReceiverAttach(linkNameMatcher, sourceMatcher, false, false, false, false, null, null);
}
public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled)
{
expectReceiverAttach(linkNameMatcher, sourceMatcher, settled, false, false, false, null, null);
}
public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean refuseLink, boolean deferAttachResponseWrite)
{
expectReceiverAttach(linkNameMatcher, sourceMatcher, false, refuseLink, false, deferAttachResponseWrite, null, null);
}
public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink, boolean deferAttachResponseWrite)
{
expectReceiverAttach(linkNameMatcher, sourceMatcher, settled, refuseLink, false, deferAttachResponseWrite, null, null);
}
public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink,
boolean omitDetach, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage)
{
expectReceiverAttach(linkNameMatcher, sourceMatcher, settled, refuseLink, omitDetach, deferAttachResponseWrite, errorType, errorMessage, null, null, null);
}
private void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink,
boolean omitDetach, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage, final Source responseSourceOverride,
Matcher<?> desiredCapabilitiesMatcher, Symbol[] offeredCapabilitiesResponse)
{
final AttachMatcher attachMatcher = new AttachMatcher()
.withName(linkNameMatcher)
.withHandle(notNullValue())
.withRole(equalTo(Role.RECEIVER))
.withSndSettleMode(equalTo(settled ? SenderSettleMode.SETTLED : SenderSettleMode.UNSETTLED))
.withRcvSettleMode(equalTo(ReceiverSettleMode.FIRST))
.withSource(sourceMatcher)
.withTarget(notNullValue());
if(desiredCapabilitiesMatcher != null) {
attachMatcher.withDesiredCapabilities(desiredCapabilitiesMatcher);
}
final AttachFrame attachResponse = new AttachFrame()
.setRole(Role.SENDER)
.setOfferedCapabilities(offeredCapabilitiesResponse)
.setSndSettleMode(settled ? SenderSettleMode.SETTLED : SenderSettleMode.UNSETTLED)
.setRcvSettleMode(ReceiverSettleMode.FIRST)
.setInitialDeliveryCount(UnsignedInteger.ZERO);
expectReceiverAttach(attachMatcher, attachResponse, refuseLink, omitDetach, deferAttachResponseWrite, errorType, errorMessage, responseSourceOverride);
}
private void expectReceiverAttach(final AttachMatcher attachMatcher, final AttachFrame attachResponse, final boolean refuseLink, boolean omitDetach,
boolean deferAttachResponseWrite, Symbol errorType, String errorMessage, final Source responseSourceOverride)
{
// The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender attachResponseSender = new FrameSender(this, FrameType.AMQP, -1, attachResponse, null);
attachResponseSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
Object receivedHandle = attachMatcher.getReceivedHandle();
attachResponseSender.setChannel(attachMatcher.getActualChannel());
attachResponse.setHandle(receivedHandle);
attachResponse.setName(attachMatcher.getReceivedName());
attachResponse.setTarget(attachMatcher.getReceivedTarget());
if(refuseLink) {
attachResponse.setSource(null);
} else if(responseSourceOverride != null){
attachResponse.setSource(responseSourceOverride);
} else {
attachResponse.setSource(createSourceObjectFromDescribedType(attachMatcher.getReceivedSource()));
}
_lastInitiatedLinkHandle = (UnsignedInteger) receivedHandle;
}
});
if(deferAttachResponseWrite)
{
// Defer writing the attach frame until the subsequent frame is also ready
attachResponseSender.setDeferWrite(true);
}
CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable();
composite.add(attachResponseSender);
if (refuseLink && !omitDetach)
{
final DetachFrame detachResponse = new DetachFrame().setClosed(true);
if (errorType != null)
{
org.apache.qpid.jms.test.testpeer.describedtypes.Error detachError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error();
detachError.setCondition(errorType);
detachError.setDescription(errorMessage);
detachResponse.setError(detachError);
}
// The response frame channel will be dynamically set based on the
// incoming frame. Using the -1 is an illegal placeholder.
final FrameSender detachResponseSender = new FrameSender(this, FrameType.AMQP, -1, detachResponse, null);
detachResponseSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
detachResponseSender.setChannel(attachMatcher.getActualChannel());
detachResponse.setHandle(attachMatcher.getReceivedHandle());
}
});
composite.add(detachResponseSender);
}
attachMatcher.onCompletion(composite);
addHandler(attachMatcher);
}
public void expectSharedDurableSubscriberAttach(String topicName, String subscriptionName, Matcher<?> linkNameMatcher, boolean clientIdSet) {
expectSharedSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true, false, clientIdSet, false, false);
}
public void expectSharedVolatileSubscriberAttach(String topicName, String subscriptionName, Matcher<?> linkNameMatcher, boolean clientIdSet) {
expectSharedSubscriberAttach(topicName, subscriptionName, linkNameMatcher, false, false, clientIdSet, false, false);
}
public void expectSharedDurableSubscriberAttach(String topicName, String subscriptionName, Matcher<?> linkNameMatcher, boolean refuseLink, boolean clientIdSet) {
expectSharedSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true, refuseLink, clientIdSet, false, false);
}
public void expectSharedSubscriberAttach(String topicName, String subscriptionName, Matcher<?> linkNameMatcher, boolean durable, boolean refuseLink,
boolean clientIdSet, boolean expectLinkCapability, boolean responseOffersLinkCapability)
{
Symbol[] sourceCapabilities;
if(clientIdSet) {
sourceCapabilities = new Symbol[] { AmqpDestinationHelper.TOPIC_CAPABILITY, AmqpSupport.SHARED };
} else {
sourceCapabilities = new Symbol[] { AmqpDestinationHelper.TOPIC_CAPABILITY, AmqpSupport.SHARED, AmqpSupport.GLOBAL };
}
SourceMatcher sourceMatcher = new SourceMatcher();
sourceMatcher.withAddress(equalTo(topicName));
sourceMatcher.withDynamic(equalTo(false));
if(durable) {
//TODO: will possibly be changed to a 1/config durability
sourceMatcher.withDurable(equalTo(TerminusDurability.UNSETTLED_STATE));
sourceMatcher.withExpiryPolicy(equalTo(TerminusExpiryPolicy.NEVER));
} else {
sourceMatcher.withDurable(equalTo(TerminusDurability.NONE));
sourceMatcher.withExpiryPolicy(equalTo(TerminusExpiryPolicy.LINK_DETACH));
}
sourceMatcher.withCapabilities(arrayContaining(sourceCapabilities));
// If we don't have the connection capability set we expect a desired link capability
Matcher<?> linkDesiredCapabilitiesMatcher;
if(expectLinkCapability) {
linkDesiredCapabilitiesMatcher = arrayContaining(new Symbol[] { AmqpSupport.SHARED_SUBS });
} else {
linkDesiredCapabilitiesMatcher = nullValue();
}
// Generate offered capability response if supported
Symbol[] linkOfferedCapabilitiesResponse = null;
if(responseOffersLinkCapability) {
linkOfferedCapabilitiesResponse = new Symbol[] { AmqpSupport.SHARED_SUBS };
}
expectReceiverAttach(linkNameMatcher, sourceMatcher, false, refuseLink, false, false, null, null, null, linkDesiredCapabilitiesMatcher, linkOfferedCapabilitiesResponse);
}
public void expectDurableSubscriberAttach(String topicName, String subscriptionName)
{
SourceMatcher sourceMatcher = new SourceMatcher();
sourceMatcher.withAddress(equalTo(topicName));
sourceMatcher.withDynamic(equalTo(false));
//TODO: will possibly be changed to a 1/config durability
sourceMatcher.withDurable(equalTo(TerminusDurability.UNSETTLED_STATE));
sourceMatcher.withExpiryPolicy(equalTo(TerminusExpiryPolicy.NEVER));
sourceMatcher.withCapabilities(arrayContaining(AmqpDestinationHelper.TOPIC_CAPABILITY));
expectReceiverAttach(equalTo(subscriptionName), sourceMatcher);
}
public void expectDurableSubUnsubscribeNullSourceLookup(boolean failLookup, boolean shared, String subscriptionName, String topicName, boolean hasClientID) {
String linkName = subscriptionName;
if(!hasClientID) {
linkName += AmqpSupport.SUB_NAME_DELIMITER + "global";
}
Matcher<String> linkNameMatcher = equalTo(linkName);
Matcher<Object> nullSourceMatcher = nullValue();
Source responseSourceOverride = null;
Symbol errorType = null;
String errorMessage = null;
if(failLookup){
errorType = AmqpError.NOT_FOUND;
errorMessage = "No subscription link found";
} else {
responseSourceOverride = new Source();
responseSourceOverride.setAddress(topicName);
responseSourceOverride.setDynamic(false);
//TODO: will possibly be changed to a 1/config durability
responseSourceOverride.setDurable(TerminusDurability.UNSETTLED_STATE);
responseSourceOverride.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
if(shared) {
if(hasClientID) {
responseSourceOverride.setCapabilities(new Symbol[]{SHARED});
} else {
responseSourceOverride.setCapabilities(new Symbol[]{SHARED, GLOBAL});
}
}
}
// If we don't have a ClientID, expect link capabilities to hint that we are trying
// to reattach to a 'global' shared subscription.
Matcher<?> linkDesiredCapabilitiesMatcher = null;
if(!hasClientID) {
linkDesiredCapabilitiesMatcher = arrayContaining(new Symbol[] { SHARED, GLOBAL });
}
expectReceiverAttach(linkNameMatcher, nullSourceMatcher, false, failLookup, false, false, errorType, errorMessage, responseSourceOverride, linkDesiredCapabilitiesMatcher, null);
}
public void expectDetach(boolean expectClosed, boolean sendResponse, boolean replyClosed)
{
expectDetach(expectClosed, sendResponse, replyClosed, null, null);
}
public void expectDetach(boolean expectClosed, boolean sendResponse, boolean replyClosed, Symbol errorType, String errorMessage)
{
Matcher<Boolean> closeMatcher = null;
if(expectClosed)
{
closeMatcher = equalTo(true);
}
else
{
closeMatcher = Matchers.anyOf(equalTo(false), nullValue());
}
final DetachMatcher detachMatcher = new DetachMatcher().withClosed(closeMatcher);
if (sendResponse)
{
final DetachFrame detachResponse = new DetachFrame();
if(replyClosed)
{
detachResponse.setClosed(replyClosed);
}
if (errorType != null) {
org.apache.qpid.jms.test.testpeer.describedtypes.Error detachError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error();
detachError.setCondition(errorType);
detachError.setDescription(errorMessage);
detachResponse.setError(detachError);
} else {
detachResponse.setError(null);
}
// The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender detachResponseSender = new FrameSender(this, FrameType.AMQP, -1, detachResponse, null);
detachResponseSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
detachResponseSender.setChannel(detachMatcher.getActualChannel());
detachResponse.setHandle(detachMatcher.getReceivedHandle());
}
});
detachMatcher.onCompletion(detachResponseSender);
}
addHandler(detachMatcher);
}
public void expectSessionFlow()
{
final FlowMatcher flowMatcher = new FlowMatcher()
.withLinkCredit(Matchers.nullValue())
.withHandle(Matchers.nullValue());
addHandler(flowMatcher);
}
public void expectLinkFlow()
{
expectLinkFlow(false, false, Matchers.greaterThan(UnsignedInteger.ZERO));
}
public void expectLinkFlow(boolean drain, Matcher<UnsignedInteger> creditMatcher)
{
expectLinkFlow(drain, false, creditMatcher);
}
public void expectLinkFlow(boolean drain, boolean sendDrainFlowResponse, Matcher<UnsignedInteger> creditMatcher)
{
expectLinkFlowRespondWithTransfer(null, null, null, null, null, 0, drain, sendDrainFlowResponse, creditMatcher, null, false, false);
}
public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType,
final MessageAnnotationsDescribedType messageAnnotationsDescribedType,
final PropertiesDescribedType propertiesDescribedType,
final ApplicationPropertiesDescribedType appPropertiesDescribedType,
final DescribedType content)
{
expectLinkFlowRespondWithTransfer(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType,
appPropertiesDescribedType, content, 1);
}
public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType,
final MessageAnnotationsDescribedType messageAnnotationsDescribedType,
final PropertiesDescribedType propertiesDescribedType,
final ApplicationPropertiesDescribedType appPropertiesDescribedType,
final DescribedType content,
final boolean sendSettled)
{
expectLinkFlowRespondWithTransfer(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType,
appPropertiesDescribedType, content, 1, false, false,
Matchers.greaterThanOrEqualTo(UnsignedInteger.valueOf(1)), 1, sendSettled, false);
}
public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType,
final MessageAnnotationsDescribedType messageAnnotationsDescribedType,
final PropertiesDescribedType propertiesDescribedType,
final ApplicationPropertiesDescribedType appPropertiesDescribedType,
final DescribedType content,
final int count)
{
expectLinkFlowRespondWithTransfer(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType,
appPropertiesDescribedType, content, count, false, false,
Matchers.greaterThanOrEqualTo(UnsignedInteger.valueOf(count)), 1, false, false);
}
public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType,
final MessageAnnotationsDescribedType messageAnnotationsDescribedType,
final PropertiesDescribedType propertiesDescribedType,
ApplicationPropertiesDescribedType appPropertiesDescribedType,
final DescribedType content,
final int count,
final boolean drain,
final boolean sendDrainFlowResponse,
Matcher<UnsignedInteger> creditMatcher,
final Integer nextIncomingId,
boolean addMessageNumberProperty)
{
expectLinkFlowRespondWithTransfer(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType,
appPropertiesDescribedType, content, count, drain, sendDrainFlowResponse,
creditMatcher, nextIncomingId, false, addMessageNumberProperty);
}
public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType,
final MessageAnnotationsDescribedType messageAnnotationsDescribedType,
final PropertiesDescribedType propertiesDescribedType,
ApplicationPropertiesDescribedType appPropertiesDescribedType,
final DescribedType content,
final int count,
final boolean drain,
final boolean sendDrainFlowResponse,
Matcher<UnsignedInteger> creditMatcher,
final Integer nextIncomingId,
final boolean sendSettled,
boolean addMessageNumberProperty)
{
expectLinkFlowAndSendBackMessages(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType,
appPropertiesDescribedType, content, count, drain, sendDrainFlowResponse,
creditMatcher, nextIncomingId, sendSettled, addMessageNumberProperty, 0, false);
}
public void expectLinkFlowAndSendBackMessages(final HeaderDescribedType headerDescribedType,
final MessageAnnotationsDescribedType messageAnnotationsDescribedType,
final PropertiesDescribedType propertiesDescribedType,
ApplicationPropertiesDescribedType appPropertiesDescribedType,
final DescribedType content,
final int count,
final boolean drain,
final boolean sendDrainFlowResponse,
Matcher<UnsignedInteger> creditMatcher,
final Integer nextIncomingId,
final boolean sendSettled,
boolean addMessageNumberProperty,
int msgPayloadPerFrame,
boolean sendFinalTransferFrameWithoutPayload)
{
if (nextIncomingId == null && count > 0)
{
throw new IllegalArgumentException("The remote NextIncomingId must be specified if transfers have been requested");
}
Matcher<Boolean> drainMatcher = null;
if(drain)
{
drainMatcher = equalTo(true);
}
else
{
drainMatcher = Matchers.anyOf(equalTo(false), nullValue());
}
Matcher<UnsignedInteger> remoteNextIncomingIdMatcher = null;
if(nextIncomingId != null)
{
remoteNextIncomingIdMatcher = Matchers.equalTo(UnsignedInteger.valueOf(nextIncomingId));
}
else
{
remoteNextIncomingIdMatcher = Matchers.greaterThanOrEqualTo(UnsignedInteger.ONE);
}
final FlowMatcher flowMatcher = new FlowMatcher()
.withLinkCredit(creditMatcher)
.withDrain(drainMatcher)
.withNextIncomingId(remoteNextIncomingIdMatcher);
CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable();
boolean addComposite = false;
if (appPropertiesDescribedType == null && addMessageNumberProperty) {
appPropertiesDescribedType = new ApplicationPropertiesDescribedType();
}
for(int i = 0; i < count; i++)
{
final int nextId = nextIncomingId + i;
String tagString = "theDeliveryTag" + nextId;
Binary dtag = new Binary(tagString.getBytes());
if(addMessageNumberProperty) {
appPropertiesDescribedType.setApplicationProperty(MESSAGE_NUMBER, i);
}
Binary payload = prepareTransferPayload(headerDescribedType, messageAnnotationsDescribedType,
propertiesDescribedType, appPropertiesDescribedType, content);
int length = payload.getLength();
int sent = 0;
while (sent < length) {
final TransferFrame transferFrame = new TransferFrame()
.setDeliveryId(UnsignedInteger.valueOf(nextId))
.setDeliveryTag(dtag)
.setMessageFormat(UnsignedInteger.ZERO)
.setSettled(sendSettled);
int remaining = length - sent;
Binary chunk;
if(msgPayloadPerFrame != 0 && msgPayloadPerFrame < length) {
int chunkSize = Math.min(msgPayloadPerFrame, remaining);
chunk = payload.subBinary(sent, chunkSize);
sent += chunkSize;
} else {
chunk = payload;
sent = length;
}
if(sent < length || (sent == length && sendFinalTransferFrameWithoutPayload)) {
// Indicate more frames if there is payload left, or we want to send a final transfer without payload
transferFrame.setMore(true);
}
// The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender transferResponseSender = new FrameSender(this, FrameType.AMQP, -1, transferFrame, chunk);
transferResponseSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
transferFrame.setHandle(flowMatcher.getReceivedHandle());
transferResponseSender.setChannel(flowMatcher.getActualChannel());
}
});
composite.add(transferResponseSender);
}
if(sendFinalTransferFrameWithoutPayload) {
sendEmptyFinalTransfer(composite, flowMatcher, nextId, dtag, sendSettled);
}
addComposite = true;
}
if(drain && sendDrainFlowResponse)
{
final FlowFrame drainResponse = new FlowFrame();
drainResponse.setOutgoingWindow(UnsignedInteger.ZERO); //TODO: shouldnt be hard coded
drainResponse.setIncomingWindow(UnsignedInteger.valueOf(Integer.MAX_VALUE)); //TODO: shouldnt be hard coded
drainResponse.setLinkCredit(UnsignedInteger.ZERO);
drainResponse.setDrain(true);
// The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender flowResponseSender = new FrameSender(this, FrameType.AMQP, -1, drainResponse, null);
flowResponseSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
flowResponseSender.setChannel(flowMatcher.getActualChannel());
drainResponse.setHandle(flowMatcher.getReceivedHandle());
drainResponse.setDeliveryCount(calculateNewDeliveryCount(flowMatcher));
drainResponse.setNextOutgoingId(calculateNewOutgoingId(flowMatcher, count));
drainResponse.setNextIncomingId(flowMatcher.getReceivedNextOutgoingId());
}
});
addComposite = true;
composite.add(flowResponseSender);
}
if(addComposite) {
flowMatcher.onCompletion(composite);
}
addHandler(flowMatcher);
}
private void sendEmptyFinalTransfer(CompositeAmqpPeerRunnable composite, final FlowMatcher flowMatcher, final int deliveryId, Binary dTag, final boolean settled) {
final TransferFrame transferFrame = new TransferFrame()
.setDeliveryId(UnsignedInteger.valueOf(deliveryId))
.setDeliveryTag(dTag)
.setMessageFormat(UnsignedInteger.ZERO)
.setSettled(settled);
// The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender finalEmptyTransferSender = new FrameSender(this, FrameType.AMQP, -1, transferFrame, null);
finalEmptyTransferSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
transferFrame.setHandle(flowMatcher.getReceivedHandle());
finalEmptyTransferSender.setChannel(flowMatcher.getActualChannel());
}
});
composite.add(finalEmptyTransferSender);
}
public void expectLinkFlowThenPerformUnexpectedDeliveryCountAdvanceThenCreditTopupThenTransfers(final int prefetch, final int topUp, final int messageCount)
{
final FlowMatcher flowMatcher = new FlowMatcher()
.withHandle(notNullValue())
.withLinkCredit(equalTo(UnsignedInteger.valueOf(prefetch)))
.withDrain(Matchers.anyOf(equalTo(false), nullValue()));
final FlowFrame advancingFlowResponse = new FlowFrame();
advancingFlowResponse.setOutgoingWindow(UnsignedInteger.MAX_VALUE);
advancingFlowResponse.setIncomingWindow(UnsignedInteger.valueOf(Integer.MAX_VALUE));
// The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender advancingFlowResponseSender = new FrameSender(this, FrameType.AMQP, -1, advancingFlowResponse, null);
advancingFlowResponseSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
advancingFlowResponseSender.setChannel(flowMatcher.getActualChannel());
advancingFlowResponse.setHandle(flowMatcher.getReceivedHandle());
advancingFlowResponse.setDeliveryCount(calculateNewDeliveryCount(flowMatcher, prefetch - topUp));
advancingFlowResponse.setLinkCredit(UnsignedInteger.ZERO);
advancingFlowResponse.setNextOutgoingId(flowMatcher.getReceivedNextIncomingId());
advancingFlowResponse.setNextIncomingId(flowMatcher.getReceivedNextOutgoingId());
}
});
final FlowFrame topUpFlowResponse = new FlowFrame();
topUpFlowResponse.setOutgoingWindow(UnsignedInteger.MAX_VALUE);
topUpFlowResponse.setIncomingWindow(UnsignedInteger.valueOf(Integer.MAX_VALUE));
// The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender topUpFlowResponseSender = new FrameSender(this, FrameType.AMQP, -1, topUpFlowResponse, null);
topUpFlowResponseSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
topUpFlowResponseSender.setChannel(flowMatcher.getActualChannel());
topUpFlowResponse.setHandle(flowMatcher.getReceivedHandle());
topUpFlowResponse.setDeliveryCount(calculateNewDeliveryCount(flowMatcher, prefetch - topUp));
topUpFlowResponse.setLinkCredit(UnsignedInteger.valueOf(topUp));
topUpFlowResponse.setNextOutgoingId(flowMatcher.getReceivedNextIncomingId());
topUpFlowResponse.setNextIncomingId(flowMatcher.getReceivedNextOutgoingId());
}
});
CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable();
composite.add(advancingFlowResponseSender);
advancingFlowResponseSender.setDeferWrite(true);
composite.add(topUpFlowResponseSender);
final Integer remoteNextIncomingId = 0; //TODO: make configurable
for(int i = 0; i < messageCount; i++)
{
final int nextId = remoteNextIncomingId + i;
String tagString = "theDeliveryTag" + nextId;
Binary dtag = new Binary(tagString.getBytes());
final TransferFrame transferResponse = new TransferFrame()
.setDeliveryId(UnsignedInteger.valueOf(nextId))
.setDeliveryTag(dtag)
.setMessageFormat(UnsignedInteger.ZERO)
.setSettled(false);
Binary payload = prepareTransferPayload(null, null, null, null, new AmqpValueDescribedType("content"));
// The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender transferResponseSender = new FrameSender(this, FrameType.AMQP, -1, transferResponse, payload);
transferResponseSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
transferResponse.setHandle(flowMatcher.getReceivedHandle());
transferResponseSender.setChannel(flowMatcher.getActualChannel());
}
});
if(i != messageCount -1) {
// Ensure all but the last transfer are set to defer, ensure they go in one write.
transferResponseSender.setDeferWrite(true);
}
composite.add(transferResponseSender);
}
flowMatcher.onCompletion(composite);
addHandler(flowMatcher);
}
private UnsignedInteger calculateNewDeliveryCount(FlowMatcher flowMatcher) {
UnsignedInteger dc = (UnsignedInteger) flowMatcher.getReceivedDeliveryCount();
UnsignedInteger lc = (UnsignedInteger) flowMatcher.getReceivedLinkCredit();
return dc.add(lc);
}
private UnsignedInteger calculateNewDeliveryCount(FlowMatcher flowMatcher, int creditToConsume) {
UnsignedInteger dc = (UnsignedInteger) flowMatcher.getReceivedDeliveryCount();
UnsignedInteger lc = (UnsignedInteger) flowMatcher.getReceivedLinkCredit();
assertThat(UnsignedInteger.valueOf(creditToConsume), lessThan(lc));
return dc.add(UnsignedInteger.valueOf(creditToConsume));
}
private UnsignedInteger calculateNewOutgoingId(FlowMatcher flowMatcher, int sentCount) {
UnsignedInteger nid = (UnsignedInteger) flowMatcher.getReceivedNextIncomingId();
return nid.add(UnsignedInteger.valueOf(sentCount));
}
/**
* Encodes and returns transfer payload Binary, or null if no message sections were supplied.
*/
private Binary prepareTransferPayload(final HeaderDescribedType headerDescribedType,
final MessageAnnotationsDescribedType messageAnnotationsDescribedType,
final PropertiesDescribedType propertiesDescribedType,
final ApplicationPropertiesDescribedType appPropertiesDescribedType,
final DescribedType content)
{
Data payloadData = Data.Factory.create();
boolean hasSection = false;
if(headerDescribedType != null)
{
hasSection = true;
payloadData.putDescribedType(headerDescribedType);
}
if(messageAnnotationsDescribedType != null)
{
hasSection = true;
payloadData.putDescribedType(messageAnnotationsDescribedType);
}
if(propertiesDescribedType != null)
{
hasSection = true;
payloadData.putDescribedType(propertiesDescribedType);
}
if(appPropertiesDescribedType != null)
{
hasSection = true;
payloadData.putDescribedType(appPropertiesDescribedType);
}
if(content != null)
{
hasSection = true;
payloadData.putDescribedType(content);
}
if (hasSection)
{
return payloadData.encode();
}
else
{
return null;
}
}
public void expectTransferButDoNotRespond(Matcher<Binary> expectedPayloadMatcher)
{
expectTransfer(expectedPayloadMatcher, nullValue(), false, false, null, false);
}
public void expectTransferButDoNotRespond(Matcher<Binary> expectedPayloadMatcher, Matcher<Binary> deliveryTagMatcher)
{
expectTransfer(expectedPayloadMatcher, deliveryTagMatcher, nullValue(), false, false, null, false, 0, 0);
}
public void expectTransfer(Matcher<Binary> expectedPayloadMatcher)
{
expectTransfer(expectedPayloadMatcher, nullValue(), false, true, new Accepted(), true);
}
public void expectTransfer(Matcher<Binary> expectedPayloadMatcher, Matcher<Binary> deliveryTagMatcher)
{
expectTransfer(expectedPayloadMatcher, deliveryTagMatcher, nullValue(), false, true, new Accepted(), true, 0, 0);
}
public void expectTransfer(int frameSize)
{
expectTransfer(null, nullValue(), false, true, new Accepted(), true, frameSize, 0);
}
public void expectTransfer(Matcher<Binary> expectedPayloadMatcher, Matcher<?> stateMatcher,
ListDescribedType responseState, boolean responseSettled)
{
expectTransfer(expectedPayloadMatcher, stateMatcher, false, true, responseState, responseSettled);
}
public void expectTransfer(Matcher<Binary> expectedPayloadMatcher, Matcher<?> stateMatcher, boolean settled,
boolean sendResponseDisposition, ListDescribedType responseState, boolean responseSettled)
{
expectTransfer(expectedPayloadMatcher, stateMatcher, settled, sendResponseDisposition, responseState, responseSettled, 0, 0);
}
public void expectTransfer(Matcher<Binary> expectedPayloadMatcher,
Matcher<?> stateMatcher, boolean settled, boolean sendResponseDisposition, ListDescribedType responseState,
boolean responseSettled, int frameSize, long dispositionDelay)
{
expectTransfer(expectedPayloadMatcher, null, stateMatcher, settled, sendResponseDisposition, responseState, responseSettled, frameSize, dispositionDelay);
}
//TODO: fix responseState to only admit applicable types.
public void expectTransfer(Matcher<Binary> expectedPayloadMatcher, Matcher<Binary> deliveryTagMatcher,
Matcher<?> stateMatcher, boolean settled, boolean sendResponseDisposition, ListDescribedType responseState,
boolean responseSettled, int frameSize, long dispositionDelay)
{
Matcher<Boolean> settledMatcher = null;
if(settled)
{
settledMatcher = equalTo(true);
}
else
{
settledMatcher = Matchers.anyOf(equalTo(false), nullValue());
}
final TransferMatcher transferMatcher = new TransferMatcher(frameSize);
transferMatcher.setPayloadMatcher(expectedPayloadMatcher);
transferMatcher.withSettled(settledMatcher);
transferMatcher.withState(stateMatcher);
if (deliveryTagMatcher != null) {
transferMatcher.withDeliveryTag(deliveryTagMatcher);
}
if(sendResponseDisposition) {
final DispositionFrame dispositionResponse = new DispositionFrame()
.setRole(Role.RECEIVER)
.setSettled(responseSettled)
.setState(responseState);
// The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender dispositionFrameSender = new FrameSender(this, FrameType.AMQP, -1, dispositionResponse, null);
dispositionFrameSender.setSendDelay(dispositionDelay);
dispositionFrameSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
dispositionFrameSender.setChannel(transferMatcher.getActualChannel());
dispositionResponse.setFirst(transferMatcher.getReceivedDeliveryId());
}
});
transferMatcher.onCompletion(dispositionFrameSender);
}
addHandler(transferMatcher);
}
public void expectTransferRespondWithDrain(Matcher<Binary> expectedPayloadMatcher, int sentMessages)
{
expectTransferRespondWithDrain(expectedPayloadMatcher, DEFAULT_PRODUCER_CREDIT, sentMessages);
}
public void expectTransferRespondWithDrain(Matcher<Binary> expectedPayloadMatcher, int originalCredit, int sentMessages)
{
Matcher<Boolean> settledMatcher = Matchers.anyOf(equalTo(false), nullValue());
final TransferMatcher transferMatcher = new TransferMatcher(0);
transferMatcher.setPayloadMatcher(expectedPayloadMatcher);
transferMatcher.withSettled(settledMatcher);
transferMatcher.withState(nullValue());
CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable();
final DispositionFrame dispositionResponse = new DispositionFrame().setRole(Role.RECEIVER).setSettled(true).setState(new Accepted());
// The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender dispositionFrameSender = new FrameSender(this, FrameType.AMQP, -1, dispositionResponse, null);
dispositionFrameSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
dispositionFrameSender.setChannel(transferMatcher.getActualChannel());
dispositionResponse.setFirst(transferMatcher.getReceivedDeliveryId());
}
});
final FlowFrame flowFrame = new FlowFrame().setNextIncomingId(UnsignedInteger.ONE.add(UnsignedInteger.valueOf(sentMessages))) //TODO: start point shouldnt be hard coded
.setIncomingWindow(UnsignedInteger.valueOf(2048))
.setNextOutgoingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded
.setOutgoingWindow(UnsignedInteger.valueOf(2048))
.setDeliveryCount(UnsignedInteger.valueOf(sentMessages))
.setLinkCredit(UnsignedInteger.valueOf(originalCredit - sentMessages))
.setDrain(true);
// The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender flowFrameSender = new FrameSender(this, FrameType.AMQP, -1, flowFrame, null);
flowFrameSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
flowFrameSender.setChannel(transferMatcher.getActualChannel());
flowFrame.setHandle(transferMatcher.getReceivedHandle());
}
});
composite.add(flowFrameSender);
composite.add(dispositionFrameSender);
transferMatcher.onCompletion(composite);
addHandler(transferMatcher);
}
public void expectDeclare(Binary txnId)
{
TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
expectTransfer(declareMatcher, nullValue(), new Declared().setTxnId(txnId), true);
}
public void expectDeclareButDoNotRespond()
{
TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
expectTransfer(declareMatcher, nullValue(), false, false, null, false);
}
public void expectDeclareAndReject()
{
TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare()));
expectTransfer(declareMatcher, nullValue(), new Rejected(), true);
}
public void expectDischarge(Binary txnId, boolean dischargeState) {
expectDischarge(txnId, dischargeState, new Accepted());
}
public void expectDischarge(Binary txnId, boolean dischargeState, ListDescribedType responseState) {
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with given response and settled disposition to indicate the outcome.
Discharge discharge = new Discharge();
discharge.setFail(dischargeState);
discharge.setTxnId(txnId);
TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
expectTransfer(dischargeMatcher, nullValue(), responseState, true);
}
public void expectDischargeButDoNotRespond(Binary txnId, boolean dischargeState) {
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with given response and settled disposition to indicate the outcome.
Discharge discharge = new Discharge();
discharge.setFail(dischargeState);
discharge.setTxnId(txnId);
TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
expectTransfer(dischargeMatcher, nullValue(), false, false, null, true);
}
public void remotelyCloseLastCoordinatorLink()
{
remotelyCloseLastCoordinatorLink(true, true, TransactionError.TRANSACTION_ROLLBACK, "Discharge of TX failed.");
}
public void remotelyCloseLastCoordinatorLink(Symbol errorType, String errorMessage)
{
remotelyCloseLastCoordinatorLink(true, true, errorType, errorMessage);
}
public void remotelyCloseLastCoordinatorLink(boolean expectDetachResponse, boolean closed, Symbol errorType, String errorMessage)
{
// Now remotely end the last attached transaction coordinator
synchronized (_handlersLock) {
CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
// Now generate the Detach for the appropriate link on the appropriate session
final DetachFrame detachFrame = new DetachFrame();
detachFrame.setClosed(true);
if (errorType != null) {
org.apache.qpid.jms.test.testpeer.describedtypes.Error detachError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error();
detachError.setCondition(errorType);
detachError.setDescription(errorMessage);
detachFrame.setError(detachError);
}
// The response frame channel will be dynamically set based on the previous frames. Using the -1 is an illegal placeholder.
final FrameSender frameSender = new FrameSender(this, FrameType.AMQP, -1, detachFrame, null);
frameSender.setValueProvider(new ValueProvider() {
@Override
public void setValues() {
frameSender.setChannel(_lastInitiatedChannel);
detachFrame.setHandle(_lastInitiatedCoordinatorLinkHandle);
}
});
comp.add(frameSender);
if (expectDetachResponse) {
Matcher<Boolean> closeMatcher = null;
if (closed) {
closeMatcher = equalTo(true);
} else {
closeMatcher = Matchers.anyOf(equalTo(false), nullValue());
}
// Expect a response to our Detach.
final DetachMatcher detachMatcher = new DetachMatcher().withClosed(closeMatcher);
// TODO: enable matching on the channel number of the response.
addHandler(detachMatcher);
}
}
}
public void remotelyCloseLastCoordinatorLinkOnDischarge(Binary txnId, boolean dischargeState)
{
remotelyCloseLastCoordinatorLinkOnDischarge(txnId, dischargeState, true, true, TransactionError.TRANSACTION_ROLLBACK, "Discharge of TX failed.", false, null);
}
public void remotelyCloseLastCoordinatorLinkOnDischarge(Binary txnId, boolean dischargeState, boolean pipelinedDeclare, Binary nextTxnId)
{
remotelyCloseLastCoordinatorLinkOnDischarge(txnId, dischargeState, true, true, TransactionError.TRANSACTION_ROLLBACK, "Discharge of TX failed.", pipelinedDeclare, nextTxnId);
}
public void remotelyCloseLastCoordinatorLinkOnDischarge(Binary txnId, boolean dischargeState, Symbol errorType, String errorMessage)
{
remotelyCloseLastCoordinatorLinkOnDischarge(txnId, dischargeState, true, true, errorType, errorMessage, false, null);
}
public void remotelyCloseLastCoordinatorLinkOnDischarge(Binary txnId, boolean dischargeState, boolean expectDetachResponse, boolean closed, Symbol errorType, String errorMessage, boolean pipelinedDeclare, Binary nextTxnId) {
// Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId,
// and reply with given response and settled disposition to indicate the outcome.
Discharge discharge = new Discharge();
discharge.setFail(dischargeState);
discharge.setTxnId(txnId);
TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher();
dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge));
expectTransfer(dischargeMatcher, nullValue(), false, false, null, false);
if (pipelinedDeclare) {
Declare declare = new Declare();
TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(declare));
expectTransfer(declareMatcher, nullValue(), false, false, new Declared().setTxnId(nextTxnId), true);
}
remotelyCloseLastCoordinatorLink(expectDetachResponse, closed, errorType, errorMessage);
}
public void expectDispositionThatIsAcceptedAndSettled()
{
expectDisposition(true, new DescriptorMatcher(Accepted.DESCRIPTOR_CODE, Accepted.DESCRIPTOR_SYMBOL));
}
public void expectDispositionThatIsReleasedAndSettled()
{
expectDisposition(true, new DescriptorMatcher(Released.DESCRIPTOR_CODE, Released.DESCRIPTOR_SYMBOL));
}
public void expectDisposition(boolean settled, Matcher<?> stateMatcher)
{
expectDisposition(settled, stateMatcher, null, null);
}
public void expectDisposition(boolean settled, Matcher<?> stateMatcher, Integer firstDeliveryId, Integer lastDeliveryId)
{
Matcher<Boolean> settledMatcher = null;
if(settled)
{
settledMatcher = equalTo(true);
}
else
{
settledMatcher = Matchers.anyOf(equalTo(false), nullValue());
}
Matcher<?> firstDeliveryIdMatcher = notNullValue();
if(firstDeliveryId != null) {
firstDeliveryIdMatcher = equalTo(UnsignedInteger.valueOf(firstDeliveryId));
}
Matcher<?> lastDeliveryIdMatcher = notNullValue();
if(lastDeliveryId != null) {
lastDeliveryIdMatcher = equalTo(UnsignedInteger.valueOf(lastDeliveryId));
}
addHandler(new DispositionMatcher()
.withSettled(settledMatcher)
.withFirst(firstDeliveryIdMatcher)
.withLast(lastDeliveryIdMatcher)
.withState(stateMatcher));
}
private Object createTargetObjectFromDescribedType(Object o) {
assertThat(o, instanceOf(DescribedType.class));
Object described = ((DescribedType) o).getDescribed();
assertThat(described, instanceOf(List.class));
@SuppressWarnings("unchecked")
List<Object> targetFields = (List<Object>) described;
Object descriptor = ((DescribedType) o).getDescriptor();
if (descriptor == Target.DESCRIPTOR_CODE || descriptor.equals(Target.DESCRIPTOR_SYMBOL)) {
return new Target(targetFields.toArray());
} else if (descriptor == Coordinator.DESCRIPTOR_CODE || descriptor.equals(Coordinator.DESCRIPTOR_SYMBOL)) {
return new Coordinator(targetFields.toArray());
} else {
throw new IllegalArgumentException("Unexpected target descriptor: " + descriptor);
}
}
private Source createSourceObjectFromDescribedType(Object o) {
assertThat(o, instanceOf(DescribedType.class));
Object described = ((DescribedType) o).getDescribed();
assertThat(described, instanceOf(List.class));
@SuppressWarnings("unchecked")
List<Object> sourceFields = (List<Object>) described;
return new Source(sourceFields.toArray());
}
public void remotelyEndLastOpenedSession(boolean expectEndResponse) {
remotelyEndLastOpenedSession(expectEndResponse, 0, null, null);
}
public void remotelyEndLastOpenedSession(boolean expectEndResponse, long delayBeforeSend) {
remotelyEndLastOpenedSession(expectEndResponse, delayBeforeSend, null, null);
}
public void remotelyEndLastOpenedSession(boolean expectEndResponse, final long delayBeforeSend, Symbol errorType, String errorMessage) {
synchronized (_handlersLock) {
CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
// Now generate the End for the appropriate session
final EndFrame endFrame = new EndFrame();
if (errorType != null) {
org.apache.qpid.jms.test.testpeer.describedtypes.Error detachError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error();
detachError.setCondition(errorType);
detachError.setDescription(errorMessage);
endFrame.setError(detachError);
}
int channel = -1;
final FrameSender frameSender = new FrameSender(this, FrameType.AMQP, channel, endFrame, null);
frameSender.setValueProvider(new ValueProvider() {
@Override
public void setValues() {
frameSender.setChannel(_lastInitiatedChannel);
//Insert a delay if requested
if (delayBeforeSend > 0) {
try {
Thread.sleep(delayBeforeSend);
} catch (InterruptedException e) {
// Ignore
}
}
}
});
comp.add(frameSender);
if (expectEndResponse) {
// Expect a response to our End.
final EndMatcher endMatcher = new EndMatcher();
// TODO: enable matching on the channel number of the response.
addHandler(endMatcher);
}
}
}
public void remotelyCloseConnection(boolean expectCloseResponse) {
remotelyCloseConnection(expectCloseResponse, null, null, null, 0);
}
public void remotelyCloseConnection(boolean expectCloseResponse, Symbol errorType, String errorMessage) {
remotelyCloseConnection(expectCloseResponse, errorType, errorMessage, null, 0);
}
public void remotelyCloseConnection(boolean expectCloseResponse, Symbol errorType, String errorMessage, long delayBeforeSend) {
remotelyCloseConnection(expectCloseResponse, errorType, errorMessage, null, delayBeforeSend);
}
public void remotelyCloseConnection(boolean expectCloseResponse, Symbol errorType, String errorMessage, Map<Symbol, Object> info) {
remotelyCloseConnection(expectCloseResponse, errorType, errorMessage, info, 0);
}
public void remotelyCloseConnection(boolean expectCloseResponse, Symbol errorType, String errorMessage, Map<Symbol, Object> info, long delayBeforeSend) {
synchronized (_handlersLock) {
// Prepare a composite to insert this action at the end of the handler sequence
CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
// Now generate the Close
final FrameSender closeSender = createCloseFrameSender(errorType, errorMessage, info, delayBeforeSend);
comp.add(closeSender);
if (expectCloseResponse) {
// Expect a response to our Close.
final CloseMatcher closeMatcher = new CloseMatcher();
addHandler(closeMatcher);
}
}
}
private FrameSender createCloseFrameSender(Symbol errorType, String errorMessage, Map<Symbol, Object> info, final long delayBeforeSend) {
final CloseFrame closeFrame = new CloseFrame();
if (errorType != null) {
org.apache.qpid.jms.test.testpeer.describedtypes.Error closeError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error();
closeError.setCondition(errorType);
closeError.setDescription(errorMessage);
closeError.setInfo(info);
closeFrame.setError(closeError);
}
final FrameSender closeSender = new FrameSender(this, FrameType.AMQP, CONNECTION_CHANNEL, closeFrame, null);
closeSender.setValueProvider(new ValueProvider() {
@Override
public void setValues() {
//Insert a delay if requested
if (delayBeforeSend > 0) {
try {
Thread.sleep(delayBeforeSend);
} catch (InterruptedException e) {
// Ignore
}
}
}
});
return closeSender;
}
void sendConnectionCloseImmediately(Symbol errorType, String errorMessage) {
FrameSender closeSender = createCloseFrameSender(errorType, errorMessage, null, 0);
closeSender.run();
}
public void remotelyDetachLastOpenedLinkOnLastOpenedSession(boolean expectDetachResponse, boolean closed) {
remotelyDetachLastOpenedLinkOnLastOpenedSession(expectDetachResponse, closed, null, null);
}
public void remotelyDetachLastOpenedLinkOnLastOpenedSession(boolean expectDetachResponse, boolean closed, Symbol errorType, String errorMessage) {
remotelyDetachLastOpenedLinkOnLastOpenedSession(expectDetachResponse, closed, errorType, errorMessage, 0);
}
public void remotelyDetachLastOpenedLinkOnLastOpenedSession(boolean expectDetachResponse, boolean closed, Symbol errorType, String errorMessage, final long delayBeforeSend) {
synchronized (_handlersLock) {
CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
// Now generate the Detach for the appropriate link on the appropriate session
final DetachFrame detachFrame = new DetachFrame();
detachFrame.setClosed(closed);
if (errorType != null) {
org.apache.qpid.jms.test.testpeer.describedtypes.Error detachError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error();
detachError.setCondition(errorType);
detachError.setDescription(errorMessage);
detachFrame.setError(detachError);
}
// The response frame channel will be dynamically set based on the previous frames. Using the -1 is an illegal placeholder.
final FrameSender frameSender = new FrameSender(this, FrameType.AMQP, -1, detachFrame, null);
frameSender.setValueProvider(new ValueProvider() {
@Override
public void setValues() {
frameSender.setChannel(_lastInitiatedChannel);
detachFrame.setHandle(_lastInitiatedLinkHandle);
//Insert a delay if requested
if (delayBeforeSend > 0) {
try {
Thread.sleep(delayBeforeSend);
} catch (InterruptedException e) {
// Ignore
}
}
}
});
comp.add(frameSender);
if (expectDetachResponse) {
Matcher<Boolean> closeMatcher = null;
if (closed) {
closeMatcher = equalTo(true);
} else {
closeMatcher = Matchers.anyOf(equalTo(false), nullValue());
}
// Expect a response to our Detach.
final DetachMatcher detachMatcher = new DetachMatcher().withClosed(closeMatcher);
// TODO: enable matching on the channel number of the response.
addHandler(detachMatcher);
}
}
}
private CompositeAmqpPeerRunnable insertCompsiteActionForLastHandler() {
CompositeAmqpPeerRunnable comp = new CompositeAmqpPeerRunnable();
Handler h = getLastHandler();
AmqpPeerRunnable orig = h.getOnCompletionAction();
if (orig != null) {
comp.add(orig);
}
h.onCompletion(comp);
return comp;
}
public void sendTransferToLastOpenedLinkOnLastOpenedSession(final HeaderDescribedType headerDescribedType,
final MessageAnnotationsDescribedType messageAnnotationsDescribedType,
final PropertiesDescribedType propertiesDescribedType,
final ApplicationPropertiesDescribedType appPropertiesDescribedType,
final DescribedType content,
final int nextIncomingDeliveryId) {
sendTransferToLastOpenedLinkOnLastOpenedSession(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, appPropertiesDescribedType, content, nextIncomingDeliveryId, null, null, 0);
}
public void sendTransferToLastOpenedLinkOnLastOpenedSession(final HeaderDescribedType headerDescribedType,
final MessageAnnotationsDescribedType messageAnnotationsDescribedType,
final PropertiesDescribedType propertiesDescribedType,
final ApplicationPropertiesDescribedType appPropertiesDescribedType,
final DescribedType content,
final int nextIncomingDeliveryId,
final String tagAsString,
final Boolean more,
final int sendDelay) {
synchronized (_handlersLock) {
CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
String tagString = tagAsString;
if(tagString == null) {
tagString = "theDeliveryTag" + nextIncomingDeliveryId;
}
Binary dtag = new Binary(tagString.getBytes());
final TransferFrame transferResponse = new TransferFrame()
.setDeliveryId(UnsignedInteger.valueOf(nextIncomingDeliveryId))
.setDeliveryTag(dtag)
.setMessageFormat(UnsignedInteger.ZERO);
if(more != null) {
transferResponse.setMore(more);
}
Binary payload = prepareTransferPayload(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, appPropertiesDescribedType, content);
// The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender transferSender = new FrameSender(this, FrameType.AMQP, -1, transferResponse, payload);
transferSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
transferResponse.setHandle(_lastInitiatedLinkHandle);
transferSender.setChannel(_lastInitiatedChannel);
}
});
if(sendDelay != 0) {
transferSender.setSendDelay(sendDelay);
}
comp.add(transferSender);
}
}
public void runAfterLastHandler(AmqpPeerRunnable action) {
synchronized (_handlersLock) {
// Prepare a composite to insert this action at the end of the handler sequence
CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
comp.add(action);
}
}
void assertionFailed(AssertionError ae) {
if(_firstAssertionError == null)
{
_firstAssertionError = ae;
}
}
public void expectSaslHeaderThenDrop() {
AmqpPeerRunnable exitAfterHeader = new AmqpPeerRunnable() {
@Override
public void run() {
_driverRunnable.exitReadLoopEarly();
}
};
addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER, exitAfterHeader));
}
public void dropAfterLastHandler() {
dropAfterLastHandler(0);
}
public void dropAfterLastHandler(final long delay) {
AmqpPeerRunnable exitEarly = new AmqpPeerRunnable() {
@Override
public void run() {
if(delay > 0) {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
LOGGER.warn("Interrupted while delaying before read loop exit");
Thread.currentThread().interrupt();
}
}
_driverRunnable.exitReadLoopEarly();
}
};
runAfterLastHandler(exitEarly);
}
public void optionalFlow(final boolean drain, final boolean sendDrainFlowResponse,Matcher<UnsignedInteger> creditMatcher)
{
final FlowMatcher flowMatcher = new FlowMatcher();
flowMatcher.setOptional(true);
Matcher<Boolean> drainMatcher = null;
if(drain)
{
drainMatcher = equalTo(true);
}
else
{
drainMatcher = Matchers.anyOf(equalTo(false), nullValue());
}
flowMatcher.withLinkCredit(creditMatcher);
flowMatcher.withDrain(drainMatcher);
if(drain && sendDrainFlowResponse)
{
final FlowFrame drainResponse = new FlowFrame();
drainResponse.setOutgoingWindow(UnsignedInteger.ZERO); //TODO: shouldnt be hard coded
drainResponse.setIncomingWindow(UnsignedInteger.valueOf(Integer.MAX_VALUE)); //TODO: shouldnt be hard coded
drainResponse.setLinkCredit(UnsignedInteger.ZERO);
drainResponse.setDrain(true);
// The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
final FrameSender flowResponseSender = new FrameSender(this, FrameType.AMQP, -1, drainResponse, null);
flowResponseSender.setValueProvider(new ValueProvider()
{
@Override
public void setValues()
{
flowResponseSender.setChannel(flowMatcher.getActualChannel());
drainResponse.setHandle(flowMatcher.getReceivedHandle());
drainResponse.setDeliveryCount(calculateNewDeliveryCount(flowMatcher));
drainResponse.setNextOutgoingId(calculateNewOutgoingId(flowMatcher, 0));
drainResponse.setNextIncomingId(flowMatcher.getReceivedNextOutgoingId());
}
});
flowMatcher.onCompletion(flowResponseSender);
}
addHandler(flowMatcher);
}
}