blob: 0fa2b673bb8b97d53bf3b3f8fda04d481a6f5f7b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro.ipc.netty;
import java.io.IOException;
import java.io.OutputStream;
import static org.junit.Assert.assertEquals;
import io.netty.channel.socket.SocketChannel;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.junit.Assert;
import org.apache.avro.ipc.Responder;
import org.apache.avro.ipc.Server;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.ipc.specific.SpecificRequestor;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.avro.test.Mail;
import org.apache.avro.test.Message;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestNettyServer {
static final int CONNECT_TIMEOUT_MILLIS = 2000; // 2 sec
protected static Server server;
protected static Transceiver transceiver;
protected static Mail proxy;
protected static MailImpl mailService;
protected static Consumer<SocketChannel> channelInitializer;
public static class MailImpl implements Mail {
private CountDownLatch allMessages = new CountDownLatch(5);
// in this simple example just return details of the message
public String send(Message message) {
return "Sent message to [" + message.getTo() + "] from [" + message.getFrom() + "] with body ["
+ message.getBody() + "]";
}
public void fireandforget(Message message) {
allMessages.countDown();
}
private void awaitMessages() throws InterruptedException {
allMessages.await(2, TimeUnit.SECONDS);
}
private void assertAllMessagesReceived() {
assertEquals(0, allMessages.getCount());
}
public void reset() {
allMessages = new CountDownLatch(5);
}
}
public static void initializeConnections(Consumer<SocketChannel> initializer) throws Exception {
initializeConnections(initializer, initializer);
}
public static void initializeConnections(Consumer<SocketChannel> serverInitializer,
Consumer<SocketChannel> transceiverInitializer) throws Exception {
System.out.println("starting server...");
channelInitializer = transceiverInitializer;
mailService = new MailImpl();
Responder responder = new SpecificResponder(Mail.class, mailService);
server = new NettyServer(responder, new InetSocketAddress(0), serverInitializer);
server.start();
int serverPort = server.getPort();
System.out.println("server port : " + serverPort);
transceiver = new NettyTransceiver(new InetSocketAddress(serverPort), CONNECT_TIMEOUT_MILLIS,
transceiverInitializer, null);
proxy = SpecificRequestor.getClient(Mail.class, transceiver);
}
@BeforeClass
public static void initializeConnections() throws Exception {
initializeConnections(null);
}
@AfterClass
public static void tearDownConnections() throws Exception {
transceiver.close();
server.close();
}
@Test
public void testRequestResponse() throws Exception {
for (int x = 0; x < 5; x++) {
verifyResponse(proxy.send(createMessage()));
}
}
private void verifyResponse(String result) {
Assert.assertEquals("Sent message to [wife] from [husband] with body [I love you!]", result);
}
@Test
public void testOneway() throws Exception {
for (int x = 0; x < 5; x++) {
proxy.fireandforget(createMessage());
}
mailService.awaitMessages();
mailService.assertAllMessagesReceived();
}
@Test
public void testMixtureOfRequests() throws Exception {
mailService.reset();
for (int x = 0; x < 5; x++) {
Message createMessage = createMessage();
proxy.fireandforget(createMessage);
verifyResponse(proxy.send(createMessage));
}
mailService.awaitMessages();
mailService.assertAllMessagesReceived();
}
@Test
public void testConnectionsCount() throws Exception {
Transceiver transceiver2 = new NettyTransceiver(new InetSocketAddress(server.getPort()), CONNECT_TIMEOUT_MILLIS,
channelInitializer);
Mail proxy2 = SpecificRequestor.getClient(Mail.class, transceiver2);
proxy.fireandforget(createMessage());
proxy2.fireandforget(createMessage());
Assert.assertEquals(2, ((NettyServer) server).getNumActiveConnections());
transceiver2.close();
// Check the active connections with some retries as closing at the client
// side might not take effect on the server side immediately
int numActiveConnections = ((NettyServer) server).getNumActiveConnections();
for (int i = 0; i < 50 && numActiveConnections == 2; ++i) {
System.out.println("Server still has 2 active connections; retrying...");
Thread.sleep(100);
numActiveConnections = ((NettyServer) server).getNumActiveConnections();
}
Assert.assertEquals(1, numActiveConnections);
}
private Message createMessage() {
Message msg = Message.newBuilder().setTo("wife").setFrom("husband").setBody("I love you!").build();
return msg;
}
// send a malformed request (HTTP) to the NettyServer port
@Test
public void testBadRequest() throws IOException {
int port = server.getPort();
String msg = "GET /status HTTP/1.1\n\n";
InetSocketAddress sockAddr = new InetSocketAddress("127.0.0.1", port);
try (Socket sock = new Socket()) {
sock.connect(sockAddr);
OutputStream out = sock.getOutputStream();
out.write(msg.getBytes(StandardCharsets.UTF_8));
out.flush();
byte[] buf = new byte[2048];
int bytesRead = sock.getInputStream().read(buf);
Assert.assertTrue("Connection should have been closed: " + bytesRead, bytesRead == -1);
}
}
}