blob: 3dec7e09d32f5a6627f1e8b3b6506a06bf93b15d [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.net.protocols.muxdemux;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.hyracks.net.buffers.IBufferAcceptor;
import edu.uci.ics.hyracks.net.buffers.ICloseableBufferAcceptor;
import edu.uci.ics.hyracks.net.exceptions.NetException;
/**
* Handle to a channel that represents a logical full-duplex communication end-point.
*
* @author vinayakb
*/
public class ChannelControlBlock {
private static final Logger LOGGER = Logger.getLogger(ChannelControlBlock.class.getName());
private final ChannelSet cSet;
private final int channelId;
private final ReadInterface ri;
private final WriteInterface wi;
private final AtomicBoolean localClose;
private final AtomicBoolean localCloseAck;
private final AtomicBoolean remoteClose;
private final AtomicBoolean remoteCloseAck;
ChannelControlBlock(ChannelSet cSet, int channelId) {
this.cSet = cSet;
this.channelId = channelId;
this.ri = new ReadInterface();
this.wi = new WriteInterface();
localClose = new AtomicBoolean();
localCloseAck = new AtomicBoolean();
remoteClose = new AtomicBoolean();
remoteCloseAck = new AtomicBoolean();
}
int getChannelId() {
return channelId;
}
/**
* Get the read inderface of this channel.
*
* @return the read interface.
*/
public IChannelReadInterface getReadInterface() {
return ri;
}
/**
* Get the write interface of this channel.
*
* @return the write interface.
*/
public IChannelWriteInterface getWriteInterface() {
return wi;
}
private final class ReadInterface implements IChannelReadInterface {
private final Deque<ByteBuffer> riEmptyStack;
private final IBufferAcceptor eba = new IBufferAcceptor() {
@Override
public void accept(ByteBuffer buffer) {
int delta = buffer.remaining();
synchronized (ChannelControlBlock.this) {
if (remoteClose.get()) {
return;
}
riEmptyStack.push(buffer);
}
cSet.addPendingCredits(channelId, delta);
}
};
private ICloseableBufferAcceptor fba;
private volatile int credits;
private ByteBuffer currentReadBuffer;
ReadInterface() {
riEmptyStack = new ArrayDeque<ByteBuffer>();
credits = 0;
}
@Override
public void setFullBufferAcceptor(ICloseableBufferAcceptor fullBufferAcceptor) {
fba = fullBufferAcceptor;
}
@Override
public IBufferAcceptor getEmptyBufferAcceptor() {
return eba;
}
int read(SocketChannel sc, int size) throws IOException, NetException {
while (true) {
if (size <= 0) {
return size;
}
if (currentReadBuffer == null) {
currentReadBuffer = riEmptyStack.poll();
assert currentReadBuffer != null;
}
int rSize = Math.min(size, currentReadBuffer.remaining());
if (rSize > 0) {
currentReadBuffer.limit(currentReadBuffer.position() + rSize);
int len;
try {
len = sc.read(currentReadBuffer);
if (len < 0) {
throw new NetException("Socket Closed");
}
} finally {
currentReadBuffer.limit(currentReadBuffer.capacity());
}
size -= len;
if (len < rSize) {
return size;
}
} else {
return size;
}
if (currentReadBuffer.remaining() <= 0) {
flush();
}
}
}
void flush() {
if (currentReadBuffer != null) {
currentReadBuffer.flip();
fba.accept(currentReadBuffer);
currentReadBuffer = null;
}
}
}
private final class WriteInterface implements IChannelWriteInterface {
private final Queue<ByteBuffer> wiFullQueue;
private boolean channelWritabilityState;
private final ICloseableBufferAcceptor fba = new ICloseableBufferAcceptor() {
@Override
public void accept(ByteBuffer buffer) {
synchronized (ChannelControlBlock.this) {
wiFullQueue.add(buffer);
adjustChannelWritability();
}
}
@Override
public void close() {
synchronized (ChannelControlBlock.this) {
if (eos) {
if (LOGGER.isLoggable(Level.WARNING)) {
LOGGER.warning("Received duplicate close() on channel: " + channelId);
}
return;
}
eos = true;
adjustChannelWritability();
}
}
@Override
public void error(int ecode) {
synchronized (ChannelControlBlock.this) {
WriteInterface.this.ecode = ecode;
adjustChannelWritability();
}
}
};
private IBufferAcceptor eba;
private int credits;
private boolean eos;
private boolean eosSent;
private int ecode;
private boolean ecodeSent;
private ByteBuffer currentWriteBuffer;
WriteInterface() {
wiFullQueue = new ArrayDeque<ByteBuffer>();
credits = 0;
eos = false;
eosSent = false;
ecode = -1;
ecodeSent = false;
}
@Override
public void setEmptyBufferAcceptor(IBufferAcceptor emptyBufferAcceptor) {
eba = emptyBufferAcceptor;
}
@Override
public ICloseableBufferAcceptor getFullBufferAcceptor() {
return fba;
}
void write(MultiplexedConnection.WriterState writerState) throws NetException {
if (currentWriteBuffer == null) {
currentWriteBuffer = wiFullQueue.poll();
}
if (currentWriteBuffer != null) {
int size = Math.min(currentWriteBuffer.remaining(), credits);
if (size > 0) {
credits -= size;
writerState.command.setChannelId(channelId);
writerState.command.setCommandType(MuxDemuxCommand.CommandType.DATA);
writerState.command.setData(size);
writerState.reset(currentWriteBuffer, size, ChannelControlBlock.this);
} else {
adjustChannelWritability();
}
} else if (ecode >= 0 && !ecodeSent) {
writerState.command.setChannelId(channelId);
writerState.command.setCommandType(MuxDemuxCommand.CommandType.ERROR);
writerState.command.setData(ecode);
writerState.reset(null, 0, null);
ecodeSent = true;
localClose.set(true);
adjustChannelWritability();
} else if (eos && !eosSent) {
writerState.command.setChannelId(channelId);
writerState.command.setCommandType(MuxDemuxCommand.CommandType.CLOSE_CHANNEL);
writerState.command.setData(0);
writerState.reset(null, 0, null);
eosSent = true;
localClose.set(true);
adjustChannelWritability();
}
}
void writeComplete() {
if (currentWriteBuffer.remaining() <= 0) {
currentWriteBuffer.clear();
eba.accept(currentWriteBuffer);
currentWriteBuffer = null;
adjustChannelWritability();
}
}
private boolean computeWritability() {
boolean writableDataPresent = currentWriteBuffer != null || !wiFullQueue.isEmpty();
if (writableDataPresent) {
return credits > 0;
}
if (eos && !eosSent) {
return true;
}
if (ecode >= 0 && !ecodeSent) {
return true;
}
return false;
}
void adjustChannelWritability() {
boolean writable = computeWritability();
if (writable) {
if (!channelWritabilityState) {
cSet.markPendingWrite(channelId);
}
} else {
if (channelWritabilityState) {
cSet.unmarkPendingWrite(channelId);
}
}
channelWritabilityState = writable;
}
}
synchronized void write(MultiplexedConnection.WriterState writerState) throws NetException {
wi.write(writerState);
}
synchronized void writeComplete() {
wi.writeComplete();
}
synchronized int read(SocketChannel sc, int size) throws IOException, NetException {
return ri.read(sc, size);
}
int getReadCredits() {
return ri.credits;
}
void setReadCredits(int credits) {
this.ri.credits = credits;
}
synchronized void addWriteCredits(int delta) {
wi.credits += delta;
wi.adjustChannelWritability();
}
synchronized void reportRemoteEOS() {
ri.flush();
ri.fba.close();
remoteClose.set(true);
}
void reportRemoteEOSAck() {
remoteCloseAck.set(true);
}
boolean getRemoteEOS() {
return remoteClose.get();
}
void reportLocalEOSAck() {
localCloseAck.set(true);
}
synchronized void reportRemoteError(int ecode) {
ri.flush();
ri.fba.error(ecode);
remoteClose.set(true);
}
boolean completelyClosed() {
return localCloseAck.get() && remoteCloseAck.get();
}
@Override
public String toString() {
return "Channel:" + channelId + "[localClose: " + localClose + " localCloseAck: " + localCloseAck
+ " remoteClose: " + remoteClose + " remoteCloseAck:" + remoteCloseAck + " readCredits: " + ri.credits
+ " writeCredits: " + wi.credits + "]";
}
}