blob: 5002b817ae87566456736d1584f09427d7df692b [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.net.tests;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import junit.framework.Assert;
import org.junit.Test;
import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
import edu.uci.ics.hyracks.net.protocols.muxdemux.ChannelControlBlock;
import edu.uci.ics.hyracks.net.protocols.muxdemux.IChannelOpenListener;
import edu.uci.ics.hyracks.net.protocols.muxdemux.MultiplexedConnection;
import edu.uci.ics.hyracks.net.protocols.muxdemux.MuxDemux;
public class NetTest {
@Test
public void test() throws Exception {
AtomicBoolean failFlag = new AtomicBoolean();
MuxDemux md1 = createMuxDemux("md1", failFlag);
md1.start();
MuxDemux md2 = createMuxDemux("md2", failFlag);
md2.start();
InetSocketAddress md2Address = md2.getLocalAddress();
MultiplexedConnection md1md2 = md1.connect(md2Address);
Thread t1 = createThread(md1md2, 1);
Thread t2 = createThread(md1md2, -1);
t1.start();
t2.start();
t1.join();
t2.join();
Assert.assertFalse("Failure flag was set to true", failFlag.get());
}
private Thread createThread(final MultiplexedConnection md1md2, final int factor) {
return new Thread() {
@Override
public void run() {
try {
ChannelControlBlock md1md2c1 = md1md2.openChannel();
final Semaphore sem = new Semaphore(1);
sem.acquire();
md1md2c1.getWriteInterface().setEmptyBufferAcceptor(new IBufferAcceptor() {
@Override
public void accept(ByteBuffer buffer) {
}
});
md1md2c1.getReadInterface().setFullBufferAcceptor(new ICloseableBufferAcceptor() {
@Override
public void accept(ByteBuffer buffer) {
}
@Override
public void error(int ecode) {
}
@Override
public void close() {
sem.release();
}
});
ICloseableBufferAcceptor fba = md1md2c1.getWriteInterface().getFullBufferAcceptor();
for (int i = 0; i < 10000; ++i) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
for (int j = 0; j < 256; ++j) {
buffer.putInt(factor * (i + j));
}
buffer.flip();
fba.accept(buffer);
}
fba.close();
sem.acquire();
} catch (Exception e) {
e.printStackTrace();
}
}
};
}
private MuxDemux createMuxDemux(final String label, final AtomicBoolean failFlag) {
IChannelOpenListener md1OpenListener = new IChannelOpenListener() {
@Override
public void channelOpened(final ChannelControlBlock channel) {
final ChannelIO cio = new ChannelIO(label, channel);
channel.getReadInterface().setFullBufferAcceptor(cio.rifba);
channel.getWriteInterface().setEmptyBufferAcceptor(cio.wieba);
final IBufferAcceptor rieba = channel.getReadInterface().getEmptyBufferAcceptor();
for (int i = 0; i < 50; ++i) {
rieba.accept(ByteBuffer.allocate(1024));
}
new Thread() {
private int prevTotal = 0;
@Override
public void run() {
while (true) {
ByteBuffer fbuf = null;
synchronized (channel) {
while (!cio.eos && cio.ecode == 0 && cio.rifq.isEmpty()) {
try {
channel.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (!cio.rifq.isEmpty()) {
fbuf = cio.rifq.poll();
} else if (cio.ecode != 0) {
throw new RuntimeException("Error: " + cio.ecode);
} else if (cio.eos) {
channel.getWriteInterface().getFullBufferAcceptor().close();
return;
}
}
int counter = 0;
while (fbuf.remaining() > 0) {
counter += fbuf.getInt();
}
if (prevTotal != 0) {
if (Math.abs(counter - prevTotal) != 256) {
failFlag.set(true);
}
}
prevTotal = counter;
fbuf.compact();
rieba.accept(fbuf);
}
}
}.start();
}
};
return new MuxDemux(new InetSocketAddress("127.0.0.1", 0), md1OpenListener, 1, 5);
}
private class ChannelIO {
private ChannelControlBlock channel;
private Queue<ByteBuffer> rifq;
private Queue<ByteBuffer> wieq;
private boolean eos;
private int ecode;
private ICloseableBufferAcceptor rifba;
private IBufferAcceptor wieba;
public ChannelIO(final String label, ChannelControlBlock channel) {
this.channel = channel;
this.rifq = new LinkedList<ByteBuffer>();
this.wieq = new LinkedList<ByteBuffer>();
rifba = new ICloseableBufferAcceptor() {
@Override
public void accept(ByteBuffer buffer) {
rifq.add(buffer);
ChannelIO.this.channel.notifyAll();
}
@Override
public void error(int ecode) {
ChannelIO.this.ecode = ecode;
ChannelIO.this.channel.notifyAll();
}
@Override
public void close() {
eos = true;
ChannelIO.this.channel.notifyAll();
}
};
wieba = new IBufferAcceptor() {
@Override
public void accept(ByteBuffer buffer) {
wieq.add(buffer);
ChannelIO.this.channel.notifyAll();
}
};
}
}
}