blob: 8e42d530fc250edc08f8346950a9eb5af4c6622e [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.ipc.impl;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
public class IPCConnectionManager {
private static final Logger LOGGER = Logger.getLogger(IPCConnectionManager.class.getName());
private final IPCSystem system;
private final NetworkThread networkThread;
private final ServerSocketChannel serverSocketChannel;
private final Map<InetSocketAddress, IPCHandle> ipcHandleMap;
private final List<IPCHandle> pendingConnections;
private final List<IPCHandle> workingPendingConnections;
private final List<Message> sendList;
private final List<Message> workingSendList;
private final InetSocketAddress address;
private volatile boolean stopped;
IPCConnectionManager(IPCSystem system, InetSocketAddress socketAddress) throws IOException {
this.system = system;
this.networkThread = new NetworkThread();
this.serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().setReuseAddress(true);
serverSocketChannel.configureBlocking(false);
ServerSocket socket = serverSocketChannel.socket();
socket.bind(socketAddress);
address = new InetSocketAddress(socket.getInetAddress(), socket.getLocalPort());
ipcHandleMap = new HashMap<InetSocketAddress, IPCHandle>();
pendingConnections = new ArrayList<IPCHandle>();
workingPendingConnections = new ArrayList<IPCHandle>();
sendList = new ArrayList<Message>();
workingSendList = new ArrayList<Message>();
}
InetSocketAddress getAddress() {
return address;
}
void start() {
stopped = false;
networkThread.start();
}
void stop() throws IOException {
stopped = true;
serverSocketChannel.close();
}
IPCHandle getIPCHandle(InetSocketAddress remoteAddress) throws IOException, InterruptedException {
IPCHandle handle;
synchronized (this) {
handle = ipcHandleMap.get(remoteAddress);
if (handle == null) {
handle = new IPCHandle(system, remoteAddress);
pendingConnections.add(handle);
networkThread.selector.wakeup();
}
}
handle.waitTillConnected();
return handle;
}
synchronized void registerHandle(IPCHandle handle) {
ipcHandleMap.put(handle.getRemoteAddress(), handle);
}
synchronized void write(Message msg) {
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Enqueued message: " + msg);
}
sendList.add(msg);
networkThread.selector.wakeup();
}
private synchronized void collectOutstandingWork() {
if (!pendingConnections.isEmpty()) {
moveAll(pendingConnections, workingPendingConnections);
}
if (!sendList.isEmpty()) {
moveAll(sendList, workingSendList);
}
}
private Message createInitialReqMessage(IPCHandle handle) {
Message msg = new Message(handle);
msg.setMessageId(system.createMessageId());
msg.setRequestMessageId(-1);
msg.setFlag(Message.INITIAL_REQ);
msg.setPayload(address);
return msg;
}
private Message createInitialAckMessage(IPCHandle handle, Message req) {
Message msg = new Message(handle);
msg.setMessageId(system.createMessageId());
msg.setRequestMessageId(req.getMessageId());
msg.setFlag(Message.INITIAL_ACK);
msg.setPayload(null);
return msg;
}
void ack(IPCHandle handle, Message req) {
write(createInitialAckMessage(handle, req));
}
private class NetworkThread extends Thread {
private final Selector selector;
public NetworkThread() {
super("IPC Network Listener Thread");
setDaemon(true);
try {
selector = Selector.open();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void run() {
try {
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (ClosedChannelException e) {
throw new RuntimeException(e);
}
BitSet unsentMessagesBitmap = new BitSet();
List<Message> tempUnsentMessages = new ArrayList<Message>();
while (!stopped) {
try {
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Starting Select");
}
int n = selector.select();
collectOutstandingWork();
if (!workingPendingConnections.isEmpty()) {
for (IPCHandle handle : workingPendingConnections) {
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
SelectionKey cKey = null;
if (channel.connect(handle.getRemoteAddress())) {
cKey = channel.register(selector, SelectionKey.OP_READ);
handle.setState(HandleState.CONNECT_SENT);
write(createInitialReqMessage(handle));
} else {
cKey = channel.register(selector, SelectionKey.OP_CONNECT);
}
handle.setKey(cKey);
cKey.attach(handle);
}
workingPendingConnections.clear();
}
if (!workingSendList.isEmpty()) {
unsentMessagesBitmap.clear();
int len = workingSendList.size();
for (int i = 0; i < len; ++i) {
Message msg = workingSendList.get(i);
if (LOGGER.isLoggable(Level.FINE)) {
LOGGER.fine("Processing send of message: " + msg);
}
IPCHandle handle = msg.getIPCHandle();
if (handle.getState() != HandleState.CLOSED) {
if (!handle.full()) {
while (true) {
ByteBuffer buffer = handle.getOutBuffer();
buffer.compact();
boolean success = msg.write(buffer);
buffer.flip();
if (success) {
system.getPerformanceCounters().addMessageSentCount(1);
SelectionKey key = handle.getKey();
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
} else {
if (!buffer.hasRemaining()) {
handle.resizeOutBuffer();
continue;
}
handle.markFull();
unsentMessagesBitmap.set(i);
}
break;
}
} else {
unsentMessagesBitmap.set(i);
}
}
}
copyUnsentMessages(unsentMessagesBitmap, tempUnsentMessages);
}
if (n > 0) {
for (Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext();) {
SelectionKey key = i.next();
i.remove();
SelectableChannel sc = key.channel();
if (key.isReadable()) {
SocketChannel channel = (SocketChannel) sc;
IPCHandle handle = (IPCHandle) key.attachment();
ByteBuffer readBuffer = handle.getInBuffer();
int len = channel.read(readBuffer);
system.getPerformanceCounters().addMessageBytesReceived(len);
if (len < 0) {
key.cancel();
channel.close();
handle.close();
} else {
handle.processIncomingMessages();
if (!readBuffer.hasRemaining()) {
handle.resizeInBuffer();
}
}
} else if (key.isWritable()) {
SocketChannel channel = (SocketChannel) sc;
IPCHandle handle = (IPCHandle) key.attachment();
ByteBuffer writeBuffer = handle.getOutBuffer();
int len = channel.write(writeBuffer);
system.getPerformanceCounters().addMessageBytesSent(len);
if (len < 0) {
key.cancel();
channel.close();
handle.close();
} else if (!writeBuffer.hasRemaining()) {
key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
}
if (handle.full()) {
handle.clearFull();
selector.wakeup();
}
} else if (key.isAcceptable()) {
assert sc == serverSocketChannel;
SocketChannel channel = serverSocketChannel.accept();
channel.configureBlocking(false);
IPCHandle handle = new IPCHandle(system, null);
SelectionKey cKey = channel.register(selector, SelectionKey.OP_READ);
handle.setKey(cKey);
cKey.attach(handle);
handle.setState(HandleState.CONNECT_RECEIVED);
} else if (key.isConnectable()) {
SocketChannel channel = (SocketChannel) sc;
if (channel.finishConnect()) {
IPCHandle handle = (IPCHandle) key.attachment();
handle.setState(HandleState.CONNECT_SENT);
registerHandle(handle);
key.interestOps(SelectionKey.OP_READ);
write(createInitialReqMessage(handle));
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void copyUnsentMessages(BitSet unsentMessagesBitmap, List<Message> tempUnsentMessages) {
assert tempUnsentMessages.isEmpty();
for (int i = unsentMessagesBitmap.nextSetBit(0); i >= 0; i = unsentMessagesBitmap.nextSetBit(i + 1)) {
tempUnsentMessages.add(workingSendList.get(i));
}
workingSendList.clear();
moveAll(tempUnsentMessages, workingSendList);
}
}
private <T> void moveAll(List<T> source, List<T> target) {
int len = source.size();
for (int i = 0; i < len; ++i) {
target.add(source.get(i));
}
source.clear();
}
}