blob: 324119fc7bca826910c53e37346fcc5e51e695ec [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.protonj2.test.driver;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.qpid.protonj2.test.driver.codec.primitives.DescribedType;
import org.apache.qpid.protonj2.test.driver.codec.transport.AMQPHeader;
import org.apache.qpid.protonj2.test.driver.netty.NettyClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;
/**
* Test Client for AMQP server testing, allows for scripting the expected inputs from
* the server and outputs from the client back to the server.
*/
public class ProtonTestClient extends ProtonTestPeer implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(ProtonTestClient.class);
private final AMQPTestDriver driver;
private final NettyTestDriverClient client;
/**
* Creates a Socket Test Peer using all default Server options.
*/
public ProtonTestClient() {
this(new ProtonTestClientOptions());
}
@Override
public String getPeerName() {
return "Client";
}
/**
* Creates a Test Client using the options to configure the client connection.
*
* @param options
* The options that control the behavior of the client connection.
*/
public ProtonTestClient(ProtonTestClientOptions options) {
this.driver = new NettyAwareAMQPTestDriver(this::processDriverOutput, this::processDriverAssertion, this::eventLoop);
this.client = new NettyTestDriverClient(options);
}
public void connect(String hostname, int port) throws IOException {
client.connect(hostname, port);
}
@Override
public AMQPTestDriver getDriver() {
return driver;
}
@Override
protected void processConnectionEstablished() {
LOG.trace("AMQP Client connected to remote.");
driver.handleConnectedEstablished();
}
@Override
protected void processCloseRequest() {
try {
client.close();
} catch (Throwable e) {
LOG.info("Error suppressed on client stop: ", e);
}
}
@Override
protected void processDriverOutput(ByteBuffer frame) {
LOG.trace("AMQP Client Channel writing: {}", frame);
client.write(frame);
}
protected void processChannelInput(ByteBuf input) {
LOG.trace("AMQP Test Client Channel processing: {}", input);
driver.accept(input);
}
protected void processDriverAssertion(AssertionError error) {
LOG.trace("AMQP Test Client Closing due to error: {}", error.getMessage());
close();
}
protected ScheduledExecutorService eventLoop() {
return client.eventLoop();
}
//----- Test driver Wrapper to ensure actions occur on the event loop
private final class NettyAwareAMQPTestDriver extends AMQPTestDriver {
public NettyAwareAMQPTestDriver(Consumer<ByteBuffer> frameConsumer, Consumer<AssertionError> assertionConsumer, Supplier<ScheduledExecutorService> scheduler) {
super(getPeerName(), frameConsumer, assertionConsumer, scheduler);
}
// If the send call occurs from a reaction to processing incoming data the
// call will be on the event loop but for actions requested by the test that
// are directed to happen immediately they will be running on the test thread
// and so we direct the resulting action into the event loop to avoid codec or
// other driver resources being used on two different threads.
@Override
public void sendAMQPFrame(int channel, DescribedType performative, ByteBuf payload) {
EventLoop loop = client.eventLoop();
if (loop.inEventLoop()) {
super.sendAMQPFrame(channel, performative, payload);
} else {
loop.execute(() -> {
super.sendAMQPFrame(channel, performative, payload);
});
}
}
@Override
public void sendSaslFrame(int channel, DescribedType performative) {
EventLoop loop = client.eventLoop();
if (loop.inEventLoop()) {
super.sendSaslFrame(channel, performative);
} else {
loop.execute(() -> {
super.sendSaslFrame(channel, performative);
});
}
}
@Override
public void sendHeader(AMQPHeader header) {
EventLoop loop = client.eventLoop();
if (loop.inEventLoop()) {
super.sendHeader(header);
} else {
loop.execute(() -> {
super.sendHeader(header);
});
}
}
@Override
public void sendEmptyFrame(int channel) {
EventLoop loop = client.eventLoop();
if (loop.inEventLoop()) {
super.sendEmptyFrame(channel);
} else {
loop.execute(() -> {
super.sendEmptyFrame(channel);
});
}
}
}
//----- Channel handler that drives IO for the test driver
private final class NettyTestDriverClient extends NettyClient {
public NettyTestDriverClient(ProtonTestClientOptions options) {
super(options);
}
@Override
protected ChannelHandler getClientHandler() {
return new SimpleChannelInboundHandler<ByteBuf>() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
processConnectionEstablished();
ctx.fireChannelActive();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf input) throws Exception {
LOG.trace("AMQP Test Client Channel read: {}", input);
try {
// Create a stable copy to avoid issue with retained buffer slices when input is pooled.
ByteBuf copy = Unpooled.buffer(input.readableBytes());
copy.writeBytes(input.nioBuffer());
input.skipBytes(input.readableBytes());
// Driver processes new data and may produce output based on this.
processChannelInput(copy);
} catch (Throwable e) {
LOG.error("Closed AMQP Test client channel due to error: ", e);
ctx.channel().close();
}
}
};
}
}
}