| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| package org.apache.qpid.tests.protocol.v1_0; |
| |
| import static org.apache.qpid.server.security.auth.manager.AbstractScramAuthenticationManager.PLAIN; |
| |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.function.Consumer; |
| import java.util.stream.Collectors; |
| |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.MoreExecutors; |
| |
| import org.apache.qpid.server.bytebuffer.QpidByteBuffer; |
| import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame; |
| import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame; |
| import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; |
| import org.apache.qpid.server.protocol.v1_0.type.BaseSource; |
| import org.apache.qpid.server.protocol.v1_0.type.BaseTarget; |
| import org.apache.qpid.server.protocol.v1_0.type.Binary; |
| import org.apache.qpid.server.protocol.v1_0.type.DeliveryState; |
| import org.apache.qpid.server.protocol.v1_0.type.FrameBody; |
| import org.apache.qpid.server.protocol.v1_0.type.Outcome; |
| import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody; |
| import org.apache.qpid.server.protocol.v1_0.type.Symbol; |
| import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; |
| import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort; |
| import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted; |
| import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue; |
| import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValueSection; |
| import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter; |
| import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected; |
| import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; |
| import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; |
| import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode; |
| import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit; |
| import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms; |
| import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome; |
| import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse; |
| import org.apache.qpid.server.protocol.v1_0.type.transaction.Coordinator; |
| import org.apache.qpid.server.protocol.v1_0.type.transaction.Declare; |
| import org.apache.qpid.server.protocol.v1_0.type.transaction.Declared; |
| import org.apache.qpid.server.protocol.v1_0.type.transaction.Discharge; |
| import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.Begin; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.Close; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.Detach; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.End; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.Open; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.Role; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode; |
| import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer; |
| import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManager; |
| import org.apache.qpid.server.util.StringUtil; |
| import org.apache.qpid.tests.protocol.AbstractInteraction; |
| import org.apache.qpid.tests.protocol.Response; |
| import org.apache.qpid.tests.utils.BrokerAdmin; |
| |
| public class Interaction extends AbstractInteraction<Interaction> |
| { |
| private static final byte[] SASL_AMQP_HEADER_BYTES = "AMQP\3\1\0\0".getBytes(StandardCharsets.UTF_8); |
| |
| private static final FrameBody EMPTY_FRAME = (channel, conn) -> { |
| throw new UnsupportedOperationException(); |
| }; |
| |
| private static final SaslFrameBody SASL_EMPTY_FRAME = (channel, conn) -> { |
| throw new UnsupportedOperationException(); |
| }; |
| |
| private static final Set<String> CONTAINER_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>()); |
| private final Begin _begin; |
| private final End _end; |
| private final Open _open; |
| private final Close _close; |
| private final Attach _attach; |
| private final Detach _detach; |
| private final Flow _flow; |
| private final Transfer _transfer; |
| private final Disposition _disposition; |
| private final SaslInit _saslInit; |
| private final SaslResponse _saslResponse; |
| private final BrokerAdmin _brokerAdmin; |
| private final BrokerAdmin.PortType _portType; |
| private byte[] _protocolHeader; |
| private UnsignedShort _connectionChannel; |
| private UnsignedShort _sessionChannel; |
| private int _deliveryIdCounter; |
| private List<Transfer> _latestDelivery; |
| private Object _decodedLatestDelivery; |
| private UnsignedInteger _latestDeliveryId; |
| private Map<String, Object> _latestDeliveryApplicationProperties; |
| private Map<Class, FrameBody> _latestResponses = new HashMap<>(); |
| private AtomicLong _receivedDeliveryCount = new AtomicLong(); |
| private AtomicLong _coordinatorCredits = new AtomicLong(); |
| private InteractionTransactionalState _transactionalState; |
| |
| Interaction(final FrameTransport frameTransport, BrokerAdmin brokerAdmin, BrokerAdmin.PortType portType) |
| { |
| super(frameTransport); |
| _brokerAdmin = brokerAdmin; |
| _portType = portType; |
| final UnsignedInteger defaultLinkHandle = UnsignedInteger.ZERO; |
| |
| _protocolHeader = frameTransport.getProtocolHeader(); |
| |
| _saslInit = new SaslInit(); |
| _saslResponse = new SaslResponse(); |
| |
| _open = new Open(); |
| _open.setContainerId(getConnectionId()); |
| _close = new Close(); |
| _connectionChannel = UnsignedShort.valueOf(0); |
| |
| _begin = new Begin(); |
| _begin.setNextOutgoingId(UnsignedInteger.ZERO); |
| _begin.setIncomingWindow(UnsignedInteger.ZERO); |
| _begin.setOutgoingWindow(UnsignedInteger.ZERO); |
| _end = new End(); |
| _sessionChannel = UnsignedShort.valueOf(1); |
| |
| _attach = new Attach(); |
| _attach.setName("testLink"); |
| _attach.setHandle(defaultLinkHandle); |
| _attach.setInitialDeliveryCount(UnsignedInteger.ZERO); |
| _attach.setSource(new Source()); |
| _attach.setTarget(new Target()); |
| |
| _detach = new Detach(); |
| _detach.setHandle(_attach.getHandle()); |
| |
| _flow = new Flow(); |
| _flow.setNextIncomingId(UnsignedInteger.ZERO); |
| _flow.setIncomingWindow(UnsignedInteger.ZERO); |
| _flow.setNextOutgoingId(UnsignedInteger.ZERO); |
| _flow.setOutgoingWindow(UnsignedInteger.ZERO); |
| |
| _transfer = new Transfer(); |
| _transfer.setHandle(defaultLinkHandle); |
| _transfer.setDeliveryTag(new Binary("testDeliveryTag".getBytes(StandardCharsets.UTF_8))); |
| _transfer.setDeliveryId(UnsignedInteger.valueOf(_deliveryIdCounter)); |
| _transfer.setMessageFormat(UnsignedInteger.ZERO); |
| |
| _disposition = new Disposition(); |
| _disposition.setFirst(UnsignedInteger.ZERO); |
| } |
| |
| public void doCloseConnection() throws Exception |
| { |
| Close close = new Close(); |
| |
| sendPerformative(close, UnsignedShort.valueOf((short) 0)); |
| Response<?> response = consumeResponse().getLatestResponse(); |
| if (!(response.getBody() instanceof Close)) |
| { |
| throw new IllegalStateException(String.format( |
| "Unexpected response to connection Close. Expected Close got '%s'", response.getBody())); |
| } |
| } |
| |
| ///////////////////////// |
| // Protocol Negotiation // |
| ///////////////////////// |
| |
| @Override |
| public Interaction protocolHeader(byte[] header) |
| { |
| _protocolHeader = header; |
| return this; |
| } |
| |
| @Override |
| protected byte[] getProtocolHeader() |
| { |
| return _protocolHeader; |
| } |
| |
| ////////// |
| // SASL // |
| ////////// |
| |
| public Interaction saslMechanism(final Symbol mechanism) |
| { |
| _saslInit.setMechanism(mechanism); |
| return this; |
| } |
| |
| public Interaction saslInitialResponse(final Binary initialResponse) |
| { |
| _saslInit.setInitialResponse(initialResponse); |
| return this; |
| } |
| |
| public Interaction saslInit() throws Exception |
| { |
| sendPerformativeAndChainFuture(copySaslInit(_saslInit)); |
| return this; |
| } |
| |
| public Interaction saslEmptyFrame() throws Exception |
| { |
| sendPerformative(SASL_EMPTY_FRAME); |
| return this; |
| } |
| |
| private SaslInit copySaslInit(final SaslInit saslInit) |
| { |
| final SaslInit saslInitCopy = new SaslInit(); |
| saslInitCopy.setMechanism(saslInit.getMechanism()); |
| saslInitCopy.setInitialResponse(saslInit.getInitialResponse()); |
| saslInitCopy.setHostname(saslInit.getHostname()); |
| return saslInitCopy; |
| } |
| |
| public Interaction saslResponseResponse(Binary response) |
| { |
| _saslResponse.setResponse(response); |
| return this; |
| } |
| |
| public Interaction saslResponse() throws Exception |
| { |
| sendPerformativeAndChainFuture(copySaslResponse(_saslResponse)); |
| return this; |
| } |
| |
| private SaslResponse copySaslResponse(final SaslResponse saslResponse) |
| { |
| final SaslResponse saslResponseCopy = new SaslResponse(); |
| saslResponseCopy.setResponse(saslResponse.getResponse()); |
| return saslResponseCopy; |
| } |
| |
| //////////////// |
| // Connection // |
| //////////////// |
| |
| public Interaction connectionChannel(UnsignedShort connectionChannel) |
| { |
| _connectionChannel = connectionChannel; |
| return this; |
| } |
| |
| public Interaction openContainerId(String containerId) |
| { |
| _open.setContainerId(containerId); |
| return this; |
| } |
| |
| public Interaction openHostname(String hostname) |
| { |
| _open.setHostname(hostname); |
| return this; |
| } |
| |
| public Interaction openMaxFrameSize(final UnsignedInteger maxFrameSize) |
| { |
| _open.setMaxFrameSize(maxFrameSize); |
| return this; |
| } |
| |
| public Interaction openChannelMax(UnsignedShort channelMax) |
| { |
| _open.setChannelMax(channelMax); |
| return this; |
| } |
| |
| public Interaction openDesiredCapabilities(final Symbol... desiredCapabilities) |
| { |
| _open.setDesiredCapabilities(desiredCapabilities); |
| return this; |
| } |
| |
| public Interaction openIdleTimeOut(final int idleTimeOut) |
| { |
| _open.setIdleTimeOut(UnsignedInteger.valueOf(idleTimeOut)); |
| return this; |
| } |
| |
| public Interaction openProperties(final Map<Symbol, Object> properties) |
| { |
| _open.setProperties(properties); |
| return this; |
| } |
| |
| public Interaction open() throws Exception |
| { |
| sendPerformativeAndChainFuture(copyOpen(_open), _connectionChannel); |
| return this; |
| } |
| |
| private Open copyOpen(final Open open) |
| { |
| final Open openCopy = new Open(); |
| openCopy.setContainerId(open.getContainerId()); |
| openCopy.setHostname(open.getHostname()); |
| openCopy.setMaxFrameSize(open.getMaxFrameSize()); |
| openCopy.setChannelMax(open.getChannelMax()); |
| openCopy.setIdleTimeOut(open.getIdleTimeOut()); |
| openCopy.setOutgoingLocales(open.getOutgoingLocales()); |
| openCopy.setIncomingLocales(open.getIncomingLocales()); |
| openCopy.setOfferedCapabilities(open.getOfferedCapabilities()); |
| openCopy.setDesiredCapabilities(open.getDesiredCapabilities()); |
| if (open.getProperties() != null) |
| { |
| openCopy.setProperties(new LinkedHashMap<>(open.getProperties())); |
| } |
| return openCopy; |
| } |
| |
| public Interaction close() throws Exception |
| { |
| sendPerformativeAndChainFuture(copyClose(_close), _connectionChannel); |
| return this; |
| } |
| |
| private Close copyClose(final Close close) |
| { |
| final Close closeCopy = new Close(); |
| closeCopy.setError(close.getError()); |
| return closeCopy; |
| } |
| |
| private String getConnectionId() |
| { |
| int index = 1; |
| String containerId = String.format("testContainer-%d", index); |
| while (CONTAINER_IDS.contains(containerId)) |
| { |
| ++index; |
| containerId = String.format("testContainer-%d", index); |
| } |
| CONTAINER_IDS.add(containerId); |
| return containerId; |
| } |
| |
| ///////////// |
| // Session // |
| ///////////// |
| |
| public Interaction sessionChannel(UnsignedShort sessionChannel) |
| { |
| _sessionChannel = sessionChannel; |
| return this; |
| } |
| |
| public Interaction beginNextOutgoingId(UnsignedInteger nextOutgoingId) |
| { |
| _begin.setNextOutgoingId(nextOutgoingId); |
| return this; |
| } |
| |
| public Interaction beginIncomingWindow(UnsignedInteger incomingWindow) |
| { |
| _begin.setIncomingWindow(incomingWindow); |
| return this; |
| } |
| |
| public Interaction beginOutgoingWindow(UnsignedInteger outgoingWindow) |
| { |
| _begin.setOutgoingWindow(outgoingWindow); |
| return this; |
| } |
| |
| public Interaction begin() throws Exception |
| { |
| sendPerformativeAndChainFuture(copyBegin(_begin), _sessionChannel); |
| return this; |
| } |
| |
| private Begin copyBegin(final Begin begin) |
| { |
| final Begin beginCopy = new Begin(); |
| beginCopy.setRemoteChannel(begin.getRemoteChannel()); |
| beginCopy.setNextOutgoingId(begin.getNextOutgoingId()); |
| beginCopy.setIncomingWindow(begin.getIncomingWindow()); |
| beginCopy.setOutgoingWindow(begin.getOutgoingWindow()); |
| beginCopy.setHandleMax(begin.getHandleMax()); |
| beginCopy.setOfferedCapabilities(begin.getOfferedCapabilities()); |
| beginCopy.setDesiredCapabilities(begin.getDesiredCapabilities()); |
| if (begin.getProperties() != null) |
| { |
| beginCopy.setProperties(new LinkedHashMap<>(begin.getProperties())); |
| } |
| return beginCopy; |
| } |
| |
| public Interaction end() throws Exception |
| { |
| sendPerformativeAndChainFuture(copyEnd(_end), _sessionChannel); |
| return this; |
| } |
| |
| private End copyEnd(final End end) |
| { |
| final End endCopy = new End(); |
| endCopy.setError(end.getError()); |
| return endCopy; |
| } |
| |
| ////////// |
| // Link // |
| ////////// |
| |
| |
| public Interaction attachName(String linkName) |
| { |
| _attach.setName(linkName); |
| return this; |
| } |
| |
| public Interaction attachRole(Role role) |
| { |
| _attach.setRole(role); |
| return this; |
| } |
| |
| public Interaction attachHandle(UnsignedInteger handle) |
| { |
| _attach.setHandle(handle); |
| _detach.setHandle(handle); |
| return this; |
| } |
| |
| public Interaction attachInitialDeliveryCount(UnsignedInteger initialDeliveryCount) |
| { |
| _attach.setInitialDeliveryCount(initialDeliveryCount); |
| return this; |
| } |
| |
| public Interaction attachRcvSettleMode(final ReceiverSettleMode rcvSettleMode) |
| { |
| _attach.setRcvSettleMode(rcvSettleMode); |
| return this; |
| } |
| |
| public Interaction attachSndSettleMode(final SenderSettleMode senderSettleMode) |
| { |
| _attach.setSndSettleMode(senderSettleMode); |
| return this; |
| } |
| |
| public Interaction attachSource(Source source) |
| { |
| _attach.setSource(source); |
| return this; |
| } |
| |
| public Interaction attachTarget(BaseTarget target) |
| { |
| _attach.setTarget(target); |
| return this; |
| } |
| |
| public Interaction attachSourceAddress(String address) |
| { |
| Source source = (Source) _attach.getSource(); |
| source.setAddress(address); |
| _attach.setSource(source); |
| return this; |
| } |
| |
| public Interaction attachSourceOutcomes(final Symbol... outcomes) |
| { |
| Source source = ((Source) _attach.getSource()); |
| source.setOutcomes(outcomes); |
| _attach.setSource(source); |
| return this; |
| } |
| |
| public Interaction attachSourceDefaultOutcome(final Outcome defaultOutcome) |
| { |
| Source source = ((Source) _attach.getSource()); |
| source.setDefaultOutcome(defaultOutcome); |
| _attach.setSource(source); |
| return this; |
| } |
| |
| |
| public Interaction attachSourceFilter(final Map<Symbol, Filter> filters) |
| { |
| Source source = ((Source) _attach.getSource()); |
| source.setFilter(filters); |
| _attach.setSource(source); |
| return this; |
| } |
| |
| public Interaction attachTargetAddress(final String address) |
| { |
| final Target target = ((Target) _attach.getTarget()); |
| target.setAddress(address); |
| _attach.setTarget(target); |
| return this; |
| } |
| |
| public Interaction attachUnsettled(final Map<Binary, DeliveryState> unsettled) |
| { |
| _attach.setUnsettled(unsettled); |
| return this; |
| } |
| |
| public Interaction attachIncompleteUnsettled(final Boolean incompleteUnsettled) |
| { |
| _attach.setIncompleteUnsettled(incompleteUnsettled); |
| return this; |
| } |
| |
| public Interaction attach() throws Exception |
| { |
| sendPerformativeAndChainFuture(copyAttach(_attach), _sessionChannel); |
| return this; |
| } |
| |
| private Attach copyAttach(final Attach attach) |
| { |
| final Attach attachCopy = new Attach(); |
| attachCopy.setName(attach.getName()); |
| attachCopy.setHandle(attach.getHandle()); |
| attachCopy.setRole(attach.getRole()); |
| attachCopy.setSndSettleMode(attach.getSndSettleMode()); |
| attachCopy.setRcvSettleMode(attach.getRcvSettleMode()); |
| final BaseSource baseSource = attach.getSource(); |
| if (baseSource != null && baseSource instanceof Source) |
| { |
| final Source source = (Source) baseSource; |
| final Source sourceCopy = new Source(); |
| sourceCopy.setAddress(source.getAddress()); |
| sourceCopy.setDurable(source.getDurable()); |
| sourceCopy.setExpiryPolicy(source.getExpiryPolicy()); |
| sourceCopy.setTimeout(source.getTimeout()); |
| sourceCopy.setDynamic(source.getDynamic()); |
| if (source.getDynamicNodeProperties() != null) |
| { |
| sourceCopy.setDynamicNodeProperties(new LinkedHashMap<>(source.getDynamicNodeProperties())); |
| } |
| sourceCopy.setFilter(source.getFilter()); |
| sourceCopy.setDefaultOutcome(source.getDefaultOutcome()); |
| sourceCopy.setOutcomes(source.getOutcomes()); |
| sourceCopy.setCapabilities(source.getCapabilities()); |
| attachCopy.setSource(sourceCopy); |
| } |
| else |
| { |
| attachCopy.setSource(baseSource); |
| } |
| final BaseTarget baseTarget = attach.getTarget(); |
| if (baseTarget != null && baseTarget instanceof Target) |
| { |
| final Target target = (Target) baseTarget; |
| final Target targetCopy = new Target(); |
| targetCopy.setAddress(target.getAddress()); |
| targetCopy.setDurable(target.getDurable()); |
| targetCopy.setExpiryPolicy(target.getExpiryPolicy()); |
| targetCopy.setTimeout(target.getTimeout()); |
| targetCopy.setDynamic(target.getDynamic()); |
| if (target.getDynamicNodeProperties() != null) |
| { |
| targetCopy.setDynamicNodeProperties(new LinkedHashMap<>(target.getDynamicNodeProperties())); |
| } |
| targetCopy.setCapabilities(target.getCapabilities()); |
| attachCopy.setTarget(targetCopy); |
| } |
| else |
| { |
| attachCopy.setTarget(baseTarget); |
| } |
| if (attach.getUnsettled() != null) |
| { |
| attachCopy.setUnsettled(new LinkedHashMap<>(attach.getUnsettled())); |
| } |
| attachCopy.setIncompleteUnsettled(attach.getIncompleteUnsettled()); |
| attachCopy.setInitialDeliveryCount(attach.getInitialDeliveryCount()); |
| attachCopy.setMaxMessageSize(attach.getMaxMessageSize()); |
| attachCopy.setOfferedCapabilities(attach.getOfferedCapabilities()); |
| attachCopy.setDesiredCapabilities(attach.getDesiredCapabilities()); |
| if (attach.getProperties() != null) |
| { |
| attachCopy.setProperties(new LinkedHashMap<>(attach.getProperties())); |
| } |
| return attachCopy; |
| } |
| |
| public Interaction detachClose(Boolean close) |
| { |
| _detach.setClosed(close); |
| return this; |
| } |
| |
| public Interaction detachHandle(UnsignedInteger handle) |
| { |
| _detach.setHandle(handle); |
| return this; |
| } |
| |
| public Interaction detach() throws Exception |
| { |
| sendPerformativeAndChainFuture(copyDetach(_detach), _sessionChannel); |
| return this; |
| } |
| |
| private Detach copyDetach(final Detach detach) |
| { |
| final Detach detachCopy = new Detach(); |
| detachCopy.setHandle(detach.getHandle()); |
| detachCopy.setClosed(detach.getClosed()); |
| detachCopy.setError(detach.getError()); |
| return detachCopy; |
| } |
| |
| ////////// |
| // FLow // |
| ////////// |
| |
| public Interaction flowIncomingWindow(final UnsignedInteger incomingWindow) |
| { |
| _flow.setIncomingWindow(incomingWindow); |
| return this; |
| } |
| |
| public Interaction flowNextIncomingId(final UnsignedInteger nextIncomingId) |
| { |
| _flow.setNextIncomingId(nextIncomingId); |
| return this; |
| } |
| |
| public Interaction flowNextIncomingIdFromPeerLatestSessionBeginAndDeliveryCount() |
| { |
| final Begin begin = getCachedResponse(Begin.class); |
| return flowNextIncomingId(begin.getNextOutgoingId().add(UnsignedInteger.valueOf(_receivedDeliveryCount.get()))); |
| } |
| |
| <T extends FrameBody> T getCachedResponse(final Class<T> responseClass) |
| { |
| return (T)_latestResponses.get(responseClass); |
| } |
| |
| public Interaction flowOutgoingWindow(final UnsignedInteger outgoingWindow) |
| { |
| _flow.setOutgoingWindow(outgoingWindow); |
| return this; |
| } |
| |
| public Interaction flowNextOutgoingId(final UnsignedInteger nextNextOutgoingId) |
| { |
| _flow.setNextOutgoingId(nextNextOutgoingId); |
| return this; |
| } |
| |
| public Interaction flowNextOutgoingId() |
| { |
| _flow.setNextOutgoingId(UnsignedInteger.valueOf(_deliveryIdCounter)); |
| return this; |
| } |
| |
| public Interaction flowEcho(final Boolean echo) |
| { |
| _flow.setEcho(echo); |
| return this; |
| } |
| |
| public Interaction flowHandle(final UnsignedInteger handle) |
| { |
| _flow.setHandle(handle); |
| return this; |
| } |
| |
| public Interaction flowAvailable(final UnsignedInteger available) |
| { |
| _flow.setAvailable(available); |
| return this; |
| } |
| |
| public Interaction flowDeliveryCount(final UnsignedInteger deliveryCount) |
| { |
| _flow.setDeliveryCount(deliveryCount); |
| return this; |
| } |
| |
| public Interaction flowDeliveryCount() |
| { |
| _flow.setDeliveryCount(UnsignedInteger.valueOf(_receivedDeliveryCount.get())); |
| return this; |
| } |
| |
| public Interaction flowLinkCredit(final UnsignedInteger linkCredit) |
| { |
| _flow.setLinkCredit(linkCredit); |
| return this; |
| } |
| |
| public Interaction flowDrain(final Boolean drain) |
| { |
| _flow.setDrain(drain); |
| return this; |
| } |
| |
| public Interaction flowProperties(Map<Symbol, Object> properties) |
| { |
| _flow.setProperties(properties); |
| return this; |
| } |
| |
| public Interaction flow() throws Exception |
| { |
| sendPerformativeAndChainFuture(copyFlow(_flow), _sessionChannel); |
| return this; |
| } |
| |
| private Flow copyFlow(final Flow flow) |
| { |
| final Flow flowCopy = new Flow(); |
| flowCopy.setNextIncomingId(flow.getNextIncomingId()); |
| flowCopy.setIncomingWindow(flow.getIncomingWindow()); |
| flowCopy.setNextOutgoingId(flow.getNextOutgoingId()); |
| flowCopy.setOutgoingWindow(flow.getOutgoingWindow()); |
| flowCopy.setHandle(flow.getHandle()); |
| flowCopy.setDeliveryCount(flow.getDeliveryCount()); |
| flowCopy.setLinkCredit(flow.getLinkCredit()); |
| flowCopy.setAvailable(flow.getAvailable()); |
| flowCopy.setDrain(flow.getDrain()); |
| flowCopy.setEcho(flow.getEcho()); |
| if (flow.getProperties() != null) |
| { |
| flowCopy.setProperties(new LinkedHashMap<>(flow.getProperties())); |
| } |
| return flowCopy; |
| } |
| |
| ////////////// |
| // Transfer // |
| ////////////// |
| |
| public Interaction transferHandle(UnsignedInteger transferHandle) |
| { |
| _transfer.setHandle(transferHandle); |
| return this; |
| } |
| |
| public Interaction transferDeliveryId(final UnsignedInteger deliveryId) |
| { |
| _transfer.setDeliveryId(deliveryId); |
| return this; |
| } |
| |
| public Interaction transferDeliveryId() |
| { |
| _transfer.setDeliveryId(getNextDeliveryId()); |
| return this; |
| } |
| |
| public Interaction transferDeliveryTag(final Binary deliveryTag) |
| { |
| _transfer.setDeliveryTag(deliveryTag); |
| return this; |
| } |
| |
| public Interaction transferMessageFormat(final UnsignedInteger messageFormat) |
| { |
| _transfer.setMessageFormat(messageFormat); |
| return this; |
| } |
| |
| public Interaction transferSettled(final Boolean settled) |
| { |
| _transfer.setSettled(settled); |
| return this; |
| } |
| |
| public Interaction transferMore(final Boolean more) |
| { |
| _transfer.setMore(more); |
| return this; |
| } |
| |
| public Interaction transferRcvSettleMode(final ReceiverSettleMode receiverSettleMode) |
| { |
| _transfer.setRcvSettleMode(receiverSettleMode); |
| return this; |
| } |
| |
| public Interaction transferState(final DeliveryState state) |
| { |
| _transfer.setState(state); |
| return this; |
| } |
| |
| public Interaction transferTransactionalState(final Binary transactionalId) |
| { |
| TransactionalState transactionalState = new TransactionalState(); |
| transactionalState.setTxnId(transactionalId); |
| return transferState(transactionalState); |
| } |
| |
| public Interaction transferTransactionalStateFromCurrentTransaction() |
| { |
| return transferTransactionalState(getCurrentTransactionId()); |
| } |
| |
| public Interaction transferResume(final Boolean resume) |
| { |
| _transfer.setResume(resume); |
| return this; |
| } |
| |
| public Interaction transferAborted(final Boolean aborted) |
| { |
| _transfer.setAborted(aborted); |
| return this; |
| } |
| |
| public Interaction transferPayload(final QpidByteBuffer payload) |
| { |
| _transfer.setPayload(payload); |
| return this; |
| } |
| |
| public Interaction transferPayloadData(final Object payload) |
| { |
| transferPayload(_transfer, payload); |
| return this; |
| } |
| |
| private void transferPayload(final Transfer transfer, final Object payload) |
| { |
| AmqpValue amqpValue = new AmqpValue(payload); |
| final AmqpValueSection section = amqpValue.createEncodingRetainingSection(); |
| try (QpidByteBuffer encodedForm = section.getEncodedForm()) |
| { |
| transfer.setPayload(encodedForm); |
| } |
| finally |
| { |
| section.dispose(); |
| } |
| } |
| |
| public Interaction transfer() throws Exception |
| { |
| sendPerformativeAndChainFuture(copyTransfer(_transfer), _sessionChannel); |
| return this; |
| } |
| |
| private Transfer copyTransfer(final Transfer transfer) |
| { |
| final Transfer transferCopy = new Transfer(); |
| transferCopy.setHandle(transfer.getHandle()); |
| transferCopy.setDeliveryId(transfer.getDeliveryId()); |
| transferCopy.setDeliveryTag(transfer.getDeliveryTag()); |
| transferCopy.setMessageFormat(transfer.getMessageFormat()); |
| transferCopy.setSettled(transfer.getSettled()); |
| transferCopy.setMore(transfer.getMore()); |
| transferCopy.setRcvSettleMode(transfer.getRcvSettleMode()); |
| transferCopy.setState(transfer.getState()); |
| transferCopy.setResume(transfer.getResume()); |
| transferCopy.setAborted(transfer.getAborted()); |
| transferCopy.setBatchable(transfer.getBatchable()); |
| try (QpidByteBuffer payload = transfer.getPayload()) |
| { |
| if (payload != null) |
| { |
| transferCopy.setPayload(payload); |
| } |
| } |
| return transferCopy; |
| } |
| |
| ///////////////// |
| // disposition // |
| ///////////////// |
| |
| public Interaction dispositionSettled(final boolean settled) |
| { |
| _disposition.setSettled(settled); |
| return this; |
| } |
| |
| public Interaction dispositionState(final DeliveryState state) |
| { |
| _disposition.setState(state); |
| return this; |
| } |
| |
| |
| public Interaction dispositionTransactionalState(final Binary transactionId, final Outcome outcome) |
| { |
| TransactionalState state = new TransactionalState(); |
| state.setTxnId(transactionId); |
| state.setOutcome(outcome); |
| return dispositionState(state); |
| } |
| |
| public Interaction dispositionTransactionalStateFromCurrentTransaction(final Outcome outcome) |
| { |
| return dispositionTransactionalState(getCurrentTransactionId(), outcome); |
| } |
| |
| public Interaction dispositionRole(final Role role) |
| { |
| _disposition.setRole(role); |
| return this; |
| } |
| |
| public Interaction dispositionFirst(final UnsignedInteger deliveryId) |
| { |
| _disposition.setFirst(deliveryId); |
| return this; |
| } |
| |
| |
| public Interaction dispositionLast(final UnsignedInteger last) |
| { |
| _disposition.setLast(last); |
| return this; |
| } |
| |
| public Interaction dispositionFirstFromLatestDelivery() |
| { |
| _disposition.setFirst(_latestDeliveryId); |
| return this; |
| } |
| |
| public Interaction disposition() throws Exception |
| { |
| sendPerformativeAndChainFuture(copyDisposition(_disposition), _sessionChannel); |
| return this; |
| } |
| |
| private Disposition copyDisposition(final Disposition disposition) |
| { |
| final Disposition dispositionCopy = new Disposition(); |
| dispositionCopy.setRole(disposition.getRole()); |
| dispositionCopy.setFirst(disposition.getFirst()); |
| dispositionCopy.setLast(disposition.getLast()); |
| dispositionCopy.setSettled(disposition.getSettled()); |
| dispositionCopy.setState(disposition.getState()); |
| dispositionCopy.setBatchable(disposition.getBatchable()); |
| return dispositionCopy; |
| } |
| |
| ///////////////// |
| // transaction // |
| //////////////// |
| |
| |
| public UnsignedInteger getCoordinatorHandle() |
| { |
| return _transactionalState == null ? null : _transactionalState.getHandle(); |
| } |
| |
| public Binary getCurrentTransactionId() |
| { |
| return _transactionalState == null ? null : _transactionalState.getCurrentTransactionId(); |
| } |
| |
| public DeliveryState getCoordinatorLatestDeliveryState() |
| { |
| return _transactionalState == null ? null : _transactionalState.getDeliveryState(); |
| } |
| |
| public Interaction txnAttachCoordinatorLink(final UnsignedInteger handle) throws Exception |
| { |
| return txnAttachCoordinatorLink(handle, |
| this::txDefaultUnexpectedResponseHandler, |
| Accepted.ACCEPTED_SYMBOL, |
| Rejected.REJECTED_SYMBOL); |
| } |
| |
| public Interaction txnAttachCoordinatorLink(final UnsignedInteger handle, |
| final Consumer<Response<?>> unexpectedResponseHandler) throws Exception |
| { |
| return txnAttachCoordinatorLink(handle, |
| unexpectedResponseHandler, |
| Accepted.ACCEPTED_SYMBOL, |
| Rejected.REJECTED_SYMBOL); |
| } |
| |
| public Interaction txnAttachCoordinatorLink(final UnsignedInteger handle, |
| final Consumer<Response<?>> unexpectedResponseHandler, |
| final Symbol... outcomes) throws Exception |
| { |
| Attach attach = new Attach(); |
| attach.setName("testTransactionCoordinator-" + handle); |
| attach.setHandle(handle); |
| attach.setInitialDeliveryCount(UnsignedInteger.ZERO); |
| attach.setTarget(new Coordinator()); |
| attach.setRole(Role.SENDER); |
| Source source = new Source(); |
| attach.setSource(source); |
| source.setOutcomes(outcomes); |
| _transactionalState = new InteractionTransactionalState(handle); |
| sendPerformativeAndChainFuture(attach, _sessionChannel); |
| |
| // attach expected |
| Consumer<Response<?>> handler = unexpectedResponseHandler == null |
| ? this::txDefaultUnexpectedResponseHandler |
| : unexpectedResponseHandler; |
| consumeResponse().assertLatestResponse(handler); |
| |
| // flow expected |
| consumeResponse().assertLatestResponse(handler); |
| |
| Flow flow = getLatestResponse(Flow.class); |
| _coordinatorCredits.set(flow.getLinkCredit().longValue()); |
| return this; |
| } |
| |
| private void txDefaultUnexpectedResponseHandler(Response<?> r) |
| { |
| if (r == null || r.getBody() == null || !(r.getBody() instanceof Attach || r.getBody() instanceof Flow)) |
| { |
| throw new IllegalStateException(String.format("Could not attach coordinator link. Got %s", r)); |
| } |
| } |
| |
| public Interaction txnDeclare() throws Exception |
| { |
| sendPayloadToCoordinator(new Declare(), _transactionalState.getHandle()); |
| final DeliveryState state = handleCoordinatorResponse(); |
| _transactionalState.setDeliveryState(state); |
| final Binary transactionId = ((Declared) state).getTxnId(); |
| _transactionalState.setLastTransactionId(transactionId); |
| return this; |
| } |
| |
| public Interaction txnSendDischarge(final boolean failed) |
| throws Exception |
| { |
| return txnSendDischarge(_transactionalState.getCurrentTransactionId(), failed); |
| } |
| |
| public Interaction txnSendDischarge(Binary transactionId, final boolean failed) |
| throws Exception |
| { |
| final Discharge discharge = new Discharge(); |
| discharge.setTxnId(transactionId); |
| discharge.setFail(failed); |
| sendPayloadToCoordinator(discharge, _transactionalState.getHandle()); |
| return this; |
| } |
| |
| public Interaction txnDischarge(boolean failed) throws Exception |
| { |
| return txnDischarge(_transactionalState.getCurrentTransactionId(), failed); |
| } |
| |
| public Interaction txnDischarge(Binary transactionId, boolean failed) throws Exception |
| { |
| txnSendDischarge(transactionId, failed); |
| final DeliveryState state = handleCoordinatorResponse(); |
| _transactionalState.setDeliveryState(state); |
| return this; |
| } |
| |
| private void sendPayloadToCoordinator(final Object payload, final UnsignedInteger handle) |
| throws Exception |
| { |
| final Transfer transfer = createTransactionTransfer(handle); |
| transferPayload(transfer, payload); |
| sendPerformativeAndChainFuture(transfer, _sessionChannel); |
| } |
| |
| private DeliveryState handleCoordinatorResponse() throws Exception |
| { |
| final Set<Class<?>> expected = new HashSet<>(Collections.singletonList(Disposition.class)); |
| if (_coordinatorCredits.decrementAndGet() == 0) |
| { |
| expected.add(Flow.class); |
| } |
| |
| final Map<Class<?>, ?> responses = consumeResponses(expected, Collections.singleton(Flow.class)); |
| |
| final Disposition disposition = (Disposition) responses.get(Disposition.class); |
| if (expected.contains(Flow.class)) |
| { |
| Flow flow = (Flow) responses.get(Flow.class); |
| if (flow.getHandle().equals(getCoordinatorHandle())) |
| { |
| final UnsignedInteger linkCredit = flow.getLinkCredit(); |
| if (linkCredit != null) |
| { |
| _coordinatorCredits.set(linkCredit.longValue()); |
| } |
| } |
| } |
| if (!Boolean.TRUE.equals(disposition.getSettled())) |
| { |
| throw new IllegalStateException("Coordinator disposition is not settled"); |
| } |
| return disposition.getState(); |
| } |
| |
| private Map<Class<?>, ?> consumeResponses(final Set<Class<?>> responseTypes, Set<Class<?>> ignore) |
| throws Exception |
| { |
| final Map<Class<?>, Object> results = new HashMap<>(); |
| final Set<Class<?>> expected = new HashSet<>(responseTypes); |
| expected.addAll(ignore); |
| do |
| { |
| Response<?> response = consumeResponse(expected).getLatestResponse(); |
| if (response != null && response.getBody() instanceof FrameBody) |
| { |
| Class<?> bodyClass = response.getBody().getClass(); |
| results.put(bodyClass, response.getBody()); |
| } |
| } |
| while (!results.keySet().containsAll(responseTypes)); |
| return results; |
| } |
| |
| private Transfer createTransactionTransfer(final UnsignedInteger handle) |
| { |
| Transfer transfer = new Transfer(); |
| transfer.setHandle(handle); |
| transfer.setDeliveryId(getNextDeliveryId()); |
| transfer.setDeliveryTag(new Binary(("transaction-" |
| + transfer.getDeliveryId()).getBytes(StandardCharsets.UTF_8))); |
| return transfer; |
| } |
| |
| ////////// |
| // misc // |
| ////////// |
| |
| public Interaction sendPerformative(final FrameBody frameBody, |
| final UnsignedShort channel) throws Exception |
| { |
| sendPerformativeAndChainFuture(frameBody, channel); |
| return this; |
| } |
| |
| public Interaction sendPerformative(final SaslFrameBody saslFrameBody) throws Exception |
| { |
| sendPerformativeAndChainFuture(saslFrameBody); |
| return this; |
| } |
| |
| private void sendPerformativeAndChainFuture(final SaslFrameBody frameBody) throws Exception |
| { |
| SASLFrame transportFrame = new SASLFrame(frameBody); |
| sendPerformativeAndChainFuture(transportFrame); |
| } |
| |
| private void sendPerformativeAndChainFuture(final FrameBody frameBody, final UnsignedShort channel) throws Exception |
| { |
| final TransportFrame transportFrame; |
| try (QpidByteBuffer payload = frameBody instanceof Transfer ? ((Transfer) frameBody).getPayload() : null) |
| { |
| final QpidByteBuffer duplicate; |
| if (payload == null) |
| { |
| duplicate = null; |
| } |
| else |
| { |
| duplicate = payload.duplicate(); |
| } |
| transportFrame = new TransportFrame(channel.shortValue(), frameBody, duplicate); |
| ListenableFuture<Void> listenableFuture = sendPerformativeAndChainFuture(transportFrame); |
| if (frameBody instanceof Transfer) |
| { |
| listenableFuture.addListener(() -> ((Transfer) frameBody).dispose(), MoreExecutors.directExecutor()); |
| } |
| if (duplicate != null) |
| { |
| listenableFuture.addListener(() -> duplicate.dispose(), MoreExecutors.directExecutor()); |
| } |
| } |
| } |
| |
| public Interaction flowHandleFromLinkHandle() |
| { |
| _flow.setHandle(_attach.getHandle()); |
| return this; |
| } |
| |
| private UnsignedInteger getNextDeliveryId() |
| { |
| return UnsignedInteger.valueOf(_deliveryIdCounter++); |
| } |
| |
| public Interaction receiveDelivery(Class<?>... ignore) throws Exception |
| { |
| sync(); |
| _latestDelivery = receiveAllTransfers(ignore); |
| _latestDeliveryId = _latestDelivery.size() > 0 ? _latestDelivery.get(0).getDeliveryId() : null; |
| _receivedDeliveryCount.incrementAndGet(); |
| return this; |
| } |
| |
| public UnsignedInteger getLatestDeliveryId() |
| { |
| return _latestDeliveryId; |
| } |
| |
| public Interaction decodeLatestDelivery() throws AmqpErrorException |
| { |
| MessageDecoder messageDecoder = new MessageDecoder(); |
| _latestDelivery.forEach(transfer -> |
| { |
| messageDecoder.addTransfer(transfer); |
| transfer.dispose(); |
| }); |
| _decodedLatestDelivery = messageDecoder.getData(); |
| _latestDeliveryApplicationProperties = messageDecoder.getApplicationProperties(); |
| _latestDelivery = null; |
| return this; |
| } |
| |
| public List<Transfer> getLatestDelivery() |
| { |
| return _latestDelivery; |
| } |
| |
| public Object getDecodedLatestDelivery() |
| { |
| return _decodedLatestDelivery; |
| } |
| |
| public Map<String, Object> getLatestDeliveryApplicationProperties() |
| { |
| return _latestDeliveryApplicationProperties; |
| } |
| |
| private List<Transfer> receiveAllTransfers(final Class<?>... ignore) throws Exception |
| { |
| List<Transfer> transfers = new ArrayList<>(); |
| boolean hasMore = true; |
| do |
| { |
| Set<Class<?>> responseTypesSet = new HashSet<>(Arrays.asList(ignore)); |
| responseTypesSet.add(Transfer.class); |
| Class<?>[] responseTypes = responseTypesSet.toArray(new Class<?>[responseTypesSet.size()]); |
| Response<?> latestResponse = consumeResponse(responseTypes).getLatestResponse(); |
| if (latestResponse.getBody() instanceof Transfer) |
| { |
| Transfer responseTransfer = (Transfer) latestResponse.getBody(); |
| hasMore = Boolean.TRUE.equals(responseTransfer.getMore()); |
| transfers.add(responseTransfer); |
| } |
| } |
| while (hasMore); |
| |
| return transfers; |
| } |
| |
| /////////// |
| // Empty // |
| /////////// |
| |
| public Interaction emptyFrame() throws Exception |
| { |
| sendPerformative(EMPTY_FRAME, UnsignedShort.ZERO); |
| return this; |
| } |
| |
| public <T> T consume(final Class<T> expected, final Class<?>... ignore) |
| throws Exception |
| { |
| final Class<?>[] expectedResponses = Arrays.copyOf(ignore, ignore.length + 1); |
| expectedResponses[ignore.length] = expected; |
| |
| T completed = null; |
| do |
| { |
| Response<?> response = consumeResponse(expectedResponses).getLatestResponse(); |
| if (expected.isAssignableFrom(response.getBody().getClass())) |
| { |
| completed = (T) response.getBody(); |
| } |
| } |
| while (completed == null); |
| return completed; |
| } |
| |
| @Override |
| protected Response<?> getNextResponse() throws Exception |
| { |
| Response<?> response = super.getNextResponse(); |
| if (response != null && response.getBody() instanceof FrameBody) |
| { |
| _latestResponses.put(response.getBody().getClass(), (FrameBody)response.getBody()); |
| } |
| return response; |
| } |
| |
| public <T> Interaction assertLatestResponse(Class<T> type, Consumer<T> assertion) |
| { |
| T latestResponse = getLatestResponse(type); |
| assertion.accept(latestResponse); |
| return this; |
| } |
| |
| public Interaction assertLatestResponse(Consumer<Response<?>> assertion) |
| { |
| Response<?> latestResponse = getLatestResponse(); |
| assertion.accept(latestResponse); |
| return this; |
| } |
| |
| public void detachEndCloseUnconditionally() throws Exception |
| { |
| detachClose(true).detach().end().close().sync(); |
| } |
| |
| public Interaction closeUnconditionally() throws Exception |
| { |
| close().sync(); |
| return this; |
| } |
| |
| public Interaction negotiateOpen() throws Exception |
| { |
| sendOpen().consumeResponse(Open.class); |
| return this; |
| } |
| |
| public Interaction sendOpen() throws Exception |
| { |
| if ((_portType == BrokerAdmin.PortType.ANONYMOUS_AMQP || _portType == BrokerAdmin.PortType.ANONYMOUS_AMQPWS) |
| && _brokerAdmin.isAnonymousSupported()) |
| { |
| sendProtocolAndOpen(); |
| } |
| else if (_portType == BrokerAdmin.PortType.AMQP |
| && _brokerAdmin.isSASLSupported() |
| && _brokerAdmin.isSASLMechanismSupported(PLAIN)) |
| { |
| sendSasl(); |
| protocolHeader(getTransport().getProtocolHeader()).sendProtocolAndOpen(); |
| } |
| else |
| { |
| throw new IllegalStateException("Only ANONYMOUS or PLAIN authentication currently supported by the tests"); |
| } |
| return this; |
| } |
| |
| private void sendProtocolAndOpen() throws Exception |
| { |
| negotiateProtocol().consumeResponse().open(); |
| } |
| |
| private void sendSasl() throws Exception |
| { |
| final byte[] protocolResponse = protocolHeader(SASL_AMQP_HEADER_BYTES) |
| .negotiateProtocol().consumeResponse() |
| .getLatestResponse(byte[].class); |
| if (!Arrays.equals(SASL_AMQP_HEADER_BYTES, protocolResponse)) |
| { |
| throw new IllegalStateException(String.format( |
| "Unexpected protocol '%s' is reported from broker supporting SASL", |
| StringUtil.toHex(protocolResponse))); |
| } |
| |
| final SaslMechanisms mechanisms = consumeResponse().getLatestResponse(SaslMechanisms.class); |
| final Symbol[] supportedMechanisms = mechanisms.getSaslServerMechanisms(); |
| if (Arrays.stream(supportedMechanisms).noneMatch(m -> m.toString().equalsIgnoreCase(PLAIN))) |
| { |
| if (Arrays.stream(supportedMechanisms) |
| .noneMatch(m -> m.toString().equalsIgnoreCase(AnonymousAuthenticationManager.MECHANISM_NAME))) |
| { |
| throw new IllegalStateException(String.format( |
| "PLAIN or ANONYMOUS SASL mechanism is not listed among supported '%s'", |
| Arrays.stream(supportedMechanisms) |
| .map(String::valueOf) |
| .collect(Collectors.joining(",")))); |
| } |
| else |
| { |
| authenticate(AnonymousAuthenticationManager.MECHANISM_NAME, new byte[0]); |
| } |
| } |
| else |
| { |
| byte[] initialResponseBytes = |
| String.format("\0%s\0%s", _brokerAdmin.getValidUsername(), _brokerAdmin.getValidPassword()) |
| .getBytes(StandardCharsets.US_ASCII); |
| authenticate(PLAIN, initialResponseBytes); |
| } |
| } |
| |
| private void authenticate(final String mechanism, final byte[] initialResponseBytes) throws Exception |
| { |
| final SaslOutcome saslOutcome = saslMechanism(Symbol.getSymbol(mechanism)) |
| .saslInitialResponse(new Binary(initialResponseBytes)) |
| .saslInit().consumeResponse() |
| .getLatestResponse(SaslOutcome.class); |
| |
| if (!SaslCode.OK.equals(saslOutcome.getCode())) |
| { |
| throw new IllegalStateException("Authentication failed."); |
| } |
| } |
| } |