blob: 31ce9243a4c01900f1b231fe32761fa35f05fd80 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.control.nc.net;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.ArrayDeque;
import java.util.Queue;
import edu.uci.ics.hyracks.api.comm.FrameHelper;
import edu.uci.ics.hyracks.api.comm.IFrameWriter;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
public class NetworkOutputChannel implements INetworkChannel, IFrameWriter {
private final IHyracksRootContext ctx;
private final Queue<ByteBuffer> emptyQueue;
private final Queue<ByteBuffer> fullQueue;
private SelectionKey key;
private boolean aborted;
private boolean eos;
private boolean eosSent;
private boolean failed;
private ByteBuffer currentBuffer;
public NetworkOutputChannel(IHyracksRootContext ctx, int nBuffers) {
this.ctx = ctx;
emptyQueue = new ArrayDeque<ByteBuffer>(nBuffers);
for (int i = 0; i < nBuffers; ++i) {
emptyQueue.add(ctx.allocateFrame());
}
fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
}
@Override
public synchronized boolean dispatchNetworkEvent() throws IOException {
if (failed || aborted) {
eos = true;
return true;
} else if (key.isWritable()) {
while (true) {
if (currentBuffer == null) {
if (eosSent) {
return true;
}
currentBuffer = fullQueue.poll();
if (currentBuffer == null) {
if (eos) {
currentBuffer = emptyQueue.poll();
currentBuffer.clear();
currentBuffer.putInt(FrameHelper.getTupleCountOffset(ctx.getFrameSize()), 0);
eosSent = true;
} else {
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
return false;
}
}
}
int bytesWritten = ((SocketChannel) key.channel()).write(currentBuffer);
if (bytesWritten < 0) {
eos = true;
return true;
}
if (currentBuffer.remaining() == 0) {
emptyQueue.add(currentBuffer);
notifyAll();
currentBuffer = null;
if (eosSent) {
return true;
}
} else {
return false;
}
}
}
return false;
}
@Override
public void setSelectionKey(SelectionKey key) {
this.key = key;
}
@Override
public SelectionKey getSelectionKey() {
return key;
}
@Override
public SocketAddress getRemoteAddress() {
return ((SocketChannel) key.channel()).socket().getRemoteSocketAddress();
}
@Override
public synchronized void abort() {
aborted = true;
}
@Override
public void open() throws HyracksDataException {
currentBuffer = null;
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
ByteBuffer destBuffer = null;
synchronized (this) {
if (aborted) {
throw new HyracksDataException("Connection has been aborted");
}
while (true) {
destBuffer = emptyQueue.poll();
if (destBuffer != null) {
break;
}
try {
wait();
} catch (InterruptedException e) {
throw new HyracksDataException(e);
}
}
}
buffer.position(0);
buffer.limit(destBuffer.capacity());
destBuffer.clear();
destBuffer.put(buffer);
destBuffer.flip();
synchronized (this) {
fullQueue.add(destBuffer);
}
key.interestOps(SelectionKey.OP_WRITE);
key.selector().wakeup();
}
@Override
public void fail() throws HyracksDataException {
failed = true;
}
@Override
public synchronized void close() throws HyracksDataException {
eos = true;
key.interestOps(SelectionKey.OP_WRITE);
key.selector().wakeup();
}
@Override
public void notifyConnectionManagerRegistration() throws IOException {
}
}