QPID-564 QPID-92 Tidied up a few points and fixed infinite loop in Read IO Thread

git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1.1@599533 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java
index 11c54bb..03838ca 100644
--- a/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java
+++ b/java/common/src/main/java/org/apache/mina/transport/socket/nio/MultiThreadSocketIoProcessor.java
@@ -66,9 +66,7 @@
 
     private ReentrantLock trafficMaskUpdateLock = new ReentrantLock();
 
-    /**
-     * @noinspection FieldAccessedSynchronizedAndUnsynchronized
-     */
+    /** @noinspection FieldAccessedSynchronizedAndUnsynchronized */
     private volatile Selector selector, writeSelector;
 
     private final Queue newSessions = new Queue();
@@ -90,11 +88,11 @@
         this.executor = executor;
     }
 
-    void addNew( SocketSessionImpl session ) throws IOException
+    void addNew(SocketSessionImpl session) throws IOException
     {
-        synchronized( newSessions )
+        synchronized (newSessions)
         {
-            newSessions.push( session );
+            newSessions.push(session);
         }
 
         startupWorker();
@@ -103,16 +101,16 @@
         writeSelector.wakeup();
     }
 
-    void remove( SocketSessionImpl session ) throws IOException
+    void remove(SocketSessionImpl session) throws IOException
     {
-        scheduleRemove( session );
+        scheduleRemove(session);
         startupWorker();
         selector.wakeup();
     }
 
     private void startupWorker() throws IOException
     {
-        synchronized(readLock)
+        synchronized (readLock)
         {
             if (readWorker == null)
             {
@@ -122,7 +120,7 @@
             }
         }
 
-        synchronized(writeLock)
+        synchronized (writeLock)
         {
             if (writeWorker == null)
             {
@@ -134,38 +132,38 @@
 
     }
 
-    void flush( SocketSessionImpl session )
+    void flush(SocketSessionImpl session)
     {
-        scheduleFlush( session );
+        scheduleFlush(session);
         Selector selector = this.writeSelector;
 
-        if( selector != null )
+        if (selector != null)
         {
             selector.wakeup();
         }
     }
 
-    void updateTrafficMask( SocketSessionImpl session )
+    void updateTrafficMask(SocketSessionImpl session)
     {
-        scheduleTrafficControl( session );
+        scheduleTrafficControl(session);
         Selector selector = this.selector;
-        if( selector != null )
+        if (selector != null)
         {
             selector.wakeup();
         }
     }
 
-    private void scheduleRemove( SocketSessionImpl session )
+    private void scheduleRemove(SocketSessionImpl session)
     {
-        synchronized( removingSessions )
+        synchronized (removingSessions)
         {
-            removingSessions.push( session );
+            removingSessions.push(session);
         }
     }
 
-    private void scheduleFlush( SocketSessionImpl session )
+    private void scheduleFlush(SocketSessionImpl session)
     {
-        synchronized(flushingSessionsSet)
+        synchronized (flushingSessionsSet)
         {
             //if flushingSessions grows to contain Integer.MAX_VALUE sessions
             // then this will fail.
@@ -176,31 +174,31 @@
         }
     }
 
-    private void scheduleTrafficControl( SocketSessionImpl session )
+    private void scheduleTrafficControl(SocketSessionImpl session)
     {
-        synchronized( trafficControllingSessions )
+        synchronized (trafficControllingSessions)
         {
-            trafficControllingSessions.push( session );
+            trafficControllingSessions.push(session);
         }
     }
 
     private void doAddNewReader() throws InterruptedException
     {
-        if( newSessions.isEmpty() )
+        if (newSessions.isEmpty())
         {
             return;
         }
 
-        for( ; ; )
+        for (; ;)
         {
             MultiThreadSocketSessionImpl session;
 
-            synchronized( newSessions )
+            synchronized (newSessions)
             {
                 session = (MultiThreadSocketSessionImpl) newSessions.peek();
             }
 
-            if( session == null )
+            if (session == null)
             {
                 break;
             }
@@ -211,21 +209,20 @@
             try
             {
 
-                ch.configureBlocking( false );
-                session.setSelectionKey( ch.register( selector,
-                                                      SelectionKey.OP_READ,
-                                                      session ) );
-
+                ch.configureBlocking(false);
+                session.setSelectionKey(ch.register(selector,
+                                                    SelectionKey.OP_READ,
+                                                    session));
 
                 //System.out.println("ReadDebug:"+"Awaiting Registration");
                 session.awaitRegistration();
                 sessionCreated(session);
             }
-            catch( IOException e )
+            catch (IOException e)
             {
                 // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
                 // and call ConnectFuture.setException().
-                session.getFilterChain().fireExceptionCaught( session, e );
+                session.getFilterChain().fireExceptionCaught(session, e);
             }
         }
     }
@@ -242,7 +239,7 @@
         {
             MultiThreadSocketSessionImpl session;
 
-            synchronized(newSessions)
+            synchronized (newSessions)
             {
                 session = (MultiThreadSocketSessionImpl) newSessions.peek();
             }
@@ -257,7 +254,7 @@
             try
             {
                 ch.configureBlocking(false);
-                synchronized(flushingSessionsSet)
+                synchronized (flushingSessionsSet)
                 {
                     flushingSessionsSet.add(session);
                 }
@@ -275,17 +272,16 @@
 
                 // Clear the AbstractIoFilterChain.CONNECT_FUTURE attribute
                 // and call ConnectFuture.setException().
-                session.getFilterChain().fireExceptionCaught( session, e );
+                session.getFilterChain().fireExceptionCaught(session, e);
             }
         }
     }
 
 
-
     private void sessionCreated(SocketSessionImpl sessionParam) throws InterruptedException
     {
         MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam;
-        synchronized(newSessions)
+        synchronized (newSessions)
         {
             if (!session.created())
             {
@@ -294,7 +290,7 @@
 
                 // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here
                 // in AbstractIoFilterChain.fireSessionOpened().
-                session.getServiceListeners().fireSessionCreated( session );
+                session.getServiceListeners().fireSessionCreated(session);
 
                 session.doneCreation();
             }
@@ -303,21 +299,21 @@
 
     private void doRemove()
     {
-        if( removingSessions.isEmpty() )
+        if (removingSessions.isEmpty())
         {
             return;
         }
 
-        for( ; ; )
+        for (; ;)
         {
             MultiThreadSocketSessionImpl session;
 
-            synchronized( removingSessions )
+            synchronized (removingSessions)
             {
                 session = (MultiThreadSocketSessionImpl) removingSessions.pop();
             }
 
-            if( session == null )
+            if (session == null)
             {
                 break;
             }
@@ -330,7 +326,7 @@
             // (In case that Session.close() is called before addSession() is processed)
             if (key == null || writeKey == null)
             {
-                scheduleRemove( session );
+                scheduleRemove(session);
                 break;
             }
             // skip if channel is already closed
@@ -342,24 +338,24 @@
             try
             {
                 //System.out.println("ReadDebug:"+"Removing Session: " + System.identityHashCode(session));
-                synchronized(readLock)
+                synchronized (readLock)
                 {
                     key.cancel();
                 }
-                synchronized(writeLock)
+                synchronized (writeLock)
                 {
                     writeKey.cancel();
                 }
                 ch.close();
             }
-            catch( IOException e )
+            catch (IOException e)
             {
-                session.getFilterChain().fireExceptionCaught( session, e );
+                session.getFilterChain().fireExceptionCaught(session, e);
             }
             finally
             {
-                releaseWriteBuffers( session );
-                session.getServiceListeners().fireSessionDestroyed( session );
+                releaseWriteBuffers(session);
+                session.getServiceListeners().fireSessionDestroyed(session);
             }
         }
     }
@@ -368,16 +364,16 @@
     {
         Iterator it = selectedKeys.iterator();
 
-        while( it.hasNext() )
+        while (it.hasNext())
         {
-            SelectionKey key = ( SelectionKey ) it.next();
+            SelectionKey key = (SelectionKey) it.next();
             MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) key.attachment();
 
-            synchronized(readLock)
+            synchronized (readLock)
             {
                 if (key.isValid() && key.isReadable() && session.getTrafficMask().isReadable())
                 {
-                    read( session );
+                    read(session);
                 }
             }
 
@@ -395,7 +391,7 @@
             SelectionKey key = (SelectionKey) it.next();
             SocketSessionImpl session = (SocketSessionImpl) key.attachment();
 
-            synchronized(writeLock)
+            synchronized (writeLock)
             {
                 if (key.isValid() && key.isWritable() && session.getTrafficMask().isWritable())
                 {
@@ -403,7 +399,7 @@
                     // Clear OP_WRITE
                     key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
 
-                    synchronized(flushingSessionsSet)
+                    synchronized (flushingSessionsSet)
                     {
                         flushingSessions.offer(session);
                     }
@@ -424,7 +420,7 @@
 
         int totalReadBytes = 0;
 
-        for (; totalReadBytes <= MAX_READ_BYTES_PER_SESSION;)
+        while (totalReadBytes <= MAX_READ_BYTES_PER_SESSION)
         {
             ByteBuffer buf = ByteBuffer.allocate(session.getReadBufferSize());
             SocketChannel ch = session.getChannel();
@@ -482,6 +478,9 @@
                     scheduleRemove(session);
                 }
                 session.getFilterChain().fireExceptionCaught(session, e);
+
+                //Stop Reading this session.
+                return;
             }
             finally
             {
@@ -507,12 +506,12 @@
         {
             lastIdleReadCheckTime = currentTime;
             Set keys = selector.keys();
-            if( keys != null )
+            if (keys != null)
             {
-                for( Iterator it = keys.iterator(); it.hasNext(); )
+                for (Iterator it = keys.iterator(); it.hasNext();)
                 {
-                    SelectionKey key = ( SelectionKey ) it.next();
-                    SocketSessionImpl session = ( SocketSessionImpl ) key.attachment();
+                    SelectionKey key = (SelectionKey) it.next();
+                    SocketSessionImpl session = (SocketSessionImpl) key.attachment();
                     notifyReadIdleness(session, currentTime);
                 }
             }
@@ -542,15 +541,15 @@
     private void notifyReadIdleness(SocketSessionImpl session, long currentTime)
     {
         notifyIdleness0(
-            session, currentTime,
-            session.getIdleTimeInMillis( IdleStatus.BOTH_IDLE ),
-            IdleStatus.BOTH_IDLE,
-            Math.max( session.getLastIoTime(), session.getLastIdleTime( IdleStatus.BOTH_IDLE ) ) );
+                session, currentTime,
+                session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
+                IdleStatus.BOTH_IDLE,
+                Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
         notifyIdleness0(
-            session, currentTime,
-            session.getIdleTimeInMillis( IdleStatus.READER_IDLE ),
-            IdleStatus.READER_IDLE,
-            Math.max( session.getLastReadTime(), session.getLastIdleTime( IdleStatus.READER_IDLE ) ) );
+                session, currentTime,
+                session.getIdleTimeInMillis(IdleStatus.READER_IDLE),
+                IdleStatus.READER_IDLE,
+                Math.max(session.getLastReadTime(), session.getLastIdleTime(IdleStatus.READER_IDLE)));
 
         notifyWriteTimeout(session, currentTime, session
                 .getWriteTimeoutInMillis(), session.getLastWriteTime());
@@ -559,51 +558,51 @@
     private void notifyWriteIdleness(SocketSessionImpl session, long currentTime)
     {
         notifyIdleness0(
-            session, currentTime,
+                session, currentTime,
                 session.getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
                 IdleStatus.BOTH_IDLE,
                 Math.max(session.getLastIoTime(), session.getLastIdleTime(IdleStatus.BOTH_IDLE)));
         notifyIdleness0(
                 session, currentTime,
-            session.getIdleTimeInMillis( IdleStatus.WRITER_IDLE ),
-            IdleStatus.WRITER_IDLE,
-            Math.max( session.getLastWriteTime(), session.getLastIdleTime( IdleStatus.WRITER_IDLE ) ) );
+                session.getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
+                IdleStatus.WRITER_IDLE,
+                Math.max(session.getLastWriteTime(), session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
 
-        notifyWriteTimeout( session, currentTime, session
-            .getWriteTimeoutInMillis(), session.getLastWriteTime() );
+        notifyWriteTimeout(session, currentTime, session
+                .getWriteTimeoutInMillis(), session.getLastWriteTime());
     }
 
-    private void notifyIdleness0( SocketSessionImpl session, long currentTime,
-                                  long idleTime, IdleStatus status,
-                                  long lastIoTime )
+    private void notifyIdleness0(SocketSessionImpl session, long currentTime,
+                                 long idleTime, IdleStatus status,
+                                 long lastIoTime)
     {
-        if( idleTime > 0 && lastIoTime != 0
-            && ( currentTime - lastIoTime ) >= idleTime )
+        if (idleTime > 0 && lastIoTime != 0
+            && (currentTime - lastIoTime) >= idleTime)
         {
-            session.increaseIdleCount( status );
-            session.getFilterChain().fireSessionIdle( session, status );
+            session.increaseIdleCount(status);
+            session.getFilterChain().fireSessionIdle(session, status);
         }
     }
 
-    private void notifyWriteTimeout( SocketSessionImpl session,
-                                     long currentTime,
-                                     long writeTimeout, long lastIoTime )
+    private void notifyWriteTimeout(SocketSessionImpl session,
+                                    long currentTime,
+                                    long writeTimeout, long lastIoTime)
     {
 
         MultiThreadSocketSessionImpl sesh = (MultiThreadSocketSessionImpl) session;
         SelectionKey key = sesh.getWriteSelectionKey();
 
-        synchronized(writeLock)
+        synchronized (writeLock)
         {
-        if( writeTimeout > 0
-            && ( currentTime - lastIoTime ) >= writeTimeout
-            && key != null && key.isValid()
-            && ( key.interestOps() & SelectionKey.OP_WRITE ) != 0 )
-        {
-            session.getFilterChain().fireExceptionCaught( session, new WriteTimeoutException() );
+            if (writeTimeout > 0
+                && (currentTime - lastIoTime) >= writeTimeout
+                && key != null && key.isValid()
+                && (key.interestOps() & SelectionKey.OP_WRITE) != 0)
+            {
+                session.getFilterChain().fireExceptionCaught(session, new WriteTimeoutException());
+            }
         }
     }
-    }
 
     private SocketSessionImpl getNextFlushingSession()
     {
@@ -612,9 +611,9 @@
 
     private void releaseSession(SocketSessionImpl session)
     {
-        synchronized(session.getWriteRequestQueue())
+        synchronized (session.getWriteRequestQueue())
         {
-            synchronized(flushingSessionsSet)
+            synchronized (flushingSessionsSet)
             {
                 if (session.getScheduledWriteRequests() > 0)
                 {
@@ -642,7 +641,7 @@
         WriteRequest req;
 
         //Should this be synchronized?
-        synchronized(writeRequestQueue)
+        synchronized (writeRequestQueue)
         {
             while ((req = (WriteRequest) writeRequestQueue.pop()) != null)
             {
@@ -668,9 +667,9 @@
 
         while ((session = (MultiThreadSocketSessionImpl) getNextFlushingSession()) != null)
         {
-            if( !session.isConnected() )
+            if (!session.isConnected())
             {
-                releaseWriteBuffers( session );
+                releaseWriteBuffers(session);
                 releaseSession(session);
                 continue;
             }
@@ -678,14 +677,14 @@
             SelectionKey key = session.getWriteSelectionKey();
             // Retry later if session is not yet fully initialized.
             // (In case that Session.write() is called before addSession() is processed)
-            if( key == null )
+            if (key == null)
             {
-                scheduleFlush( session );
+                scheduleFlush(session);
                 releaseSession(session);
                 continue;
             }
             // skip if channel is already closed
-            if( !key.isValid() )
+            if (!key.isValid())
             {
                 releaseSession(session);
                 continue;
@@ -698,11 +697,11 @@
                     releaseSession(session);
                 }
             }
-            catch( IOException e )
+            catch (IOException e)
             {
                 releaseSession(session);
-                scheduleRemove( session );
-                session.getFilterChain().fireExceptionCaught( session, e );
+                scheduleRemove(session);
+                session.getFilterChain().fireExceptionCaught(session, e);
             }
 
         }
@@ -714,32 +713,32 @@
         MultiThreadSocketSessionImpl session = (MultiThreadSocketSessionImpl) sessionParam;
         // Clear OP_WRITE
         SelectionKey key = session.getWriteSelectionKey();
-        synchronized(writeLock)
+        synchronized (writeLock)
         {
-            key.interestOps( key.interestOps() & ( ~SelectionKey.OP_WRITE ) );
+            key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
         }
         SocketChannel ch = session.getChannel();
         Queue writeRequestQueue = session.getWriteRequestQueue();
 
         long totalFlushedBytes = 0;
-        for( ; ; )
+        while (true)
         {
             WriteRequest req;
 
-            synchronized( writeRequestQueue )
+            synchronized (writeRequestQueue)
             {
-                req = ( WriteRequest ) writeRequestQueue.first();
+                req = (WriteRequest) writeRequestQueue.first();
             }
 
-            if( req == null )
+            if (req == null)
             {
                 break;
             }
 
-            ByteBuffer buf = ( ByteBuffer ) req.getMessage();
-            if( buf.remaining() == 0 )
+            ByteBuffer buf = (ByteBuffer) req.getMessage();
+            if (buf.remaining() == 0)
             {
-                synchronized( writeRequestQueue )
+                synchronized (writeRequestQueue)
                 {
                     writeRequestQueue.pop();
                 }
@@ -747,7 +746,7 @@
                 session.increaseWrittenMessages();
 
                 buf.reset();
-                session.getFilterChain().fireMessageSent( session, req );
+                session.getFilterChain().fireMessageSent(session, req);
                 continue;
             }
 
@@ -755,23 +754,16 @@
             int writtenBytes = 0;
 
             // Reported as DIRMINA-362
-            //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it.            
-//            if (key.isWritable())
+            //note: todo: fixme: Not sure it is important but if we see NoyYetConnected exceptions or 100% CPU in the kernel then this is it.
+            if (key.isWritable())
             {
-                try
-                {
-                    writtenBytes = ch.write(buf.buf());
-                    totalFlushedBytes += writtenBytes;
-                }
-                catch (IOException ioe)
-                {
-                    throw ioe;
-                }
+                writtenBytes = ch.write(buf.buf());
+                totalFlushedBytes += writtenBytes;
             }
 
-            if( writtenBytes > 0 )
+            if (writtenBytes > 0)
             {
-                session.increaseWrittenBytes( writtenBytes );
+                session.increaseWrittenBytes(writtenBytes);
             }
 
             if (buf.hasRemaining() || (totalFlushedBytes <= MAX_FLUSH_BYTES_PER_SESSION))
@@ -911,7 +903,7 @@
 
                     if (writeSelector.keys().isEmpty())
                     {
-                        synchronized(writeLock)
+                        synchronized (writeLock)
                         {
 
                             if (writeSelector.keys().isEmpty() && newSessions.isEmpty())
@@ -963,7 +955,7 @@
             Thread.currentThread().setName(MultiThreadSocketIoProcessor.this.threadName + "Reader");
 
             //System.out.println("ReadDebug:"+"Startup");
-            for( ; ; )
+            for (; ;)
             {
                 try
                 {
@@ -972,7 +964,7 @@
                     doAddNewReader();
                     doUpdateTrafficMask();
 
-                    if( nKeys > 0 )
+                    if (nKeys > 0)
                     {
                         //System.out.println("ReadDebug:"+nKeys + " keys from selector");
 
@@ -987,21 +979,21 @@
                     doRemove();
                     notifyReadIdleness();
 
-                    if( selector.keys().isEmpty() )
+                    if (selector.keys().isEmpty())
                     {
 
-                        synchronized(readLock)
+                        synchronized (readLock)
                         {
-                            if( selector.keys().isEmpty() && newSessions.isEmpty() )
+                            if (selector.keys().isEmpty() && newSessions.isEmpty())
                             {
                                 readWorker = null;
                                 try
                                 {
                                     selector.close();
                                 }
-                                catch( IOException e )
+                                catch (IOException e)
                                 {
-                                    ExceptionMonitor.getInstance().exceptionCaught( e );
+                                    ExceptionMonitor.getInstance().exceptionCaught(e);
                                 }
                                 finally
                                 {
@@ -1013,17 +1005,17 @@
                         }
                     }
                 }
-                catch( Throwable t )
+                catch (Throwable t)
                 {
-                    ExceptionMonitor.getInstance().exceptionCaught( t );
+                    ExceptionMonitor.getInstance().exceptionCaught(t);
 
                     try
                     {
-                        Thread.sleep( 1000 );
+                        Thread.sleep(1000);
                     }
-                    catch( InterruptedException e1 )
+                    catch (InterruptedException e1)
                     {
-                        ExceptionMonitor.getInstance().exceptionCaught( e1 );
+                        ExceptionMonitor.getInstance().exceptionCaught(e1);
                     }
                 }
             }