blob: e49f435d6058803db0bd081e4a335f0ed12e5f01 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.control.nc.comm;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.comm.IConnectionEntry;
import edu.uci.ics.hyracks.api.comm.IDataReceiveListener;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
public class ConnectionEntry implements IConnectionEntry {
private static final Logger LOGGER = Logger.getLogger(ConnectionEntry.class.getName());
private SocketChannel socketChannel;
private final ByteBuffer readBuffer;
private final ByteBuffer writeBuffer;
private IDataReceiveListener recvListener;
private Object attachment;
private final SelectionKey key;
private UUID jobId;
private UUID stageId;
private boolean aborted;
public ConnectionEntry(IHyracksRootContext ctx, SocketChannel socketChannel, SelectionKey key) {
this.socketChannel = socketChannel;
readBuffer = ctx.allocateFrame();
readBuffer.clear();
writeBuffer = ctx.allocateFrame();
writeBuffer.clear();
this.key = key;
}
public SocketChannel getSocketChannel() {
return socketChannel;
}
public boolean dispatch(SelectionKey key) throws IOException {
if (aborted) {
recvListener.dataReceived(this);
} else {
if (key.isReadable()) {
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.finer("Before read: " + readBuffer.position() + " " + readBuffer.limit());
}
int bytesRead = socketChannel.read(readBuffer);
if (bytesRead < 0) {
recvListener.eos(this);
return true;
}
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.finer("After read: " + readBuffer.position() + " " + readBuffer.limit());
}
recvListener.dataReceived(this);
} else if (key.isWritable()) {
synchronized (this) {
writeBuffer.flip();
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.finer("Before write: " + writeBuffer.position() + " " + writeBuffer.limit());
}
int bytesWritten = socketChannel.write(writeBuffer);
if (bytesWritten < 0) {
return true;
}
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.finer("After write: " + writeBuffer.position() + " " + writeBuffer.limit());
}
if (writeBuffer.remaining() <= 0) {
int ops = key.interestOps();
key.interestOps(ops & ~SelectionKey.OP_WRITE);
}
writeBuffer.compact();
notifyAll();
}
} else {
LOGGER.warning("Spurious event triggered: " + key.readyOps());
return true;
}
}
return false;
}
@Override
public ByteBuffer getReadBuffer() {
return readBuffer;
}
@Override
public synchronized void write(ByteBuffer buffer) {
while (buffer.remaining() > 0) {
while (writeBuffer.remaining() <= 0) {
try {
wait();
} catch (InterruptedException e) {
}
}
int oldLimit = buffer.limit();
buffer.limit(Math.min(oldLimit, writeBuffer.remaining()));
writeBuffer.put(buffer);
buffer.limit(oldLimit);
int ops = key.interestOps();
key.interestOps(ops | SelectionKey.OP_WRITE);
key.selector().wakeup();
}
}
@Override
public void setDataReceiveListener(IDataReceiveListener listener) {
this.recvListener = listener;
}
@Override
public void attach(Object attachment) {
this.attachment = attachment;
}
@Override
public Object getAttachment() {
return attachment;
}
@Override
public void close() {
try {
socketChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public SelectionKey getSelectionKey() {
return key;
}
@Override
public UUID getJobId() {
return jobId;
}
@Override
public void setJobId(UUID jobId) {
this.jobId = jobId;
}
@Override
public UUID getStageId() {
return stageId;
}
@Override
public void setStageId(UUID stageId) {
this.stageId = stageId;
}
@Override
public void abort() {
aborted = true;
}
@Override
public boolean aborted() {
return aborted;
}
}