blob: 91b0454b97f424b3f606f1319a72d856759981c1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.tests.protocol;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import java.net.InetSocketAddress;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.JdkFutureAdapters;
import com.google.common.util.concurrent.ListenableFuture;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public abstract class AbstractFrameTransport<I extends AbstractInteraction<I>> implements AutoCloseable
{
static final long RESPONSE_TIMEOUT =
Long.getLong("qpid.tests.protocol.frameTransport.responseTimeout", 6000);
private static final Response CHANNEL_CLOSED_RESPONSE = new ChannelClosedResponse();
private final BlockingQueue<Response<?>> _queue = new ArrayBlockingQueue<>(1000);
private final EventLoopGroup _workerGroup;
private final InetSocketAddress _brokerAddress;
private final InputHandler _inputHandler;
private final OutputHandler _outputHandler;
private volatile Channel _channel;
private volatile boolean _channelClosedSeen = false;
public AbstractFrameTransport(final InetSocketAddress brokerAddress, InputDecoder inputDecoder, OutputEncoder outputEncoder)
{
_brokerAddress = brokerAddress;
_inputHandler = new InputHandler(_queue, inputDecoder);
_outputHandler = new OutputHandler(outputEncoder);
_workerGroup = new NioEventLoopGroup();
}
public InetSocketAddress getBrokerAddress()
{
return _brokerAddress;
}
public AbstractFrameTransport<I> connect()
{
try
{
Bootstrap b = new Bootstrap();
b.group(_workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>()
{
@Override
public void initChannel(SocketChannel ch) throws Exception
{
ChannelPipeline pipeline = ch.pipeline();
buildInputOutputPipeline(pipeline);
}
});
_channel = b.connect(_brokerAddress).sync().channel();
_channel.closeFuture().addListener(future ->
{
_channelClosedSeen = true;
_queue.add(CHANNEL_CLOSED_RESPONSE);
});
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
return this;
}
protected void buildInputOutputPipeline(final ChannelPipeline pipeline)
{
pipeline.addLast(_inputHandler).addLast(_outputHandler);
}
@Override
public void close() throws Exception
{
try
{
if (_channel != null)
{
_channel.disconnect().sync();
_channel.close().sync();
_channel = null;
}
}
finally
{
_workerGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).sync();
}
}
ListenableFuture<Void> sendProtocolHeader(final byte[] bytes) throws Exception
{
return sendBytes(bytes);
}
public ListenableFuture<Void> sendBytes(final byte[] bytes)
{
Preconditions.checkState(_channel != null, "Not connected");
ChannelPromise promise = _channel.newPromise();
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
buffer.writeBytes(bytes);
_channel.write(buffer, promise);
return JdkFutureAdapters.listenInPoolThread(promise);
}
public ListenableFuture<Void> sendPerformative(final Object data) throws Exception
{
Preconditions.checkState(_channel != null, "Not connected");
ChannelPromise promise = _channel.newPromise();
_channel.write(data, promise);
return JdkFutureAdapters.listenInPoolThread(promise);
}
<T extends Response<?>> T getNextResponse() throws Exception
{
return (T) _queue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
}
public void assertNoMoreResponses() throws Exception
{
Response response = getNextResponse();
assertThat(response, anyOf(nullValue(), instanceOf(ChannelClosedResponse.class)));
}
public void assertNoMoreResponsesAndChannelClosed() throws Exception
{
assertNoMoreResponses();
assertThat(_channelClosedSeen, is(true));
}
public void flush()
{
_channel.flush();
}
public abstract byte[] getProtocolHeader();
public abstract I newInteraction();
}