work on shutdown logic; wait to close down pool until GIOPConnectionThreaded shutdowns have been run
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPConnection.java b/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPConnection.java
index cea456e..a095a6f 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPConnection.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPConnection.java
@@ -23,6 +23,7 @@
import org.omg.SendingContext.CodeBase;
abstract public class GIOPConnection implements DowncallEmitter, UpcallReturn {
+ static final java.util.logging.Logger logger = java.util.logging.Logger.getLogger(GIOPConnection.class.getName());
// ----------------------------------------------------------------
// Inner classes
// ----------------------------------------------------------------
@@ -1440,8 +1441,10 @@
public void setState(int newState) {
synchronized (this) {
if (state_ == newState
- || (state_ != State.Holding && newState < state_))
+ || (state_ != State.Holding && newState < state_)) {
+ logger.fine("No state change from " +state_ + " to " + newState);
return;
+ }
//
// make sure to update the state since some of the actions
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPConnectionThreaded.java b/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPConnectionThreaded.java
index 6bf2337..269edf5 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPConnectionThreaded.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPConnectionThreaded.java
@@ -20,6 +20,8 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -168,6 +170,7 @@
.describeTransient(org.apache.yoko.orb.OB.MinorCodes.MinorForcedShutdown),
org.apache.yoko.orb.OB.MinorCodes.MinorForcedShutdown,
org.omg.CORBA.CompletionStatus.COMPLETED_MAYBE), false);
+ arrive();
}
@@ -183,8 +186,11 @@
//
// don't shutdown if there are pending upcalls
//
- if (upcallsInProgress_ > 0 || state_ != State.Closing)
+ if (upcallsInProgress_ > 0 || state_ != State.Closing) {
+ logger.fine("pending upcalls: " + upcallsInProgress_ + " state: " + state_);
+
return;
+ }
//
// send a CloseConnection if we can
@@ -205,10 +211,12 @@
org.omg.GIOP.MsgType_1_1.CloseConnection, false, 0);
messageQueue_.add(orbInstance_, out._OB_buffer());
+ } else {
+ logger.fine("could not send close connection message");
}
//
- // now create the startup thread
+ // now create the shutdown thread
//
try {
if (shuttingDown)
@@ -218,15 +226,29 @@
//
// start the shutdown thread
//
- getExecutor().submit(new Shutdown());
+ try {
+ getExecutor().submit(new Shutdown());
+ } catch (RejectedExecutionException ree) {
+ logger.log(Level.WARNING, "Could not submit shutdown task", ree);
+ }
} catch (OutOfMemoryError ex) {
processException(State.Closed, new org.omg.CORBA.IMP_LIMIT(
org.apache.yoko.orb.OB.MinorCodes.describeImpLimit(org.apache.yoko.orb.OB.MinorCodes.MinorThreadLimit),
org.apache.yoko.orb.OB.MinorCodes.MinorThreadLimit,
org.omg.CORBA.CompletionStatus.COMPLETED_NO), false);
+ } finally {
+ arrive();
}
}
+
+ private void arrive() {
+ if ((properties_ & Property.CreatedByClient) != 0)
+ orbInstance_.getClientPhaser().arrive();
+ else
+ orbInstance_.getServerPhaser().arrive();
+ }
+
// ----------------------------------------------------------------
// Public Methods
// ----------------------------------------------------------------
@@ -237,6 +259,7 @@
public GIOPConnectionThreaded(ORBInstance orbInstance,
org.apache.yoko.orb.OCI.Transport transport, GIOPClient client) {
super(orbInstance, transport, client);
+ orbInstance.getClientPhaser().register();
start();
}
@@ -246,13 +269,14 @@
public GIOPConnectionThreaded(ORBInstance orbInstance,
org.apache.yoko.orb.OCI.Transport transport, OAInterface oa) {
super(orbInstance, transport, oa);
+ orbInstance.getServerPhaser().register();
}
private ExecutorService getExecutor() {
if ((properties_ & Property.CreatedByClient) != 0)
- return orbInstance_.getClientExecutor();
- else
- return orbInstance_.getServerExecutor();
+ return orbInstance_.getClientExecutor();
+ else
+ return orbInstance_.getServerExecutor();
}
//
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPServerStarter.java b/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPServerStarter.java
index c54a153..2cb4be4 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPServerStarter.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPServerStarter.java
@@ -17,11 +17,8 @@
package org.apache.yoko.orb.OB;
-import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.yoko.orb.PortableServer.*;
-
abstract class GIOPServerStarter {
static final Logger logger = Logger.getLogger(GIOPServerStarter.class.getName());
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPServerStarterThreaded.java b/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPServerStarterThreaded.java
index 6382893..fb2097c 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPServerStarterThreaded.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPServerStarterThreaded.java
@@ -18,6 +18,7 @@
package org.apache.yoko.orb.OB;
import java.util.concurrent.ExecutorService;
+import java.util.logging.Level;
final class GIOPServerStarterThreaded extends GIOPServerStarter {
//
@@ -237,11 +238,13 @@
// StateClosing for proper connection shutdown
//
Assert._OB_assert(state_ == StateClosed);
-
+ logger.fine("Processing an inbound connection because state is closed");
GIOPConnection connection = new GIOPConnectionThreaded(
orbInstance_, transport, oaInterface_);
+ logger.fine("Created connection " + connection);
connection.setState(GIOPConnection.State.Closing);
+ logger.fine("set connection state to closing");
}
} catch (org.omg.CORBA.SystemException ex) {
String msg = "can't accept connection\n" + ex.getMessage();
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OB/ORBControl.java b/yoko-core/src/main/java/org/apache/yoko/orb/OB/ORBControl.java
index 03ff95f..3a0ae1a 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OB/ORBControl.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OB/ORBControl.java
@@ -26,7 +26,10 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReadWriteLock;
import org.apache.yoko.orb.OBPortableServer.POAManagerFactory_impl;
import org.apache.yoko.orb.OBPortableServer.POA_impl;
@@ -110,7 +113,7 @@
}
private void waitForServerThreads() {
- shutdownExecutor(orbInstance_.getServerExecutor());
+ shutdownExecutor(orbInstance_.getServerPhaser(), orbInstance_.getServerExecutor());
//
// Get the DispatchStrategyFactory implementation and
@@ -417,7 +420,7 @@
// Wait for all the threads in the client worker group to
// terminate
//
- shutdownExecutor(orbInstance_.getClientExecutor());
+ shutdownExecutor(orbInstance_.getClientPhaser(), orbInstance_.getClientExecutor());
}
//
@@ -428,12 +431,16 @@
notifyAll();
}
- private void shutdownExecutor(ExecutorService executor) {
- executor.shutdown();
+ private void shutdownExecutor(Phaser phaser, ExecutorService executor) {
+ int phase = phaser.arrive();//release the system's "lock"
+ //phaser advances after all GIOPConnectionThreaded have shut down (gracefully or abort)
try {
- executor.awaitTermination(shutdownTimeout_, TimeUnit.SECONDS);
- } catch (InterruptedException e) {
+ phaser.awaitAdvanceInterruptibly(phase, shutdownTimeout_, TimeUnit.SECONDS);
+ } catch (InterruptedException e1) {
Thread.currentThread().interrupt();
+ } catch (TimeoutException e) {
+ } finally {
+ phaser.forceTermination();
}
executor.shutdownNow();
}
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OB/ORBInstance.java b/yoko-core/src/main/java/org/apache/yoko/orb/OB/ORBInstance.java
index 129cf06..a7493ef 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OB/ORBInstance.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OB/ORBInstance.java
@@ -17,9 +17,15 @@
package org.apache.yoko.orb.OB;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.yoko.orb.OB.BootManager;
import org.apache.yoko.orb.OB.DispatchStrategyFactory;
@@ -93,8 +99,10 @@
private RecursiveMutex orbSyncMutex_ = new RecursiveMutex();
private ExecutorService serverExecutor_;
+ private Phaser serverPhaser = new Phaser(1);
private ExecutorService clientExecutor_;
+ private Phaser clientPhaser = new Phaser(1);
private org.apache.yoko.orb.OCI.ConFactoryRegistry conFactoryRegistry_;
@@ -418,9 +426,17 @@
return serverExecutor_;
}
+ public Phaser getServerPhaser() {
+ return serverPhaser;
+ }
+
public ExecutorService getClientExecutor() {
return clientExecutor_;
}
+
+ public Phaser getClientPhaser() {
+ return clientPhaser;
+ }
public org.apache.yoko.orb.OCI.ConFactoryRegistry getConFactoryRegistry() {
return conFactoryRegistry_;
@@ -468,4 +484,5 @@
public OrbAsyncHandler getAsyncHandler() {
return asyncHandler_;
}
+
}
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/Acceptor_impl.java b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/Acceptor_impl.java
index 75bcefe..1793988 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/Acceptor_impl.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/Acceptor_impl.java
@@ -71,7 +71,7 @@
}
public void close() {
- logger.fine("Closing connection to host=" + localAddress_ + ", port=" + port_);
+ logger.log(Level.FINE, "Closing server socket with host=" + localAddress_ + ", port=" + port_, new Exception("Stack trace"));
//
// Destroy the info object
//
@@ -83,7 +83,9 @@
try {
socket_.close();
socket_ = null;
+ logger.log(Level.FINE, "Closed server socket with host=" + localAddress_ + ", port=" + port_);
} catch (java.io.IOException ex) {
+ logger.log(Level.FINE, "Exception closing server socket with host=" + localAddress_ + ", port=" + port_, ex);
}
}