| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.mina.transport.socket.nio; |
| |
| import edu.emory.mathcs.backport.java.util.concurrent.Executor; |
| import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock; |
| import org.apache.mina.common.ByteBuffer; |
| import org.apache.mina.common.ExceptionMonitor; |
| import org.apache.mina.common.IdleStatus; |
| import org.apache.mina.common.IoFilter.WriteRequest; |
| import org.apache.mina.common.WriteTimeoutException; |
| import org.apache.mina.util.IdentityHashSet; |
| import org.apache.mina.util.NamePreservingRunnable; |
| import org.apache.mina.util.Queue; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.nio.channels.SelectionKey; |
| import java.nio.channels.Selector; |
| import java.nio.channels.SocketChannel; |
| import java.util.Iterator; |
| import java.util.Set; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.LinkedBlockingQueue; |
| |
| /** |
| * Performs all I/O operations for sockets which is connected or bound. This class is used by MINA internally. |
| * |
| * @author The Apache Directory Project (mina-dev@directory.apache.org) |
| * @version $Rev: 619823 $, $Date: 2008-02-08 10:09:37 +0000 (Fri, 08 Feb 2008) $, |
| */ |
| class MultiThreadSocketIoProcessor extends SocketIoProcessor |
| { |
| Logger _logger = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class); |
| Logger _loggerRead = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class + ".Reader"); |
| Logger _loggerWrite = LoggerFactory.getLogger(MultiThreadSocketIoProcessor.class + ".Writer"); |
| |
| private static final long SELECTOR_TIMEOUT = 1000L; |
| |
| private int MAX_READ_BYTES_PER_SESSION = 524288; //512K |
| private int MAX_FLUSH_BYTES_PER_SESSION = 524288; //512K |
| |
| private final Object readLock = new Object(); |
| private final Object writeLock = new Object(); |
| |
| private final String threadName; |
| private final Executor executor; |
| |
| private ReentrantLock trafficMaskUpdateLock = new ReentrantLock(); |
| |
| /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */ |
| private volatile Selector selector, writeSelector; |
| |
| private final Queue newSessions = new Queue(); |
| private final Queue removingSessions = new Queue(); |
| private final BlockingQueue flushingSessions = new LinkedBlockingQueue(); |
| private final IdentityHashSet flushingSessionsSet = new IdentityHashSet(); |
| |
| private final Queue trafficControllingSessions = new Queue(); |
| |
| private ReadWorker readWorker; |
| private WriteWorker writeWorker; |
| private long lastIdleReadCheckTime = System.currentTimeMillis(); |
| private long lastIdleWriteCheckTime = System.currentTimeMillis(); |
| |
| MultiThreadSocketIoProcessor(String threadName, Executor executor) |
| { |
| super(threadName, executor); |
| this.threadName = threadName; |
| this.executor = executor; |
| } |
| |
| void addNew(SocketSessionImpl session) throws IOException |
| { |
| synchronized (newSessions) |
| { |
| newSessions.push(session); |
| } |
| |
| startupWorker(); |
| |
| selector.wakeup(); |
| writeSelector.wakeup(); |
| } |
| |
| void remove(SocketSessionImpl session) throws IOException |
| { |
| scheduleRemove(session); |
| startupWorker(); |
| selector.wakeup(); |
| } |
| |
| private void startupWorker() throws IOException |
| { |
| synchronized (readLock) |
| { |
| if (readWorker == null) |
| { |
| selector = Selector.open(); |
| readWorker = new ReadWorker(); |
| executor.execute(new NamePreservingRunnable(readWorker)); |
| } |
| } |
| |
| synchronized (writeLock) |
| { |
| if (writeWorker == null) |
| { |
| writeSelector = Selector.open(); |
| writeWorker = new WriteWorker(); |
| executor.execute(new NamePreservingRunnable(writeWorker)); |
| } |
| } |
| |
| } |
| |
| void flush(SocketSessionImpl session) |
| { |
| scheduleFlush(session); |
| Selector selector = this.writeSelector; |
| |
| if (selector != null) |
| { |
| selector.wakeup(); |
| } |
| } |
| |
| void updateTrafficMask(SocketSessionImpl session) |
| { |
| scheduleTrafficControl(session); |
| Selector selector = this.selector; |
| if (selector != null) |
| { |
| selector.wakeup(); |
| } |
| } |
| |
| private void scheduleRemove(SocketSessionImpl session) |
| { |
| synchronized (removingSessions) |
| { |
| removingSessions.push(session); |
| } |
| } |
| |
| private void scheduleFlush(SocketSessionImpl session) |
| { |
| synchronized (flushingSessionsSet) |
| { |
| //if flushingSessions grows to contain Integer.MAX_VALUE sessions |
| // then this will fail. |
| if (flushingSessionsSet.add(session)) |
| { |
| flushingSessions.offer(session); |
| } |
| } |
| } |
| |
| private void scheduleTrafficControl(SocketSessionImpl session) |
| { |
| synchronized (trafficControllingSessions) |
| { |
| trafficControllingSessions.push(session); |
| } |
| } |
| |
| private void doAddNewReader() throws InterruptedException |
| { |
| if (newSessions.isEmpty()) |
| { |
| return; |
| } |
| |
| for (; ;) |
| { |
| MultiThreadSocketSessionImpl session; |
| |
| synchronized (newSessions) |
| { |
| session = (MultiThreadSocketSessionImpl) newSessions.peek(); |
| } |
| |
| if (session == null) |
| { |
| break; |
| } |
| |
| SocketChannel ch = session.getChannel(); |
| |
| |
| try |
| { |
| |
| ch.configureBlocking(false); |
| session.setSelectionKey(ch.register(selector, |
| SelectionKey.OP_READ, |
| session)); |
| |
| //System.out.println("ReadDebug:"+"Awaiting Registration"); |
| session.awaitRegistration(); |
| sessionCreated(session); |
| } |
| catch (IOException e) |
| { |
| // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute |
| // and call ConnectFuture.setException(). |
| session.getFilterChain().fireExceptionCaught(session, e); |
| } |
| } |
| } |
| |
| |
| private void doAddNewWrite() throws InterruptedException |
| { |
| if (newSessions.isEmpty()) |
| { |
| return; |
| } |
| |
| for (; ;) |
| { |
| MultiThreadSocketSessionImpl session; |
| |
| synchronized (newSessions) |
| { |
| session = (MultiThreadSocketSessionImpl) newSessions.peek(); |
| } |
| |
| if (session == null) |
| { |
| break; |
| } |
| |
| SocketChannel ch = session.getChannel(); |
| |
| try |
| { |
| ch.configureBlocking(false); |
| synchronized (flushingSessionsSet) |
| { |
| flushingSessionsSet.add(session); |
| } |
| |
| session.setWriteSelectionKey(ch.register(writeSelector, |
| SelectionKey.OP_WRITE, |
| session)); |
| |
| //System.out.println("WriteDebug:"+"Awaiting Registration"); |
| session.awaitRegistration(); |
| sessionCreated(session); |
| } |
| catch (IOException e) |
| { |
| |
| // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute |
| // and call ConnectFuture.setException(). |
| session.getFilterChain().fireExceptionCaught(session, e); |
| } |
| } |
| } |
| |
| |
| private void sessionCreated(SocketSessionImpl sessionParam) throws InterruptedException |
| { |
| MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam; |
| synchronized (newSessions) |
| { |
| if (!session.created()) |
| { |
| _logger.debug("Popping new session"); |
| newSessions.pop(); |
| |
| // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here |
| // in AbstractIoFilterChain.fireSessionOpened(). |
| session.getServiceListeners().fireSessionCreated(session); |
| |
| session.doneCreation(); |
| } |
| } |
| } |
| |
| private void doRemove() |
| { |
| if (removingSessions.isEmpty()) |
| { |
| return; |
| } |
| |
| for (; ;) |
| { |
| MultiThreadSocketSessionImpl session; |
| |
| synchronized (removingSessions) |
| { |
| session = (MultiThreadSocketSessionImpl) removingSessions.pop(); |
| } |
| |
| if (session == null) |
| { |
| break; |
| } |
| |
| SocketChannel ch = session.getChannel(); |
| SelectionKey key = session.getReadSelectionKey(); |
| SelectionKey writeKey = session.getWriteSelectionKey(); |
| |
| // Retry later if session is not yet fully initialized. |
| // (In case that Session.close() is called before addSession() is processed) |
| if (key == null || writeKey == null) |
| { |
| scheduleRemove(session); |
| break; |
| } |
| // skip if channel is already closed |
| if (!key.isValid() || !writeKey.isValid()) |
| { |
| continue; |
| } |
| |
| try |
| { |
| //System.out.println("ReadDebug:"+"Removing Session: " + System.identityHashCode(session)); |
| synchronized (readLock) |
| { |
| key.cancel(); |
| } |
| synchronized (writeLock) |
| { |
| writeKey.cancel(); |
| } |
| ch.close(); |
| } |
| catch (IOException e) |
| { |
| session.getFilterChain().fireExceptionCaught(session, e); |
| } |
| finally |
| { |
| releaseWriteBuffers(session); |
| session.getServiceListeners().fireSessionDestroyed(session); |
| } |
| } |
| } |
| |
| private void processRead(Set selectedKeys) |
| { |
| Iterator it = selectedKeys.iterator(); |
| |
| while (it.hasNext()) |
| { |
| SelectionKey key = (SelectionKey) it.next(); |
| MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) key.attachment(); |
| |
| synchronized (readLock) |
| { |
| if (key.isValid() && key.isReadable() && session.getTrafficMask().isReadable()) |
| { |
| read(session); |
| } |
| } |
| |
| } |
| |
| selectedKeys.clear(); |
| } |
| |
| private void processWrite(Set selectedKeys) |
| { |
| Iterator it = selectedKeys.iterator(); |
| |
| while (it.hasNext()) |
| { |
| SelectionKey key = (SelectionKey) it.next(); |
| SocketSessionImpl session = (SocketSessionImpl) key.attachment(); |
| |
| synchronized (writeLock) |
| { |
| if (key.isValid() && key.isWritable() && session.getTrafficMask().isWritable()) |
| { |
| |
| // Clear OP_WRITE |
| key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); |
| |
| synchronized (flushingSessionsSet) |
| { |
| flushingSessions.offer(session); |
| } |
| } |
| } |
| } |
| |
| selectedKeys.clear(); |
| } |
| |
| private void read(SocketSessionImpl session) |
| { |
| |
| //if (_loggerWrite.isDebugEnabled()) |
| { |
| //System.out.println("WriteDebug:"+"Starting read for Session:" + System.identityHashCode(session)); |
| } |
| |
| int totalReadBytes = 0; |
| |
| while (totalReadBytes <= MAX_READ_BYTES_PER_SESSION) |
| { |
| ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize()); |
| SocketChannel ch = session.getChannel(); |
| |
| try |
| { |
| buf.clear(); |
| |
| int readBytes = 0; |
| int ret; |
| |
| try |
| { |
| while ((ret = ch.read(buf.buf())) > 0) |
| { |
| readBytes += ret; |
| totalReadBytes += ret; |
| } |
| } |
| finally |
| { |
| buf.flip(); |
| } |
| |
| |
| if (readBytes > 0) |
| { |
| session.increaseReadBytes(readBytes); |
| |
| session.getFilterChain().fireMessageReceived(session, buf); |
| buf = null; |
| } |
| |
| if (ret <= 0) |
| { |
| if (ret == 0) |
| { |
| if (readBytes == session.getReadBufferSize()) |
| { |
| continue; |
| } |
| } |
| else |
| { |
| scheduleRemove(session); |
| } |
| |
| break; |
| } |
| } |
| catch (Throwable e) |
| { |
| if (e instanceof IOException) |
| { |
| scheduleRemove(session); |
| } |
| session.getFilterChain().fireExceptionCaught(session, e); |
| |
| //Stop Reading this session. |
| return; |
| } |
| finally |
| { |
| if (buf != null) |
| { |
| buf.release(); |
| } |
| } |
| }//for |
| |
| // if (_loggerWrite.isDebugEnabled()) |
| { |
| //System.out.println("WriteDebug:"+"Read for Session:" + System.identityHashCode(session) + " got: " + totalReadBytes); |
| } |
| } |
| |
| |
| private void notifyReadIdleness() |
| { |
| // process idle sessions |
| long currentTime = System.currentTimeMillis(); |
| if ((currentTime - lastIdleReadCheckTime) >= 1000) |
| { |
| lastIdleReadCheckTime = currentTime; |
| Set keys = selector.keys(); |
| if (keys != null) |
| { |
| for (Iterator it = keys.iterator(); it.hasNext();) |
| { |
| SelectionKey key = (SelectionKey) it.next(); |
| SocketSessionImpl session = (SocketSessionImpl) key.attachment(); |
| notifyReadIdleness(session, currentTime); |
| } |
| } |
| } |
| } |
| |
| private void notifyWriteIdleness() |
| { |
| // process idle sessions |
| long currentTime = System.currentTimeMillis(); |
| if ((currentTime - lastIdleWriteCheckTime) >= 1000) |
| { |
| lastIdleWriteCheckTime = currentTime; |
| Set keys = writeSelector.keys(); |
| if (keys != null) |
| { |
| for (Iterator it = keys.iterator(); it.hasNext();) |
| { |
| SelectionKey key = (SelectionKey) it.next(); |
| SocketSessionImpl session = (SocketSessionImpl) key.attachment(); |
| notifyWriteIdleness(session, currentTime); |
| } |
| } |
| } |
| } |
| |
| private void notifyReadIdleness(SocketSessionImpl session, long currentTime) |
| { |
| notifyIdleness0( |
| session, currentTime, |
| session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE), |
| IdleStatus.BOTH_IDLE, |
| Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); |
| notifyIdleness0( |
| session, currentTime, |
| session.getIdleTimeInMillis(IdleStatus.READER_IDLE), |
| IdleStatus.READER_IDLE, |
| Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE))); |
| |
| notifyWriteTimeout(session, currentTime, session |
| .getWriteTimeoutInMillis(), session.getLastWriteTime()); |
| } |
| |
| private void notifyWriteIdleness(SocketSessionImpl session, long currentTime) |
| { |
| notifyIdleness0( |
| session, currentTime, |
| session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE), |
| IdleStatus.BOTH_IDLE, |
| Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE))); |
| notifyIdleness0( |
| session, currentTime, |
| session.getIdleTimeInMillis(IdleStatus.WRITER_IDLE), |
| IdleStatus.WRITER_IDLE, |
| Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE))); |
| |
| notifyWriteTimeout(session, currentTime, session |
| .getWriteTimeoutInMillis(), session.getLastWriteTime()); |
| } |
| |
| private void notifyIdleness0(SocketSessionImpl session, long currentTime, |
| long idleTime, IdleStatus status, |
| long lastIoTime) |
| { |
| if (idleTime > 0 && lastIoTime != 0 |
| && (currentTime - lastIoTime) >= idleTime) |
| { |
| session.increaseIdleCount(status); |
| session.getFilterChain().fireSessionIdle(session, status); |
| } |
| } |
| |
| private void notifyWriteTimeout(SocketSessionImpl session, |
| long currentTime, |
| long writeTimeout, long lastIoTime) |
| { |
| |
| MultiThreadSocketSessionImpl sesh = (MultiThreadSocketSessionImpl) session; |
| SelectionKey key = sesh.getWriteSelectionKey(); |
| |
| synchronized (writeLock) |
| { |
| if (writeTimeout > 0 |
| && (currentTime - lastIoTime) >= writeTimeout |
| && key != null && key.isValid() |
| && (key.interestOps() & SelectionKey.OP_WRITE) != 0) |
| { |
| session.getFilterChain().fireExceptionCaught(session, new WriteTimeoutException()); |
| } |
| } |
| } |
| |
| private SocketSessionImpl getNextFlushingSession() |
| { |
| return (SocketSessionImpl) flushingSessions.poll(); |
| } |
| |
| private void releaseSession(SocketSessionImpl session) |
| { |
| synchronized (session.getWriteRequestQueue()) |
| { |
| synchronized (flushingSessionsSet) |
| { |
| if (session.getScheduledWriteRequests() > 0) |
| { |
| if (_loggerWrite.isDebugEnabled()) |
| { |
| //System.out.println("WriteDebug:"+"Reflush" + System.identityHashCode(session)); |
| } |
| flushingSessions.offer(session); |
| } |
| else |
| { |
| if (_loggerWrite.isDebugEnabled()) |
| { |
| //System.out.println("WriteDebug:"+"Releasing session " + System.identityHashCode(session)); |
| } |
| flushingSessionsSet.remove(session); |
| } |
| } |
| } |
| } |
| |
| private void releaseWriteBuffers(SocketSessionImpl session) |
| { |
| Queue writeRequestQueue = session.getWriteRequestQueue(); |
| WriteRequest req; |
| |
| //Should this be synchronized? |
| synchronized (writeRequestQueue) |
| { |
| while ((req = (WriteRequest) writeRequestQueue.pop()) != null) |
| { |
| try |
| { |
| ((ByteBuffer) req.getMessage()).release(); |
| } |
| catch (IllegalStateException e) |
| { |
| session.getFilterChain().fireExceptionCaught(session, e); |
| } |
| finally |
| { |
| req.getFuture().setWritten(false); |
| } |
| } |
| } |
| } |
| |
| private void doFlush() |
| { |
| MultiThreadSocketSessionImpl session; |
| |
| while ((session = (MultiThreadSocketSessionImpl) getNextFlushingSession()) != null) |
| { |
| if (!session.isConnected()) |
| { |
| releaseWriteBuffers(session); |
| releaseSession(session); |
| continue; |
| } |
| |
| SelectionKey key = session.getWriteSelectionKey(); |
| // Retry later if session is not yet fully initialized. |
| // (In case that Session.write() is called before addSession() is processed) |
| if (key == null) |
| { |
| scheduleFlush(session); |
| releaseSession(session); |
| continue; |
| } |
| // skip if channel is already closed |
| if (!key.isValid()) |
| { |
| releaseSession(session); |
| continue; |
| } |
| |
| try |
| { |
| if (doFlush(session)) |
| { |
| releaseSession(session); |
| } |
| } |
| catch (IOException e) |
| { |
| releaseSession(session); |
| scheduleRemove(session); |
| session.getFilterChain().fireExceptionCaught(session, e); |
| } |
| |
| } |
| |
| } |
| |
| private boolean doFlush(SocketSessionImpl sessionParam) throws IOException |
| { |
| MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam; |
| // Clear OP_WRITE |
| SelectionKey key = session.getWriteSelectionKey(); |
| synchronized (writeLock) |
| { |
| key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE)); |
| } |
| SocketChannel ch = session.getChannel(); |
| Queue writeRequestQueue = session.getWriteRequestQueue(); |
| |
| long totalFlushedBytes = 0; |
| while (true) |
| { |
| WriteRequest req; |
| |
| synchronized (writeRequestQueue) |
| { |
| req = (WriteRequest) writeRequestQueue.first(); |
| } |
| |
| if (req == null) |
| { |
| break; |
| } |
| |
| ByteBuffer buf = (ByteBuffer) req.getMessage(); |
| if (buf.remaining() == 0) |
| { |
| synchronized (writeRequestQueue) |
| { |
| writeRequestQueue.pop(); |
| } |
| |
| session.increaseWrittenMessages(); |
| |
| buf.reset(); |
| session.getFilterChain().fireMessageSent(session, req); |
| continue; |
| } |
| |
| |
| int writtenBytes = 0; |
| |
| // Reported as DIRMINA-362 |
| //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it. |
| if (key.isWritable()) |
| { |
| writtenBytes = ch.write(buf.buf()); |
| totalFlushedBytes += writtenBytes; |
| } |
| |
| if (writtenBytes > 0) |
| { |
| session.increaseWrittenBytes(writtenBytes); |
| } |
| |
| if (buf.hasRemaining() || (totalFlushedBytes <= MAX_FLUSH_BYTES_PER_SESSION)) |
| { |
| // Kernel buffer is full |
| synchronized (writeLock) |
| { |
| key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); |
| } |
| if (_loggerWrite.isDebugEnabled()) |
| { |
| //System.out.println("WriteDebug:"+"Written BF: " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes"); |
| } |
| return false; |
| } |
| } |
| |
| if (_loggerWrite.isDebugEnabled()) |
| { |
| //System.out.println("WriteDebug:"+"Written : " + (session.getWrittenBytes() - totalFlushedBytes) + " bytes"); |
| } |
| return true; |
| } |
| |
| private void doUpdateTrafficMask() |
| { |
| if (trafficControllingSessions.isEmpty() || trafficMaskUpdateLock.isLocked()) |
| { |
| return; |
| } |
| |
| // Synchronize over entire operation as this method should be called |
| // from both read and write thread and we don't want the order of the |
| // updates to get changed. |
| trafficMaskUpdateLock.lock(); |
| try |
| { |
| for (; ;) |
| { |
| MultiThreadSocketSessionImpl session; |
| |
| session = (MultiThreadSocketSessionImpl) trafficControllingSessions.pop(); |
| |
| if (session == null) |
| { |
| break; |
| } |
| |
| SelectionKey key = session.getReadSelectionKey(); |
| // Retry later if session is not yet fully initialized. |
| // (In case that Session.suspend??() or session.resume??() is |
| // called before addSession() is processed) |
| if (key == null) |
| { |
| scheduleTrafficControl(session); |
| break; |
| } |
| // skip if channel is already closed |
| if (!key.isValid()) |
| { |
| continue; |
| } |
| |
| // The normal is OP_READ and, if there are write requests in the |
| // session's write queue, set OP_WRITE to trigger flushing. |
| |
| //Sset to Read and Write if there is nothing then the cost |
| // is one loop through the flusher. |
| int ops = SelectionKey.OP_READ; |
| |
| // Now mask the preferred ops with the mask of the current session |
| int mask = session.getTrafficMask().getInterestOps(); |
| synchronized (readLock) |
| { |
| key.interestOps(ops & mask); |
| } |
| //Change key to the WriteSelection Key |
| key = session.getWriteSelectionKey(); |
| if (key != null && key.isValid()) |
| { |
| Queue writeRequestQueue = session.getWriteRequestQueue(); |
| synchronized (writeRequestQueue) |
| { |
| if (!writeRequestQueue.isEmpty()) |
| { |
| ops = SelectionKey.OP_WRITE; |
| synchronized (writeLock) |
| { |
| key.interestOps(ops & mask); |
| } |
| } |
| } |
| } |
| } |
| } |
| finally |
| { |
| trafficMaskUpdateLock.unlock(); |
| } |
| |
| } |
| |
| private class WriteWorker implements Runnable |
| { |
| |
| public void run() |
| { |
| Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Writer"); |
| |
| //System.out.println("WriteDebug:"+"Startup"); |
| for (; ;) |
| { |
| try |
| { |
| int nKeys = writeSelector.select(SELECTOR_TIMEOUT); |
| |
| doAddNewWrite(); |
| doUpdateTrafficMask(); |
| |
| if (nKeys > 0) |
| { |
| //System.out.println("WriteDebug:"+nKeys + " keys from writeselector"); |
| processWrite(writeSelector.selectedKeys()); |
| } |
| else |
| { |
| //System.out.println("WriteDebug:"+"No keys from writeselector"); |
| } |
| |
| doRemove(); |
| notifyWriteIdleness(); |
| |
| if (flushingSessionsSet.size() > 0) |
| { |
| doFlush(); |
| } |
| |
| if (writeSelector.keys().isEmpty()) |
| { |
| synchronized (writeLock) |
| { |
| |
| if (writeSelector.keys().isEmpty() && newSessions.isEmpty()) |
| { |
| writeWorker = null; |
| try |
| { |
| writeSelector.close(); |
| } |
| catch (IOException e) |
| { |
| ExceptionMonitor.getInstance().exceptionCaught(e); |
| } |
| finally |
| { |
| writeSelector = null; |
| } |
| |
| break; |
| } |
| } |
| } |
| |
| } |
| catch (Throwable t) |
| { |
| ExceptionMonitor.getInstance().exceptionCaught(t); |
| |
| try |
| { |
| Thread.sleep(1000); |
| } |
| catch (InterruptedException e1) |
| { |
| ExceptionMonitor.getInstance().exceptionCaught(e1); |
| } |
| } |
| } |
| //System.out.println("WriteDebug:"+"Shutdown"); |
| } |
| |
| } |
| |
| private class ReadWorker implements Runnable |
| { |
| |
| public void run() |
| { |
| Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Reader"); |
| |
| //System.out.println("ReadDebug:"+"Startup"); |
| for (; ;) |
| { |
| try |
| { |
| int nKeys = selector.select(SELECTOR_TIMEOUT); |
| |
| doAddNewReader(); |
| doUpdateTrafficMask(); |
| |
| if (nKeys > 0) |
| { |
| //System.out.println("ReadDebug:"+nKeys + " keys from selector"); |
| |
| processRead(selector.selectedKeys()); |
| } |
| else |
| { |
| //System.out.println("ReadDebug:"+"No keys from selector"); |
| } |
| |
| |
| doRemove(); |
| notifyReadIdleness(); |
| |
| if (selector.keys().isEmpty()) |
| { |
| |
| synchronized (readLock) |
| { |
| if (selector.keys().isEmpty() && newSessions.isEmpty()) |
| { |
| readWorker = null; |
| try |
| { |
| selector.close(); |
| } |
| catch (IOException e) |
| { |
| ExceptionMonitor.getInstance().exceptionCaught(e); |
| } |
| finally |
| { |
| selector = null; |
| } |
| |
| break; |
| } |
| } |
| } |
| } |
| catch (Throwable t) |
| { |
| ExceptionMonitor.getInstance().exceptionCaught(t); |
| |
| try |
| { |
| Thread.sleep(1000); |
| } |
| catch (InterruptedException e1) |
| { |
| ExceptionMonitor.getInstance().exceptionCaught(e1); |
| } |
| } |
| } |
| //System.out.println("ReadDebug:"+"Shutdown"); |
| } |
| |
| } |
| } |