blob: 269594f0f86cbabb05bb79c03c41af228d30610e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.omid.tso;
import org.apache.omid.NetworkUtils;
import org.apache.phoenix.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.omid.metrics.NullMetricsProvider;
import org.apache.omid.proto.TSOProto;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFactory;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyCollectionOf;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@SuppressWarnings({"UnusedDeclaration", "StatementWithEmptyBody"})
public class TestTSOChannelHandlerNetty {
private static final Logger LOG = LoggerFactory.getLogger(TestTSOChannelHandlerNetty.class);
@Mock
private
RequestProcessor requestProcessor;
@BeforeMethod
public void beforeTestMethod() {
MockitoAnnotations.initMocks(this);
}
private TSOChannelHandler getTSOChannelHandler(int port) {
TSOServerConfig config = new TSOServerConfig();
config.setPort(port);
return new TSOChannelHandler(config, requestProcessor, new NullMetricsProvider());
}
@Test(timeOut = 10_000)
public void testMainAPI() throws Exception {
int port = NetworkUtils.getFreePort();
TSOChannelHandler channelHandler = getTSOChannelHandler(port);
try {
// Check initial state
assertNull(channelHandler.listeningChannel);
assertNull(channelHandler.allChannels);
// Check initial connection
channelHandler.reconnect();
assertTrue(channelHandler.listeningChannel.isOpen());
assertEquals(channelHandler.allChannels.size(), 1);
assertEquals(((InetSocketAddress) channelHandler.listeningChannel.localAddress()).getPort(), port);
// Check connection close
channelHandler.closeConnection();
assertFalse(channelHandler.listeningChannel.isOpen());
assertEquals(channelHandler.allChannels.size(), 0);
// Check re-closing connection
channelHandler.closeConnection();
assertFalse(channelHandler.listeningChannel.isOpen());
assertEquals(channelHandler.allChannels.size(), 0);
// Check connection after closing
channelHandler.reconnect();
assertTrue(channelHandler.listeningChannel.isOpen());
assertEquals(channelHandler.allChannels.size(), 1);
// Check re-connection
channelHandler.reconnect();
assertTrue(channelHandler.listeningChannel.isOpen());
assertEquals(channelHandler.allChannels.size(), 1);
// Exercise closeable with re-connection trial
channelHandler.close();
assertFalse(channelHandler.listeningChannel.isOpen());
assertEquals(channelHandler.allChannels.size(), 0);
try {
channelHandler.reconnect();
fail("Can't reconnect after closing");
} catch (Exception e) {
// Expected: Can't reconnect after closing
assertFalse(channelHandler.listeningChannel.isOpen());
assertEquals(channelHandler.allChannels.size(), 0);
}
} finally {
if(channelHandler != null) channelHandler.close();
}
}
@Test(timeOut = 10_000)
public void testNettyConnectionToTSOFromClient() throws Exception {
int port = NetworkUtils.getFreePort();
TSOChannelHandler channelHandler = getTSOChannelHandler(port);
try {
Bootstrap nettyClient = createNettyClientBootstrap();
ChannelFuture channelF = nettyClient.connect(new InetSocketAddress("localhost", port));
// ------------------------------------------------------------------------------------------------------------
// Test the client can't connect cause the server is not there
// ------------------------------------------------------------------------------------------------------------
while (!channelF.isDone()) /** do nothing */ ;
assertFalse(channelF.isSuccess());
// ------------------------------------------------------------------------------------------------------------
// Test creation of a server connection
// ------------------------------------------------------------------------------------------------------------
channelHandler.reconnect();
assertTrue(channelHandler.listeningChannel.isOpen());
// Eventually the channel group of the server should contain the listening channel
assertEquals(channelHandler.allChannels.size(), 1);
// ------------------------------------------------------------------------------------------------------------
// Test that a client can connect now
// ------------------------------------------------------------------------------------------------------------
channelF = nettyClient.connect(new InetSocketAddress("localhost", port));
while (!channelF.isDone()) /** do nothing */ ;
assertTrue(channelF.isSuccess());
assertTrue(channelF.channel().isActive());
// Eventually the channel group of the server should have 2 elements
while (channelHandler.allChannels.size() != 2) /** do nothing */ ;
// ------------------------------------------------------------------------------------------------------------
// Close the channel on the client side and test we have one element less in the channel group
// ------------------------------------------------------------------------------------------------------------
channelF.channel().close().await();
// Eventually the channel group of the server should have only one element
while (channelHandler.allChannels.size() != 1) /** do nothing */ ;
// ------------------------------------------------------------------------------------------------------------
// Open a new channel and test the connection closing on the server side through the channel handler
// ------------------------------------------------------------------------------------------------------------
channelF = nettyClient.connect(new InetSocketAddress("localhost", port));
while (!channelF.isDone()) /** do nothing */ ;
assertTrue(channelF.isSuccess());
// Eventually the channel group of the server should have 2 elements again
while (channelHandler.allChannels.size() != 2) /** do nothing */ ;
channelHandler.closeConnection();
assertFalse(channelHandler.listeningChannel.isOpen());
assertEquals(channelHandler.allChannels.size(), 0);
// Wait some time and check the channel was closed
TimeUnit.SECONDS.sleep(1);
assertFalse(channelF.channel().isOpen());
// ------------------------------------------------------------------------------------------------------------
// Test server re-connections with connected clients
// ------------------------------------------------------------------------------------------------------------
// Connect first time
channelHandler.reconnect();
assertTrue(channelHandler.listeningChannel.isOpen());
// Eventually the channel group of the server should contain the listening channel
assertEquals(channelHandler.allChannels.size(), 1);
// Check the client can connect
channelF = nettyClient.connect(new InetSocketAddress("localhost", port));
while (!channelF.isDone()) /** do nothing */ ;
assertTrue(channelF.isSuccess());
// Eventually the channel group of the server should have 2 elements
while (channelHandler.allChannels.size() != 2) /** do nothing */ ;
// Re-connect and check that client connection was gone
channelHandler.reconnect();
assertTrue(channelHandler.listeningChannel.isOpen());
// Eventually the channel group of the server should contain the listening channel
assertEquals(channelHandler.allChannels.size(), 1);
// Wait some time and check the channel was closed
TimeUnit.SECONDS.sleep(1);
assertFalse(channelF.channel().isOpen());
// ------------------------------------------------------------------------------------------------------------
// Test closeable interface with re-connection trial
// ------------------------------------------------------------------------------------------------------------
channelHandler.close();
assertFalse(channelHandler.listeningChannel.isOpen());
assertEquals(channelHandler.allChannels.size(), 0);
} finally {
if (channelHandler != null) channelHandler.close();
}
}
@Test(timeOut = 10_000)
public void testNettyChannelWriting() throws Exception {
int port = NetworkUtils.getFreePort();
TSOChannelHandler channelHandler = getTSOChannelHandler(port);
try {
// ------------------------------------------------------------------------------------------------------------
// Prepare test
// ------------------------------------------------------------------------------------------------------------
// Connect channel handler
channelHandler.reconnect();
// Create client and connect it
Bootstrap nettyClient = createNettyClientBootstrap();
ChannelFuture channelF = nettyClient.connect(new InetSocketAddress("localhost", port));
// Basic checks for connection
while (!channelF.isDone()) /** do nothing */ ;
assertTrue(channelF.isSuccess());
assertTrue(channelF.channel().isActive());
Channel channel = channelF.channel();
// Eventually the channel group of the server should have 2 elements
while (channelHandler.allChannels.size() != 2) /** do nothing */ ;
// Write first handshake request
TSOProto.HandshakeRequest.Builder handshake = TSOProto.HandshakeRequest.newBuilder();
// NOTE: Add here the required handshake capabilities when necessary
handshake.setClientCapabilities(TSOProto.Capabilities.newBuilder().build());
channelF.channel().writeAndFlush(TSOProto.Request.newBuilder().setHandshakeRequest(handshake.build()).build());
// ------------------------------------------------------------------------------------------------------------
// Test channel writing
// ------------------------------------------------------------------------------------------------------------
testWritingTimestampRequest(channel);
testWritingCommitRequest(channel);
testWritingFenceRequest(channel);
} finally {
if(channelHandler != null) channelHandler.close();
}
}
private void testWritingTimestampRequest(Channel channel) throws InterruptedException {
// Reset mock
reset(requestProcessor);
TSOProto.Request.Builder tsBuilder = TSOProto.Request.newBuilder();
TSOProto.TimestampRequest.Builder tsRequestBuilder = TSOProto.TimestampRequest.newBuilder();
tsBuilder.setTimestampRequest(tsRequestBuilder.build());
// Write into the channel
channel.writeAndFlush(tsBuilder.build()).await();
verify(requestProcessor, timeout(100).times(1)).timestampRequest(any(), any(MonitoringContext.class));
verify(requestProcessor, timeout(100).times(0))
.commitRequest(anyLong(), anyCollectionOf(Long.class), anyCollectionOf(Long.class), anyBoolean(), any(), any(MonitoringContext.class));
}
private void testWritingCommitRequest(Channel channel) throws InterruptedException {
// Reset mock
reset(requestProcessor);
TSOProto.Request.Builder commitBuilder = TSOProto.Request.newBuilder();
TSOProto.CommitRequest.Builder commitRequestBuilder = TSOProto.CommitRequest.newBuilder();
commitRequestBuilder.setStartTimestamp(666);
commitRequestBuilder.addCellId(666);
commitBuilder.setCommitRequest(commitRequestBuilder.build());
TSOProto.Request r = commitBuilder.build();
assertTrue(r.hasCommitRequest());
// Write into the channel
channel.writeAndFlush(commitBuilder.build()).await();
verify(requestProcessor, timeout(100).times(0)).timestampRequest(any(), any(MonitoringContext.class));
verify(requestProcessor, timeout(100).times(1))
.commitRequest(eq(666L), anyCollectionOf(Long.class), anyCollectionOf(Long.class), eq(false), any(), any(MonitoringContext.class));
}
private void testWritingFenceRequest(Channel channel) throws InterruptedException {
// Reset mock
reset(requestProcessor);
TSOProto.Request.Builder fenceBuilder = TSOProto.Request.newBuilder();
TSOProto.FenceRequest.Builder fenceRequestBuilder = TSOProto.FenceRequest.newBuilder();
fenceRequestBuilder.setTableId(666);
fenceBuilder.setFenceRequest(fenceRequestBuilder.build());
TSOProto.Request r = fenceBuilder.build();
assertTrue(r.hasFenceRequest());
// Write into the channel
channel.writeAndFlush(fenceBuilder.build()).await();
verify(requestProcessor, timeout(100).times(0)).timestampRequest(any(), any(MonitoringContext.class));
verify(requestProcessor, timeout(100).times(1))
.fenceRequest(eq(666L), any(), any(MonitoringContext.class));
}
// ----------------------------------------------------------------------------------------------------------------
// Helper methods
// ----------------------------------------------------------------------------------------------------------------
private Bootstrap createNettyClientBootstrap() {
ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("tsoclient-worker-%d").build();
EventLoopGroup workerGroup = new NioEventLoopGroup(1, workerThreadFactory);
Bootstrap bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.SO_REUSEADDR, true);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 100);
bootstrap.group(workerGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(8 * 1024, 0, 4, 0, 4));
pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
pipeline.addLast("protobufdecoder", new ProtobufDecoder(TSOProto.Response.getDefaultInstance()));
pipeline.addLast("protobufencoder", new ProtobufEncoder());
pipeline.addLast("testhandler", new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) {
LOG.info("Channel {} active", ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOG.error("Error on channel {}", ctx.channel(), cause);
}
});
}
});
return bootstrap;
}
}