blob: 58f70881491f062d650a5ae707622583f68570be [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.control.nc.net;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.channels.IInputChannel;
import edu.uci.ics.hyracks.api.channels.IInputChannelMonitor;
import edu.uci.ics.hyracks.api.comm.FrameHelper;
import edu.uci.ics.hyracks.api.context.IHyracksRootContext;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.exceptions.HyracksException;
import edu.uci.ics.hyracks.api.partitions.PartitionId;
public class NetworkInputChannel implements IInputChannel, INetworkChannel {
private static final Logger LOGGER = Logger.getLogger(NetworkInputChannel.class.getName());
private final ConnectionManager connectionManager;
private final SocketAddress remoteAddress;
private final PartitionId partitionId;
private final Queue<ByteBuffer> emptyQueue;
private final Queue<ByteBuffer> fullQueue;
private SocketChannel socketChannel;
private SelectionKey key;
private ByteBuffer currentBuffer;
private boolean eos;
private boolean aborted;
private IInputChannelMonitor monitor;
private Object attachment;
private ByteBuffer writeBuffer;
public NetworkInputChannel(IHyracksRootContext ctx, ConnectionManager connectionManager,
SocketAddress remoteAddress, PartitionId partitionId, int nBuffers) {
this.connectionManager = connectionManager;
this.remoteAddress = remoteAddress;
this.partitionId = partitionId;
this.emptyQueue = new ArrayDeque<ByteBuffer>(nBuffers);
for (int i = 0; i < nBuffers; ++i) {
emptyQueue.add(ctx.allocateFrame());
}
fullQueue = new ArrayDeque<ByteBuffer>(nBuffers);
aborted = false;
eos = false;
}
@Override
public void registerMonitor(IInputChannelMonitor monitor) throws HyracksException {
this.monitor = monitor;
}
@Override
public void setAttachment(Object attachment) {
this.attachment = attachment;
}
@Override
public Object getAttachment() {
return attachment;
}
@Override
public synchronized ByteBuffer getNextBuffer() {
return fullQueue.poll();
}
@Override
public synchronized void recycleBuffer(ByteBuffer buffer) {
buffer.clear();
emptyQueue.add(buffer);
if (!eos && !aborted) {
int ops = key.interestOps();
if ((ops & SelectionKey.OP_READ) == 0) {
key.interestOps(ops | SelectionKey.OP_READ);
key.selector().wakeup();
if (currentBuffer == null) {
currentBuffer = emptyQueue.poll();
}
}
}
}
@Override
public void open() throws HyracksDataException {
currentBuffer = emptyQueue.poll();
try {
connectionManager.connect(this);
} catch (IOException e) {
throw new HyracksDataException(e);
}
}
@Override
public void close() throws HyracksDataException {
}
@Override
public synchronized boolean dispatchNetworkEvent() throws IOException {
if (aborted) {
eos = true;
monitor.notifyEndOfStream(this);
return true;
}
if (key.isConnectable()) {
if (socketChannel.finishConnect()) {
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT);
prepareForWrite();
}
} else if (key.isWritable()) {
socketChannel.write(writeBuffer);
if (writeBuffer.remaining() == 0) {
key.interestOps(SelectionKey.OP_READ);
}
} else if (key.isReadable()) {
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.finer("Before read: " + currentBuffer.position() + " " + currentBuffer.limit());
}
int bytesRead = socketChannel.read(currentBuffer);
if (bytesRead < 0) {
eos = true;
monitor.notifyEndOfStream(this);
return true;
}
if (LOGGER.isLoggable(Level.FINER)) {
LOGGER.finer("After read: " + currentBuffer.position() + " " + currentBuffer.limit());
}
currentBuffer.flip();
int dataLen = currentBuffer.remaining();
if (dataLen >= currentBuffer.capacity() || aborted()) {
if (LOGGER.isLoggable(Level.FINEST)) {
LOGGER.finest("NetworkInputChannel: frame received: sender = " + partitionId.getSenderIndex());
}
if (currentBuffer.getInt(FrameHelper.getTupleCountOffset(currentBuffer.capacity())) == 0) {
eos = true;
monitor.notifyEndOfStream(this);
return true;
}
fullQueue.add(currentBuffer);
currentBuffer = emptyQueue.poll();
if (currentBuffer == null && key.isValid()) {
int ops = key.interestOps();
key.interestOps(ops & ~SelectionKey.OP_READ);
}
monitor.notifyDataAvailability(this, 1);
return false;
}
currentBuffer.compact();
}
return false;
}
private void prepareForConnect() {
key.interestOps(SelectionKey.OP_CONNECT);
}
private void prepareForWrite() {
writeBuffer = ByteBuffer.allocate(ConnectionManager.INITIAL_MESSAGE_SIZE);
writeBuffer.putLong(partitionId.getJobId().getId());
writeBuffer.putInt(partitionId.getConnectorDescriptorId().getId());
writeBuffer.putInt(partitionId.getSenderIndex());
writeBuffer.putInt(partitionId.getReceiverIndex());
writeBuffer.flip();
key.interestOps(SelectionKey.OP_WRITE);
}
@Override
public void setSelectionKey(SelectionKey key) {
this.key = key;
socketChannel = (SocketChannel) key.channel();
}
@Override
public SocketAddress getRemoteAddress() {
return remoteAddress;
}
@Override
public SelectionKey getSelectionKey() {
return key;
}
public PartitionId getPartitionId() {
return partitionId;
}
public void abort() {
aborted = true;
}
public boolean aborted() {
return aborted;
}
@Override
public void notifyConnectionManagerRegistration() throws IOException {
if (socketChannel.connect(remoteAddress)) {
prepareForWrite();
} else {
prepareForConnect();
}
}
}