blob: a13d9c3a95481b3a58f910e3b0d7bf5e78792277 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.protonj2.test.driver;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.qpid.protonj2.test.driver.actions.ScriptCompleteAction;
import org.apache.qpid.protonj2.test.driver.codec.primitives.DescribedType;
import org.apache.qpid.protonj2.test.driver.codec.security.SaslDescribedType;
import org.apache.qpid.protonj2.test.driver.codec.security.SaslOutcome;
import org.apache.qpid.protonj2.test.driver.codec.transport.AMQPHeader;
import org.apache.qpid.protonj2.test.driver.codec.transport.HeartBeat;
import org.apache.qpid.protonj2.test.driver.codec.transport.Open;
import org.apache.qpid.protonj2.test.driver.codec.transport.PerformativeDescribedType;
import org.apache.qpid.protonj2.test.driver.exceptions.UnexpectedPerformativeError;
import org.apache.qpid.protonj2.test.driver.expectations.ConnectionDropExpectation;
import org.apache.qpid.protonj2.test.driver.expectations.SaslOutcomeExpectation;
import org.apache.qpid.protonj2.test.driver.netty.NettyEventLoop;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test driver object used to drive inputs and inspect outputs of an Engine.
*/
public class AMQPTestDriver implements Consumer<ByteBuffer> {
private static final Logger LOG = LoggerFactory.getLogger(AMQPTestDriver.class);
private final String driverName;
private final FrameDecoder frameParser;
private final FrameEncoder frameEncoder;
private Open localOpen;
private Open remoteOpen;
private final DriverSessions sessions = new DriverSessions(this);
private final Consumer<ByteBuffer> frameConsumer;
private final Consumer<AssertionError> assertionConsumer;
private final Supplier<NettyEventLoop> schedulerSupplier;
private volatile List<ByteBuffer> deferredWrites = new ArrayList<>();
private volatile AssertionError failureCause;
private int advertisedIdleTimeout = 0;
private volatile int emptyFrameCount;
private volatile int performativeCount;
private volatile int saslPerformativeCount;
private int inboundMaxFrameSize = Integer.MAX_VALUE;
private int outboundMaxFrameSize = Integer.MAX_VALUE;
/**
* Holds the expectations for processing of data from the peer under test.
* Uses a thread safe queue to avoid contention on adding script entries
* and processing incoming data (although you should probably not do that).
*/
private final Queue<ScriptedElement> script = new ArrayDeque<>();
/**
* Create a test driver instance connected to the given Engine instance.
*
* @param name
* A unique test driver name that should allow log inspection of sends and receiver for a driver instance.
* @param frameConsumer
* A {@link Consumer} that will accept encoded frames in ProtonBuffer instances.
* @param scheduler
* A {@link Supplier} that will provide this driver with a scheduler service for delayed actions
*/
public AMQPTestDriver(String name, Consumer<ByteBuffer> frameConsumer, Supplier<NettyEventLoop> scheduler) {
this(name, frameConsumer, null, scheduler);
}
/**
* Create a test driver instance connected to the given Engine instance.
*
* @param name
* A unique test driver name that should allow log inspection of sends and receiver for a driver instance.
* @param frameConsumer
* A {@link Consumer} that will accept encoded frames in ProtonBuffer instances.
* @param assertionConsumer
* A {@link Consumer} that will handle test assertions from the scripted expectations
* @param scheduler
* A {@link Supplier} that will provide this driver with a scheduler service for delayed actions
*/
public AMQPTestDriver(String name, Consumer<ByteBuffer> frameConsumer, Consumer<AssertionError> assertionConsumer, Supplier<NettyEventLoop> scheduler) {
this.frameConsumer = frameConsumer;
this.assertionConsumer = assertionConsumer;
this.schedulerSupplier = scheduler;
this.driverName = name;
// Configure test driver resources
this.frameParser = new FrameDecoder(this);
this.frameEncoder = new FrameEncoder(this);
}
/**
* @return the Sessions tracking manager for this driver.
*/
public DriverSessions sessions() {
return sessions;
}
/**
* @return the assigned name of this AMQP test driver
*/
public Object getName() {
return driverName;
}
//----- View the test driver state
/**
* @return the idle timeout value that will be set on outgoing Open frames.
*/
public int getAdvertisedIdleTimeout() {
return advertisedIdleTimeout;
}
/**
* Sets the idle timeout which the test driver should supply to remote in its Open frame.
*
* @param advertisedIdleTimeout
* The idle timeout value to supply to remote peers.
*/
public void setAdvertisedIdleTimeout(int advertisedIdleTimeout) {
this.advertisedIdleTimeout = advertisedIdleTimeout;
}
/**
* @return the number of empty frames that the remote has sent for idle timeout processing.
*/
public int getEmptyFrameCount() {
return emptyFrameCount;
}
/**
* @return the number of performative frames that this test peer has processed from incoming bytes.
*/
public int getPerformativeCount() {
return performativeCount;
}
/**
* @return the number of SASL performative frames that this test peer has processed from incoming bytes.
*/
public int getSaslPerformativeCount() {
return saslPerformativeCount;
}
/**
* @return the maximum allowed inbound frame size.
*/
public int getInboundMaxFrameSize() {
return inboundMaxFrameSize;
}
/**
* Sets a limiting value on the size of incoming frames after which the peer will signal an error.
*
* @param maxSize
* The maximum incoming frame size limit for this peer.
*/
public void setInboundMaxFrameSize(int maxSize) {
this.inboundMaxFrameSize = maxSize;
}
/**
* @return the maximum allowed outbound frame size.
*/
public int getOutboundMaxFrameSize() {
return outboundMaxFrameSize;
}
/**
* Sets a limiting value on the size of outgoing frames after which the peer will signal an error.
*
* @param maxSize
* The maximum outgoing frame size limit for this peer.
*/
public void setOutboundMaxFrameSize(int maxSize) {
this.outboundMaxFrameSize = maxSize;
}
//----- Accepts encoded AMQP frames for processing
/**
* Supply incoming bytes read to the test driver via an Netty {@link ByteBuffer} instance.
*
* @param buffer
* The Netty {@link ByteBuffer} that contains new incoming bytes read.
*/
@Override
public void accept(ByteBuffer buffer) {
LOG.trace("{} processing new inbound buffer of size: {}", driverName, buffer.remaining());
try {
// Process off all encoded frames from this buffer one at a time.
while (buffer.remaining() > 0 && failureCause == null) {
LOG.trace("{} ingestion of {} bytes starting now.", driverName, buffer.remaining());
frameParser.ingest(buffer);
LOG.trace("{} ingestion completed cycle, remaining bytes in buffer: {}", driverName, buffer.remaining());
}
} catch (AssertionError e) {
signalFailure(e);
}
}
/**
* @return the remote {@link Open} that this peer received (if any).
*/
public Open getRemoteOpen() {
return remoteOpen;
}
/**
* @return the local {@link Open} that this peer sent (if any).
*/
public Open getLocalOpen() {
return localOpen;
}
//----- Test driver handling of decoded AMQP frames
void handleConnectedEstablished() throws AssertionError {
synchronized (script) {
ScriptedElement peekNext = script.peek();
if (peekNext instanceof ScriptedAction) {
processScript(peekNext);
}
}
}
void handleConnectedDropped() throws AssertionError {
synchronized (script) {
// For now we just reset the parser as any new connection would need to
// send an AMQP header, other validation could be added if we expand
// processing on client disconnect events.
resetToExpectingAMQPHeader();
// Reset connection tracking state to empty as the connection is gone
// and we want new connection to be able to reuse handles etc.
sessions.reset();
// Check if the currently pending scripted expectation is for the connection
// to drop in which case we remove it and unblock any waiters on the script
// to complete, if this is the final scripted entry.
final ScriptedElement scriptEntry = script.peek();
if (scriptEntry instanceof ConnectionDropExpectation) {
processScript(script.poll());
}
}
}
void handleHeader(AMQPHeader header) throws AssertionError {
synchronized (script) {
final ScriptedElement scriptEntry = script.poll();
if (scriptEntry == null) {
signalFailure(new AssertionError("Received header when not expecting any input."));
}
try {
header.invoke(scriptEntry, this);
} catch (Throwable t) {
if (scriptEntry.isOptional()) {
handleHeader(header);
} else {
LOG.warn(t.getMessage());
signalFailure(t);
throw t;
}
}
processScript(scriptEntry);
}
}
void handleSaslPerformative(int frameSize, SaslDescribedType sasl, int channel, ByteBuffer payload) throws AssertionError {
synchronized (script) {
final ScriptedElement scriptEntry = script.poll();
if (scriptEntry == null) {
signalFailure(new AssertionError("Received performative[" + sasl + "] when not expecting any input."));
}
try {
sasl.invoke(scriptEntry, frameSize, this);
} catch (UnexpectedPerformativeError e) {
if (scriptEntry.isOptional()) {
handleSaslPerformative(frameSize, sasl, channel, payload);
} else {
signalFailure(e);
throw e;
}
} catch (AssertionError assertion) {
LOG.warn(assertion.getMessage());
signalFailure(assertion);
throw assertion;
}
processScript(scriptEntry);
}
}
void handlePerformative(int frameSize, PerformativeDescribedType amqp, int channel, ByteBuffer payload) throws AssertionError {
switch (amqp.getPerformativeType()) {
case HEARTBEAT:
break;
case OPEN:
remoteOpen = (Open) amqp;
default:
performativeCount++;
break;
}
synchronized (script) {
final ScriptedElement scriptEntry = script.poll();
if (scriptEntry == null) {
signalFailure(new AssertionError("Received performative[" + amqp + "] when not expecting any input."));
}
try {
amqp.invoke(scriptEntry, frameSize, payload, channel, this);
} catch (UnexpectedPerformativeError e) {
if (scriptEntry.isOptional()) {
handlePerformative(frameSize, amqp, channel, payload);
} else {
signalFailure(e);
throw e;
}
} catch (AssertionError assertion) {
LOG.warn(assertion.getMessage());
signalFailure(assertion);
throw assertion;
}
processScript(scriptEntry);
}
}
void handleHeartbeat(int frameSize, int channel) {
emptyFrameCount++;
handlePerformative(frameSize, HeartBeat.INSTANCE, channel, null);
}
/**
* Submits a new {@link ScriptedAction} which should be performed after waiting for the
* given delay period in milliseconds.
*
* @param delay
* The time in milliseconds to wait before performing the action
* @param action
* The action that should be performed after the given delay.
*/
public synchronized void afterDelay(int delay, ScriptedAction action) {
Objects.requireNonNull(schedulerSupplier, "This driver cannot schedule delayed events, no scheduler available");
NettyEventLoop scheduler = schedulerSupplier.get();
Objects.requireNonNull(scheduler, "This driver cannot schedule delayed events, no scheduler available");
scheduler.schedule(() -> {
LOG.trace("{} running delayed action: {}", driverName, action);
action.perform(this);
}, delay, TimeUnit.MILLISECONDS);
}
//----- Test driver actions
/**
* Resets the frame parser to a state where it expects to read an AMQP Header type instead
* of a normal AMQP frame type. By default the parser starts in this state and then switches
* to frames after the first header is read. In cases where SASL is in use this needs to be
* reset to the header read state when the outcome is known. Normally the script should handle
* this via a {@link SaslOutcomeExpectation} but other script variations might want to drive
* this manually.
*/
public void resetToExpectingAMQPHeader() {
this.frameParser.resetToExpectingHeader();
}
/**
* Waits indefinitely for the scripted expectations and actions to be performed. If the script
* execution encounters an error this method will throw an {@link AssertionError} that describes
* the error.
*/
public void waitForScriptToComplete() {
checkFailed();
ScriptCompleteAction possibleWait = null;
synchronized (script) {
checkFailed();
if (!script.isEmpty()) {
possibleWait = new ScriptCompleteAction(this).queue();
}
}
if (possibleWait != null) {
try {
possibleWait.await();
} catch (InterruptedException e) {
Thread.interrupted();
signalFailure("Interrupted while waiting for script to complete");
}
}
checkFailed();
}
/**
* Waits indefinitely for the scripted expectations and actions to be performed. If the script
* execution encounters an error this method will not throw an {@link AssertionError} that describes
* the error but simply ignore it and return.
*/
public void waitForScriptToCompleteIgnoreErrors() {
ScriptCompleteAction possibleWait = null;
synchronized (script) {
if (!script.isEmpty()) {
possibleWait = new ScriptCompleteAction(this).queue();
}
}
if (possibleWait != null) {
try {
possibleWait.await();
} catch (InterruptedException e) {
Thread.interrupted();
signalFailure("Interrupted while waiting for script to complete");
}
}
}
/**
* Waits for the given amount of time for the scripted expectations and actions to be performed. If
* the script execution encounters an error this method will throw an {@link AssertionError} that
* describes the error.
*
* @param timeout
* The time in milliseconds to wait for the scripted expectations to complete.
*/
public void waitForScriptToComplete(long timeout) {
waitForScriptToComplete(timeout, TimeUnit.SECONDS);
}
/**
* Waits for the given amount of time for the scripted expectations and actions to be performed. If
* the script execution encounters an error this method will throw an {@link AssertionError} that
* describes the error.
*
* @param timeout
* The time to wait for the scripted expectations to complete.
* @param units
* The {@link TimeUnit} instance that converts the given time value.
*/
public void waitForScriptToComplete(long timeout, TimeUnit units) {
checkFailed();
ScriptCompleteAction possibleWait = null;
synchronized (script) {
checkFailed();
if (!script.isEmpty()) {
possibleWait = new ScriptCompleteAction(this).queue();
}
}
if (possibleWait != null) {
try {
possibleWait.await(timeout, units);
} catch (InterruptedException e) {
Thread.interrupted();
signalFailure("Interrupted while waiting for script to complete");
}
}
checkFailed();
}
/**
* Adds the script element to the list of scripted expectations and actions that should occur
* in order to complete the test outcome.
*
* @param element
* The element to add to the script.
*/
public void addScriptedElement(ScriptedElement element) {
checkFailed();
synchronized (script) {
checkFailed();
script.offer(element);
}
}
/**
* Encodes the given frame data into a ProtonBuffer and injects it into the configured consumer.
*
* @param channel
* The channel to use when writing the frame
* @param performative
* The AMQP Performative to write
* @param payload
* The payload to include in the encoded frame.
*/
public final void sendAMQPFrame(int channel, DescribedType performative, ByteBuffer payload) {
sendAMQPFrame(channel, performative, payload, false);
}
/**
* Encodes the given frame data into a ProtonBuffer but does not write the data until the next non-deferred
* send if initiated.
*
* @param channel
* The channel to use when writing the frame
* @param performative
* The AMQP Performative to write
* @param payload
* The payload to include in the encoded frame.
* @param splitWrite
* Should the data be written in multiple chunks
*/
public void deferAMQPFrame(int channel, DescribedType performative, ByteBuffer payload, boolean splitWrite) {
LOG.trace("{} Deferring write of performative: {}", driverName, performative);
try {
deferredWrites.add(frameEncoder.handleWrite(performative, channel, payload, null));
} catch (Throwable t) {
signalFailure(new AssertionError("Frame was not written due to error.", t));
}
}
/**
* Encodes the given frame data into a ProtonBuffer but does not write the data until the next non-deferred
* send if initiated.
*
* @param channel
* The channel to use when writing the frame
* @param performative
* The SASL Performative to write
*/
public void deferSaslFrame(int channel, DescribedType performative) {
// When the outcome of SASL is written the decoder should revert to initial state
// as the only valid next incoming value is an AMQP header.
if (performative instanceof SaslOutcome) {
frameParser.resetToExpectingHeader();
}
LOG.trace("{} Deferring SASL performative write: {}", driverName, performative);
try {
deferredWrites.add(frameEncoder.handleWrite(performative, channel));
} catch (Throwable t) {
signalFailure(new AssertionError("Frame was not written due to error.", t));
}
}
/**
* Encodes the given Header data into a ProtonBuffer but does not write the data until the next non-deferred
* send if initiated.
*
* @param header
* The byte array to send as the AMQP Header.
*/
public void deferHeader(AMQPHeader header) {
LOG.trace("{} Deferring AMQP Header write: {}", driverName, header);
try {
deferredWrites.add(ByteBuffer.wrap(Arrays.copyOf(header.getBuffer(), AMQPHeader.HEADER_SIZE_BYTES)));
} catch (Throwable t) {
signalFailure(new AssertionError("Frame was not consumed due to error.", t));
}
}
/**
* Encodes the given frame data into a ProtonBuffer and injects it into the configured consumer.
*
* @param channel
* The channel to use when writing the frame
* @param performative
* The AMQP Performative to write
* @param payload
* The payload to include in the encoded frame.
* @param splitWrite
* Should the data be written in multiple chunks
*/
public void sendAMQPFrame(int channel, DescribedType performative, ByteBuffer payload, boolean splitWrite) {
LOG.trace("{} Sending performative: {}", driverName, performative);
if (performative instanceof PerformativeDescribedType) {
switch (((PerformativeDescribedType) performative).getPerformativeType()) {
case OPEN:
localOpen = (Open) performative;
default:
break;
}
}
final ByteBuffer output;
final ByteBuffer buffer = frameEncoder.handleWrite(performative, channel, payload, null);
if (!deferredWrites.isEmpty()) {
deferredWrites.add(buffer);
try {
output = composeDefferedWrites(deferredWrites).asReadOnlyBuffer();
} finally {
deferredWrites.clear();
}
LOG.trace("{} appending deferred buffer {} to next write.", driverName, output);
} else {
output = buffer;
}
try {
LOG.trace("{} Writing out buffer {} to consumer: {}", driverName, output, frameConsumer);
if (splitWrite) {
final int bufferSplitPoint = output.remaining() / 2;
final byte[] front = new byte[bufferSplitPoint - output.position()];
final byte[] rear = new byte[output.remaining() - bufferSplitPoint];
output.get(front);
output.get(rear);
frameConsumer.accept(ByteBuffer.wrap(front));
frameConsumer.accept(ByteBuffer.wrap(rear));
} else {
frameConsumer.accept(output);
}
} catch (Throwable t) {
signalFailure(new AssertionError("Frame was not written due to error.", t));
}
}
/**
* Encodes the given frame data into a ProtonBuffer and injects it into the configured consumer.
*
* @param channel
* The channel to use when writing the frame
* @param performative
* The SASL Performative to write
*/
public void sendSaslFrame(int channel, DescribedType performative) {
// When the outcome of SASL is written the decoder should revert to initial state
// as the only valid next incoming value is an AMQP header.
if (performative instanceof SaslOutcome) {
frameParser.resetToExpectingHeader();
}
final ByteBuffer output;
try {
final ByteBuffer buffer = frameEncoder.handleWrite(performative, channel);
if (!deferredWrites.isEmpty()) {
deferredWrites.add(buffer);
try {
output = composeDefferedWrites(deferredWrites).asReadOnlyBuffer();
} finally {
deferredWrites.clear();
}
LOG.trace("{} appending deferred buffer {} to next write.", driverName, output);
} else {
output = buffer;
}
LOG.trace("{} Sending SASL performative: {}", driverName, performative);
frameConsumer.accept(output);
} catch (Throwable t) {
signalFailure(new AssertionError("Frame was not written due to error.", t));
}
}
/**
* Send the specific header bytes to the remote frame consumer.
* @param header
* The byte array to send as the AMQP Header.
*/
public void sendHeader(AMQPHeader header) {
try {
final ByteBuffer output;
if (!deferredWrites.isEmpty()) {
LOG.trace("{} appending deferred buffer {} to next write.", driverName, deferredWrites);
deferredWrites.add(ByteBuffer.wrap(header.getBuffer()).asReadOnlyBuffer());
try {
output = composeDefferedWrites(deferredWrites).asReadOnlyBuffer();
} finally {
deferredWrites.clear();
}
} else {
output = ByteBuffer.wrap(header.getBuffer()).asReadOnlyBuffer();
}
LOG.trace("{} Sending AMQP Header: {}", driverName, header);
frameConsumer.accept(output);
} catch (Throwable t) {
signalFailure(new AssertionError("Frame was not consumed due to error.", t));
}
}
/**
* Send an Empty Frame on the given channel to the remote consumer.
*
* @param channel
* the channel on which to send the empty frame.
*/
public void sendEmptyFrame(int channel) {
try {
frameConsumer.accept(frameEncoder.handleWrite(null, channel, null, null));
} catch (Throwable t) {
signalFailure(new AssertionError("Frame was not consumed due to error.", t));
}
}
/**
* Send the specific ProtonBuffer bytes to the remote frame consumer.
* @param buffer
* The buffer whose contents are to be written to the frame consumer.
*/
public void sendBytes(ByteBuffer buffer) {
LOG.trace("{} Sending bytes from ByteBuffer: {}", driverName, buffer);
try {
frameConsumer.accept(buffer.duplicate());
} catch (Throwable t) {
signalFailure(new AssertionError("Buffer was not consumed due to error.", t));
}
}
/**
* Throw an exception from processing incoming data which should be handled by the peer under test.
*
* @param ex
* The exception that triggered this call.
*
* @throws AssertionError indicating the first error that cause the driver to report test failure.
*/
public void signalFailure(Throwable ex) throws AssertionError {
if (this.failureCause == null) {
if (ex instanceof AssertionError) {
LOG.trace("{} sending failure assertion due to: ", driverName, ex);
this.failureCause = (AssertionError) ex;
} else {
LOG.trace("{} sending failure assertion due to: ", driverName, ex);
this.failureCause = new AssertionError(ex);
}
searchForScriptioCompletionAndTrigger();
if (assertionConsumer != null) {
assertionConsumer.accept(failureCause);
}
}
}
/**
* Throw an exception from processing incoming data which should be handled by the peer under test.
*
* @param message
* The error message that describes what triggered this call.
*
* @throws AssertionError that indicates the first error that failed for this driver.
*/
public void signalFailure(String message) throws AssertionError {
signalFailure(new AssertionError(message));
}
//----- Internal implementation
private static final ByteBuffer composeDefferedWrites(List<ByteBuffer> deferredWrites) {
int totalSize = 0;
for (ByteBuffer component : deferredWrites) {
totalSize += component.remaining();
}
final ByteBuffer composite = ByteBuffer.allocate(totalSize);
for (ByteBuffer component : deferredWrites) {
composite.put(component);
}
deferredWrites.clear();
return composite.flip().asReadOnlyBuffer();
}
private void searchForScriptioCompletionAndTrigger() {
script.forEach(element -> {
if (element instanceof ScriptCompleteAction) {
ScriptCompleteAction completed = (ScriptCompleteAction) element;
completed.perform(this);
}
});
}
private void processScript(ScriptedElement current) {
if (current.performAfterwards() != null && failureCause == null) {
current.performAfterwards().perform(this);
}
ScriptedElement peekNext = script.peek();
do {
if (peekNext instanceof ScriptedAction) {
script.poll();
((ScriptedAction) peekNext).perform(this);
} else {
return;
}
peekNext = script.peek();
} while (peekNext != null && failureCause == null);
}
private void checkFailed() {
if (failureCause != null) {
throw failureCause;
}
}
}