blob: 5675cbfddf9633f46a99887e0907b966339918aa [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Random;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.net.NetUtils;
/**
* This test provokes partial writes in the server, which is
* serving multiple clients.
*/
public class TestIPCServerResponder extends TestCase {
public static final Log LOG =
LogFactory.getLog(TestIPCServerResponder.class);
private static Configuration conf = new Configuration();
public TestIPCServerResponder(final String name) {
super(name);
}
private static final Random RANDOM = new Random();
private static final String ADDRESS = "0.0.0.0";
private static final int BYTE_COUNT = 1024;
private static final byte[] BYTES = new byte[BYTE_COUNT];
static {
for (int i = 0; i < BYTE_COUNT; i++)
BYTES[i] = (byte) ('a' + (i % 26));
}
private static class TestServer extends Server {
private boolean sleep;
public TestServer(final int handlerCount, final boolean sleep)
throws IOException {
super(ADDRESS, 0, BytesWritable.class, handlerCount, conf);
// Set the buffer size to half of the maximum parameter/result size
// to force the socket to block
this.setSocketSendBufSize(BYTE_COUNT / 2);
this.sleep = sleep;
}
@Override
public Writable call(RpcKind rpcKind, String protocol, Writable param,
long receiveTime) throws IOException {
if (sleep) {
try {
Thread.sleep(RANDOM.nextInt(20)); // sleep a bit
} catch (InterruptedException e) {}
}
return param;
}
}
private static class Caller extends Thread {
private Client client;
private int count;
private InetSocketAddress address;
private boolean failed;
public Caller(final Client client, final InetSocketAddress address,
final int count) {
this.client = client;
this.address = address;
this.count = count;
}
@Override
public void run() {
for (int i = 0; i < count; i++) {
try {
int byteSize = RANDOM.nextInt(BYTE_COUNT);
byte[] bytes = new byte[byteSize];
System.arraycopy(BYTES, 0, bytes, 0, byteSize);
Writable param = new BytesWritable(bytes);
Writable value = client.call(param, address);
Thread.sleep(RANDOM.nextInt(20));
} catch (Exception e) {
LOG.fatal("Caught: " + e);
failed = true;
}
}
}
}
public void testResponseBuffer() throws Exception {
Server.INITIAL_RESP_BUF_SIZE = 1;
conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,
1);
testServerResponder(1, true, 1, 1, 5);
conf = new Configuration(); // reset configuration
}
public void testServerResponder() throws Exception {
testServerResponder(10, true, 1, 10, 200);
}
public void testServerResponder(final int handlerCount,
final boolean handlerSleep,
final int clientCount,
final int callerCount,
final int callCount) throws Exception {
Server server = new TestServer(handlerCount, handlerSleep);
server.start();
InetSocketAddress address = NetUtils.getConnectAddress(server);
Client[] clients = new Client[clientCount];
for (int i = 0; i < clientCount; i++) {
clients[i] = new Client(BytesWritable.class, conf);
}
Caller[] callers = new Caller[callerCount];
for (int i = 0; i < callerCount; i++) {
callers[i] = new Caller(clients[i % clientCount], address, callCount);
callers[i].start();
}
for (int i = 0; i < callerCount; i++) {
callers[i].join();
assertFalse(callers[i].failed);
}
for (int i = 0; i < clientCount; i++) {
clients[i].stop();
}
server.stop();
}
}