| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.qpid.jms.test.testpeer; |
| |
| import static org.apache.qpid.jms.provider.amqp.AmqpSupport.ANONYMOUS_RELAY; |
| import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DELAYED_DELIVERY; |
| import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DYNAMIC_NODE_LIFETIME_POLICY; |
| import static org.apache.qpid.jms.provider.amqp.AmqpSupport.GLOBAL; |
| import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SHARED; |
| import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SHARED_SUBS; |
| import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SOLE_CONNECTION_CAPABILITY; |
| import static org.hamcrest.MatcherAssert.assertThat; |
| import static org.hamcrest.Matchers.arrayContaining; |
| import static org.hamcrest.Matchers.equalTo; |
| import static org.hamcrest.Matchers.hasEntry; |
| import static org.hamcrest.Matchers.instanceOf; |
| import static org.hamcrest.Matchers.lessThan; |
| import static org.hamcrest.Matchers.notNullValue; |
| import static org.hamcrest.Matchers.nullValue; |
| |
| import java.io.IOException; |
| import java.net.Socket; |
| import java.nio.charset.StandardCharsets; |
| import java.security.PrivilegedActionException; |
| import java.security.PrivilegedExceptionAction; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import javax.net.ssl.SSLContext; |
| import javax.security.auth.Subject; |
| import javax.security.auth.callback.Callback; |
| import javax.security.auth.callback.CallbackHandler; |
| import javax.security.auth.callback.UnsupportedCallbackException; |
| import javax.security.auth.login.AppConfigurationEntry; |
| import javax.security.auth.login.Configuration; |
| import javax.security.auth.login.LoginContext; |
| import javax.security.sasl.AuthorizeCallback; |
| import javax.security.sasl.Sasl; |
| import javax.security.sasl.SaslServer; |
| |
| import org.apache.qpid.jms.provider.amqp.AmqpSupport; |
| import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper; |
| import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError; |
| import org.apache.qpid.jms.test.testpeer.basictypes.ReceiverSettleMode; |
| import org.apache.qpid.jms.test.testpeer.basictypes.Role; |
| import org.apache.qpid.jms.test.testpeer.basictypes.SenderSettleMode; |
| import org.apache.qpid.jms.test.testpeer.basictypes.TerminusDurability; |
| import org.apache.qpid.jms.test.testpeer.basictypes.TerminusExpiryPolicy; |
| import org.apache.qpid.jms.test.testpeer.basictypes.TransactionError; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.AttachFrame; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.BeginFrame; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.CloseFrame; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.Coordinator; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.Declare; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.Declared; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.DetachFrame; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.Discharge; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.DispositionFrame; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.EndFrame; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.FlowFrame; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.FrameDescriptorMapping; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.OpenFrame; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.Rejected; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.Released; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.SaslChallengeFrame; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.SaslMechanismsFrame; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.SaslOutcomeFrame; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.Source; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.Target; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.TransferFrame; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.sections.ApplicationPropertiesDescribedType; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.sections.HeaderDescribedType; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.sections.MessageAnnotationsDescribedType; |
| import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType; |
| import org.apache.qpid.jms.test.testpeer.matchers.AttachMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.BeginMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.CloseMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.DeleteOnCloseMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.DetachMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.DispositionMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.EndMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.FlowMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.OpenMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.SaslInitMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.SaslResponseMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.TransferMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher; |
| import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher; |
| import org.apache.qpid.proton.amqp.Binary; |
| import org.apache.qpid.proton.amqp.DescribedType; |
| import org.apache.qpid.proton.amqp.Symbol; |
| import org.apache.qpid.proton.amqp.UnsignedByte; |
| import org.apache.qpid.proton.amqp.UnsignedInteger; |
| import org.apache.qpid.proton.amqp.UnsignedShort; |
| import org.apache.qpid.proton.codec.Data; |
| import org.apache.qpid.proton.engine.impl.AmqpHeader; |
| import org.hamcrest.BaseMatcher; |
| import org.hamcrest.Description; |
| import org.hamcrest.Matcher; |
| import org.hamcrest.Matchers; |
| import org.junit.Assert; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class TestAmqpPeer implements AutoCloseable |
| { |
| private static final Logger LOGGER = LoggerFactory.getLogger(TestAmqpPeer.class.getName()); |
| |
| public static final String MESSAGE_NUMBER = "MessageNumber"; |
| |
| private static final Symbol ANONYMOUS = Symbol.valueOf("ANONYMOUS"); |
| private static final Symbol EXTERNAL = Symbol.valueOf("EXTERNAL"); |
| private static final Symbol PLAIN = Symbol.valueOf("PLAIN"); |
| private static final Symbol GSSAPI = Symbol.valueOf("GSSAPI"); |
| private static final Symbol XOAUTH2 = Symbol.valueOf("XOAUTH2"); |
| private static final UnsignedByte SASL_OK = UnsignedByte.valueOf((byte) 0); |
| private static final UnsignedByte SASL_FAIL_AUTH = UnsignedByte.valueOf((byte) 1); |
| private static final UnsignedByte SASL_SYS_TEMP = UnsignedByte.valueOf((byte) 4); |
| private static final int CONNECTION_CHANNEL = 0; |
| private static final int DEFAULT_PRODUCER_CREDIT = 100; |
| private static final Symbol[] DEFAULT_DESIRED_CAPABILITIES = new Symbol[] { SOLE_CONNECTION_CAPABILITY, DELAYED_DELIVERY, ANONYMOUS_RELAY, SHARED_SUBS}; |
| |
| private volatile AssertionError _firstAssertionError = null; |
| private final TestAmqpPeerRunner _driverRunnable; |
| private final Thread _driverThread; |
| |
| /** |
| * Guarded by {@link #_handlersLock} |
| */ |
| private final List<Handler> _handlers = new ArrayList<Handler>(); |
| private final Object _handlersLock = new Object(); |
| |
| /** |
| * Guarded by {@link #_handlersLock} |
| */ |
| private CountDownLatch _handlersCompletedLatch; |
| |
| private byte[] _deferredBytes; |
| private int _lastInitiatedChannel = -1; |
| private UnsignedInteger _lastInitiatedLinkHandle = null; |
| private UnsignedInteger _lastInitiatedCoordinatorLinkHandle = null; |
| private int advertisedIdleTimeout = 0; |
| private AtomicInteger _emptyFrameCount = new AtomicInteger(); |
| |
| public TestAmqpPeer() throws IOException |
| { |
| this(null, false); |
| } |
| |
| public TestAmqpPeer(SSLContext context, boolean needClientCert) throws IOException |
| { |
| this(context, needClientCert, false); |
| } |
| |
| public TestAmqpPeer(SSLContext context, boolean needClientCert, boolean sendSaslHeaderPreEmptively) throws IOException |
| { |
| _driverRunnable = new TestAmqpPeerRunner(this, context, needClientCert); |
| _driverRunnable.setSendSaslHeaderPreEmptively(sendSaslHeaderPreEmptively); |
| _driverThread = new Thread(_driverRunnable, "MockAmqpPeer-" + _driverRunnable.getServerPort()); |
| _driverThread.start(); |
| } |
| |
| /** |
| * Shuts down the test peer, throwing any Throwable |
| * that occurred on the peer, or validating that no |
| * unused matchers remain. |
| */ |
| @Override |
| public void close() throws Exception |
| { |
| _driverRunnable.stop(); |
| |
| try |
| { |
| _driverThread.join(30000); |
| } |
| catch (InterruptedException e) |
| { |
| Thread.currentThread().interrupt(); |
| } |
| finally |
| { |
| AssertionError ae = _firstAssertionError; |
| if(ae != null) |
| { |
| String message = "Assertion failure during test run"; |
| if(ae.getMessage() != null) |
| { |
| message += ": " + ae.getMessage(); |
| } |
| |
| throw new AssertionError(message, _firstAssertionError); |
| } |
| |
| Throwable throwable = getThrowable(); |
| if(throwable == null) |
| { |
| synchronized(_handlersLock) |
| { |
| assertThat(_handlers, Matchers.empty()); |
| } |
| } |
| else |
| { |
| //AutoClosable can't handle throwing Throwables, so we wrap it. |
| throw new RuntimeException("TestPeer caught throwable during run", throwable); |
| } |
| } |
| } |
| |
| public Throwable getThrowable() |
| { |
| return _driverRunnable.getException(); |
| } |
| |
| public void setSuppressReadExceptionOnClose(boolean suppress) |
| { |
| _driverRunnable.setSuppressReadExceptionOnClose(suppress); |
| } |
| |
| public int getServerPort() |
| { |
| return _driverRunnable.getServerPort(); |
| } |
| |
| public Socket getClientSocket() |
| { |
| return _driverRunnable.getClientSocket(); |
| } |
| |
| public boolean isSSL() |
| { |
| return _driverRunnable.isSSL(); |
| } |
| |
| public int getAdvertisedIdleTimeout() |
| { |
| return advertisedIdleTimeout; |
| } |
| |
| public void setAdvertisedIdleTimeout(int advertisedIdleTimeout) |
| { |
| this.advertisedIdleTimeout = advertisedIdleTimeout; |
| } |
| |
| public int getEmptyFrameCount() { |
| return _emptyFrameCount.get(); |
| } |
| |
| public void purgeExpectations() { |
| synchronized (_handlersLock) { |
| _handlers.clear(); |
| } |
| } |
| |
| void receiveHeader(byte[] header) |
| { |
| Handler handler = getFirstHandler(); |
| if(handler instanceof HeaderHandler) |
| { |
| ((HeaderHandler)handler).header(header,this); |
| removeFirstHandler(); |
| } |
| else |
| { |
| throw new IllegalStateException("Received header but the next handler is a " + handler); |
| } |
| } |
| |
| void receiveFrame(int type, int channel, int frameSize, DescribedType describedType, Binary payload) |
| { |
| Handler handler = getFirstHandler(); |
| |
| while (handler instanceof FrameHandler && ((FrameHandler) handler).isOptional()) |
| { |
| FrameHandler frameHandler = (FrameHandler) handler; |
| if(frameHandler.descriptorMatches(describedType.getDescriptor())){ |
| LOGGER.info("Optional frame handler matches the descriptor, proceeding to verify it"); |
| break; |
| } else { |
| LOGGER.info("Skipping non-matching optional frame handler, received frame descriptor (" + describedType.getDescriptor() + ") does not match handler: " + frameHandler); |
| removeFirstHandler(); |
| handler = getFirstHandler(); |
| } |
| } |
| |
| if(handler == null) |
| { |
| Object actualDescriptor = describedType.getDescriptor(); |
| Object mappedDescriptor = FrameDescriptorMapping.lookupMapping(actualDescriptor); |
| |
| throw new IllegalStateException("No handler! Received frame, descriptor=" + actualDescriptor + "/" + mappedDescriptor); |
| } |
| |
| if(handler instanceof FrameHandler) |
| { |
| ((FrameHandler)handler).frame(type, channel, frameSize, describedType, payload, this); |
| removeFirstHandler(); |
| } |
| else |
| { |
| throw new IllegalStateException("Received frame but the next handler is a " + handler); |
| } |
| } |
| |
| void receiveEmptyFrame(int type, int channel) |
| { |
| _emptyFrameCount.incrementAndGet(); |
| LOGGER.debug("Received empty frame"); |
| } |
| |
| private void removeFirstHandler() |
| { |
| synchronized(_handlersLock) |
| { |
| Handler h = _handlers.remove(0); |
| if(_handlersCompletedLatch != null) |
| { |
| _handlersCompletedLatch.countDown(); |
| } |
| LOGGER.trace("Removed completed handler: {}", h); |
| } |
| } |
| |
| private void addHandler(Handler handler) |
| { |
| synchronized(_handlersLock) |
| { |
| _handlers.add(handler); |
| LOGGER.trace("Added handler: {}", handler); |
| } |
| } |
| |
| private Handler getFirstHandler() |
| { |
| synchronized(_handlersLock) |
| { |
| if(_handlers.isEmpty()) |
| { |
| return null; |
| } |
| return _handlers.get(0); |
| } |
| } |
| |
| private Handler getLastHandler() |
| { |
| synchronized(_handlersLock) |
| { |
| if(_handlers.isEmpty()) |
| { |
| throw new IllegalStateException("No handlers"); |
| } |
| return _handlers.get(_handlers.size() - 1); |
| } |
| } |
| |
| public void waitForAllHandlersToComplete(int timeoutMillis) throws InterruptedException |
| { |
| boolean countedDownOk = waitForAllHandlersToCompleteNoAssert(timeoutMillis); |
| |
| String message = "All handlers did not complete within the " + timeoutMillis + "ms timeout."; |
| Throwable t = getThrowable(); |
| if(t != null){ |
| message += System.lineSeparator() + "A *potential* reason, peer caught throwable: " + t; |
| } |
| |
| Assert.assertTrue(message, countedDownOk); |
| } |
| |
| public boolean waitForAllHandlersToCompleteNoAssert(int timeoutMillis) throws InterruptedException |
| { |
| synchronized(_handlersLock) |
| { |
| _handlersCompletedLatch = new CountDownLatch(_handlers.size()); |
| } |
| |
| return _handlersCompletedLatch.await(timeoutMillis, TimeUnit.MILLISECONDS); |
| } |
| |
| void sendHeader(byte[] header) |
| { |
| LOGGER.debug("About to send header: {}", new Binary(header)); |
| _driverRunnable.sendBytes(header); |
| } |
| |
| public void sendPreemptiveServerAmqpHeader() { |
| // Arrange to send the AMQP header after the previous handler |
| CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler(); |
| comp.add(new AmqpPeerRunnable() { |
| @Override |
| public void run() { |
| sendHeader(AmqpHeader.HEADER); |
| } |
| }); |
| } |
| |
| public void sendEmptyFrame(boolean deferWrite) |
| { |
| sendFrame(FrameType.AMQP, 0, null, null, deferWrite, 0); |
| } |
| |
| void sendFrame(FrameType type, int channel, DescribedType frameDescribedType, Binary framePayload, boolean deferWrite, long sendDelay) |
| { |
| if(channel < 0) |
| { |
| throw new IllegalArgumentException("Frame must be sent on a channel >= 0"); |
| } |
| |
| LOGGER.debug("About to send: {}", frameDescribedType); |
| |
| if(sendDelay > 0) |
| { |
| LOGGER.debug("Delaying send by {} ms", sendDelay); |
| try { |
| Thread.sleep(sendDelay); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new RuntimeException("Interrupted before send", e); |
| } |
| } |
| |
| byte[] output = AmqpDataFramer.encodeFrame(type, channel, frameDescribedType, framePayload); |
| |
| if(deferWrite && _deferredBytes == null) |
| { |
| _deferredBytes = output; |
| } |
| else if(_deferredBytes != null) |
| { |
| int newCapacity = _deferredBytes.length + output.length; |
| //TODO: check overflow |
| |
| byte[] newOutput = new byte[newCapacity]; |
| System.arraycopy(_deferredBytes, 0, newOutput, 0, _deferredBytes.length); |
| System.arraycopy(output, 0, newOutput, _deferredBytes.length, output.length); |
| |
| _deferredBytes = newOutput; |
| output = newOutput; |
| } |
| |
| if(deferWrite) |
| { |
| LOGGER.debug("Deferring write until pipelined with future frame bytes"); |
| return; |
| } |
| else |
| { |
| //clear the deferred bytes to avoid corrupting future sends |
| _deferredBytes = null; |
| |
| _driverRunnable.sendBytes(output); |
| } |
| } |
| |
| private OpenFrame createOpenFrame() |
| { |
| OpenFrame openFrame = new OpenFrame(); |
| openFrame.setContainerId("test-amqp-peer-container-id"); |
| if(advertisedIdleTimeout != 0) |
| { |
| openFrame.setIdleTimeOut(UnsignedInteger.valueOf(advertisedIdleTimeout)); |
| } |
| |
| return openFrame; |
| } |
| |
| public void expectHeader(byte[] header, byte[] response) |
| { |
| addHandler(new HeaderHandlerImpl(header, response)); |
| } |
| |
| private void expectSaslAuthentication(Symbol mechanism, Matcher<Binary> initialResponseMatcher, Matcher<?> hostnameMatcher, |
| boolean sendSaslHeaderResponse, boolean amqpHeaderSentPreemptively) |
| { |
| SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(mechanism); |
| byte[] saslHeaderResponse = null; |
| if(sendSaslHeaderResponse) { |
| saslHeaderResponse = AmqpHeader.SASL_HEADER; |
| } |
| addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, saslHeaderResponse, |
| new FrameSender( |
| this, FrameType.SASL, 0, |
| saslMechanismsFrame, null))); |
| |
| SaslInitMatcher saslInitMatcher = new SaslInitMatcher() |
| .withMechanism(equalTo(mechanism)) |
| .withInitialResponse(initialResponseMatcher) |
| .onCompletion(new AmqpPeerRunnable() |
| { |
| @Override |
| public void run() |
| { |
| TestAmqpPeer.this.sendFrame( |
| FrameType.SASL, 0, |
| new SaslOutcomeFrame().setCode(SASL_OK), |
| null, |
| false, 0); |
| |
| // Now that we processed the SASL layer AMQP header, reset the |
| // peer to expect the non-SASL AMQP header. |
| _driverRunnable.expectHeader(); |
| } |
| }); |
| |
| if(hostnameMatcher != null) |
| { |
| saslInitMatcher.withHostname(hostnameMatcher); |
| } |
| |
| addHandler(saslInitMatcher); |
| |
| if (!amqpHeaderSentPreemptively) |
| { |
| addHandler(new HeaderHandlerImpl(AmqpHeader.HEADER, AmqpHeader.HEADER)); |
| } |
| } |
| |
| public void expectSaslGSSAPIFail() throws Exception { |
| SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(GSSAPI); |
| |
| addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER, |
| new FrameSender( |
| this, FrameType.SASL, 0, |
| saslMechanismsFrame, null))); |
| } |
| |
| public void expectSaslGSSAPI(String serviceName, String keyTab, String clientAuthId) throws Exception { |
| |
| SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(GSSAPI); |
| |
| addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER, |
| new FrameSender( |
| this, FrameType.SASL, 0, |
| saslMechanismsFrame, null))); |
| |
| // setup server gss context |
| final Map<String, String> options = new HashMap<>(); |
| options.put("principal", serviceName); |
| options.put("useKeyTab", "true"); |
| options.put("keyTab", keyTab); |
| options.put("storeKey", "true"); |
| options.put("isInitiator", "false"); |
| Configuration loginCconfiguration = new Configuration() { |
| @Override |
| public AppConfigurationEntry[] getAppConfigurationEntry(String name) { |
| return new AppConfigurationEntry[]{ |
| new AppConfigurationEntry("com.sun.security.auth.module.Krb5LoginModule", |
| AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, |
| options)}; |
| } |
| }; |
| |
| LoginContext loginContext = new LoginContext("", null, null, loginCconfiguration); |
| loginContext.login(); |
| final Subject serverSubject =loginContext.getSubject(); |
| |
| LOGGER.info("saslServer subject:" + serverSubject.getPrivateCredentials()); |
| |
| Map<String, ?> config = new HashMap<>(); |
| final CallbackHandler handler = new CallbackHandler() { |
| @Override |
| public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { |
| LOGGER.info("Here with: " + Arrays.asList(callbacks)); |
| for (Callback callback :callbacks) { |
| if (callback instanceof AuthorizeCallback) { |
| AuthorizeCallback authorizeCallback = (AuthorizeCallback) callback; |
| authorizeCallback.setAuthorized(authorizeCallback.getAuthenticationID().equals(authorizeCallback.getAuthorizationID())); |
| } |
| } |
| } |
| }; |
| final SaslServer saslServer = Subject.doAs(serverSubject, new PrivilegedExceptionAction<SaslServer>() { |
| @Override |
| public SaslServer run() throws Exception { |
| return Sasl.createSaslServer(GSSAPI.toString(), null, null, config, handler); |
| } |
| }); |
| |
| final SaslChallengeFrame challengeFrame1 = new SaslChallengeFrame(); |
| |
| SaslInitMatcher saslInitMatcher = new SaslInitMatcher() |
| .withMechanism(equalTo(GSSAPI)) |
| .withInitialResponse(new BaseMatcher<Binary>() { |
| |
| @Override |
| public void describeTo(Description description) {} |
| |
| @Override |
| public boolean matches(Object o) { |
| if (o == null) { |
| LOGGER.error("Got null initial response!"); |
| return false; |
| } |
| final Binary binary = (Binary) o; |
| // validate via sasl |
| try { |
| byte[] challenge1data = Subject.doAs(serverSubject, new PrivilegedExceptionAction<byte[]>() { |
| @Override |
| public byte[] run() throws Exception { |
| LOGGER.info("Evaluate Initial Response.. size:" + binary.getLength()); |
| return saslServer.evaluateResponse(binary.getArray()); |
| } |
| }); |
| |
| LOGGER.info("Creating challenge 1.. size: " + challenge1data.length); |
| challengeFrame1.setChallenge(new Binary(challenge1data)); |
| |
| } catch (PrivilegedActionException e) { |
| LOGGER.error("Unexpected error during processing initial response", e); |
| throw new RuntimeException("Failed to eval initial response", e); |
| } |
| LOGGER.info("Complete:" + saslServer.isComplete()); |
| |
| return true; |
| } |
| }).onCompletion(new AmqpPeerRunnable() { |
| @Override |
| public void run() { |
| LOGGER.info("Send challenge 1.."); |
| TestAmqpPeer.this.sendFrame( |
| FrameType.SASL, 0, |
| challengeFrame1, |
| null, |
| false, 0); |
| } |
| }); |
| |
| AtomicBoolean succeeded = new AtomicBoolean(false); |
| |
| final SaslChallengeFrame challengeFrame2 = new SaslChallengeFrame(); |
| |
| SaslResponseMatcher responseMatcher1 = new SaslResponseMatcher().withResponse(new BaseMatcher<Binary>() { |
| @Override |
| public void describeTo(Description description) {} |
| |
| @Override |
| public boolean matches(Object o) { |
| final Binary responseBinary1 = (Binary) o; |
| // validate via sasl |
| |
| byte[] challenge2data = null; |
| try { |
| challenge2data = Subject.doAs(serverSubject, new PrivilegedExceptionAction<byte[]>() { |
| @Override |
| public byte[] run() throws Exception { |
| LOGGER.info("Evaluate challenge response 1.. size:" + responseBinary1.getLength()); |
| return saslServer.evaluateResponse(responseBinary1.getArray()); |
| } |
| }); |
| } catch (PrivilegedActionException e) { |
| LOGGER.error("Unexpected error during processing challenge response 1", e); |
| throw new RuntimeException("failed to evaluate challenge response 1", e); |
| } |
| |
| LOGGER.info("Creating challenge 2.. size: " + challenge2data.length); |
| challengeFrame2.setChallenge(new Binary(challenge2data)); |
| |
| LOGGER.info("Complete:" + saslServer.isComplete()); |
| |
| return true; |
| } |
| }).onCompletion(new AmqpPeerRunnable() { |
| @Override |
| public void run() { |
| LOGGER.info("Send challenge 2.."); |
| TestAmqpPeer.this.sendFrame( |
| FrameType.SASL, 0, |
| challengeFrame2, |
| null, |
| false, 0); |
| } |
| }); |
| |
| SaslResponseMatcher responseMatcher2 = new SaslResponseMatcher().withResponse(new BaseMatcher<Binary>() { |
| @Override |
| public void describeTo(Description description) {} |
| |
| @Override |
| public boolean matches(Object o) { |
| final Binary binary = (Binary) o; |
| // validate via sasl |
| |
| byte[] additionalData = null; |
| try { |
| additionalData = Subject.doAs(serverSubject, new PrivilegedExceptionAction<byte[]>() { |
| @Override |
| public byte[] run() throws Exception { |
| LOGGER.info("Evaluate challenge response 2.. size:" + binary.getLength()); |
| return saslServer.evaluateResponse(binary.getArray()); |
| } |
| }); |
| } catch (PrivilegedActionException e) { |
| LOGGER.error("Unexpected error during processing challenge response 2", e); |
| throw new RuntimeException("failed to evaluate challenge response 2", e); |
| } |
| |
| boolean complete = saslServer.isComplete(); |
| boolean expectedAuthId = false; |
| if(complete) { |
| expectedAuthId = clientAuthId.equals(saslServer.getAuthorizationID()); |
| LOGGER.info("Authorized ID: " + saslServer.getAuthorizationID()); |
| } |
| |
| LOGGER.info("Complete:" + complete + ", expectedAuthID:" + expectedAuthId +", additionalData:" + Arrays.toString(additionalData)); |
| |
| if(complete && expectedAuthId && additionalData == null) { |
| succeeded.set(true); |
| return true; |
| } else { |
| return false; |
| } |
| } |
| }).onCompletion(new AmqpPeerRunnable() { |
| @Override |
| public void run() { |
| SaslOutcomeFrame saslOutcome = new SaslOutcomeFrame(); |
| if (saslServer.isComplete() && succeeded.get()) { |
| saslOutcome.setCode(SASL_OK); |
| } else { |
| saslOutcome.setCode(SASL_FAIL_AUTH); |
| } |
| |
| LOGGER.info("Send Outcome"); |
| TestAmqpPeer.this.sendFrame( |
| FrameType.SASL, 0, |
| saslOutcome, |
| null, |
| false, 0); |
| |
| // Now that we processed the SASL layer AMQP header, reset the |
| // peer to expect the non-SASL AMQP header. |
| _driverRunnable.expectHeader(); |
| } |
| }); |
| |
| addHandler(saslInitMatcher); |
| addHandler(responseMatcher1); |
| addHandler(responseMatcher2); |
| addHandler(new HeaderHandlerImpl(AmqpHeader.HEADER, AmqpHeader.HEADER)); |
| } |
| |
| public void expectSaslPlain(String username, String password) |
| { |
| byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8); |
| byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8); |
| byte[] data = new byte[usernameBytes.length+passwordBytes.length+2]; |
| System.arraycopy(usernameBytes, 0, data, 1, usernameBytes.length); |
| System.arraycopy(passwordBytes, 0, data, 2 + usernameBytes.length, passwordBytes.length); |
| |
| Matcher<Binary> initialResponseMatcher = equalTo(new Binary(data)); |
| |
| expectSaslAuthentication(PLAIN, initialResponseMatcher, null, true, false); |
| } |
| |
| public void expectSaslXOauth2(String username, String password) |
| { |
| byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8); |
| byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8); |
| byte[] data = new byte[usernameBytes.length+passwordBytes.length+20]; |
| |
| System.arraycopy("user=".getBytes(StandardCharsets.US_ASCII), 0, data, 0, 5); |
| System.arraycopy(usernameBytes, 0, data, 5, usernameBytes.length); |
| data[5+usernameBytes.length] = 1; |
| System.arraycopy("auth=Bearer ".getBytes(StandardCharsets.US_ASCII), 0, data, 6+usernameBytes.length, 12); |
| System.arraycopy(passwordBytes, 0, data, 18 + usernameBytes.length, passwordBytes.length); |
| data[data.length-2] = 1; |
| data[data.length-1] = 1; |
| |
| Matcher<Binary> initialResponseMatcher = equalTo(new Binary(data)); |
| |
| expectSaslAuthentication(XOAUTH2, initialResponseMatcher, null, true, false); |
| } |
| |
| public void expectSaslExternal() |
| { |
| if(!_driverRunnable.isNeedClientCert()) |
| { |
| throw new IllegalStateException("need-client-cert must be enabled on the test peer"); |
| } |
| |
| expectSaslAuthentication(EXTERNAL, equalTo(new Binary(new byte[0])), null, true, false); |
| } |
| |
| public void expectSaslAnonymous() |
| { |
| expectSaslAnonymous(null); |
| } |
| |
| public void expectSaslAnonymous(Matcher<?> hostnameMatcher) |
| { |
| expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), hostnameMatcher, true, false); |
| } |
| |
| public void expectSaslAnonymousWithPreEmptiveServerHeader() |
| { |
| assertThat("Peer should be created with instruction to send preemptively", _driverRunnable.isSendSaslHeaderPreEmptively(), equalTo(true)); |
| boolean sendSaslHeaderResponse = false; // Must arrange for the server to have already sent it preemptively |
| expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), null, sendSaslHeaderResponse, false); |
| } |
| |
| public void expectSaslAnonymousWithServerAmqpHeaderSentPreemptively() |
| { |
| expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), null, true, true); |
| } |
| |
| public void expectSaslFailingAuthentication(Symbol[] serverMechs, Symbol clientSelectedMech) |
| { |
| expectSaslFailingExchange(serverMechs, clientSelectedMech, SASL_FAIL_AUTH); |
| } |
| |
| public void expectSaslFailingExchange(Symbol[] serverMechs, Symbol clientSelectedMech, UnsignedByte saslFailureAuthCode) |
| { |
| SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(serverMechs); |
| addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER, |
| new FrameSender( |
| this, FrameType.SASL, 0, |
| saslMechanismsFrame, null))); |
| |
| if(saslFailureAuthCode.compareTo(SASL_FAIL_AUTH) < 0 || saslFailureAuthCode.compareTo(SASL_SYS_TEMP) > 0) { |
| throw new IllegalArgumentException("A valid failing SASL code must be supplied"); |
| } |
| |
| SaslInitMatcher saslInitMatcher = new SaslInitMatcher().withMechanism(equalTo(clientSelectedMech)); |
| saslInitMatcher.onCompletion(new AmqpPeerRunnable() |
| { |
| @Override |
| public void run() |
| { |
| TestAmqpPeer.this.sendFrame( |
| FrameType.SASL, 0, |
| new SaslOutcomeFrame().setCode(saslFailureAuthCode), |
| null, |
| false, 0); |
| _driverRunnable.expectHeader(); |
| } |
| }); |
| addHandler(saslInitMatcher); |
| } |
| |
| public void expectSaslMechanismNegotiationFailure(Symbol[] serverMechs) |
| { |
| SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(serverMechs); |
| FrameSender mechanismsFrameSender = new FrameSender(this, FrameType.SASL, 0, saslMechanismsFrame, null); |
| |
| addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER, mechanismsFrameSender)); |
| } |
| |
| /** |
| * Expect a connection that does not use a SASL layer, but proceeds straight |
| * to the AMQP connection (useful to skip a stage for connections that don't |
| * require SASL, e.g. because of anonymous or client certificate authentication). |
| * |
| * @param maxFrameSizeMatcher |
| * The Matcher used to validate the maxFrameSize setting. |
| */ |
| public void expectSaslLayerDisabledConnect(Matcher<?> maxFrameSizeMatcher) |
| { |
| addHandler(new HeaderHandlerImpl(AmqpHeader.HEADER, AmqpHeader.HEADER)); |
| |
| OpenFrame openFrame = createOpenFrame(); |
| |
| OpenMatcher openMatcher = new OpenMatcher() |
| .withContainerId(notNullValue(String.class)); |
| |
| if(maxFrameSizeMatcher != null) { |
| openMatcher.withMaxFrameSize(maxFrameSizeMatcher); |
| } |
| |
| openMatcher.onCompletion(new FrameSender( |
| this, FrameType.AMQP, 0, |
| openFrame, |
| null)); |
| |
| addHandler(openMatcher); |
| } |
| |
| public void expectOpen() { |
| expectOpen(false); |
| } |
| |
| public void expectOpen(boolean deferOpened) { |
| expectOpen(null, null, deferOpened); |
| } |
| |
| public void expectOpen(Map<Symbol, Object> serverProperties) { |
| expectOpen(DEFAULT_DESIRED_CAPABILITIES, new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY }, null, serverProperties, null, null, false); |
| } |
| |
| public void expectOpen(Map<Symbol, Object> serverProperties, Symbol[] serverCapabilities) { |
| expectOpen(DEFAULT_DESIRED_CAPABILITIES, serverCapabilities, null, serverProperties, null, null, false); |
| } |
| |
| public void expectOpen(Matcher<?> clientPropertiesMatcher, Matcher<?> hostnameMatcher, boolean deferOpened) { |
| expectOpen(clientPropertiesMatcher, nullValue(), hostnameMatcher, deferOpened); |
| } |
| |
| public void expectOpen(Matcher<?> clientPropertiesMatcher, Matcher<?> idleTimeoutMatcher, Matcher<?> hostnameMatcher, boolean deferOpened) { |
| expectOpen(DEFAULT_DESIRED_CAPABILITIES, new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY }, clientPropertiesMatcher, null, null, hostnameMatcher, deferOpened); |
| } |
| |
| public void sendPreemptiveServerOpenFrame() { |
| // Arrange to send the Open frame after the previous handler |
| OpenFrame open = createOpenFrame(); |
| |
| CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler(); |
| comp.add(new FrameSender(this, FrameType.AMQP, 0, open, null)); |
| } |
| |
| public void expectOpen(Symbol[] desiredCapabilities, Symbol[] serverCapabilities, |
| Matcher<?> clientPropertiesMatcher, Map<Symbol, Object> serverProperties, |
| Matcher<?> idleTimeoutMatcher, Matcher<?> hostnameMatcher, boolean deferOpened) { |
| |
| OpenFrame open = createOpenFrame(); |
| if (serverCapabilities != null) { |
| open.setOfferedCapabilities(serverCapabilities); |
| } |
| |
| if (serverProperties != null) { |
| open.setProperties(serverProperties); |
| } |
| |
| OpenMatcher openMatcher = new OpenMatcher().withContainerId(notNullValue(String.class)); |
| if (!deferOpened) { |
| openMatcher.onCompletion(new FrameSender(this, FrameType.AMQP, 0, open, null)); |
| } |
| |
| if (desiredCapabilities != null) { |
| openMatcher.withDesiredCapabilities(arrayContaining(desiredCapabilities)); |
| } else { |
| openMatcher.withDesiredCapabilities(nullValue()); |
| } |
| |
| if (idleTimeoutMatcher != null) { |
| openMatcher.withIdleTimeOut(idleTimeoutMatcher); |
| } |
| |
| if (hostnameMatcher != null) { |
| openMatcher.withHostname(hostnameMatcher); |
| } |
| |
| if (clientPropertiesMatcher != null) { |
| openMatcher.withProperties(clientPropertiesMatcher); |
| } |
| |
| addHandler(openMatcher); |
| } |
| |
| public void rejectConnect(Symbol errorType, String errorMessage, Map<Symbol, Object> errorInfo) { |
| // Expect a connection, establish through the SASL negotiation and sending of the Open frame |
| Map<Symbol, Object> serverProperties = new HashMap<Symbol, Object>(); |
| serverProperties.put(AmqpSupport.CONNECTION_OPEN_FAILED, true); |
| |
| expectSaslAnonymous(); |
| expectOpen(serverProperties); |
| |
| // Now generate the Close frame with the supplied error |
| final FrameSender closeSender = createCloseFrameSender(errorType, errorMessage, errorInfo, 0); |
| |
| // Update the handler to send the Close frame after the Open frame. |
| CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler(); |
| comp.add(closeSender); |
| |
| addHandler(new CloseMatcher().withError(Matchers.nullValue())); |
| } |
| |
| public void expectClose() |
| { |
| expectClose(Matchers.nullValue(), true); |
| } |
| |
| public void expectClose(boolean sendReply) |
| { |
| expectClose(Matchers.nullValue(), sendReply); |
| } |
| |
| public void expectClose(Matcher<?> errorMatcher, boolean sendReply) |
| { |
| CloseMatcher closeMatcher = new CloseMatcher().withError(errorMatcher); |
| if(sendReply) { |
| closeMatcher.onCompletion(new FrameSender(this, FrameType.AMQP, 0, |
| new CloseFrame(), |
| null)); |
| } |
| |
| addHandler(closeMatcher); |
| } |
| |
| public void expectBegin() |
| { |
| expectBegin(notNullValue(), true); |
| } |
| |
| public void expectBegin(Matcher<?> outgoingWindowMatcher, boolean sendResponse) |
| { |
| final BeginMatcher beginMatcher = new BeginMatcher() |
| .withRemoteChannel(nullValue()) |
| .withNextOutgoingId(equalTo(UnsignedInteger.ONE)) |
| .withIncomingWindow(notNullValue()); |
| if(outgoingWindowMatcher != null) |
| { |
| beginMatcher.withOutgoingWindow(notNullValue()); |
| } |
| else |
| { |
| beginMatcher.withOutgoingWindow(outgoingWindowMatcher); |
| } |
| |
| if(sendResponse) { |
| // The response will have its remoteChannel field dynamically set based on incoming value |
| final BeginFrame beginResponse = new BeginFrame() |
| .setNextOutgoingId(UnsignedInteger.ONE) |
| .setIncomingWindow(UnsignedInteger.ZERO) |
| .setOutgoingWindow(UnsignedInteger.ZERO); |
| |
| // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. |
| final FrameSender beginResponseSender = new FrameSender(this, FrameType.AMQP, -1, beginResponse, null); |
| beginResponseSender.setValueProvider(new ValueProvider() |
| { |
| @Override |
| public void setValues() |
| { |
| int actualChannel = beginMatcher.getActualChannel(); |
| |
| beginResponseSender.setChannel(actualChannel); |
| beginResponse.setRemoteChannel( |
| UnsignedShort.valueOf((short) actualChannel)); |
| |
| _lastInitiatedChannel = actualChannel; |
| } |
| }); |
| beginMatcher.onCompletion(beginResponseSender); |
| } |
| |
| addHandler(beginMatcher); |
| } |
| |
| public void expectEnd() |
| { |
| expectEnd(true); |
| } |
| |
| public void expectEnd(boolean sendResponse) |
| { |
| final EndMatcher endMatcher = new EndMatcher(); |
| |
| if (sendResponse) { |
| final EndFrame endResponse = new EndFrame(); |
| |
| // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. |
| final FrameSender frameSender = new FrameSender(this, FrameType.AMQP, -1, endResponse, null); |
| frameSender.setValueProvider(new ValueProvider() |
| { |
| @Override |
| public void setValues() |
| { |
| frameSender.setChannel(endMatcher.getActualChannel()); |
| } |
| }); |
| endMatcher.onCompletion(frameSender); |
| } |
| |
| addHandler(endMatcher); |
| } |
| |
| public void expectTempQueueCreationAttach(final String dynamicAddress) |
| { |
| expectTempNodeCreationAttach(dynamicAddress, AmqpDestinationHelper.TEMP_QUEUE_CAPABILITY, false, false, null, null); |
| } |
| |
| public void expectTempQueueCreationAttach(final String dynamicAddress, boolean sendReponse) |
| { |
| expectTempNodeCreationAttach(dynamicAddress, AmqpDestinationHelper.TEMP_QUEUE_CAPABILITY, sendReponse, false, false, null, null); |
| } |
| |
| public void expectTempTopicCreationAttach(final String dynamicAddress) |
| { |
| expectTempNodeCreationAttach(dynamicAddress, AmqpDestinationHelper.TEMP_TOPIC_CAPABILITY, false, false, null, null); |
| } |
| |
| public void expectTempTopicCreationAttach(final String dynamicAddress, boolean sendReponse) |
| { |
| expectTempNodeCreationAttach(dynamicAddress, AmqpDestinationHelper.TEMP_TOPIC_CAPABILITY, sendReponse, false, false, null, null); |
| } |
| |
| public void expectAndRefuseTempQueueCreationAttach(Symbol errorType, String errorMessage, boolean deferAttachResponseWrite) |
| { |
| expectTempNodeCreationAttach(null, AmqpDestinationHelper.TEMP_QUEUE_CAPABILITY, true, deferAttachResponseWrite, errorType, errorMessage); |
| } |
| |
| public void expectAndRefuseTempTopicCreationAttach(Symbol errorType, String errorMessage, boolean deferAttachResponseWrite) |
| { |
| expectTempNodeCreationAttach(null, AmqpDestinationHelper.TEMP_TOPIC_CAPABILITY, true, deferAttachResponseWrite, errorType, errorMessage); |
| } |
| |
| private void expectTempNodeCreationAttach(final String dynamicAddress, final Symbol nodeTypeCapability, final boolean refuseLink, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage) |
| { |
| expectTempNodeCreationAttach(dynamicAddress, nodeTypeCapability, true, refuseLink, deferAttachResponseWrite, errorType, errorMessage); |
| } |
| |
| private void expectTempNodeCreationAttach(final String dynamicAddress, final Symbol nodeTypeCapability, boolean sendResponse, final boolean refuseLink, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage) |
| { |
| TargetMatcher targetMatcher = new TargetMatcher(); |
| targetMatcher.withAddress(nullValue()); |
| targetMatcher.withDynamic(equalTo(true)); |
| targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); |
| targetMatcher.withExpiryPolicy(equalTo(TerminusExpiryPolicy.LINK_DETACH)); |
| targetMatcher.withDynamicNodeProperties(hasEntry(equalTo(DYNAMIC_NODE_LIFETIME_POLICY), new DeleteOnCloseMatcher())); |
| targetMatcher.withCapabilities(arrayContaining(nodeTypeCapability)); |
| |
| final AttachMatcher attachMatcher = new AttachMatcher() |
| .withName(notNullValue()) |
| .withHandle(notNullValue()) |
| .withRole(equalTo(Role.SENDER)) |
| .withSndSettleMode(equalTo(SenderSettleMode.UNSETTLED)) |
| .withRcvSettleMode(equalTo(ReceiverSettleMode.FIRST)) |
| .withSource(notNullValue()) |
| .withTarget(targetMatcher); |
| |
| if (sendResponse) |
| { |
| final AttachFrame attachResponse = new AttachFrame() |
| .setRole(Role.RECEIVER) |
| .setSndSettleMode(SenderSettleMode.UNSETTLED) |
| .setRcvSettleMode(ReceiverSettleMode.FIRST); |
| |
| // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. |
| final FrameSender attachResponseSender = new FrameSender(this, FrameType.AMQP, -1, attachResponse, null); |
| attachResponseSender.setValueProvider(new ValueProvider() |
| { |
| @Override |
| public void setValues() |
| { |
| Object receivedHandle = attachMatcher.getReceivedHandle(); |
| |
| attachResponseSender.setChannel(attachMatcher.getActualChannel()); |
| attachResponse.setHandle(receivedHandle); |
| attachResponse.setName(attachMatcher.getReceivedName()); |
| attachResponse.setSource(attachMatcher.getReceivedSource()); |
| |
| if (!refuseLink) { |
| Target t = (Target) createTargetObjectFromDescribedType(attachMatcher.getReceivedTarget()); |
| t.setAddress(dynamicAddress); |
| attachResponse.setTarget(t); |
| } else { |
| attachResponse.setTarget(null); |
| } |
| |
| _lastInitiatedLinkHandle = (UnsignedInteger) receivedHandle; |
| } |
| }); |
| |
| if (deferAttachResponseWrite) |
| { |
| // Defer writing the attach frame until the subsequent frame is also ready |
| attachResponseSender.setDeferWrite(true); |
| } |
| |
| CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable(); |
| composite.add(attachResponseSender); |
| |
| if (!refuseLink) { |
| final FlowFrame flowFrame = new FlowFrame().setNextIncomingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded |
| .setIncomingWindow(UnsignedInteger.valueOf(2048)) |
| .setNextOutgoingId(UnsignedInteger.ONE) //TODO: shouldn't be hard coded |
| .setOutgoingWindow(UnsignedInteger.valueOf(2048)) |
| .setLinkCredit(UnsignedInteger.valueOf(100)); |
| |
| // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. |
| final FrameSender flowFrameSender = new FrameSender(this, FrameType.AMQP, -1, flowFrame, null); |
| flowFrameSender.setValueProvider(new ValueProvider() |
| { |
| @Override |
| public void setValues() |
| { |
| flowFrameSender.setChannel(attachMatcher.getActualChannel()); |
| flowFrame.setHandle(attachMatcher.getReceivedHandle()); |
| flowFrame.setDeliveryCount(attachMatcher.getReceivedInitialDeliveryCount()); |
| } |
| }); |
| |
| composite.add(flowFrameSender); |
| } else { |
| final DetachFrame detachResponse = new DetachFrame().setClosed(true); |
| if (errorType != null) |
| { |
| org.apache.qpid.jms.test.testpeer.describedtypes.Error detachError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error(); |
| |
| detachError.setCondition(errorType); |
| detachError.setDescription(errorMessage); |
| |
| detachResponse.setError(detachError); |
| } |
| |
| // The response frame channel will be dynamically set based on the |
| // incoming frame. Using the -1 is an illegal placeholder. |
| final FrameSender detachResonseSender = new FrameSender(this, FrameType.AMQP, -1, detachResponse, null); |
| detachResonseSender.setValueProvider(new ValueProvider() { |
| @Override |
| public void setValues() { |
| detachResonseSender.setChannel(attachMatcher.getActualChannel()); |
| detachResponse.setHandle(attachMatcher.getReceivedHandle()); |
| } |
| }); |
| |
| composite.add(detachResonseSender); |
| } |
| |
| attachMatcher.onCompletion(composite); |
| } |
| |
| addHandler(attachMatcher); |
| } |
| |
| public void expectSenderAttach() |
| { |
| expectSenderAttach(notNullValue(), false, false); |
| } |
| |
| public void expectSettledSenderAttach() |
| { |
| expectSenderAttach(notNullValue(), notNullValue(), true, false, false, false, 0, DEFAULT_PRODUCER_CREDIT, null, null); |
| } |
| |
| public void expectSenderAttachWithoutGrantingCredit() |
| { |
| expectSenderAttach(notNullValue(), notNullValue(), false, false, false, 0, 0, null, null); |
| } |
| |
| public void expectSenderAttach(long creditFlowDelay) |
| { |
| expectSenderAttach(notNullValue(), notNullValue(), false, false, false, creditFlowDelay, DEFAULT_PRODUCER_CREDIT, null, null); |
| } |
| |
| public void expectSenderAttach(final Matcher<?> targetMatcher, final boolean refuseLink, boolean deferAttachResponseWrite) |
| { |
| expectSenderAttach(notNullValue(), targetMatcher, refuseLink, deferAttachResponseWrite); |
| } |
| |
| public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean refuseLink, boolean deferAttachResponseWrite) |
| { |
| expectSenderAttach(sourceMatcher, targetMatcher, refuseLink, false, deferAttachResponseWrite, 0, null, null); |
| } |
| |
| public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, long creditFlowDelay, Symbol errorType, String errorMessage) |
| { |
| expectSenderAttach(sourceMatcher, targetMatcher, refuseLink, omitDetach, deferAttachResponseWrite, creditFlowDelay, DEFAULT_PRODUCER_CREDIT, errorType, errorMessage); |
| } |
| |
| public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, long creditFlowDelay, int creditAmount, Symbol errorType, String errorMessage) |
| { |
| expectSenderAttach(sourceMatcher, targetMatcher, false, refuseLink, omitDetach, deferAttachResponseWrite, creditFlowDelay, creditAmount, errorType, errorMessage); |
| } |
| |
| public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean senderSettled, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, long creditFlowDelay, int creditAmount, Symbol errorType, String errorMessage) |
| { |
| expectSenderAttach(sourceMatcher, targetMatcher, senderSettled, refuseLink, omitDetach, deferAttachResponseWrite, creditFlowDelay, creditAmount, errorType, errorMessage, null, null); |
| } |
| |
| public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean senderSettled, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, long creditFlowDelay, int creditAmount, Symbol errorType, String errorMessage, Matcher<?> desiredCapabilitiesMatcher, Symbol[] offeredCapabilitiesResponse) |
| { |
| final AttachMatcher attachMatcher = new AttachMatcher() |
| .withName(notNullValue()) |
| .withHandle(notNullValue()) |
| .withRole(equalTo(Role.SENDER)) |
| .withSndSettleMode(equalTo(senderSettled ? SenderSettleMode.SETTLED : SenderSettleMode.UNSETTLED)) |
| .withRcvSettleMode(equalTo(ReceiverSettleMode.FIRST)) |
| .withSource(sourceMatcher) |
| .withTarget(targetMatcher); |
| |
| if(desiredCapabilitiesMatcher != null) { |
| attachMatcher.withDesiredCapabilities(desiredCapabilitiesMatcher); |
| } |
| |
| final AttachFrame attachResponse = new AttachFrame() |
| .setRole(Role.RECEIVER) |
| .setOfferedCapabilities(offeredCapabilitiesResponse) |
| .setSndSettleMode(senderSettled ? SenderSettleMode.SETTLED : SenderSettleMode.UNSETTLED) |
| .setRcvSettleMode(ReceiverSettleMode.FIRST); |
| |
| expectSenderAttach(attachMatcher, attachResponse, refuseLink, omitDetach, deferAttachResponseWrite, creditFlowDelay, creditAmount, errorType, errorMessage); |
| } |
| |
| public void expectSenderAttach(final AttachMatcher attachMatcher, final AttachFrame attachResponse, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, long creditFlowDelay, int creditAmount, Symbol errorType, String errorMessage) |
| { |
| // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. |
| final FrameSender attachResponseSender = new FrameSender(this, FrameType.AMQP, -1, attachResponse, null); |
| attachResponseSender.setValueProvider(new ValueProvider() |
| { |
| @Override |
| public void setValues() |
| { |
| Object receivedHandle = attachMatcher.getReceivedHandle(); |
| |
| attachResponseSender.setChannel(attachMatcher.getActualChannel()); |
| attachResponse.setHandle(receivedHandle); |
| attachResponse.setName(attachMatcher.getReceivedName()); |
| attachResponse.setSource(createSourceObjectFromDescribedType(attachMatcher.getReceivedSource())); |
| if(refuseLink) { |
| attachResponse.setTarget(null); |
| } else { |
| attachResponse.setTarget(createTargetObjectFromDescribedType(attachMatcher.getReceivedTarget())); |
| } |
| |
| _lastInitiatedLinkHandle = (UnsignedInteger) receivedHandle; |
| |
| Object target = createTargetObjectFromDescribedType(attachMatcher.getReceivedTarget()); |
| if (target instanceof Coordinator) |
| { |
| _lastInitiatedCoordinatorLinkHandle = (UnsignedInteger) receivedHandle; |
| } |
| } |
| }); |
| |
| if(deferAttachResponseWrite) |
| { |
| // Defer writing the attach frame until the subsequent frame is also ready |
| attachResponseSender.setDeferWrite(true); |
| } |
| |
| CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable(); |
| composite.add(attachResponseSender); |
| if (refuseLink) { |
| if (!omitDetach) { |
| final DetachFrame detachResponse = new DetachFrame().setClosed(true); |
| if (errorType != null) |
| { |
| org.apache.qpid.jms.test.testpeer.describedtypes.Error detachError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error(); |
| |
| detachError.setCondition(errorType); |
| detachError.setDescription(errorMessage); |
| |
| detachResponse.setError(detachError); |
| } |
| |
| // The response frame channel will be dynamically set based on the |
| // incoming frame. Using the -1 is an illegal placeholder. |
| final FrameSender detachResonseSender = new FrameSender(this, FrameType.AMQP, -1, detachResponse, null); |
| detachResonseSender.setValueProvider(new ValueProvider() { |
| @Override |
| public void setValues() { |
| detachResonseSender.setChannel(attachMatcher.getActualChannel()); |
| detachResponse.setHandle(attachMatcher.getReceivedHandle()); |
| } |
| }); |
| |
| composite.add(detachResonseSender); |
| } |
| } else { |
| final FlowFrame flowFrame = new FlowFrame().setNextIncomingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded |
| .setIncomingWindow(UnsignedInteger.valueOf(2048)) |
| .setNextOutgoingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded |
| .setOutgoingWindow(UnsignedInteger.valueOf(2048)) |
| .setLinkCredit(UnsignedInteger.valueOf(creditAmount)); |
| |
| // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. |
| final FrameSender flowFrameSender = new FrameSender(this, FrameType.AMQP, -1, flowFrame, null); |
| flowFrameSender.setValueProvider(new ValueProvider() |
| { |
| @Override |
| public void setValues() |
| { |
| flowFrameSender.setChannel(attachMatcher.getActualChannel()); |
| flowFrame.setHandle(attachMatcher.getReceivedHandle()); |
| flowFrame.setDeliveryCount(attachMatcher.getReceivedInitialDeliveryCount()); |
| } |
| }); |
| |
| flowFrameSender.setSendDelay(creditFlowDelay); |
| |
| composite.add(flowFrameSender); |
| } |
| |
| attachMatcher.onCompletion(composite); |
| |
| addHandler(attachMatcher); |
| } |
| |
| public void expectCoordinatorAttach() |
| { |
| expectCoordinatorAttach(false, false, null, null); |
| } |
| |
| public void expectCoordinatorAttach(SourceMatcher sourceMatcher) |
| { |
| expectCoordinatorAttach(sourceMatcher, false, false, null, null); |
| } |
| |
| public void expectCoordinatorAttach(boolean refuseLink, boolean deferAttachResponseWrite) |
| { |
| expectCoordinatorAttach(refuseLink, deferAttachResponseWrite, null, null); |
| } |
| |
| public void expectCoordinatorAttach(final boolean refuseLink, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage) |
| { |
| expectCoordinatorAttach(notNullValue(), refuseLink, deferAttachResponseWrite, errorType, errorMessage); |
| } |
| |
| private void expectCoordinatorAttach(Matcher<Object> sourceMatcher, final boolean refuseLink, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage) { |
| expectSenderAttach(sourceMatcher, new CoordinatorMatcher(), refuseLink, false, deferAttachResponseWrite, 0, errorType, errorMessage); |
| } |
| |
| public void expectQueueBrowserAttach() |
| { |
| expectReceiverAttach(notNullValue(), notNullValue(), true); |
| } |
| |
| public void expectReceiverAttach() |
| { |
| expectReceiverAttach(notNullValue(), notNullValue()); |
| } |
| |
| public void expectSettledReceiverAttach() |
| { |
| expectReceiverAttach(notNullValue(), notNullValue(), true, false, false, false, null, null); |
| } |
| |
| public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher) |
| { |
| expectReceiverAttach(linkNameMatcher, sourceMatcher, false, false, false, false, null, null); |
| } |
| |
| public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled) |
| { |
| expectReceiverAttach(linkNameMatcher, sourceMatcher, settled, false, false, false, null, null); |
| } |
| |
| public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean refuseLink, boolean deferAttachResponseWrite) |
| { |
| expectReceiverAttach(linkNameMatcher, sourceMatcher, false, refuseLink, false, deferAttachResponseWrite, null, null); |
| } |
| |
| public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink, boolean deferAttachResponseWrite) |
| { |
| expectReceiverAttach(linkNameMatcher, sourceMatcher, settled, refuseLink, false, deferAttachResponseWrite, null, null); |
| } |
| |
| public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink, |
| boolean omitDetach, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage) |
| { |
| expectReceiverAttach(linkNameMatcher, sourceMatcher, settled, refuseLink, omitDetach, deferAttachResponseWrite, errorType, errorMessage, null, null, null); |
| } |
| |
| private void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink, |
| boolean omitDetach, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage, final Source responseSourceOverride, |
| Matcher<?> desiredCapabilitiesMatcher, Symbol[] offeredCapabilitiesResponse) |
| { |
| final AttachMatcher attachMatcher = new AttachMatcher() |
| .withName(linkNameMatcher) |
| .withHandle(notNullValue()) |
| .withRole(equalTo(Role.RECEIVER)) |
| .withSndSettleMode(equalTo(settled ? SenderSettleMode.SETTLED : SenderSettleMode.UNSETTLED)) |
| .withRcvSettleMode(equalTo(ReceiverSettleMode.FIRST)) |
| .withSource(sourceMatcher) |
| .withTarget(notNullValue()); |
| |
| if(desiredCapabilitiesMatcher != null) { |
| attachMatcher.withDesiredCapabilities(desiredCapabilitiesMatcher); |
| } |
| |
| final AttachFrame attachResponse = new AttachFrame() |
| .setRole(Role.SENDER) |
| .setOfferedCapabilities(offeredCapabilitiesResponse) |
| .setSndSettleMode(settled ? SenderSettleMode.SETTLED : SenderSettleMode.UNSETTLED) |
| .setRcvSettleMode(ReceiverSettleMode.FIRST) |
| .setInitialDeliveryCount(UnsignedInteger.ZERO); |
| |
| expectReceiverAttach(attachMatcher, attachResponse, refuseLink, omitDetach, deferAttachResponseWrite, errorType, errorMessage, responseSourceOverride); |
| } |
| |
| private void expectReceiverAttach(final AttachMatcher attachMatcher, final AttachFrame attachResponse, final boolean refuseLink, boolean omitDetach, |
| boolean deferAttachResponseWrite, Symbol errorType, String errorMessage, final Source responseSourceOverride) |
| { |
| // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. |
| final FrameSender attachResponseSender = new FrameSender(this, FrameType.AMQP, -1, attachResponse, null); |
| attachResponseSender.setValueProvider(new ValueProvider() |
| { |
| @Override |
| public void setValues() |
| { |
| Object receivedHandle = attachMatcher.getReceivedHandle(); |
| |
| attachResponseSender.setChannel(attachMatcher.getActualChannel()); |
| attachResponse.setHandle(receivedHandle); |
| attachResponse.setName(attachMatcher.getReceivedName()); |
| attachResponse.setTarget(attachMatcher.getReceivedTarget()); |
| if(refuseLink) { |
| attachResponse.setSource(null); |
| } else if(responseSourceOverride != null){ |
| attachResponse.setSource(responseSourceOverride); |
| } else { |
| attachResponse.setSource(createSourceObjectFromDescribedType(attachMatcher.getReceivedSource())); |
| } |
| |
| _lastInitiatedLinkHandle = (UnsignedInteger) receivedHandle; |
| } |
| }); |
| |
| if(deferAttachResponseWrite) |
| { |
| // Defer writing the attach frame until the subsequent frame is also ready |
| attachResponseSender.setDeferWrite(true); |
| } |
| |
| CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable(); |
| composite.add(attachResponseSender); |
| |
| if (refuseLink && !omitDetach) |
| { |
| final DetachFrame detachResponse = new DetachFrame().setClosed(true); |
| if (errorType != null) |
| { |
| org.apache.qpid.jms.test.testpeer.describedtypes.Error detachError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error(); |
| |
| detachError.setCondition(errorType); |
| detachError.setDescription(errorMessage); |
| |
| detachResponse.setError(detachError); |
| } |
| |
| // The response frame channel will be dynamically set based on the |
| // incoming frame. Using the -1 is an illegal placeholder. |
| final FrameSender detachResponseSender = new FrameSender(this, FrameType.AMQP, -1, detachResponse, null); |
| detachResponseSender.setValueProvider(new ValueProvider() |
| { |
| @Override |
| public void setValues() |
| { |
| detachResponseSender.setChannel(attachMatcher.getActualChannel()); |
| detachResponse.setHandle(attachMatcher.getReceivedHandle()); |
| } |
| }); |
| |
| composite.add(detachResponseSender); |
| } |
| |
| attachMatcher.onCompletion(composite); |
| |
| addHandler(attachMatcher); |
| } |
| |
| public void expectSharedDurableSubscriberAttach(String topicName, String subscriptionName, Matcher<?> linkNameMatcher, boolean clientIdSet) { |
| expectSharedSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true, false, clientIdSet, false, false); |
| } |
| |
| public void expectSharedVolatileSubscriberAttach(String topicName, String subscriptionName, Matcher<?> linkNameMatcher, boolean clientIdSet) { |
| expectSharedSubscriberAttach(topicName, subscriptionName, linkNameMatcher, false, false, clientIdSet, false, false); |
| } |
| |
| public void expectSharedDurableSubscriberAttach(String topicName, String subscriptionName, Matcher<?> linkNameMatcher, boolean refuseLink, boolean clientIdSet) { |
| expectSharedSubscriberAttach(topicName, subscriptionName, linkNameMatcher, true, refuseLink, clientIdSet, false, false); |
| } |
| |
| public void expectSharedSubscriberAttach(String topicName, String subscriptionName, Matcher<?> linkNameMatcher, boolean durable, boolean refuseLink, |
| boolean clientIdSet, boolean expectLinkCapability, boolean responseOffersLinkCapability) |
| { |
| Symbol[] sourceCapabilities; |
| if(clientIdSet) { |
| sourceCapabilities = new Symbol[] { AmqpDestinationHelper.TOPIC_CAPABILITY, AmqpSupport.SHARED }; |
| } else { |
| sourceCapabilities = new Symbol[] { AmqpDestinationHelper.TOPIC_CAPABILITY, AmqpSupport.SHARED, AmqpSupport.GLOBAL }; |
| } |
| |
| SourceMatcher sourceMatcher = new SourceMatcher(); |
| sourceMatcher.withAddress(equalTo(topicName)); |
| sourceMatcher.withDynamic(equalTo(false)); |
| if(durable) { |
| //TODO: will possibly be changed to a 1/config durability |
| sourceMatcher.withDurable(equalTo(TerminusDurability.UNSETTLED_STATE)); |
| sourceMatcher.withExpiryPolicy(equalTo(TerminusExpiryPolicy.NEVER)); |
| } else { |
| sourceMatcher.withDurable(equalTo(TerminusDurability.NONE)); |
| sourceMatcher.withExpiryPolicy(equalTo(TerminusExpiryPolicy.LINK_DETACH)); |
| } |
| |
| sourceMatcher.withCapabilities(arrayContaining(sourceCapabilities)); |
| |
| // If we don't have the connection capability set we expect a desired link capability |
| Matcher<?> linkDesiredCapabilitiesMatcher; |
| if(expectLinkCapability) { |
| linkDesiredCapabilitiesMatcher = arrayContaining(new Symbol[] { AmqpSupport.SHARED_SUBS }); |
| } else { |
| linkDesiredCapabilitiesMatcher = nullValue(); |
| } |
| |
| // Generate offered capability response if supported |
| Symbol[] linkOfferedCapabilitiesResponse = null; |
| if(responseOffersLinkCapability) { |
| linkOfferedCapabilitiesResponse = new Symbol[] { AmqpSupport.SHARED_SUBS }; |
| } |
| |
| expectReceiverAttach(linkNameMatcher, sourceMatcher, false, refuseLink, false, false, null, null, null, linkDesiredCapabilitiesMatcher, linkOfferedCapabilitiesResponse); |
| } |
| |
| public void expectDurableSubscriberAttach(String topicName, String subscriptionName) |
| { |
| SourceMatcher sourceMatcher = new SourceMatcher(); |
| sourceMatcher.withAddress(equalTo(topicName)); |
| sourceMatcher.withDynamic(equalTo(false)); |
| //TODO: will possibly be changed to a 1/config durability |
| sourceMatcher.withDurable(equalTo(TerminusDurability.UNSETTLED_STATE)); |
| sourceMatcher.withExpiryPolicy(equalTo(TerminusExpiryPolicy.NEVER)); |
| |
| sourceMatcher.withCapabilities(arrayContaining(AmqpDestinationHelper.TOPIC_CAPABILITY)); |
| |
| expectReceiverAttach(equalTo(subscriptionName), sourceMatcher); |
| } |
| |
| public void expectDurableSubUnsubscribeNullSourceLookup(boolean failLookup, boolean shared, String subscriptionName, String topicName, boolean hasClientID) { |
| String linkName = subscriptionName; |
| if(!hasClientID) { |
| linkName += AmqpSupport.SUB_NAME_DELIMITER + "global"; |
| } |
| |
| Matcher<String> linkNameMatcher = equalTo(linkName); |
| Matcher<Object> nullSourceMatcher = nullValue(); |
| |
| Source responseSourceOverride = null; |
| Symbol errorType = null; |
| String errorMessage = null; |
| |
| if(failLookup){ |
| errorType = AmqpError.NOT_FOUND; |
| errorMessage = "No subscription link found"; |
| } else { |
| responseSourceOverride = new Source(); |
| responseSourceOverride.setAddress(topicName); |
| responseSourceOverride.setDynamic(false); |
| //TODO: will possibly be changed to a 1/config durability |
| responseSourceOverride.setDurable(TerminusDurability.UNSETTLED_STATE); |
| responseSourceOverride.setExpiryPolicy(TerminusExpiryPolicy.NEVER); |
| |
| if(shared) { |
| if(hasClientID) { |
| responseSourceOverride.setCapabilities(new Symbol[]{SHARED}); |
| } else { |
| responseSourceOverride.setCapabilities(new Symbol[]{SHARED, GLOBAL}); |
| } |
| } |
| } |
| |
| // If we don't have a ClientID, expect link capabilities to hint that we are trying |
| // to reattach to a 'global' shared subscription. |
| Matcher<?> linkDesiredCapabilitiesMatcher = null; |
| if(!hasClientID) { |
| linkDesiredCapabilitiesMatcher = arrayContaining(new Symbol[] { SHARED, GLOBAL }); |
| } |
| |
| expectReceiverAttach(linkNameMatcher, nullSourceMatcher, false, failLookup, false, false, errorType, errorMessage, responseSourceOverride, linkDesiredCapabilitiesMatcher, null); |
| } |
| |
| public void expectDetach(boolean expectClosed, boolean sendResponse, boolean replyClosed) |
| { |
| expectDetach(expectClosed, sendResponse, replyClosed, null, null); |
| } |
| |
| public void expectDetach(boolean expectClosed, boolean sendResponse, boolean replyClosed, Symbol errorType, String errorMessage) |
| { |
| Matcher<Boolean> closeMatcher = null; |
| if(expectClosed) |
| { |
| closeMatcher = equalTo(true); |
| } |
| else |
| { |
| closeMatcher = Matchers.anyOf(equalTo(false), nullValue()); |
| } |
| |
| final DetachMatcher detachMatcher = new DetachMatcher().withClosed(closeMatcher); |
| |
| if (sendResponse) |
| { |
| final DetachFrame detachResponse = new DetachFrame(); |
| if(replyClosed) |
| { |
| detachResponse.setClosed(replyClosed); |
| } |
| |
| if (errorType != null) { |
| org.apache.qpid.jms.test.testpeer.describedtypes.Error detachError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error(); |
| detachError.setCondition(errorType); |
| detachError.setDescription(errorMessage); |
| detachResponse.setError(detachError); |
| } else { |
| detachResponse.setError(null); |
| } |
| |
| // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. |
| final FrameSender detachResponseSender = new FrameSender(this, FrameType.AMQP, -1, detachResponse, null); |
| detachResponseSender.setValueProvider(new ValueProvider() |
| { |
| @Override |
| public void setValues() |
| { |
| detachResponseSender.setChannel(detachMatcher.getActualChannel()); |
| detachResponse.setHandle(detachMatcher.getReceivedHandle()); |
| } |
| }); |
| |
| detachMatcher.onCompletion(detachResponseSender); |
| } |
| |
| addHandler(detachMatcher); |
| } |
| |
| public void expectSessionFlow() |
| { |
| final FlowMatcher flowMatcher = new FlowMatcher() |
| .withLinkCredit(Matchers.nullValue()) |
| .withHandle(Matchers.nullValue()); |
| |
| addHandler(flowMatcher); |
| } |
| |
| public void expectLinkFlow() |
| { |
| expectLinkFlow(false, false, Matchers.greaterThan(UnsignedInteger.ZERO)); |
| } |
| |
| public void expectLinkFlow(boolean drain, Matcher<UnsignedInteger> creditMatcher) |
| { |
| expectLinkFlow(drain, false, creditMatcher); |
| } |
| |
| public void expectLinkFlow(boolean drain, boolean sendDrainFlowResponse, Matcher<UnsignedInteger> creditMatcher) |
| { |
| expectLinkFlowRespondWithTransfer(null, null, null, null, null, 0, drain, sendDrainFlowResponse, creditMatcher, null, false, false); |
| } |
| |
| public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType, |
| final MessageAnnotationsDescribedType messageAnnotationsDescribedType, |
| final PropertiesDescribedType propertiesDescribedType, |
| final ApplicationPropertiesDescribedType appPropertiesDescribedType, |
| final DescribedType content) |
| { |
| expectLinkFlowRespondWithTransfer(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, |
| appPropertiesDescribedType, content, 1); |
| } |
| |
| public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType, |
| final MessageAnnotationsDescribedType messageAnnotationsDescribedType, |
| final PropertiesDescribedType propertiesDescribedType, |
| final ApplicationPropertiesDescribedType appPropertiesDescribedType, |
| final DescribedType content, |
| final boolean sendSettled) |
| { |
| expectLinkFlowRespondWithTransfer(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, |
| appPropertiesDescribedType, content, 1, false, false, |
| Matchers.greaterThanOrEqualTo(UnsignedInteger.valueOf(1)), 1, sendSettled, false); |
| } |
| |
| public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType, |
| final MessageAnnotationsDescribedType messageAnnotationsDescribedType, |
| final PropertiesDescribedType propertiesDescribedType, |
| final ApplicationPropertiesDescribedType appPropertiesDescribedType, |
| final DescribedType content, |
| final int count) |
| { |
| expectLinkFlowRespondWithTransfer(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, |
| appPropertiesDescribedType, content, count, false, false, |
| Matchers.greaterThanOrEqualTo(UnsignedInteger.valueOf(count)), 1, false, false); |
| } |
| |
| public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType, |
| final MessageAnnotationsDescribedType messageAnnotationsDescribedType, |
| final PropertiesDescribedType propertiesDescribedType, |
| ApplicationPropertiesDescribedType appPropertiesDescribedType, |
| final DescribedType content, |
| final int count, |
| final boolean drain, |
| final boolean sendDrainFlowResponse, |
| Matcher<UnsignedInteger> creditMatcher, |
| final Integer nextIncomingId, |
| boolean addMessageNumberProperty) |
| { |
| expectLinkFlowRespondWithTransfer(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, |
| appPropertiesDescribedType, content, count, drain, sendDrainFlowResponse, |
| creditMatcher, nextIncomingId, false, addMessageNumberProperty); |
| } |
| |
| public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType, |
| final MessageAnnotationsDescribedType messageAnnotationsDescribedType, |
| final PropertiesDescribedType propertiesDescribedType, |
| ApplicationPropertiesDescribedType appPropertiesDescribedType, |
| final DescribedType content, |
| final int count, |
| final boolean drain, |
| final boolean sendDrainFlowResponse, |
| Matcher<UnsignedInteger> creditMatcher, |
| final Integer nextIncomingId, |
| final boolean sendSettled, |
| boolean addMessageNumberProperty) |
| { |
| expectLinkFlowAndSendBackMessages(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, |
| appPropertiesDescribedType, content, count, drain, sendDrainFlowResponse, |
| creditMatcher, nextIncomingId, sendSettled, addMessageNumberProperty, 0, false); |
| } |
| |
| public void expectLinkFlowAndSendBackMessages(final HeaderDescribedType headerDescribedType, |
| final MessageAnnotationsDescribedType messageAnnotationsDescribedType, |
| final PropertiesDescribedType propertiesDescribedType, |
| ApplicationPropertiesDescribedType appPropertiesDescribedType, |
| final DescribedType content, |
| final int count, |
| final boolean drain, |
| final boolean sendDrainFlowResponse, |
| Matcher<UnsignedInteger> creditMatcher, |
| final Integer nextIncomingId, |
| final boolean sendSettled, |
| boolean addMessageNumberProperty, |
| int msgPayloadPerFrame, |
| boolean sendFinalTransferFrameWithoutPayload) |
| { |
| if (nextIncomingId == null && count > 0) |
| { |
| throw new IllegalArgumentException("The remote NextIncomingId must be specified if transfers have been requested"); |
| } |
| |
| Matcher<Boolean> drainMatcher = null; |
| if(drain) |
| { |
| drainMatcher = equalTo(true); |
| } |
| else |
| { |
| drainMatcher = Matchers.anyOf(equalTo(false), nullValue()); |
| } |
| |
| Matcher<UnsignedInteger> remoteNextIncomingIdMatcher = null; |
| if(nextIncomingId != null) |
| { |
| remoteNextIncomingIdMatcher = Matchers.equalTo(UnsignedInteger.valueOf(nextIncomingId)); |
| } |
| else |
| { |
| remoteNextIncomingIdMatcher = Matchers.greaterThanOrEqualTo(UnsignedInteger.ONE); |
| } |
| |
| final FlowMatcher flowMatcher = new FlowMatcher() |
| .withLinkCredit(creditMatcher) |
| .withDrain(drainMatcher) |
| .withNextIncomingId(remoteNextIncomingIdMatcher); |
| |
| CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable(); |
| boolean addComposite = false; |
| |
| if (appPropertiesDescribedType == null && addMessageNumberProperty) { |
| appPropertiesDescribedType = new ApplicationPropertiesDescribedType(); |
| } |
| |
| for(int i = 0; i < count; i++) |
| { |
| final int nextId = nextIncomingId + i; |
| |
| String tagString = "theDeliveryTag" + nextId; |
| Binary dtag = new Binary(tagString.getBytes()); |
| |
| if(addMessageNumberProperty) { |
| appPropertiesDescribedType.setApplicationProperty(MESSAGE_NUMBER, i); |
| } |
| |
| |
| Binary payload = prepareTransferPayload(headerDescribedType, messageAnnotationsDescribedType, |
| propertiesDescribedType, appPropertiesDescribedType, content); |
| |
| int length = payload.getLength(); |
| int sent = 0; |
| |
| while (sent < length) { |
| final TransferFrame transferFrame = new TransferFrame() |
| .setDeliveryId(UnsignedInteger.valueOf(nextId)) |
| .setDeliveryTag(dtag) |
| .setMessageFormat(UnsignedInteger.ZERO) |
| .setSettled(sendSettled); |
| |
| int remaining = length - sent; |
| Binary chunk; |
| if(msgPayloadPerFrame != 0 && msgPayloadPerFrame < length) { |
| int chunkSize = Math.min(msgPayloadPerFrame, remaining); |
| chunk = payload.subBinary(sent, chunkSize); |
| sent += chunkSize; |
| } else { |
| chunk = payload; |
| sent = length; |
| } |
| |
| if(sent < length || (sent == length && sendFinalTransferFrameWithoutPayload)) { |
| // Indicate more frames if there is payload left, or we want to send a final transfer without payload |
| transferFrame.setMore(true); |
| } |
| |
| // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. |
| final FrameSender transferResponseSender = new FrameSender(this, FrameType.AMQP, -1, transferFrame, chunk); |
| transferResponseSender.setValueProvider(new ValueProvider() |
| { |
| @Override |
| public void setValues() |
| { |
| transferFrame.setHandle(flowMatcher.getReceivedHandle()); |
| transferResponseSender.setChannel(flowMatcher.getActualChannel()); |
| } |
| }); |
| |
| composite.add(transferResponseSender); |
| } |
| |
| if(sendFinalTransferFrameWithoutPayload) { |
| sendEmptyFinalTransfer(composite, flowMatcher, nextId, dtag, sendSettled); |
| } |
| |
| addComposite = true; |
| } |
| |
| if(drain && sendDrainFlowResponse) |
| { |
| final FlowFrame drainResponse = new FlowFrame(); |
| drainResponse.setOutgoingWindow(UnsignedInteger.ZERO); //TODO: shouldnt be hard coded |
| drainResponse.setIncomingWindow(UnsignedInteger.valueOf(Integer.MAX_VALUE)); //TODO: shouldnt be hard coded |
| drainResponse.setLinkCredit(UnsignedInteger.ZERO); |
| drainResponse.setDrain(true); |
| |
| // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. |
| final FrameSender flowResponseSender = new FrameSender(this, FrameType.AMQP, -1, drainResponse, null); |
| flowResponseSender.setValueProvider(new ValueProvider() |
| { |
| @Override |
| public void setValues() |
| { |
| flowResponseSender.setChannel(flowMatcher.getActualChannel()); |
| drainResponse.setHandle(flowMatcher.getReceivedHandle()); |
| drainResponse.setDeliveryCount(calculateNewDeliveryCount(flowMatcher)); |
| drainResponse.setNextOutgoingId(calculateNewOutgoingId(flowMatcher, count)); |
| drainResponse.setNextIncomingId(flowMatcher.getReceivedNextOutgoingId()); |
| } |
| }); |
| |
| addComposite = true; |
| composite.add(flowResponseSender); |
| } |
| |
| if(addComposite) { |
| flowMatcher.onCompletion(composite); |
| } |
| |
| addHandler(flowMatcher); |
| } |
| |
| private void sendEmptyFinalTransfer(CompositeAmqpPeerRunnable composite, final FlowMatcher flowMatcher, final int deliveryId, Binary dTag, final boolean settled) { |
| final TransferFrame transferFrame = new TransferFrame() |
| .setDeliveryId(UnsignedInteger.valueOf(deliveryId)) |
| .setDeliveryTag(dTag) |
| .setMessageFormat(UnsignedInteger.ZERO) |
| .setSettled(settled); |
| |
| // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. |
| final FrameSender finalEmptyTransferSender = new FrameSender(this, FrameType.AMQP, -1, transferFrame, null); |
| finalEmptyTransferSender.setValueProvider(new ValueProvider() |
| { |
| @Override |
| public void setValues() |
| { |
| transferFrame.setHandle(flowMatcher.getReceivedHandle()); |
| finalEmptyTransferSender.setChannel(flowMatcher.getActualChannel()); |
| } |
| }); |
| |
| composite.add(finalEmptyTransferSender); |
| } |
| |
| public void expectLinkFlowThenPerformUnexpectedDeliveryCountAdvanceThenCreditTopupThenTransfers(final int prefetch, final int topUp, final int messageCount) |
| { |
| final FlowMatcher flowMatcher = new FlowMatcher() |
| .withHandle(notNullValue()) |
| .withLinkCredit(equalTo(UnsignedInteger.valueOf(prefetch))) |
| .withDrain(Matchers.anyOf(equalTo(false), nullValue())); |
| |
| final FlowFrame advancingFlowResponse = new FlowFrame(); |
| advancingFlowResponse.setOutgoingWindow(UnsignedInteger.MAX_VALUE); |
| advancingFlowResponse.setIncomingWindow(UnsignedInteger.valueOf(Integer.MAX_VALUE)); |
| |
| // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. |
| final FrameSender advancingFlowResponseSender = new FrameSender(this, FrameType.AMQP, -1, advancingFlowResponse, null); |
| advancingFlowResponseSender.setValueProvider(new ValueProvider() |
| { |
| @Override |
| public void setValues() |
| { |
| advancingFlowResponseSender.setChannel(flowMatcher.getActualChannel()); |
| advancingFlowResponse.setHandle(flowMatcher.getReceivedHandle()); |
| advancingFlowResponse.setDeliveryCount(calculateNewDeliveryCount(flowMatcher, prefetch - topUp)); |
| advancingFlowResponse.setLinkCredit(UnsignedInteger.ZERO); |
| advancingFlowResponse.setNextOutgoingId(flowMatcher.getReceivedNextIncomingId()); |
| advancingFlowResponse.setNextIncomingId(flowMatcher.getReceivedNextOutgoingId()); |
| } |
| }); |
| |
| final FlowFrame topUpFlowResponse = new FlowFrame(); |
| topUpFlowResponse.setOutgoingWindow(UnsignedInteger.MAX_VALUE); |
| topUpFlowResponse.setIncomingWindow(UnsignedInteger.valueOf(Integer.MAX_VALUE)); |
| |
| // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. |
| final FrameSender topUpFlowResponseSender = new FrameSender(this, FrameType.AMQP, -1, topUpFlowResponse, null); |
| topUpFlowResponseSender.setValueProvider(new ValueProvider() |
| { |
| @Override |
| public void setValues() |
| { |
| topUpFlowResponseSender.setChannel(flowMatcher.getActualChannel()); |
| topUpFlowResponse.setHandle(flowMatcher.getReceivedHandle()); |
| topUpFlowResponse.setDeliveryCount(calculateNewDeliveryCount(flowMatcher, prefetch - topUp)); |
| topUpFlowResponse.setLinkCredit(UnsignedInteger.valueOf(topUp)); |
| topUpFlowResponse.setNextOutgoingId(flowMatcher.getReceivedNextIncomingId()); |
| topUpFlowResponse.setNextIncomingId(flowMatcher.getReceivedNextOutgoingId()); |
| } |
| }); |
| |
| CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable(); |
| composite.add(advancingFlowResponseSender); |
| advancingFlowResponseSender.setDeferWrite(true); |
| composite.add(topUpFlowResponseSender); |
| |
| final Integer remoteNextIncomingId = 0; //TODO: make configurable |
| for(int i = 0; i < messageCount; i++) |
| { |
| final int nextId = remoteNextIncomingId + i; |
| |
| String tagString = "theDeliveryTag" + nextId; |
| Binary dtag = new Binary(tagString.getBytes()); |
| |
| final TransferFrame transferResponse = new TransferFrame() |
| .setDeliveryId(UnsignedInteger.valueOf(nextId)) |
| .setDeliveryTag(dtag) |
| .setMessageFormat(UnsignedInteger.ZERO) |
| .setSettled(false); |
| |
| Binary payload = prepareTransferPayload(null, null, null, null, new AmqpValueDescribedType("content")); |
| |
| // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. |
| final FrameSender transferResponseSender = new FrameSender(this, FrameType.AMQP, -1, transferResponse, payload); |
| transferResponseSender.setValueProvider(new ValueProvider() |
| { |
| @Override |
| public void setValues() |
| { |
| transferResponse.setHandle(flowMatcher.getReceivedHandle()); |
| transferResponseSender.setChannel(flowMatcher.getActualChannel()); |
| } |
| }); |
| |
| if(i != messageCount -1) { |
| // Ensure all but the last transfer are set to defer, ensure they go in one write. |
| transferResponseSender.setDeferWrite(true); |
| } |
| |
| composite.add(transferResponseSender); |
| } |
| |
| flowMatcher.onCompletion(composite); |
| |
| addHandler(flowMatcher); |
| } |
| |
| private UnsignedInteger calculateNewDeliveryCount(FlowMatcher flowMatcher) { |
| UnsignedInteger dc = (UnsignedInteger) flowMatcher.getReceivedDeliveryCount(); |
| UnsignedInteger lc = (UnsignedInteger) flowMatcher.getReceivedLinkCredit(); |
| |
| return dc.add(lc); |
| } |
| |
| private UnsignedInteger calculateNewDeliveryCount(FlowMatcher flowMatcher, int creditToConsume) { |
| UnsignedInteger dc = (UnsignedInteger) flowMatcher.getReceivedDeliveryCount(); |
| UnsignedInteger lc = (UnsignedInteger) flowMatcher.getReceivedLinkCredit(); |
| |
| assertThat(UnsignedInteger.valueOf(creditToConsume), lessThan(lc)); |
| |
| return dc.add(UnsignedInteger.valueOf(creditToConsume)); |
| } |
| |
| private UnsignedInteger calculateNewOutgoingId(FlowMatcher flowMatcher, int sentCount) { |
| UnsignedInteger nid = (UnsignedInteger) flowMatcher.getReceivedNextIncomingId(); |
| |
| return nid.add(UnsignedInteger.valueOf(sentCount)); |
| } |
| |
| /** |
| * Encodes and returns transfer payload Binary, or null if no message sections were supplied. |
| */ |
| private Binary prepareTransferPayload(final HeaderDescribedType headerDescribedType, |
| final MessageAnnotationsDescribedType messageAnnotationsDescribedType, |
| final PropertiesDescribedType propertiesDescribedType, |
| final ApplicationPropertiesDescribedType appPropertiesDescribedType, |
| final DescribedType content) |
| { |
| Data payloadData = Data.Factory.create(); |
| boolean hasSection = false; |
| |
| if(headerDescribedType != null) |
| { |
| hasSection = true; |
| payloadData.putDescribedType(headerDescribedType); |
| } |
| |
| if(messageAnnotationsDescribedType != null) |
| { |
| hasSection = true; |
| payloadData.putDescribedType(messageAnnotationsDescribedType); |
| } |
| |
| if(propertiesDescribedType != null) |
| { |
| hasSection = true; |
| payloadData.putDescribedType(propertiesDescribedType); |
| } |
| |
| if(appPropertiesDescribedType != null) |
| { |
| hasSection = true; |
| payloadData.putDescribedType(appPropertiesDescribedType); |
| } |
| |
| if(content != null) |
| { |
| hasSection = true; |
| payloadData.putDescribedType(content); |
| } |
| |
| if (hasSection) |
| { |
| return payloadData.encode(); |
| } |
| else |
| { |
| return null; |
| } |
| } |
| |
| public void expectTransferButDoNotRespond(Matcher<Binary> expectedPayloadMatcher) |
| { |
| expectTransfer(expectedPayloadMatcher, nullValue(), false, false, null, false); |
| } |
| |
| public void expectTransferButDoNotRespond(Matcher<Binary> expectedPayloadMatcher, Matcher<Binary> deliveryTagMatcher) |
| { |
| expectTransfer(expectedPayloadMatcher, deliveryTagMatcher, nullValue(), false, false, null, false, 0, 0); |
| } |
| |
| public void expectTransfer(Matcher<Binary> expectedPayloadMatcher) |
| { |
| expectTransfer(expectedPayloadMatcher, nullValue(), false, true, new Accepted(), true); |
| } |
| |
| public void expectTransfer(Matcher<Binary> expectedPayloadMatcher, Matcher<Binary> deliveryTagMatcher) |
| { |
| expectTransfer(expectedPayloadMatcher, deliveryTagMatcher, nullValue(), false, true, new Accepted(), true, 0, 0); |
| } |
| |
| public void expectTransfer(int frameSize) |
| { |
| expectTransfer(null, nullValue(), false, true, new Accepted(), true, frameSize, 0); |
| } |
| |
| public void expectTransfer(Matcher<Binary> expectedPayloadMatcher, Matcher<?> stateMatcher, |
| ListDescribedType responseState, boolean responseSettled) |
| { |
| expectTransfer(expectedPayloadMatcher, stateMatcher, false, true, responseState, responseSettled); |
| } |
| |
| public void expectTransfer(Matcher<Binary> expectedPayloadMatcher, Matcher<?> stateMatcher, boolean settled, |
| boolean sendResponseDisposition, ListDescribedType responseState, boolean responseSettled) |
| { |
| expectTransfer(expectedPayloadMatcher, stateMatcher, settled, sendResponseDisposition, responseState, responseSettled, 0, 0); |
| } |
| |
| public void expectTransfer(Matcher<Binary> expectedPayloadMatcher, |
| Matcher<?> stateMatcher, boolean settled, boolean sendResponseDisposition, ListDescribedType responseState, |
| boolean responseSettled, int frameSize, long dispositionDelay) |
| { |
| expectTransfer(expectedPayloadMatcher, null, stateMatcher, settled, sendResponseDisposition, responseState, responseSettled, frameSize, dispositionDelay); |
| } |
| |
| //TODO: fix responseState to only admit applicable types. |
| public void expectTransfer(Matcher<Binary> expectedPayloadMatcher, Matcher<Binary> deliveryTagMatcher, |
| Matcher<?> stateMatcher, boolean settled, boolean sendResponseDisposition, ListDescribedType responseState, |
| boolean responseSettled, int frameSize, long dispositionDelay) |
| { |
| Matcher<Boolean> settledMatcher = null; |
| if(settled) |
| { |
| settledMatcher = equalTo(true); |
| } |
| else |
| { |
| settledMatcher = Matchers.anyOf(equalTo(false), nullValue()); |
| } |
| |
| final TransferMatcher transferMatcher = new TransferMatcher(frameSize); |
| transferMatcher.setPayloadMatcher(expectedPayloadMatcher); |
| transferMatcher.withSettled(settledMatcher); |
| transferMatcher.withState(stateMatcher); |
| if (deliveryTagMatcher != null) { |
| transferMatcher.withDeliveryTag(deliveryTagMatcher); |
| } |
| |
| if(sendResponseDisposition) { |
| final DispositionFrame dispositionResponse = new DispositionFrame() |
| .setRole(Role.RECEIVER) |
| .setSettled(responseSettled) |
| .setState(responseState); |
| |
| // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. |
| final FrameSender dispositionFrameSender = new FrameSender(this, FrameType.AMQP, -1, dispositionResponse, null); |
| dispositionFrameSender.setSendDelay(dispositionDelay); |
| dispositionFrameSender.setValueProvider(new ValueProvider() |
| { |
| @Override |
| public void setValues() |
| { |
| dispositionFrameSender.setChannel(transferMatcher.getActualChannel()); |
| dispositionResponse.setFirst(transferMatcher.getReceivedDeliveryId()); |
| } |
| }); |
| |
| transferMatcher.onCompletion(dispositionFrameSender); |
| } |
| |
| addHandler(transferMatcher); |
| } |
| |
| public void expectTransferRespondWithDrain(Matcher<Binary> expectedPayloadMatcher, int sentMessages) |
| { |
| expectTransferRespondWithDrain(expectedPayloadMatcher, DEFAULT_PRODUCER_CREDIT, sentMessages); |
| } |
| |
| public void expectTransferRespondWithDrain(Matcher<Binary> expectedPayloadMatcher, int originalCredit, int sentMessages) |
| { |
| Matcher<Boolean> settledMatcher = Matchers.anyOf(equalTo(false), nullValue()); |
| |
| final TransferMatcher transferMatcher = new TransferMatcher(0); |
| transferMatcher.setPayloadMatcher(expectedPayloadMatcher); |
| transferMatcher.withSettled(settledMatcher); |
| transferMatcher.withState(nullValue()); |
| |
| CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable(); |
| final DispositionFrame dispositionResponse = new DispositionFrame().setRole(Role.RECEIVER).setSettled(true).setState(new Accepted()); |
| |
| // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. |
| final FrameSender dispositionFrameSender = new FrameSender(this, FrameType.AMQP, -1, dispositionResponse, null); |
| dispositionFrameSender.setValueProvider(new ValueProvider() |
| { |
| @Override |
| public void setValues() |
| { |
| dispositionFrameSender.setChannel(transferMatcher.getActualChannel()); |
| dispositionResponse.setFirst(transferMatcher.getReceivedDeliveryId()); |
| } |
| }); |
| |
| final FlowFrame flowFrame = new FlowFrame().setNextIncomingId(UnsignedInteger.ONE.add(UnsignedInteger.valueOf(sentMessages))) //TODO: start point shouldnt be hard coded |
| .setIncomingWindow(UnsignedInteger.valueOf(2048)) |
| .setNextOutgoingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded |
| .setOutgoingWindow(UnsignedInteger.valueOf(2048)) |
| .setDeliveryCount(UnsignedInteger.valueOf(sentMessages)) |
| .setLinkCredit(UnsignedInteger.valueOf(originalCredit - sentMessages)) |
| .setDrain(true); |
| |
| // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. |
| final FrameSender flowFrameSender = new FrameSender(this, FrameType.AMQP, -1, flowFrame, null); |
| flowFrameSender.setValueProvider(new ValueProvider() |
| { |
| @Override |
| public void setValues() |
| { |
| flowFrameSender.setChannel(transferMatcher.getActualChannel()); |
| flowFrame.setHandle(transferMatcher.getReceivedHandle()); |
| } |
| }); |
| |
| composite.add(flowFrameSender); |
| composite.add(dispositionFrameSender); |
| |
| transferMatcher.onCompletion(composite); |
| |
| addHandler(transferMatcher); |
| } |
| |
| public void expectDeclare(Binary txnId) |
| { |
| TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); |
| declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); |
| |
| expectTransfer(declareMatcher, nullValue(), new Declared().setTxnId(txnId), true); |
| } |
| |
| public void expectDeclareButDoNotRespond() |
| { |
| TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); |
| declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); |
| |
| expectTransfer(declareMatcher, nullValue(), false, false, null, false); |
| } |
| |
| public void expectDeclareAndReject() |
| { |
| TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); |
| declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(new Declare())); |
| |
| expectTransfer(declareMatcher, nullValue(), new Rejected(), true); |
| } |
| |
| public void expectDischarge(Binary txnId, boolean dischargeState) { |
| expectDischarge(txnId, dischargeState, new Accepted()); |
| } |
| |
| public void expectDischarge(Binary txnId, boolean dischargeState, ListDescribedType responseState) { |
| // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId, |
| // and reply with given response and settled disposition to indicate the outcome. |
| Discharge discharge = new Discharge(); |
| discharge.setFail(dischargeState); |
| discharge.setTxnId(txnId); |
| |
| TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher(); |
| dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge)); |
| |
| expectTransfer(dischargeMatcher, nullValue(), responseState, true); |
| } |
| |
| public void expectDischargeButDoNotRespond(Binary txnId, boolean dischargeState) { |
| // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId, |
| // and reply with given response and settled disposition to indicate the outcome. |
| Discharge discharge = new Discharge(); |
| discharge.setFail(dischargeState); |
| discharge.setTxnId(txnId); |
| |
| TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher(); |
| dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge)); |
| |
| expectTransfer(dischargeMatcher, nullValue(), false, false, null, true); |
| } |
| |
| public void remotelyCloseLastCoordinatorLink() |
| { |
| remotelyCloseLastCoordinatorLink(true, true, TransactionError.TRANSACTION_ROLLBACK, "Discharge of TX failed."); |
| } |
| |
| public void remotelyCloseLastCoordinatorLink(Symbol errorType, String errorMessage) |
| { |
| remotelyCloseLastCoordinatorLink(true, true, errorType, errorMessage); |
| } |
| |
| public void remotelyCloseLastCoordinatorLink(boolean expectDetachResponse, boolean closed, Symbol errorType, String errorMessage) |
| { |
| // Now remotely end the last attached transaction coordinator |
| synchronized (_handlersLock) { |
| CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler(); |
| |
| // Now generate the Detach for the appropriate link on the appropriate session |
| final DetachFrame detachFrame = new DetachFrame(); |
| detachFrame.setClosed(true); |
| if (errorType != null) { |
| org.apache.qpid.jms.test.testpeer.describedtypes.Error detachError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error(); |
| detachError.setCondition(errorType); |
| detachError.setDescription(errorMessage); |
| detachFrame.setError(detachError); |
| } |
| |
| // The response frame channel will be dynamically set based on the previous frames. Using the -1 is an illegal placeholder. |
| final FrameSender frameSender = new FrameSender(this, FrameType.AMQP, -1, detachFrame, null); |
| frameSender.setValueProvider(new ValueProvider() { |
| @Override |
| public void setValues() { |
| frameSender.setChannel(_lastInitiatedChannel); |
| detachFrame.setHandle(_lastInitiatedCoordinatorLinkHandle); |
| } |
| }); |
| comp.add(frameSender); |
| |
| if (expectDetachResponse) { |
| Matcher<Boolean> closeMatcher = null; |
| if (closed) { |
| closeMatcher = equalTo(true); |
| } else { |
| closeMatcher = Matchers.anyOf(equalTo(false), nullValue()); |
| } |
| |
| // Expect a response to our Detach. |
| final DetachMatcher detachMatcher = new DetachMatcher().withClosed(closeMatcher); |
| // TODO: enable matching on the channel number of the response. |
| addHandler(detachMatcher); |
| } |
| } |
| } |
| |
| public void remotelyCloseLastCoordinatorLinkOnDischarge(Binary txnId, boolean dischargeState) |
| { |
| remotelyCloseLastCoordinatorLinkOnDischarge(txnId, dischargeState, true, true, TransactionError.TRANSACTION_ROLLBACK, "Discharge of TX failed.", false, null); |
| } |
| |
| public void remotelyCloseLastCoordinatorLinkOnDischarge(Binary txnId, boolean dischargeState, boolean pipelinedDeclare, Binary nextTxnId) |
| { |
| remotelyCloseLastCoordinatorLinkOnDischarge(txnId, dischargeState, true, true, TransactionError.TRANSACTION_ROLLBACK, "Discharge of TX failed.", pipelinedDeclare, nextTxnId); |
| } |
| |
| public void remotelyCloseLastCoordinatorLinkOnDischarge(Binary txnId, boolean dischargeState, Symbol errorType, String errorMessage) |
| { |
| remotelyCloseLastCoordinatorLinkOnDischarge(txnId, dischargeState, true, true, errorType, errorMessage, false, null); |
| } |
| |
| public void remotelyCloseLastCoordinatorLinkOnDischarge(Binary txnId, boolean dischargeState, boolean expectDetachResponse, boolean closed, Symbol errorType, String errorMessage, boolean pipelinedDeclare, Binary nextTxnId) { |
| // Expect an unsettled 'discharge' transfer to the txn coordinator containing the txnId, |
| // and reply with given response and settled disposition to indicate the outcome. |
| Discharge discharge = new Discharge(); |
| discharge.setFail(dischargeState); |
| discharge.setTxnId(txnId); |
| |
| TransferPayloadCompositeMatcher dischargeMatcher = new TransferPayloadCompositeMatcher(); |
| dischargeMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(discharge)); |
| |
| expectTransfer(dischargeMatcher, nullValue(), false, false, null, false); |
| |
| if (pipelinedDeclare) { |
| Declare declare = new Declare(); |
| |
| TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher(); |
| declareMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(declare)); |
| |
| expectTransfer(declareMatcher, nullValue(), false, false, new Declared().setTxnId(nextTxnId), true); |
| } |
| |
| remotelyCloseLastCoordinatorLink(expectDetachResponse, closed, errorType, errorMessage); |
| } |
| |
| public void expectDispositionThatIsAcceptedAndSettled() |
| { |
| expectDisposition(true, new DescriptorMatcher(Accepted.DESCRIPTOR_CODE, Accepted.DESCRIPTOR_SYMBOL)); |
| } |
| |
| public void expectDispositionThatIsReleasedAndSettled() |
| { |
| expectDisposition(true, new DescriptorMatcher(Released.DESCRIPTOR_CODE, Released.DESCRIPTOR_SYMBOL)); |
| } |
| |
| public void expectDisposition(boolean settled, Matcher<?> stateMatcher) |
| { |
| expectDisposition(settled, stateMatcher, null, null); |
| } |
| |
| public void expectDisposition(boolean settled, Matcher<?> stateMatcher, Integer firstDeliveryId, Integer lastDeliveryId) |
| { |
| Matcher<Boolean> settledMatcher = null; |
| if(settled) |
| { |
| settledMatcher = equalTo(true); |
| } |
| else |
| { |
| settledMatcher = Matchers.anyOf(equalTo(false), nullValue()); |
| } |
| |
| Matcher<?> firstDeliveryIdMatcher = notNullValue(); |
| if(firstDeliveryId != null) { |
| firstDeliveryIdMatcher = equalTo(UnsignedInteger.valueOf(firstDeliveryId)); |
| } |
| |
| Matcher<?> lastDeliveryIdMatcher = notNullValue(); |
| if(lastDeliveryId != null) { |
| lastDeliveryIdMatcher = equalTo(UnsignedInteger.valueOf(lastDeliveryId)); |
| } |
| |
| addHandler(new DispositionMatcher() |
| .withSettled(settledMatcher) |
| .withFirst(firstDeliveryIdMatcher) |
| .withLast(lastDeliveryIdMatcher) |
| .withState(stateMatcher)); |
| } |
| |
| private Object createTargetObjectFromDescribedType(Object o) { |
| assertThat(o, instanceOf(DescribedType.class)); |
| Object described = ((DescribedType) o).getDescribed(); |
| assertThat(described, instanceOf(List.class)); |
| @SuppressWarnings("unchecked") |
| List<Object> targetFields = (List<Object>) described; |
| Object descriptor = ((DescribedType) o).getDescriptor(); |
| if (descriptor == Target.DESCRIPTOR_CODE || descriptor.equals(Target.DESCRIPTOR_SYMBOL)) { |
| return new Target(targetFields.toArray()); |
| } else if (descriptor == Coordinator.DESCRIPTOR_CODE || descriptor.equals(Coordinator.DESCRIPTOR_SYMBOL)) { |
| return new Coordinator(targetFields.toArray()); |
| } else { |
| throw new IllegalArgumentException("Unexpected target descriptor: " + descriptor); |
| } |
| } |
| |
| private Source createSourceObjectFromDescribedType(Object o) { |
| assertThat(o, instanceOf(DescribedType.class)); |
| Object described = ((DescribedType) o).getDescribed(); |
| assertThat(described, instanceOf(List.class)); |
| @SuppressWarnings("unchecked") |
| List<Object> sourceFields = (List<Object>) described; |
| |
| return new Source(sourceFields.toArray()); |
| } |
| |
| public void remotelyEndLastOpenedSession(boolean expectEndResponse) { |
| remotelyEndLastOpenedSession(expectEndResponse, 0, null, null); |
| } |
| |
| public void remotelyEndLastOpenedSession(boolean expectEndResponse, long delayBeforeSend) { |
| remotelyEndLastOpenedSession(expectEndResponse, delayBeforeSend, null, null); |
| } |
| |
| public void remotelyEndLastOpenedSession(boolean expectEndResponse, final long delayBeforeSend, Symbol errorType, String errorMessage) { |
| synchronized (_handlersLock) { |
| CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler(); |
| |
| // Now generate the End for the appropriate session |
| final EndFrame endFrame = new EndFrame(); |
| if (errorType != null) { |
| org.apache.qpid.jms.test.testpeer.describedtypes.Error detachError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error(); |
| detachError.setCondition(errorType); |
| detachError.setDescription(errorMessage); |
| endFrame.setError(detachError); |
| } |
| |
| int channel = -1; |
| final FrameSender frameSender = new FrameSender(this, FrameType.AMQP, channel, endFrame, null); |
| frameSender.setValueProvider(new ValueProvider() { |
| @Override |
| public void setValues() { |
| frameSender.setChannel(_lastInitiatedChannel); |
| |
| //Insert a delay if requested |
| if (delayBeforeSend > 0) { |
| try { |
| Thread.sleep(delayBeforeSend); |
| } catch (InterruptedException e) { |
| // Ignore |
| } |
| } |
| } |
| }); |
| comp.add(frameSender); |
| |
| if (expectEndResponse) { |
| // Expect a response to our End. |
| final EndMatcher endMatcher = new EndMatcher(); |
| // TODO: enable matching on the channel number of the response. |
| addHandler(endMatcher); |
| } |
| } |
| } |
| |
| public void remotelyCloseConnection(boolean expectCloseResponse) { |
| remotelyCloseConnection(expectCloseResponse, null, null, null, 0); |
| } |
| |
| public void remotelyCloseConnection(boolean expectCloseResponse, Symbol errorType, String errorMessage) { |
| remotelyCloseConnection(expectCloseResponse, errorType, errorMessage, null, 0); |
| } |
| |
| public void remotelyCloseConnection(boolean expectCloseResponse, Symbol errorType, String errorMessage, long delayBeforeSend) { |
| remotelyCloseConnection(expectCloseResponse, errorType, errorMessage, null, delayBeforeSend); |
| } |
| |
| public void remotelyCloseConnection(boolean expectCloseResponse, Symbol errorType, String errorMessage, Map<Symbol, Object> info) { |
| remotelyCloseConnection(expectCloseResponse, errorType, errorMessage, info, 0); |
| } |
| |
| public void remotelyCloseConnection(boolean expectCloseResponse, Symbol errorType, String errorMessage, Map<Symbol, Object> info, long delayBeforeSend) { |
| synchronized (_handlersLock) { |
| // Prepare a composite to insert this action at the end of the handler sequence |
| CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler(); |
| |
| // Now generate the Close |
| final FrameSender closeSender = createCloseFrameSender(errorType, errorMessage, info, delayBeforeSend); |
| |
| comp.add(closeSender); |
| |
| if (expectCloseResponse) { |
| // Expect a response to our Close. |
| final CloseMatcher closeMatcher = new CloseMatcher(); |
| addHandler(closeMatcher); |
| } |
| } |
| } |
| |
| private FrameSender createCloseFrameSender(Symbol errorType, String errorMessage, Map<Symbol, Object> info, final long delayBeforeSend) { |
| final CloseFrame closeFrame = new CloseFrame(); |
| if (errorType != null) { |
| org.apache.qpid.jms.test.testpeer.describedtypes.Error closeError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error(); |
| closeError.setCondition(errorType); |
| closeError.setDescription(errorMessage); |
| closeError.setInfo(info); |
| |
| closeFrame.setError(closeError); |
| } |
| |
| final FrameSender closeSender = new FrameSender(this, FrameType.AMQP, CONNECTION_CHANNEL, closeFrame, null); |
| closeSender.setValueProvider(new ValueProvider() { |
| |
| @Override |
| public void setValues() { |
| //Insert a delay if requested |
| if (delayBeforeSend > 0) { |
| try { |
| Thread.sleep(delayBeforeSend); |
| } catch (InterruptedException e) { |
| // Ignore |
| } |
| } |
| } |
| }); |
| |
| return closeSender; |
| } |
| |
| void sendConnectionCloseImmediately(Symbol errorType, String errorMessage) { |
| FrameSender closeSender = createCloseFrameSender(errorType, errorMessage, null, 0); |
| closeSender.run(); |
| } |
| |
| public void remotelyDetachLastOpenedLinkOnLastOpenedSession(boolean expectDetachResponse, boolean closed) { |
| remotelyDetachLastOpenedLinkOnLastOpenedSession(expectDetachResponse, closed, null, null); |
| } |
| |
| public void remotelyDetachLastOpenedLinkOnLastOpenedSession(boolean expectDetachResponse, boolean closed, Symbol errorType, String errorMessage) { |
| remotelyDetachLastOpenedLinkOnLastOpenedSession(expectDetachResponse, closed, errorType, errorMessage, 0); |
| } |
| |
| public void remotelyDetachLastOpenedLinkOnLastOpenedSession(boolean expectDetachResponse, boolean closed, Symbol errorType, String errorMessage, final long delayBeforeSend) { |
| synchronized (_handlersLock) { |
| CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler(); |
| |
| // Now generate the Detach for the appropriate link on the appropriate session |
| final DetachFrame detachFrame = new DetachFrame(); |
| detachFrame.setClosed(closed); |
| if (errorType != null) { |
| org.apache.qpid.jms.test.testpeer.describedtypes.Error detachError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error(); |
| detachError.setCondition(errorType); |
| detachError.setDescription(errorMessage); |
| detachFrame.setError(detachError); |
| } |
| |
| // The response frame channel will be dynamically set based on the previous frames. Using the -1 is an illegal placeholder. |
| final FrameSender frameSender = new FrameSender(this, FrameType.AMQP, -1, detachFrame, null); |
| frameSender.setValueProvider(new ValueProvider() { |
| @Override |
| public void setValues() { |
| frameSender.setChannel(_lastInitiatedChannel); |
| detachFrame.setHandle(_lastInitiatedLinkHandle); |
| |
| //Insert a delay if requested |
| if (delayBeforeSend > 0) { |
| try { |
| Thread.sleep(delayBeforeSend); |
| } catch (InterruptedException e) { |
| // Ignore |
| } |
| } |
| } |
| }); |
| comp.add(frameSender); |
| |
| if (expectDetachResponse) { |
| Matcher<Boolean> closeMatcher = null; |
| if (closed) { |
| closeMatcher = equalTo(true); |
| } else { |
| closeMatcher = Matchers.anyOf(equalTo(false), nullValue()); |
| } |
| |
| // Expect a response to our Detach. |
| final DetachMatcher detachMatcher = new DetachMatcher().withClosed(closeMatcher); |
| // TODO: enable matching on the channel number of the response. |
| addHandler(detachMatcher); |
| } |
| } |
| } |
| |
| private CompositeAmqpPeerRunnable insertCompsiteActionForLastHandler() { |
| CompositeAmqpPeerRunnable comp = new CompositeAmqpPeerRunnable(); |
| Handler h = getLastHandler(); |
| AmqpPeerRunnable orig = h.getOnCompletionAction(); |
| if (orig != null) { |
| comp.add(orig); |
| } |
| h.onCompletion(comp); |
| return comp; |
| } |
| |
| public void sendTransferToLastOpenedLinkOnLastOpenedSession(final HeaderDescribedType headerDescribedType, |
| final MessageAnnotationsDescribedType messageAnnotationsDescribedType, |
| final PropertiesDescribedType propertiesDescribedType, |
| final ApplicationPropertiesDescribedType appPropertiesDescribedType, |
| final DescribedType content, |
| final int nextIncomingDeliveryId) { |
| |
| sendTransferToLastOpenedLinkOnLastOpenedSession(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, appPropertiesDescribedType, content, nextIncomingDeliveryId, null, null, 0); |
| } |
| |
| public void sendTransferToLastOpenedLinkOnLastOpenedSession(final HeaderDescribedType headerDescribedType, |
| final MessageAnnotationsDescribedType messageAnnotationsDescribedType, |
| final PropertiesDescribedType propertiesDescribedType, |
| final ApplicationPropertiesDescribedType appPropertiesDescribedType, |
| final DescribedType content, |
| final int nextIncomingDeliveryId, |
| final String tagAsString, |
| final Boolean more, |
| final int sendDelay) { |
| synchronized (_handlersLock) { |
| CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler(); |
| |
| String tagString = tagAsString; |
| if(tagString == null) { |
| tagString = "theDeliveryTag" + nextIncomingDeliveryId; |
| } |
| |
| Binary dtag = new Binary(tagString.getBytes()); |
| |
| final TransferFrame transferResponse = new TransferFrame() |
| .setDeliveryId(UnsignedInteger.valueOf(nextIncomingDeliveryId)) |
| .setDeliveryTag(dtag) |
| .setMessageFormat(UnsignedInteger.ZERO); |
| if(more != null) { |
| transferResponse.setMore(more); |
| } |
| |
| Binary payload = prepareTransferPayload(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, appPropertiesDescribedType, content); |
| |
| // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. |
| final FrameSender transferSender = new FrameSender(this, FrameType.AMQP, -1, transferResponse, payload); |
| transferSender.setValueProvider(new ValueProvider() |
| { |
| @Override |
| public void setValues() |
| { |
| transferResponse.setHandle(_lastInitiatedLinkHandle); |
| transferSender.setChannel(_lastInitiatedChannel); |
| } |
| }); |
| |
| if(sendDelay != 0) { |
| transferSender.setSendDelay(sendDelay); |
| } |
| |
| comp.add(transferSender); |
| } |
| } |
| |
| public void runAfterLastHandler(AmqpPeerRunnable action) { |
| synchronized (_handlersLock) { |
| // Prepare a composite to insert this action at the end of the handler sequence |
| CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler(); |
| comp.add(action); |
| } |
| } |
| |
| void assertionFailed(AssertionError ae) { |
| if(_firstAssertionError == null) |
| { |
| _firstAssertionError = ae; |
| } |
| } |
| |
| public void expectSaslHeaderThenDrop() { |
| AmqpPeerRunnable exitAfterHeader = new AmqpPeerRunnable() { |
| @Override |
| public void run() { |
| _driverRunnable.exitReadLoopEarly(); |
| } |
| }; |
| |
| addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER, exitAfterHeader)); |
| } |
| |
| |
| public void dropAfterLastHandler() { |
| dropAfterLastHandler(0); |
| } |
| |
| public void dropAfterLastHandler(final long delay) { |
| AmqpPeerRunnable exitEarly = new AmqpPeerRunnable() { |
| @Override |
| public void run() { |
| if(delay > 0) { |
| try { |
| Thread.sleep(delay); |
| } catch (InterruptedException e) { |
| LOGGER.warn("Interrupted while delaying before read loop exit"); |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| _driverRunnable.exitReadLoopEarly(); |
| } |
| }; |
| |
| runAfterLastHandler(exitEarly); |
| } |
| |
| public void optionalFlow(final boolean drain, final boolean sendDrainFlowResponse,Matcher<UnsignedInteger> creditMatcher) |
| { |
| final FlowMatcher flowMatcher = new FlowMatcher(); |
| flowMatcher.setOptional(true); |
| |
| Matcher<Boolean> drainMatcher = null; |
| if(drain) |
| { |
| drainMatcher = equalTo(true); |
| } |
| else |
| { |
| drainMatcher = Matchers.anyOf(equalTo(false), nullValue()); |
| } |
| |
| flowMatcher.withLinkCredit(creditMatcher); |
| flowMatcher.withDrain(drainMatcher); |
| |
| if(drain && sendDrainFlowResponse) |
| { |
| final FlowFrame drainResponse = new FlowFrame(); |
| drainResponse.setOutgoingWindow(UnsignedInteger.ZERO); //TODO: shouldnt be hard coded |
| drainResponse.setIncomingWindow(UnsignedInteger.valueOf(Integer.MAX_VALUE)); //TODO: shouldnt be hard coded |
| drainResponse.setLinkCredit(UnsignedInteger.ZERO); |
| drainResponse.setDrain(true); |
| |
| // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. |
| final FrameSender flowResponseSender = new FrameSender(this, FrameType.AMQP, -1, drainResponse, null); |
| flowResponseSender.setValueProvider(new ValueProvider() |
| { |
| @Override |
| public void setValues() |
| { |
| flowResponseSender.setChannel(flowMatcher.getActualChannel()); |
| drainResponse.setHandle(flowMatcher.getReceivedHandle()); |
| drainResponse.setDeliveryCount(calculateNewDeliveryCount(flowMatcher)); |
| drainResponse.setNextOutgoingId(calculateNewOutgoingId(flowMatcher, 0)); |
| drainResponse.setNextIncomingId(flowMatcher.getReceivedNextOutgoingId()); |
| } |
| }); |
| |
| flowMatcher.onCompletion(flowResponseSender); |
| } |
| |
| addHandler(flowMatcher); |
| } |
| } |