| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.mina; |
| |
| import org.apache.log4j.Logger; |
| import org.apache.mina.common.*; |
| import org.apache.mina.transport.socket.nio.SocketConnector; |
| import org.apache.mina.transport.socket.nio.SocketConnectorConfig; |
| import org.apache.mina.transport.socket.nio.SocketSessionConfig; |
| |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.util.concurrent.CountDownLatch; |
| |
| import junit.framework.TestCase; |
| |
| public class WriterTest extends TestCase |
| { |
| private static final Logger _logger = Logger.getLogger(WriterTest.class); |
| |
| private static class RunnableWriterTest implements Runnable |
| { |
| private Logger _logger; |
| |
| private IoSession _session; |
| |
| private long _startTime; |
| |
| private long[] _chunkTimes; |
| |
| private int _chunkCount = 500000; |
| |
| private int _chunkSize = 1024; |
| |
| private CountDownLatch _notifier; |
| |
| public RunnableWriterTest(Logger logger) |
| { |
| _logger = logger; |
| } |
| |
| public void run() |
| { |
| _startTime = System.currentTimeMillis(); |
| _notifier = new CountDownLatch(1); |
| for (int i = 0; i < _chunkCount; i++) |
| { |
| ByteBuffer buf = ByteBuffer.allocate(_chunkSize, false); |
| byte check = (byte) (i % 128); |
| buf.put(check); |
| buf.fill((byte)88, buf.remaining()); |
| buf.flip(); |
| _session.write(buf); |
| } |
| |
| try |
| { |
| _logger.info("All buffers sent; waiting for receipt from server"); |
| _notifier.await(); |
| } |
| catch (InterruptedException e) |
| { |
| } |
| _logger.info("Completed"); |
| long totalTime = System.currentTimeMillis() - _startTime; |
| _logger.info("Total time: " + totalTime); |
| _logger.info("MB per second: " + (_chunkSize * _chunkCount)/totalTime); |
| long lastChunkTime = _startTime; |
| double average = 0; |
| for (int i = 0; i < _chunkTimes.length; i++) |
| { |
| if (i == 0) |
| { |
| average = _chunkTimes[i] - _startTime; |
| } |
| else |
| { |
| long delta = _chunkTimes[i] - lastChunkTime; |
| if (delta != 0) |
| { |
| average = (average + delta)/2; |
| } |
| } |
| lastChunkTime = _chunkTimes[i]; |
| } |
| _logger.info("Average chunk time: " + average + "ms"); |
| CloseFuture cf = _session.close(); |
| cf.join(); |
| } |
| |
| private class WriterHandler extends IoHandlerAdapter |
| { |
| private int _chunksReceived = 0; |
| |
| private int _partialBytesRead = 0; |
| |
| private byte _partialCheckNumber; |
| |
| private int _totalBytesReceived = 0; |
| |
| public void messageReceived(IoSession session, Object message) throws Exception |
| { |
| ByteBuffer result = (ByteBuffer) message; |
| _totalBytesReceived += result.remaining(); |
| int size = result.remaining(); |
| long now = System.currentTimeMillis(); |
| if (_partialBytesRead > 0) |
| { |
| int offset = _chunkSize - _partialBytesRead; |
| if (size >= offset) |
| { |
| _chunkTimes[_chunksReceived++] = now; |
| result.position(offset); |
| } |
| else |
| { |
| // have not read even one chunk, including the previous partial bytes |
| _partialBytesRead += size; |
| return; |
| } |
| } |
| |
| int chunkCount = result.remaining()/_chunkSize; |
| |
| for (int i = 0; i < chunkCount; i++) |
| { |
| _chunkTimes[_chunksReceived++] = now; |
| byte check = result.get(); |
| _logger.debug("Check number " + check + " read"); |
| if (check != (byte)((_chunksReceived - 1)%128)) |
| { |
| _logger.error("Check number " + check + " read when expected " + (_chunksReceived%128)); |
| } |
| _logger.debug("Chunk times recorded"); |
| |
| try |
| { |
| result.skip(_chunkSize - 1); |
| } |
| catch (IllegalArgumentException e) |
| { |
| _logger.error("Position was: " + result.position()); |
| _logger.error("Tried to skip to: " + (_chunkSize * i)); |
| _logger.error("limit was; " + result.limit()); |
| } |
| } |
| _logger.debug("Chunks received now " + _chunksReceived); |
| _logger.debug("Bytes received: " + _totalBytesReceived); |
| _partialBytesRead = result.remaining(); |
| |
| if (_partialBytesRead > 0) |
| { |
| _partialCheckNumber = result.get(); |
| } |
| |
| if (_chunksReceived >= _chunkCount) |
| { |
| _notifier.countDown(); |
| } |
| |
| } |
| |
| public void exceptionCaught(IoSession session, Throwable cause) throws Exception |
| { |
| _logger.error("Error: " + cause, cause); |
| } |
| } |
| |
| public void startWriter(int chunkSize) throws IOException, InterruptedException |
| { |
| _chunkSize = chunkSize; |
| |
| IoConnector ioConnector = null; |
| |
| ioConnector = new SocketConnector(); |
| |
| SocketConnectorConfig cfg = (SocketConnectorConfig) ioConnector.getDefaultConfig(); |
| cfg.setThreadModel(ThreadModel.MANUAL); |
| SocketSessionConfig scfg = (SocketSessionConfig) cfg.getSessionConfig(); |
| scfg.setTcpNoDelay(true); |
| scfg.setSendBufferSize(32768); |
| scfg.setReceiveBufferSize(32768); |
| |
| final InetSocketAddress address = new InetSocketAddress("localhost", AcceptorTest.PORT); |
| _logger.info("Attempting connection to " + address); |
| ConnectFuture future = ioConnector.connect(address, new WriterHandler()); |
| // wait for connection to complete |
| future.join(); |
| _logger.info("Connection completed"); |
| // we call getSession which throws an IOException if there has been an error connecting |
| _session = future.getSession(); |
| _chunkTimes = new long[_chunkCount]; |
| Thread t = new Thread(this); |
| t.start(); |
| t.join(); |
| _logger.info("Test completed"); |
| } |
| } |
| |
| private RunnableWriterTest _runnableWriterTest = new RunnableWriterTest(_logger); |
| |
| public void test1k() throws IOException, InterruptedException |
| { |
| _logger.info("Starting 1k test"); |
| _runnableWriterTest.startWriter(1024); |
| } |
| |
| public void test2k() throws IOException, InterruptedException |
| { |
| _logger.info("Starting 2k test"); |
| _runnableWriterTest.startWriter(2048); |
| } |
| |
| public void test4k() throws IOException, InterruptedException |
| { |
| _logger.info("Starting 4k test"); |
| _runnableWriterTest.startWriter(4096); |
| } |
| |
| public void test8k() throws IOException, InterruptedException |
| { |
| _logger.info("Starting 8k test"); |
| _runnableWriterTest.startWriter(8192); |
| } |
| |
| public void test16k() throws IOException, InterruptedException |
| { |
| _logger.info("Starting 16k test"); |
| _runnableWriterTest.startWriter(16384); |
| } |
| |
| public void test32k() throws IOException, InterruptedException |
| { |
| _logger.info("Starting 32k test"); |
| _runnableWriterTest.startWriter(32768); |
| } |
| |
| public static void main(String[] args) throws IOException, InterruptedException |
| { |
| WriterTest w = new WriterTest(); |
| //w.test1k(); |
| //w.test2k(); |
| //w.test4k(); |
| w.test8k(); |
| //w.test16k(); |
| //w.test32k(); |
| } |
| |
| public static junit.framework.Test suite() |
| { |
| return new junit.framework.TestSuite(WriterTest.class); |
| } |
| } |