blob: 910345624fa075486ad40bd1af895ebaacbf9e82 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.mina;
import org.apache.log4j.Logger;
import org.apache.mina.common.*;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import junit.framework.TestCase;
public class WriterTest extends TestCase
{
private static final Logger _logger = Logger.getLogger(WriterTest.class);
private static class RunnableWriterTest implements Runnable
{
private Logger _logger;
private IoSession _session;
private long _startTime;
private long[] _chunkTimes;
private int _chunkCount = 500000;
private int _chunkSize = 1024;
private CountDownLatch _notifier;
public RunnableWriterTest(Logger logger)
{
_logger = logger;
}
public void run()
{
_startTime = System.currentTimeMillis();
_notifier = new CountDownLatch(1);
for (int i = 0; i < _chunkCount; i++)
{
ByteBuffer buf = ByteBuffer.allocate(_chunkSize, false);
byte check = (byte) (i % 128);
buf.put(check);
buf.fill((byte)88, buf.remaining());
buf.flip();
_session.write(buf);
}
try
{
_logger.info("All buffers sent; waiting for receipt from server");
_notifier.await();
}
catch (InterruptedException e)
{
}
_logger.info("Completed");
long totalTime = System.currentTimeMillis() - _startTime;
_logger.info("Total time: " + totalTime);
_logger.info("MB per second: " + (_chunkSize * _chunkCount)/totalTime);
long lastChunkTime = _startTime;
double average = 0;
for (int i = 0; i < _chunkTimes.length; i++)
{
if (i == 0)
{
average = _chunkTimes[i] - _startTime;
}
else
{
long delta = _chunkTimes[i] - lastChunkTime;
if (delta != 0)
{
average = (average + delta)/2;
}
}
lastChunkTime = _chunkTimes[i];
}
_logger.info("Average chunk time: " + average + "ms");
CloseFuture cf = _session.close();
cf.join();
}
private class WriterHandler extends IoHandlerAdapter
{
private int _chunksReceived = 0;
private int _partialBytesRead = 0;
private byte _partialCheckNumber;
private int _totalBytesReceived = 0;
public void messageReceived(IoSession session, Object message) throws Exception
{
ByteBuffer result = (ByteBuffer) message;
_totalBytesReceived += result.remaining();
int size = result.remaining();
long now = System.currentTimeMillis();
if (_partialBytesRead > 0)
{
int offset = _chunkSize - _partialBytesRead;
if (size >= offset)
{
_chunkTimes[_chunksReceived++] = now;
result.position(offset);
}
else
{
// have not read even one chunk, including the previous partial bytes
_partialBytesRead += size;
return;
}
}
int chunkCount = result.remaining()/_chunkSize;
for (int i = 0; i < chunkCount; i++)
{
_chunkTimes[_chunksReceived++] = now;
byte check = result.get();
_logger.debug("Check number " + check + " read");
if (check != (byte)((_chunksReceived - 1)%128))
{
_logger.error("Check number " + check + " read when expected " + (_chunksReceived%128));
}
_logger.debug("Chunk times recorded");
try
{
result.skip(_chunkSize - 1);
}
catch (IllegalArgumentException e)
{
_logger.error("Position was: " + result.position());
_logger.error("Tried to skip to: " + (_chunkSize * i));
_logger.error("limit was; " + result.limit());
}
}
_logger.debug("Chunks received now " + _chunksReceived);
_logger.debug("Bytes received: " + _totalBytesReceived);
_partialBytesRead = result.remaining();
if (_partialBytesRead > 0)
{
_partialCheckNumber = result.get();
}
if (_chunksReceived >= _chunkCount)
{
_notifier.countDown();
}
}
public void exceptionCaught(IoSession session, Throwable cause) throws Exception
{
_logger.error("Error: " + cause, cause);
}
}
public void startWriter(int chunkSize) throws IOException, InterruptedException
{
_chunkSize = chunkSize;
IoConnector ioConnector = null;
ioConnector = new SocketConnector();
SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig();
cfg.setThreadModel(ThreadModel.MANUAL);
SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig();
scfg.setTcpNoDelay(true);
scfg.setSendBufferSize(32768);
scfg.setReceiveBufferSize(32768);
final InetSocketAddress address = new InetSocketAddress("localhost", AcceptorTest.PORT);
_logger.info("Attempting connection to " + address);
ConnectFuture future = ioConnector.connect(address, new WriterHandler());
// wait for connection to complete
future.join();
_logger.info("Connection completed");
// we call getSession which throws an IOException if there has been an error connecting
_session = future.getSession();
_chunkTimes = new long[_chunkCount];
Thread t = new Thread(this);
t.start();
t.join();
_logger.info("Test completed");
}
}
private RunnableWriterTest _runnableWriterTest = new RunnableWriterTest(_logger);
public void test1k() throws IOException, InterruptedException
{
_logger.info("Starting 1k test");
_runnableWriterTest.startWriter(1024);
}
public void test2k() throws IOException, InterruptedException
{
_logger.info("Starting 2k test");
_runnableWriterTest.startWriter(2048);
}
public void test4k() throws IOException, InterruptedException
{
_logger.info("Starting 4k test");
_runnableWriterTest.startWriter(4096);
}
public void test8k() throws IOException, InterruptedException
{
_logger.info("Starting 8k test");
_runnableWriterTest.startWriter(8192);
}
public void test16k() throws IOException, InterruptedException
{
_logger.info("Starting 16k test");
_runnableWriterTest.startWriter(16384);
}
public void test32k() throws IOException, InterruptedException
{
_logger.info("Starting 32k test");
_runnableWriterTest.startWriter(32768);
}
public static void main(String[] args) throws IOException, InterruptedException
{
WriterTest w = new WriterTest();
//w.test1k();
//w.test2k();
//w.test4k();
w.test8k();
//w.test16k();
//w.test32k();
}
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(WriterTest.class);
}
}