blob: 14981c3f11e0220da9d79b9cae161fffc6ef416e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.avro.ipc;
import java.io.IOException;
import java.io.OutputStream;
import static org.junit.Assert.assertEquals;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import junit.framework.Assert;
import org.apache.avro.ipc.specific.SpecificRequestor;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.avro.test.Mail;
import org.apache.avro.test.Message;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestNettyServer {
static final long CONNECT_TIMEOUT_MILLIS = 2000; // 2 sec
private static Server server;
private static Transceiver transceiver;
private static Mail proxy;
private static MailImpl mailService;
public static class MailImpl implements Mail {
private CountDownLatch allMessages = new CountDownLatch(5);
// in this simple example just return details of the message
public String send(Message message) {
return "Sent message to ["+ message.getTo().toString() +
"] from [" + message.getFrom().toString() + "] with body [" +
message.getBody().toString() + "]";
}
public void fireandforget(Message message) {
allMessages.countDown();
}
private void awaitMessages() throws InterruptedException {
allMessages.await(2, TimeUnit.SECONDS);
}
private void assertAllMessagesReceived() {
assertEquals(0, allMessages.getCount());
}
public void reset() {
allMessages = new CountDownLatch(5);
}
}
@BeforeClass
public static void initializeConnections()throws Exception {
// start server
System.out.println("starting server...");
mailService = new MailImpl();
Responder responder = new SpecificResponder(Mail.class, mailService);
server = initializeServer(responder);
server.start();
int serverPort = server.getPort();
System.out.println("server port : " + serverPort);
transceiver = initializeTransceiver(serverPort);
proxy = SpecificRequestor.getClient(Mail.class, transceiver);
}
protected static Server initializeServer(Responder responder) {
return new NettyServer(responder, new InetSocketAddress(0));
}
protected static Transceiver initializeTransceiver(int serverPort) throws IOException {
return new NettyTransceiver(new InetSocketAddress(
serverPort), CONNECT_TIMEOUT_MILLIS);
}
@AfterClass
public static void tearDownConnections() throws Exception{
transceiver.close();
server.close();
}
@Test
public void testRequestResponse() throws Exception {
for(int x = 0; x < 5; x++) {
verifyResponse(proxy.send(createMessage()));
}
}
private void verifyResponse(String result) {
Assert.assertEquals(
"Sent message to [wife] from [husband] with body [I love you!]",
result.toString());
}
@Test
public void testOneway() throws Exception {
for (int x = 0; x < 5; x++) {
proxy.fireandforget(createMessage());
}
mailService.awaitMessages();
mailService.assertAllMessagesReceived();
}
@Test
public void testMixtureOfRequests() throws Exception {
mailService.reset();
for (int x = 0; x < 5; x++) {
Message createMessage = createMessage();
proxy.fireandforget(createMessage);
verifyResponse(proxy.send(createMessage));
}
mailService.awaitMessages();
mailService.assertAllMessagesReceived();
}
@Test
public void testConnectionsCount() throws Exception {
Transceiver transceiver2 = new NettyTransceiver(new InetSocketAddress(
server.getPort()), CONNECT_TIMEOUT_MILLIS);
Mail proxy2 = SpecificRequestor.getClient(Mail.class, transceiver2);
proxy.fireandforget(createMessage());
proxy2.fireandforget(createMessage());
Assert.assertEquals(2, ((NettyServer) server).getNumActiveConnections());
transceiver2.close();
Assert.assertEquals(1, ((NettyServer) server).getNumActiveConnections());
}
private Message createMessage() {
Message msg = Message.newBuilder().
setTo("wife").
setFrom("husband").
setBody("I love you!").
build();
return msg;
}
// send a malformed request (HTTP) to the NettyServer port
@Test
public void testBadRequest() throws IOException {
int port = server.getPort();
String msg = "GET /status HTTP/1.1\n\n";
InetSocketAddress sockAddr = new InetSocketAddress("127.0.0.1", port);
Socket sock = new Socket();
sock.connect(sockAddr);
OutputStream out = sock.getOutputStream();
out.write(msg.getBytes(Charset.forName("UTF-8")));
out.flush();
byte[] buf = new byte[2048];
int bytesRead = sock.getInputStream().read(buf);
Assert.assertTrue("Connection should have been closed", bytesRead == -1);
}
}