Cache outbound connections with reference counting.
diff --git a/pom.xml b/pom.xml
index cec4645..9eaf6ae 100644
--- a/pom.xml
+++ b/pom.xml
@@ -71,11 +71,12 @@
<packaging>pom</packaging>
<modules>
- <module>yoko-core</module>
+ <module>yoko-osgi</module>
<module>yoko-spec-corba</module>
<module>yoko-rmi-spec</module>
- <module>yoko-rmi-impl</module>
<module>yoko-util</module>
+ <module>yoko-rmi-impl</module>
+ <module>yoko-core</module>
</modules>
<dependencyManagement>
@@ -83,7 +84,7 @@
<!-- Yoko modules -->
<dependency>
<groupId>org.apache.yoko</groupId>
- <artifactId>yoko-core</artifactId>
+ <artifactId>yoko-osgi</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
@@ -93,19 +94,24 @@
</dependency>
<dependency>
<groupId>org.apache.yoko</groupId>
- <artifactId>yoko-util</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.yoko</groupId>
<artifactId>yoko-rmi-spec</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.yoko</groupId>
+ <artifactId>yoko-util</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.yoko</groupId>
<artifactId>yoko-rmi-impl</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.yoko</groupId>
+ <artifactId>yoko-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- External dependencies -->
<dependency>
<groupId>junit</groupId>
@@ -117,16 +123,6 @@
<artifactId>org.apache.servicemix.bundles.bcel</artifactId>
<version>5.2_2</version>
</dependency>
- <!--<dependency>-->
- <!--<groupId>org.apache.maven</groupId>-->
- <!--<artifactId>maven-plugin-api</artifactId>-->
- <!--<version>${maven.version}</version>-->
- <!--</dependency>-->
- <!--<dependency>-->
- <!--<groupId>org.apache.maven</groupId>-->
- <!--<artifactId>maven-project</artifactId>-->
- <!--<version>${maven.version}</version>-->
- <!--</dependency>-->
<dependency>
<groupId>ant</groupId>
<artifactId>ant</artifactId>
@@ -209,7 +205,6 @@
</compilerArguments>
</configuration>
</plugin>
-
</plugins>
<pluginManagement>
@@ -226,24 +221,6 @@
</dependency>
</dependencies>
</plugin>
- <!--<plugin>-->
- <!--<groupId>org.apache.maven.plugins</groupId>-->
- <!--<artifactId>maven-surefire-plugin</artifactId>-->
- <!--<configuration>-->
- <!--<includes>-->
- <!--<include>**/*Test.java</include>-->
- <!--<include>**/*TestCase.java</include>-->
- <!--</includes>-->
- <!--<excludes>-->
- <!--<exclude>**/*$*</exclude>-->
- <!--</excludes>-->
- <!--<reportFormat>brief</reportFormat>-->
- <!--<useFile>false</useFile>-->
- <!--<forkMode>once</forkMode>-->
- <!--<childDelegation>false</childDelegation>-->
- <!--<argLine>-ea</argLine>-->
- <!--</configuration>-->
- <!--</plugin>-->
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
@@ -272,9 +249,6 @@
<artifactId>maven-clover-plugin</artifactId>
<version>2.4</version>
</plugin>
- <!--plugin>
- <artifactId>maven-pmd-plugin</artifactId>
- </plugin!-->
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
</plugin>
diff --git a/yoko-core/pom.xml b/yoko-core/pom.xml
index eb8b4ff..54af256 100644
--- a/yoko-core/pom.xml
+++ b/yoko-core/pom.xml
@@ -38,6 +38,11 @@
</dependency>
<dependency>
<groupId>org.apache.yoko</groupId>
+ <artifactId>yoko-osgi</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.yoko</groupId>
<artifactId>yoko-spec-corba</artifactId>
<scope>provided</scope>
</dependency>
@@ -48,20 +53,19 @@
</dependency>
<dependency>
<groupId>org.apache.yoko</groupId>
- <artifactId>yoko-rmi-impl</artifactId>
- <scope>test</scope>
+ <artifactId>yoko-util</artifactId>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.yoko</groupId>
- <artifactId>yoko-util</artifactId>
- <scope>provided</scope>
+ <artifactId>yoko-rmi-impl</artifactId>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
-
</dependencies>
<build>
@@ -102,7 +106,7 @@
</artifactItem>
<artifactItem>
<groupId>org.apache.yoko</groupId>
- <artifactId>yoko-util</artifactId>
+ <artifactId>yoko-osgi</artifactId>
<type>jar</type>
<overWrite>true</overWrite>
<outputDirectory>${project.build.directory}/endorsed</outputDirectory>
@@ -127,7 +131,8 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
- <argLine>-Djava.endorsed.dirs=${basedir}/target/endorsed</argLine>
+ <argLine>-Xmx128m</argLine>
+ <argLine>-Djava.endorsed.dirs=${project.build.directory}/endorsed</argLine>
<includes>
<include>**/org/apache/yoko/*Test.java</include>
</includes>
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/CORBA/Delegate.java b/yoko-core/src/main/java/org/apache/yoko/orb/CORBA/Delegate.java
index db5868f..acc5015 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/CORBA/Delegate.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/CORBA/Delegate.java
@@ -1057,11 +1057,4 @@
return downcallStub_;
}
- public synchronized void _OB_closeConnection(boolean terminate) {
- if (downcallStub_ == null) {
- return;
- }
- downcallStub_._OB_closeConnection(terminate);
- downcallStub_ = null;
- }
}
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/CORBA/ObjectImpl.java b/yoko-core/src/main/java/org/apache/yoko/orb/CORBA/ObjectImpl.java
index 780a695..5a456c2 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/CORBA/ObjectImpl.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/CORBA/ObjectImpl.java
@@ -19,8 +19,7 @@
import static org.apache.yoko.orb.OCI.GiopVersion.GIOP1_2;
-import org.apache.yoko.orb.OCI.GiopVersion;
-import org.apache.yoko.util.osgi.ProviderLocator;
+import org.apache.yoko.osgi.ProviderLocator;
//
// ObjectImpl is the base class for proprietary stubs with full
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OB/Client.java b/yoko-core/src/main/java/org/apache/yoko/orb/OB/Client.java
index 3cab63b..c97df61 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OB/Client.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OB/Client.java
@@ -53,7 +53,7 @@
//
// Destroy the client
//
- public abstract void destroy(boolean terminate);
+ public abstract void destroy();
//
// Increment usage (not mutex protected)
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OB/ClientManager.java b/yoko-core/src/main/java/org/apache/yoko/orb/OB/ClientManager.java
index 644ba01..e5f6ff7 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OB/ClientManager.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OB/ClientManager.java
@@ -82,7 +82,7 @@
//
// Destroy all clients
//
- for (Client c : allClients_) c.destroy(false);
+ for (Client c : allClients_) c.destroy();
//
// Reset internal data
@@ -284,7 +284,7 @@
}
if (matched) {
- newClient.destroy(false);
+ newClient.destroy();
continue;
}
}
@@ -372,7 +372,7 @@
return pairs;
}
- public synchronized void releaseClient(Client client, boolean terminate) {
+ public synchronized void releaseClient(Client client) {
//
// The ORB destroys this object, so it's an initialization error
// if this operation is called after ORB destruction
@@ -400,7 +400,7 @@
reusableClients_.remove(client);
if (allClients_.remove(client)) {
- client.destroy(terminate);
+ client.destroy();
} else {
Assert._OB_assert("Release called on unknown client");
}
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OB/CollocatedClient.java b/yoko-core/src/main/java/org/apache/yoko/orb/OB/CollocatedClient.java
index d3f26d4..5cb1242 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OB/CollocatedClient.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OB/CollocatedClient.java
@@ -36,7 +36,7 @@
//
// Destroy the client
//
- public void destroy(boolean terminate) {
+ public void destroy() {
// Nothing to do here
}
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OB/DispatchRequest_impl.java b/yoko-core/src/main/java/org/apache/yoko/orb/OB/DispatchRequest_impl.java
index 3cb2d8c..949cb84 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OB/DispatchRequest_impl.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OB/DispatchRequest_impl.java
@@ -54,7 +54,6 @@
public void invoke() {
poa_._OB_dispatch(oid_, upcall_);
-
upcall_ = null;
}
}
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OB/DowncallStub.java b/yoko-core/src/main/java/org/apache/yoko/orb/OB/DowncallStub.java
index ad989ce..f947285 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OB/DowncallStub.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OB/DowncallStub.java
@@ -136,7 +136,7 @@
return clientProfilePair.client;
}
- private void destroy(boolean terminate) {
+ private void destroy() {
//
// If the ORB has been destroyed then the clientManager can be nil
//
@@ -144,7 +144,7 @@
if (clientManager != null && clientProfilePairs_ != null) {
for (ClientProfilePair pair: clientProfilePairs_) {
- clientManager.releaseClient(pair.client, terminate);
+ clientManager.releaseClient(pair.client);
}
}
@@ -152,7 +152,7 @@
}
protected void finalize() throws Throwable {
- destroy(false);
+ destroy();
super.finalize();
}
@@ -355,7 +355,7 @@
for (ClientProfilePair pair : clientProfilePairs_) {
if (pair.client == client && pair.profile == profile) {
- clientManager.releaseClient(pair.client, false);
+ clientManager.releaseClient(pair.client);
clientProfilePairs_.remove(pair);
break;
}
@@ -1026,10 +1026,6 @@
return delivered;
}
- public void _OB_closeConnection(boolean terminate) {
- destroy(terminate);
- }
-
//
// Need to be able to access the ORB instance from a stub for AMI
// polling
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPClient.java b/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPClient.java
index 6756e8e..2d58692 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPClient.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPClient.java
@@ -17,9 +17,19 @@
package org.apache.yoko.orb.OB;
+import org.apache.yoko.orb.OBPortableServer.POAManager_impl;
+import org.apache.yoko.orb.OCI.ConnectorInfo;
import org.apache.yoko.orb.OCI.GiopVersion;
+import org.apache.yoko.util.Cache;
+import org.apache.yoko.util.Factory;
+import org.apache.yoko.util.Reference;
+import org.omg.CORBA.INITIALIZE;
+import org.omg.PortableServer.POAManager;
import org.omg.SendingContext.CodeBaseHelper;
+import static org.apache.yoko.orb.OB.MinorCodes.*;
+import static org.omg.CORBA.CompletionStatus.*;
+
final class GIOPClient extends Client {
protected ORBInstance orbInstance_; // The ORB instance
@@ -44,9 +54,8 @@
protected boolean bidirWorker_; // is the worker bidir?
- protected boolean ownsWorker_; // does 'this' own the worker?
-
- protected boolean destroy_; // True if destroy() was called
+ protected volatile boolean destroy_; // True if destroy() was called
+ private Reference<GIOPConnection> connectionRef;
// ----------------------------------------------------------------------
// GIOPClient private and protected member implementations
@@ -66,60 +75,14 @@
//
protected GIOPConnection find_bidir_worker() {
try {
- //
- // Any transport that we want to query should exist when the
- // server first receives a request from the client-side. This
- // transport will have the ListenPointList populated inside of
- // its TransportInfo. So we query the list of
- // GIOPServerStarters for the correct transport and hopefully
- // find a match if we want to use bidir
- //
- org.apache.yoko.orb.OBPortableServer.POAManagerFactory pmFactoryImpl = orbInstance_
- .getPOAManagerFactory();
-
- //
- // Obtain a list of POAs for this POAManager
- //
- org.omg.PortableServer.POAManager[] pmSeq = pmFactoryImpl.list();
-
- for (int i = 0; i < pmSeq.length; i++) {
- org.apache.yoko.orb.OBPortableServer.POAManager_impl poaImpl = (org.apache.yoko.orb.OBPortableServer.POAManager_impl) pmSeq[i];
-
- //
- // Get the server manager from the POA
- //
- org.apache.yoko.orb.OB.ServerManager sm = poaImpl
- ._OB_getServerManager();
-
- //
- // get the list of servers from the server manager
- //
- org.apache.yoko.orb.OB.Server[] servSeq = sm.getServers();
-
- //
- // iterate these servers obtaining the GIOPServerStarter
- //
- for (int j = 0; j < servSeq.length; j++) {
- org.apache.yoko.orb.OB.GIOPServer giopServer = (org.apache.yoko.orb.OB.GIOPServer) servSeq[j];
-
- org.apache.yoko.orb.OB.GIOPServerStarter servStarter = giopServer
- ._OB_getGIOPServerStarter();
-
- //
- // get the matching worker from the GIOPServerStarter
- //
- GIOPConnection gw = servStarter.getWorker(connectorInfo());
-
+ for (POAManager poaManager : orbInstance_.getPOAManagerFactory().list()) {
+ for (Server aServSeq : ((POAManager_impl) poaManager)._OB_getServerManager().getServers()) {
+ GIOPConnection gw = ((GIOPServer) aServSeq)._OB_getGIOPServerStarter().getMatchingConnection(connectorInfo());
if (gw != null)
return gw;
}
}
- } catch (ClassCastException ex) {
- }
-
- //
- // nothing was found to return
- //
+ } catch (ClassCastException ignored) {}
return null;
}
@@ -128,108 +91,20 @@
// a new worker is created, with the timeout specified as second
// parameter.
//
- protected synchronized GIOPConnection getWorker(boolean create, int t) {
+ protected synchronized GIOPConnection getWorker(boolean create, final int timeout) {
if (destroy_)
- throw new org.omg.CORBA.INITIALIZE(org.apache.yoko.orb.OB.MinorCodes
- .describeInitialize(org.apache.yoko.orb.OB.MinorCodes.MinorORBDestroyed),
- org.apache.yoko.orb.OB.MinorCodes.MinorORBDestroyed,
- org.omg.CORBA.CompletionStatus.COMPLETED_NO);
+ throw new INITIALIZE(describeInitialize(MinorORBDestroyed), MinorORBDestroyed, COMPLETED_NO);
- //
- // first attempt to locate a reusable bidir connection
+ if (connection_ == null)
+ reuseInboundConnection();
+
+
//
- if (connection_ == null) {
- connection_ = find_bidir_worker();
-
- if (connection_ != null) {
- //
- // adjust the requestID to match the spec (even for
- // clients, odd for servers)
- //
- if ((nextRequestId_ & 1) == 0)
- nextRequestId_++;
- ownsWorker_ = false;
- connection_.activateClientSide(this);
-
- //
- // log the reusing of the connection
- //
- CoreTraceLevels coreTraceLevels = orbInstance_
- .getCoreTraceLevels();
- if (coreTraceLevels.traceConnections() > 0) {
- org.apache.yoko.orb.OCI.TransportInfo info = connection_
- .transport().get_info();
- String msg = "reusing established bidir connection\n";
- msg += info.describe();
- orbInstance_.getLogger().trace("outgoing", msg);
- }
- }
- }
-
- //
- // no bidir connection resolved so create one if the request
- // calls for it
- //
- if (connection_ == null && create) {
- //
- // Trace connection attempt
- //
- CoreTraceLevels coreTraceLevels = orbInstance_.getCoreTraceLevels();
- if (coreTraceLevels.traceConnections() > 0) {
- org.apache.yoko.orb.OCI.ConnectorInfo info = connector_
- .get_info();
- String msg = "trying to establish connection\n";
- msg += "timeout: ";
- if (t >= 0) {
- msg += t;
- msg += "ms\n";
- } else
- msg += "none\n";
- msg += info.describe();
- orbInstance_.getLogger().trace("outgoing", msg);
- }
-
- //
- // Create new transport, using the connector
- //
- // For symetry reasons, GIOPClientStarterThreaded should also be
- // added, even though these classes only have a trivial
- // functionality. Or perhaps the GIOPClientStarterThreaded tries to
- // connect() in the backgound? Just an idea...
- //
-
- org.apache.yoko.orb.OCI.Transport transport;
-
- if (t >= 0) {
- transport = connector_.connect_timeout(t);
-
- //
- // Was there a timeout?
- //
- if (transport == null)
- throw new org.omg.CORBA.NO_RESPONSE("Connection timeout",
- 0, org.omg.CORBA.CompletionStatus.COMPLETED_NO);
- } else {
- transport = connector_.connect();
- Assert._OB_assert(transport != null);
- }
-
- //
- // Create new worker
- //
- Assert._OB_assert(concModel_ == Threaded);
- connection_ = new GIOPConnectionThreaded(orbInstance_, transport,
- this);
- ownsWorker_ = true;
-
- //
- // bidirWorker_ means that this connection may be used to
- // service requests so we need to set ourselves up as a
- // server (to correct map the OAInterfaces)
- //
- if (bidirWorker_)
- connection_.activateServerSide();
- }
+ // no inbound bidir connection resolved so lookup an existing outbound connection
+ // or create one if the request calls for it
+ //
+ if (connection_ == null)
+ reuseOrCreateOutboundConnection(create, timeout);
//
// Lazy initialization of codeSetSC_. We don't want to
@@ -242,6 +117,100 @@
return connection_;
}
+ private synchronized void reuseOrCreateOutboundConnection(boolean create, final int timeout) {
+ Cache<ConnectorInfo, GIOPConnection> connCache = orbInstance_.getOutboundConnectionCache();
+ if (create) {
+ connectionRef = connCache.getOrCreate(connector_.get_info(), new Factory<GIOPConnection>() {
+ @Override public GIOPConnection create() {return createOutboundConnection(timeout);}
+ });
+ } else {
+ connectionRef = connCache.get(connector_.get_info());
+ }
+ connCache.clean();
+ connection_ = connectionRef.get();
+
+ //
+ // bidirWorker_ means that this connection may be used to
+ // service requests so we need to set ourselves up as a
+ // server (to correct map the OAInterfaces)
+ //
+ if (bidirWorker_)
+ connection_.activateServerSide();
+ }
+
+ private synchronized void reuseInboundConnection() {
+ //
+ // first attempt to locate a reusable bidir connection
+ //
+ connection_ = find_bidir_worker();
+
+ if (connection_ == null) return;
+
+ //
+ // adjust the requestID to match the spec (even for
+ // clients, odd for servers)
+ //
+ nextRequestId_ |= 1;
+ connection_.activateClientSide(this);
+
+ //
+ // log the reusing of the connection
+ //
+ if (orbInstance_.getCoreTraceLevels().traceConnections() > 0) {
+ String msg = "reusing established bidir connection\n" + connection_.transport().get_info().describe();
+ orbInstance_.getLogger().trace("outgoing", msg);
+ }
+ }
+
+ private GIOPConnectionThreaded createOutboundConnection(int t) {
+ //
+ // Trace connection attempt
+ //
+ CoreTraceLevels coreTraceLevels = orbInstance_.getCoreTraceLevels();
+ if (coreTraceLevels.traceConnections() > 0) {
+ String msg = "trying to establish connection\n";
+ msg += "timeout: ";
+ if (t >= 0) {
+ msg += t;
+ msg += "ms\n";
+ } else
+ msg += "none\n";
+ msg += connector_.get_info().describe();
+ orbInstance_.getLogger().trace("outgoing", msg);
+ }
+
+ //
+ // Create new transport, using the connector
+ //
+ // For symetry reasons, GIOPClientStarterThreaded should also be
+ // added, even though these classes only have a trivial
+ // functionality. Or perhaps the GIOPClientStarterThreaded tries to
+ // connect() in the backgound? Just an idea...
+ //
+
+ org.apache.yoko.orb.OCI.Transport transport;
+
+ if (t >= 0) {
+ transport = connector_.connect_timeout(t);
+
+ //
+ // Was there a timeout?
+ //
+ if (transport == null)
+ throw new org.omg.CORBA.NO_RESPONSE("Connection timeout",
+ 0, org.omg.CORBA.CompletionStatus.COMPLETED_NO);
+ } else {
+ transport = connector_.connect();
+ Assert._OB_assert(transport != null);
+ }
+
+ //
+ // Create new worker
+ //
+ Assert._OB_assert(concModel_ == Threaded);
+ return new GIOPConnectionThreaded(orbInstance_, transport, this);
+ }
+
//
// initialize internal service contexts
private void initServiceContexts() {
@@ -325,7 +294,6 @@
connection_ = null;
destroy_ = false;
bidirWorker_ = bidirEnable;
- ownsWorker_ = true;
}
// ----------------------------------------------------------------------
@@ -335,39 +303,20 @@
//
// Destroy the client
//
- public void destroy(boolean terminate) {
- GIOPConnection c = null;
-
- synchronized (this) {
- //
- // Don't destroy twice
- //
- if (destroy_)
- return;
-
- //
- // Set the destroy flag
- //
- destroy_ = true;
-
- //
- // Use a copy of the worker, and destroy the worker outside
- // the synchronization, to avoid deadlocks
- //
- c = connection_;
- connection_ = null;
- }
-
- //
- // If there is a worker (and we exclusively own it) destroy it
- //
- if (c != null && ownsWorker_)
- c.destroy(terminate);
+ public synchronized void destroy() {
+ if (destroy_) return;
+ destroy_ = true;
+ connection_ = null;
+ // release the reference if this is an outbound connection
+ if (connectionRef != null) connectionRef.close();
}
public synchronized void removeConnection(GIOPConnection connection) {
- if (connection_ == connection)
- connection_ = null;
+ if (connection != connection_) return;
+ connection_ = null;
+ // purge the reference from the cache if this is an outbound connection
+ if (connectionRef != null)
+ orbInstance_.getOutboundConnectionCache().remove(connectionRef);
}
//
@@ -601,12 +550,4 @@
org.apache.yoko.orb.OCI.Transport transport = connection.transport();
return transport.mode() == org.apache.yoko.orb.OCI.SendReceiveMode.SendReceive;
}
-
- //
- // determines whether this GIOPClient exclusively owns its worker or
- // if its shared with another Client/Server
- //
- public boolean sharedConnection() {
- return !ownsWorker_;
- }
}
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPConnection.java b/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPConnection.java
index 2280516..77aec72 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPConnection.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPConnection.java
@@ -17,19 +17,14 @@
package org.apache.yoko.orb.OB;
-import static org.apache.yoko.orb.OCI.GiopVersion.GIOP1_2;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
+import static org.apache.yoko.orb.OB.MinorCodes.*;
+import static org.omg.CORBA.CompletionStatus.*;
import org.apache.yoko.orb.CORBA.InputStream;
-import org.apache.yoko.orb.OB.Logger;
-import org.apache.yoko.orb.OCI.Buffer;
import org.apache.yoko.orb.OCI.GiopVersion;
import org.omg.CORBA.SystemException;
+import org.omg.CORBA.TRANSIENT;
import org.omg.CORBA.UNKNOWN;
-import org.omg.CORBA.portable.UnknownException;
import org.omg.IOP.ServiceContext;
import org.omg.IOP.UnknownExceptionInfo;
import org.omg.SendingContext.CodeBase;
@@ -489,8 +484,7 @@
// Parse the SCL, examining it for various codeset info
//
readCodeConverters(scl.value);
- if (codeConverters_ != null)
- in._OB_codeConverters(codeConverters_, GiopVersion.get(version.major, version.minor));
+ in._OB_codeConverters(codeConverters_, GiopVersion.get(version.major, version.minor));
//
// read in the peer's sending context runtime object
@@ -502,7 +496,7 @@
//
if (response.value)
upcallsInProgress_++;
-
+
orbInstance_.getLogger().debug("Processing request reqId=" + reqId + " op=" + op.value);
return oaInterface_.createUpcall(
@@ -1162,6 +1156,16 @@
idleTimeout_ = Integer.parseInt(value);
}
+ /** @returns true iff this connection was initiated by the other party */
+ public final boolean isInbound() {
+ return (properties_ & Property.CreatedByClient) == 0;
+ }
+
+ /** @returns true iff this connection was initiated by this party */
+ public final boolean isOutbound() {
+ return !!! isInbound();
+ }
+
//
// start populating the reply data
//
@@ -1450,15 +1454,6 @@
}
//
- // check if this connection is enabled for BiDir communication
- //
- synchronized public boolean bidirConnection() {
- if (client_ == null)
- return false;
- return client_.sharedConnection();
- }
-
- //
// change the state of this connection
//
public void setState(int newState) {
@@ -1598,15 +1593,8 @@
//
// destroy this connection
//
- public void destroy(boolean terminateNow) {
- if (!terminateNow)
- setState(State.Closing);
- else
- processException(State.Closed, new org.omg.CORBA.TRANSIENT(
- MinorCodes
- .describeTransient(org.apache.yoko.orb.OB.MinorCodes.MinorForcedShutdown),
- org.apache.yoko.orb.OB.MinorCodes.MinorForcedShutdown,
- org.omg.CORBA.CompletionStatus.COMPLETED_MAYBE), false);
+ public void destroy() {
+ setState(State.Closing);
}
//
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPIncomingMessage.java b/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPIncomingMessage.java
index 1c5157e..bd5cc94 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPIncomingMessage.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPIncomingMessage.java
@@ -232,8 +232,7 @@
org.apache.yoko.orb.OB.MinorCodes.MinorVersion,
org.omg.CORBA.CompletionStatus.COMPLETED_MAYBE);
- org.apache.yoko.orb.CORBA.InputStream in = new org.apache.yoko.orb.CORBA.InputStream(
- buf, 0, false);
+ org.apache.yoko.orb.CORBA.InputStream in = new org.apache.yoko.orb.CORBA.InputStream(buf, 0, false);
switch (version_.minor) {
case 0: {
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPServerStarter.java b/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPServerStarter.java
index 2cb4be4..ccbe630 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPServerStarter.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPServerStarter.java
@@ -17,18 +17,21 @@
package org.apache.yoko.orb.OB;
+import org.apache.yoko.orb.OCI.Acceptor;
+
+import java.util.Vector;
import java.util.logging.Logger;
abstract class GIOPServerStarter {
static final Logger logger = Logger.getLogger(GIOPServerStarter.class.getName());
- protected ORBInstance orbInstance_; // The ORBInstance
+ protected final ORBInstance orbInstance_; // The ORBInstance
- protected org.apache.yoko.orb.OCI.Acceptor acceptor_; // The acceptor
+ protected final Acceptor acceptor_; // The acceptor
- protected OAInterface oaInterface_; // The OA interface
+ protected final OAInterface oaInterface_; // The OA interface
- protected java.util.Vector connections_ = new java.util.Vector(); // Workers
+ protected final Vector connections_ = new java.util.Vector(); // Workers
public static final int StateActive = 0;
@@ -63,8 +66,7 @@
protected void reapWorkers() {
for (int i = 0; i < connections_.size();) {
- GIOPConnection connection = (GIOPConnection) connections_
- .elementAt(i);
+ GIOPConnection connection = (GIOPConnection) connections_.elementAt(i);
if (connection.destroyed())
connections_.removeElementAt(i);
else
@@ -110,11 +112,11 @@
//
// given a host/port this will search the workers of this
- // GIOPServerStarter for a transport which matches the specific
- // connection information. It returns null if not found.
+ // GIOPServerStarter for an inbound connection transport
+ // which matches the specific connection information.
+ // It returns null if not found.
//
- public synchronized GIOPConnection getWorker(
- org.apache.yoko.orb.OCI.ConnectorInfo connInfo) {
+ public synchronized GIOPConnection getMatchingConnection(org.apache.yoko.orb.OCI.ConnectorInfo connInfo) {
//
// reap the workers first since we don't want to return a
// destroyed transport
@@ -127,11 +129,14 @@
for (int i = 0; i < connections_.size(); i++) {
GIOPConnection worker = (GIOPConnection) connections_.elementAt(i);
+ // we only want to find inbound connections
+ if (worker.isOutbound())
+ continue;
+
org.apache.yoko.orb.OCI.Transport transport = worker.transport();
- if (transport != null)
- if (transport.get_info().endpoint_alias_match(connInfo))
- return worker;
+ if (transport != null && transport.get_info().endpoint_alias_match(connInfo))
+ return worker;
}
//
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPServerStarterThreaded.java b/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPServerStarterThreaded.java
index 81303b4..4701484 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPServerStarterThreaded.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OB/GIOPServerStarterThreaded.java
@@ -17,6 +17,9 @@
package org.apache.yoko.orb.OB;
+import org.apache.yoko.orb.OCI.Acceptor;
+import org.apache.yoko.orb.OCI.Transport;
+
import java.util.concurrent.ExecutorService;
import java.util.logging.Level;
@@ -44,29 +47,19 @@
//
// Accept all connections which might have queued up in the
// listen() backlog
- while (true) {
- org.apache.yoko.orb.OCI.Transport transport = null;
+ do {
try {
- transport = acceptor_.accept(false);
- } catch (org.omg.CORBA.SystemException ex) {
- }
-
- if (transport == null) {
- logger.fine("Null transport received from a connect");
- break;
- }
-
- try {
- GIOPConnection connection = new GIOPConnectionThreaded(
- orbInstance_, transport,
- oaInterface_);
-
+ Transport t = acceptor_.accept(false);
+ if (t == null) {
+ logger.fine("Null transport received from a connect");
+ break;
+ }
+ GIOPConnection connection = new GIOPConnectionThreaded(orbInstance_, t, oaInterface_);
connection.setState(GIOPConnection.State.Closing);
} catch (org.omg.CORBA.SystemException ex) {
- // Ignore SystemExceptions
}
- }
+ } while (true);
//
// Close the acceptor
@@ -80,8 +73,7 @@
// GIOPServerStarterThreaded package member implementation
// ----------------------------------------------------------------------
- GIOPServerStarterThreaded(ORBInstance orbInstance,
- org.apache.yoko.orb.OCI.Acceptor acceptor, OAInterface oaInterface) {
+ GIOPServerStarterThreaded(ORBInstance orbInstance, Acceptor acceptor, OAInterface oaInterface) {
super(orbInstance, acceptor, oaInterface);
logger.fine("GIOPServer thread started " + this + " using acceptor " + acceptor);
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OB/ORBInstance.java b/yoko-core/src/main/java/org/apache/yoko/orb/OB/ORBInstance.java
index a7493ef..8e6c0e3 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OB/ORBInstance.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OB/ORBInstance.java
@@ -17,28 +17,27 @@
package org.apache.yoko.orb.OB;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.yoko.orb.OB.BootManager;
-import org.apache.yoko.orb.OB.DispatchStrategyFactory;
-import org.apache.yoko.orb.OB.Logger;
-import org.apache.yoko.orb.OB.URLRegistry;
-import org.apache.yoko.orb.OB.UnknownExceptionStrategy;
+import org.apache.yoko.orb.OCI.ConnectorInfo;
+import org.apache.yoko.util.Cache;
+import org.apache.yoko.util.concurrent.ReferenceCountedCache;
+import org.apache.yoko.util.concurrent.WeakCountedCache;
public final class ORBInstance {
private boolean destroy_; // True if destroy() was called
- //
- // Reference to ORB is needed in Java
- //
+ private static final Cache.Cleaner<GIOPConnection> CLEANER = new Cache.Cleaner<GIOPConnection>() {
+ @Override
+ public void clean(GIOPConnection conn) {
+ conn.destroy();
+ }
+ };
+
+ private final Cache<ConnectorInfo, GIOPConnection> outboundConnectionCache = new WeakCountedCache<>(CLEANER, 0, 100);
+
private org.omg.CORBA.ORB orb_;
//
@@ -485,4 +484,5 @@
return asyncHandler_;
}
+ public Cache<org.apache.yoko.orb.OCI.ConnectorInfo,GIOPConnection> getOutboundConnectionCache() {return outboundConnectionCache;}
}
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OB/PluginManager.java b/yoko-core/src/main/java/org/apache/yoko/orb/OB/PluginManager.java
index b7ca0bf..28e0d8f 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OB/PluginManager.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OB/PluginManager.java
@@ -17,7 +17,7 @@
package org.apache.yoko.orb.OB;
-import org.apache.yoko.util.osgi.ProviderLocator;
+import org.apache.yoko.osgi.ProviderLocator;
public final class PluginManager {
//
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OB/Util.java b/yoko-core/src/main/java/org/apache/yoko/orb/OB/Util.java
index 49aa61f..19bbdd5 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OB/Util.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OB/Util.java
@@ -17,7 +17,7 @@
package org.apache.yoko.orb.OB;
-import org.apache.yoko.util.osgi.ProviderLocator;
+import org.apache.yoko.osgi.ProviderLocator;
import org.omg.IOP.ServiceContext;
import org.omg.SendingContext.CodeBase;
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OB/ValueWriter.java b/yoko-core/src/main/java/org/apache/yoko/orb/OB/ValueWriter.java
index f9ee9f6..bc9a6b5 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OB/ValueWriter.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OB/ValueWriter.java
@@ -21,9 +21,8 @@
import javax.rmi.CORBA.ValueHandler;
-import org.apache.yoko.orb.CORBA.ORB;
import org.apache.yoko.util.cmsf.RepIds;
-import org.apache.yoko.util.osgi.ProviderLocator;
+import org.apache.yoko.osgi.ProviderLocator;
import org.omg.CORBA.WStringValueHelper;
import org.omg.CORBA.portable.BoxedValueHelper;
import org.omg.CORBA.portable.IDLEntity;
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OBCORBA/ORB_impl.java b/yoko-core/src/main/java/org/apache/yoko/orb/OBCORBA/ORB_impl.java
index 3376368..36e0044 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OBCORBA/ORB_impl.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OBCORBA/ORB_impl.java
@@ -19,8 +19,6 @@
import java.security.AccessController;
import java.util.Properties;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.yoko.orb.cmsf.CmsfClientInterceptor;
import org.apache.yoko.orb.cmsf.CmsfIORInterceptor;
@@ -31,7 +29,7 @@
import org.apache.yoko.orb.yasf.YasfClientInterceptor;
import org.apache.yoko.orb.yasf.YasfIORInterceptor;
import org.apache.yoko.orb.yasf.YasfServerInterceptor;
-import org.apache.yoko.util.osgi.ProviderLocator;
+import org.apache.yoko.osgi.ProviderLocator;
import org.omg.CORBA.OBJECT_NOT_EXIST;
// This class must be public and not final
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OBMessaging/ExceptionHolder_impl.java b/yoko-core/src/main/java/org/apache/yoko/orb/OBMessaging/ExceptionHolder_impl.java
index 982725f..b9461a6 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OBMessaging/ExceptionHolder_impl.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OBMessaging/ExceptionHolder_impl.java
@@ -18,9 +18,7 @@
package org.apache.yoko.orb.OBMessaging;
import static org.apache.yoko.orb.OCI.GiopVersion.GIOP1_2;
-import org.apache.yoko.orb.OCI.GiopVersion;
-import org.apache.yoko.util.osgi.ProviderLocator;
-import org.omg.CORBA.Any;
+import org.apache.yoko.osgi.ProviderLocator;
public class ExceptionHolder_impl extends org.omg.Messaging._ExceptionHolder {
//
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OBMessaging/UserExceptionRaiseProxy.java b/yoko-core/src/main/java/org/apache/yoko/orb/OBMessaging/UserExceptionRaiseProxy.java
index b08ceb7..dd9667b 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OBMessaging/UserExceptionRaiseProxy.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OBMessaging/UserExceptionRaiseProxy.java
@@ -17,7 +17,7 @@
package org.apache.yoko.orb.OBMessaging;
-import org.apache.yoko.util.osgi.ProviderLocator;
+import org.apache.yoko.osgi.ProviderLocator;
public class UserExceptionRaiseProxy {
public void raise(org.omg.Messaging._ExceptionHolder execptHolder)
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/AcceptorInfoOperations.java b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/AcceptorInfoOperations.java
index 64b390e..3931b51 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/AcceptorInfoOperations.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/AcceptorInfoOperations.java
@@ -66,31 +66,8 @@
//
// IDL:orb.yoko.apache.org/OCI/AcceptorInfo/add_accept_cb:1.0
//
- /**
- *
- * Add a callback that is called whenever a new connection is
- * accepted. If the callback has already been registered, this
- * method has no effect.
- *
- * @param cb The callback to add.
- *
- **/
-
- void
- add_accept_cb(AcceptCB cb);
//
// IDL:orb.yoko.apache.org/OCI/AcceptorInfo/remove_accept_cb:1.0
//
- /**
- *
- * Remove an accept callback. If the callback was not registered,
- * this method has no effect.
- *
- * @param cb The callback to remove.
- *
- **/
-
- void
- remove_accept_cb(AcceptCB cb);
}
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/Buffer.java b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/Buffer.java
index 77f1827..6cdee79 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/Buffer.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/Buffer.java
@@ -17,6 +17,8 @@
package org.apache.yoko.orb.OCI;
+import org.apache.yoko.orb.OB.IORUtil;
+
public final class Buffer {
private int max_; // The maximum size of the buffer
@@ -155,4 +157,14 @@
public Buffer(int len) {
alloc(len);
}
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ int pos = pos_, len = len_;
+ IORUtil.dump_octets(data_, 0, pos, sb);
+ sb.append(String.format("------------------ pos = 0x%08X -------------------%n", pos));
+ IORUtil.dump_octets(data_, pos, len_ - pos, sb);
+ return sb.toString();
+ }
}
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/ConnectorInfoOperations.java b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/ConnectorInfoOperations.java
index 5841a3f..7edb247 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/ConnectorInfoOperations.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/ConnectorInfoOperations.java
@@ -62,35 +62,4 @@
String
describe();
-
- //
- // IDL:orb.yoko.apache.org/OCI/ConnectorInfo/add_connect_cb:1.0
- //
- /**
- *
- * Add a callback that is called whenever a new connection is
- * established. If the callback has already been registered, this
- * method has no effect.
- *
- * @param cb The callback to add.
- *
- **/
-
- void
- add_connect_cb(ConnectCB cb);
-
- //
- // IDL:orb.yoko.apache.org/OCI/ConnectorInfo/remove_connect_cb:1.0
- //
- /**
- *
- * Remove a connect callback. If the callback was not registered,
- * this method has no effect.
- *
- * @param cb The callback to remove.
- *
- **/
-
- void
- remove_connect_cb(ConnectCB cb);
}
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/CurrentOperations.java b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/CurrentOperations.java
index 580d592..a0d51be 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/CurrentOperations.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/CurrentOperations.java
@@ -47,16 +47,4 @@
//
// IDL:orb.yoko.apache.org/OCI/Current/get_oci_acceptor_info:1.0
//
- /**
- *
- * This method returns the Acceptor information object for the
- * Acceptor which created the Transport used to invoke the current
- * request.
- *
- * @returns The Acceptor information object.
- *
- **/
-
- AcceptorInfo
- get_oci_acceptor_info();
}
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/Current_impl.java b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/Current_impl.java
index c013f11..c19db57 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/Current_impl.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/Current_impl.java
@@ -68,14 +68,6 @@
.firstElement();
}
- public org.apache.yoko.orb.OCI.AcceptorInfo get_oci_acceptor_info() {
- org.apache.yoko.orb.OCI.TransportInfo info = get_oci_transport_info();
- if (info != null)
- return info.acceptor_info();
- else
- return null;
- }
-
// ------------------------------------------------------------------
// Yoko internal functions
// Application programs must not use these functions directly
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/AcceptorInfo_impl.java b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/AcceptorInfo_impl.java
index bd46fc7..48c2411 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/AcceptorInfo_impl.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/AcceptorInfo_impl.java
@@ -19,16 +19,11 @@
import org.apache.yoko.orb.OCI.IIOP.AcceptorInfo;
import org.apache.yoko.orb.OCI.IIOP.PLUGIN_ID;
+import org.omg.CORBA.LocalObject;
-public final class AcceptorInfo_impl extends org.omg.CORBA.LocalObject
- implements AcceptorInfo {
+public final class AcceptorInfo_impl extends LocalObject implements AcceptorInfo {
private Acceptor_impl acceptor_; // The associated acceptor
- //
- // All accept callback objects
- //
- private java.util.Vector acceptCBVec_ = new java.util.Vector();
-
// ------------------------------------------------------------------
// Standard IDL to Java Mapping
// ------------------------------------------------------------------
@@ -62,24 +57,6 @@
return desc;
}
- public synchronized void add_accept_cb(org.apache.yoko.orb.OCI.AcceptCB cb) {
- int length = acceptCBVec_.size();
- for (int i = 0; i < length; i++)
- if (acceptCBVec_.elementAt(i) == cb)
- return; // Already registered
- acceptCBVec_.addElement(cb);
- }
-
- public synchronized void remove_accept_cb(
- org.apache.yoko.orb.OCI.AcceptCB cb) {
- int length = acceptCBVec_.size();
- for (int i = 0; i < length; i++)
- if (acceptCBVec_.elementAt(i) == cb) {
- acceptCBVec_.removeElementAt(i);
- return;
- }
- }
-
public synchronized String[] hosts() {
if (acceptor_ == null)
throw new org.omg.CORBA.NO_RESOURCES();
@@ -115,16 +92,6 @@
acceptor_ = acceptor;
}
- synchronized void _OB_callAcceptCB(
- org.apache.yoko.orb.OCI.TransportInfo info) {
- int length = acceptCBVec_.size();
- for (int i = 0; i < length; i++) {
- org.apache.yoko.orb.OCI.AcceptCB cb = (org.apache.yoko.orb.OCI.AcceptCB) acceptCBVec_
- .elementAt(i);
- cb.accept_cb(info);
- }
- }
-
synchronized void _OB_destroy() {
acceptor_ = null;
}
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/Acceptor_impl.java b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/Acceptor_impl.java
index 1793988..0cc863a 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/Acceptor_impl.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/Acceptor_impl.java
@@ -17,13 +17,20 @@
package org.apache.yoko.orb.OCI.IIOP;
+import static org.apache.yoko.orb.OCI.IIOP.Exceptions.*;
+import static org.apache.yoko.orb.OB.MinorCodes.*;
+
+import java.io.InterruptedIOException;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.yoko.orb.CORBA.OutputStream;
import org.apache.yoko.orb.OB.Assert;
-import org.apache.yoko.orb.OCI.IIOP.PLUGIN_ID;
+import org.apache.yoko.orb.OB.MinorCodes;
+import org.apache.yoko.orb.OCI.Transport;
+import org.omg.CORBA.COMM_FAILURE;
import org.omg.IOP.Codec;
+import org.omg.IOP.TAG_INTERNET_IOP;
final class Acceptor_impl extends org.omg.CORBA.LocalObject implements
org.apache.yoko.orb.OCI.Acceptor {
@@ -63,7 +70,7 @@
}
public int tag() {
- return org.omg.IOP.TAG_INTERNET_IOP.value;
+ return TAG_INTERNET_IOP.value;
}
public int handle() {
@@ -123,22 +130,11 @@
return null; // Timeout
else {
logger.log(Level.FINE, "Failure accepting connection for host=" + localAddress_ + ", port=" + port_, ex);
-
- throw (org.omg.CORBA.COMM_FAILURE)new org.omg.CORBA.COMM_FAILURE(
- org.apache.yoko.orb.OB.MinorCodes
- .describeCommFailure(org.apache.yoko.orb.OB.MinorCodes.MinorAccept)
- + ": " + ex.getMessage(),
- org.apache.yoko.orb.OB.MinorCodes.MinorAccept,
- org.omg.CORBA.CompletionStatus.COMPLETED_NO).initCause(ex);
+ throw asCommFailure(ex, MinorAccept);
}
} catch (java.io.IOException ex) {
logger.log(Level.FINE, "Failure accepting connection for host=" + localAddress_ + ", port=" + port_, ex);
- throw (org.omg.CORBA.COMM_FAILURE)new org.omg.CORBA.COMM_FAILURE(
- org.apache.yoko.orb.OB.MinorCodes
- .describeCommFailure(org.apache.yoko.orb.OB.MinorCodes.MinorAccept)
- + ": " + ex.getMessage(),
- org.apache.yoko.orb.OB.MinorCodes.MinorAccept,
- org.omg.CORBA.CompletionStatus.COMPLETED_NO).initCause(ex);
+ throw asCommFailure(ex, MinorAccept);
}
//
@@ -150,18 +146,13 @@
socket.setKeepAlive(true);
} catch (java.net.SocketException ex) {
logger.log(Level.FINE, "Failure configuring server connection for host=" + localAddress_ + ", port=" + port_, ex);
- throw (org.omg.CORBA.COMM_FAILURE)new org.omg.CORBA.COMM_FAILURE(
- org.apache.yoko.orb.OB.MinorCodes
- .describeCommFailure(org.apache.yoko.orb.OB.MinorCodes.MinorSetsockopt)
- + ": " + ex.getMessage(),
- org.apache.yoko.orb.OB.MinorCodes.MinorSetsockopt,
- org.omg.CORBA.CompletionStatus.COMPLETED_NO).initCause(ex);
+ throw asCommFailure(ex, MinorSetsockopt);
}
//
// Create new transport
//
- org.apache.yoko.orb.OCI.Transport tr = null;
+ Transport tr = null;
try {
tr = new Transport_impl(this, socket, listenMap_);
logger.fine("Inbound connection received from " + socket.getInetAddress());
@@ -175,24 +166,12 @@
}
//
- // Call callbacks
- //
- org.apache.yoko.orb.OCI.TransportInfo trInfo = tr.get_info();
- try {
- info_._OB_callAcceptCB(trInfo);
- } catch (org.omg.CORBA.SystemException ex) {
- tr.close();
- logger.log(Level.FINE, "error calling connection callbacks", ex);
- throw ex;
- }
-
- //
// Return new transport
//
return tr;
}
- public org.apache.yoko.orb.OCI.Transport connect_self() {
+ public Transport connect_self() {
//
// Create socket and connect to local address
//
@@ -205,20 +184,10 @@
}
} catch (java.net.ConnectException ex) {
logger.log(Level.FINE, "Failure making self connection for host=" + localAddress_ + ", port=" + port_, ex);
- throw new org.omg.CORBA.TRANSIENT(
- org.apache.yoko.orb.OB.MinorCodes
- .describeTransient(org.apache.yoko.orb.OB.MinorCodes.MinorConnectFailed)
- + ": " + ex.getMessage(),
- org.apache.yoko.orb.OB.MinorCodes.MinorConnectFailed,
- org.omg.CORBA.CompletionStatus.COMPLETED_NO);
+ throw asTransient(ex, MinorConnectFailed);
} catch (java.io.IOException ex) {
logger.log(Level.FINE, "Failure making self connection for host=" + localAddress_ + ", port=" + port_, ex);
- throw (org.omg.CORBA.COMM_FAILURE)new org.omg.CORBA.COMM_FAILURE(
- org.apache.yoko.orb.OB.MinorCodes
- .describeCommFailure(org.apache.yoko.orb.OB.MinorCodes.MinorSocket)
- + ": " + ex.getMessage(),
- org.apache.yoko.orb.OB.MinorCodes.MinorSocket,
- org.omg.CORBA.CompletionStatus.COMPLETED_NO).initCause(ex);
+ throw asCommFailure(ex, MinorSocket);
}
//
@@ -232,20 +201,14 @@
socket.close();
} catch (java.io.IOException e) {
}
- throw (org.omg.CORBA.COMM_FAILURE)new org.omg.CORBA.COMM_FAILURE(
- org.apache.yoko.orb.OB.MinorCodes
- .describeCommFailure(org.apache.yoko.orb.OB.MinorCodes.MinorSetsockopt)
- + ": " + ex.getMessage(),
- org.apache.yoko.orb.OB.MinorCodes.MinorSetsockopt,
- org.omg.CORBA.CompletionStatus.COMPLETED_NO).initCause(ex);
+ throw asCommFailure(ex);
}
//
// Create and return new transport
//
- org.apache.yoko.orb.OCI.Transport tr = null;
try {
- tr = new Transport_impl(this, socket, listenMap_);
+ return new Transport_impl(this, socket, listenMap_);
} catch (org.omg.CORBA.SystemException ex) {
try {
socket.close();
@@ -253,7 +216,6 @@
}
throw ex;
}
- return tr;
}
public void add_profiles(org.apache.yoko.orb.OCI.ProfileInfo profileInfo,
@@ -471,12 +433,7 @@
}
} catch (java.net.UnknownHostException ex) {
logger.log(Level.FINE, "Host resolution failure", ex);
- throw (org.omg.CORBA.COMM_FAILURE)new org.omg.CORBA.COMM_FAILURE(
- org.apache.yoko.orb.OB.MinorCodes
- .describeCommFailure(org.apache.yoko.orb.OB.MinorCodes.MinorGethostbyname)
- + ": " + ex.getMessage(),
- org.apache.yoko.orb.OB.MinorCodes.MinorGethostbyname,
- org.omg.CORBA.CompletionStatus.COMPLETED_NO).initCause(ex);
+ throw asCommFailure(ex);
}
//
@@ -513,12 +470,7 @@
org.omg.CORBA.CompletionStatus.COMPLETED_NO).initCause(ex);
} catch (java.io.IOException ex) {
logger.log(Level.FINE, "Failure creating server socket for host=" + localAddress_ + ", port=" + port, ex);
- throw (org.omg.CORBA.COMM_FAILURE)new org.omg.CORBA.COMM_FAILURE(
- org.apache.yoko.orb.OB.MinorCodes
- .describeCommFailure(org.apache.yoko.orb.OB.MinorCodes.MinorSocket)
- + ": " + ex.getMessage(),
- org.apache.yoko.orb.OB.MinorCodes.MinorSocket,
- org.omg.CORBA.CompletionStatus.COMPLETED_NO).initCause(ex);
+ throw asCommFailure(ex, MinorSocket);
}
//
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/ConnectorInfoOperations.java b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/ConnectorInfoOperations.java
index c222f8b..f6eeea5 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/ConnectorInfoOperations.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/ConnectorInfoOperations.java
@@ -20,6 +20,9 @@
//
// IDL:orb.yoko.apache.org/OCI/IIOP/ConnectorInfo:1.0
//
+
+import org.apache.yoko.orb.OCI.Connector;
+
/**
*
* Information on an IIOP OCI Connector object.
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/ConnectorInfo_impl.java b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/ConnectorInfo_impl.java
index a076a8d..21411a8 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/ConnectorInfo_impl.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/ConnectorInfo_impl.java
@@ -17,114 +17,88 @@
package org.apache.yoko.orb.OCI.IIOP;
-import org.apache.yoko.orb.OCI.IIOP.ConnectorInfo;
-import org.apache.yoko.orb.OCI.IIOP.PLUGIN_ID;
+import static org.apache.yoko.orb.OCI.IIOP.Exceptions.*;
-public final class ConnectorInfo_impl extends org.omg.CORBA.LocalObject
- implements ConnectorInfo {
- private Connector_impl connector_; // The associated connector
+import org.apache.yoko.orb.OCI.ConnectCB;
+import org.omg.CORBA.LocalObject;
+import org.omg.IOP.TAG_INTERNET_IOP;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.*;
+
+/**
+ * Immutable memo of the endpoint details for a connection. The InetAddress
+ * is looked up when first needed but never changed again. It is used in
+ * hashcode() and equals().
+ */
+public final class ConnectorInfo_impl extends LocalObject implements ConnectorInfo {
+ private final String host;
+ private final short port;
+ private volatile InetAddress addr; // initialised lazily
//
// All connect callback objects
//
- private java.util.Vector connectCBVec_ = new java.util.Vector();
+ private final List<ConnectCB> callbacks;
// ------------------------------------------------------------------
// Standard IDL to Java Mapping
// ------------------------------------------------------------------
- public String id() {
- return PLUGIN_ID.value;
- }
+ public String id() {return PLUGIN_ID.value;}
- public int tag() {
- return org.omg.IOP.TAG_INTERNET_IOP.value;
- }
+ public int tag() {return TAG_INTERNET_IOP.value;}
- public synchronized String describe() {
- short remotePort = remote_port();
+ public String describe() {return String.format("id: %s%nremote address: %s:%d", PLUGIN_ID.value, remote_addr(), port);}
- String desc = "id: " + PLUGIN_ID.value;
- desc += "\nremote address: ";
- desc += remote_addr();
- desc += ":";
- desc += (remotePort < 0 ? 0xffff + (int) remotePort + 1 : remotePort);
+ public String remote_addr() {return getInetAddress().getHostAddress();}
- return desc;
- }
-
- public synchronized void add_connect_cb(org.apache.yoko.orb.OCI.ConnectCB cb) {
- int length = connectCBVec_.size();
- for (int i = 0; i < length; i++)
- if (connectCBVec_.elementAt(i) == cb)
- return; // Already registered
- connectCBVec_.addElement(cb);
- }
-
- public synchronized void remove_connect_cb(
- org.apache.yoko.orb.OCI.ConnectCB cb) {
- int length = connectCBVec_.size();
- for (int i = 0; i < length; i++)
- if (connectCBVec_.elementAt(i) == cb) {
- connectCBVec_.removeElementAt(i);
- return;
- }
- }
-
- public synchronized String remote_addr() {
- if (connector_ == null)
- throw new org.omg.CORBA.NO_RESOURCES("No connector");
-
- try {
- java.net.InetAddress address = java.net.InetAddress
- .getByName(connector_.host_);
- return address.getHostAddress();
- } catch (java.net.UnknownHostException ex) {
- throw new org.omg.CORBA.COMM_FAILURE(
- org.apache.yoko.orb.OB.MinorCodes
- .describeCommFailure(org.apache.yoko.orb.OB.MinorCodes.MinorGethostbyname)
- + ": " + ex.getMessage(),
- org.apache.yoko.orb.OB.MinorCodes.MinorGethostbyname,
- org.omg.CORBA.CompletionStatus.COMPLETED_NO);
- }
- }
-
- public synchronized short remote_port() {
- if (connector_ == null)
- throw new org.omg.CORBA.NO_RESOURCES("No connector");
-
- int port = connector_.port_;
-
- if (port >= 0x8000)
- return (short) (port - 0xffff - 1);
- else
- return (short) port;
- }
+ public short remote_port() {return port;}
// ------------------------------------------------------------------
// Yoko internal functions
// Application programs must not use these functions directly
// ------------------------------------------------------------------
- ConnectorInfo_impl(Connector_impl connector,
- org.apache.yoko.orb.OCI.ConnectCB[] cb) {
- connector_ = connector;
-
- for (int i = 0; i < cb.length; i++)
- connectCBVec_.addElement(cb[i]);
+ ConnectorInfo_impl(String host, int port, ConnectCB...cb) {
+ this.host = host;
+ this.port = (short)port;
+ if (cb == null || cb.length == 0)
+ callbacks = Collections.emptyList();
+ else
+ callbacks = Collections.unmodifiableList(new ArrayList<ConnectCB>(Arrays.asList(cb)));
}
- synchronized void _OB_callConnectCB(
- org.apache.yoko.orb.OCI.TransportInfo info) {
- int length = connectCBVec_.size();
- for (int i = 0; i < length; i++) {
- org.apache.yoko.orb.OCI.ConnectCB cb = (org.apache.yoko.orb.OCI.ConnectCB) connectCBVec_
- .elementAt(i);
- cb.connect_cb(info);
+ String getHost() { return host; }
+
+ int getPort() { return (char)port; }
+
+ private InetAddress getInetAddress() {
+ if (addr == null) synchronized (this) {
+ if (addr == null) try {
+ addr = InetAddress.getByName(host);
+ } catch (UnknownHostException ex) {
+ throw asCommFailure(ex);
+ }
}
+ return addr;
}
- synchronized void _OB_destroy() {
- connector_ = null;
+ synchronized void _OB_callConnectCB(org.apache.yoko.orb.OCI.TransportInfo info) {
+ for (ConnectCB cb : callbacks) cb.connect_cb(info);
}
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) return true;
+ if (!!! (other instanceof ConnectorInfo_impl)) return false;
+
+ ConnectorInfo_impl that = (ConnectorInfo_impl) other;
+
+ return (this.port == that.port) && this.getInetAddress().equals(that.getInetAddress());
+ }
+
+ @Override
+ public int hashCode() {return 31*port + getInetAddress().hashCode();}
}
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/Connector_impl.java b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/Connector_impl.java
index a886a35..e2a737e 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/Connector_impl.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/Connector_impl.java
@@ -17,10 +17,14 @@
package org.apache.yoko.orb.OCI.IIOP;
+import static org.apache.yoko.orb.OCI.IIOP.Exceptions.*;
+
import java.util.Arrays;
import java.util.logging.Level;
import java.util.logging.Logger;
+import org.apache.yoko.orb.OCI.ConnectCB;
+import org.apache.yoko.orb.OCI.Connector;
import org.apache.yoko.orb.OCI.ProfileInfo;
import org.apache.yoko.orb.OCI.ProfileInfoHolder;
import org.omg.CORBA.Policy;
@@ -29,25 +33,18 @@
import org.omg.IOP.IOR;
import org.omg.IOP.TaggedComponent;
-final class Connector_impl extends org.omg.CORBA.LocalObject implements
- org.apache.yoko.orb.OCI.Connector {
+final class Connector_impl extends org.omg.CORBA.LocalObject implements Connector {
// the real logger backing instance. We use the interface class as the locator
- static final Logger logger = Logger.getLogger(org.apache.yoko.orb.OCI.Connector.class.getName());
+ static final Logger logger = Logger.getLogger(Connector.class.getName());
private final IOR ior_; // the target IOR we're connecting with
private final Policy[] policies_; // the policies used for the connection.
- // Some data member must not be private because the info object
- // must be able to access them
- public String host_; // The host
-
- public int port_; // The port
-
private boolean keepAlive_; // The keepalive flag
- private ConnectorInfo_impl info_; // Connector information
+ private final ConnectorInfo_impl info_; // Connector information
private java.net.Socket socket_; // The socket
@@ -67,11 +64,7 @@
// ------------------------------------------------------------------
private void close() {
- logger.fine("Closing connection to host=" + host_ + ", port=" + port_);
- //
- // Destroy the info object
- //
- info_._OB_destroy();
+ logger.fine("Closing connection to host=" + this.info_.getHost() + ", port=" + this.info_.getPort());
//
// Close the socket
@@ -105,15 +98,10 @@
//
java.net.InetAddress address;
try {
- address = java.net.InetAddress.getByName(host_);
+ address = java.net.InetAddress.getByName(this.info_.getHost());
} catch (java.net.UnknownHostException ex) {
logger.log(Level.FINE, "Host resolution error", ex);
- throw (org.omg.CORBA.COMM_FAILURE)new org.omg.CORBA.COMM_FAILURE(
- org.apache.yoko.orb.OB.MinorCodes
- .describeCommFailure(org.apache.yoko.orb.OB.MinorCodes.MinorGethostbyname)
- + ": " + ex.getMessage(),
- org.apache.yoko.orb.OB.MinorCodes.MinorGethostbyname,
- org.omg.CORBA.CompletionStatus.COMPLETED_NO).initCause(ex);
+ throw asCommFailure(ex);
}
@@ -121,27 +109,27 @@
// Create socket and connect
//
try {
- logger.fine("Connecting to host=" + address + ", port=" + port_);
+ logger.fine("Connecting to host=" + address + ", port=" + this.info_.getPort());
if (connectionHelper_ != null) {
- socket_ = connectionHelper_.createSocket(ior_, policies_, address, port_);
+ socket_ = connectionHelper_.createSocket(ior_, policies_, address, this.info_.getPort());
} else {
- socket_ = extendedConnectionHelper_.createSocket(ior_, policies_, address, port_);
+ socket_ = extendedConnectionHelper_.createSocket(ior_, policies_, address, this.info_.getPort());
}
- logger.fine("Connection created with socket " + socket_);
+ logger.fine("Connection created with socket " + socket_);
} catch (java.net.ConnectException ex) {
- logger.log(Level.FINE, "Error connecting to host=" + address + ", port=" + port_, ex);
+ logger.log(Level.FINE, "Error connecting to host=" + address + ", port=" + this.info_.getPort(), ex);
throw new org.omg.CORBA.TRANSIENT(
org.apache.yoko.orb.OB.MinorCodes
.describeTransient(org.apache.yoko.orb.OB.MinorCodes.MinorConnectFailed)
- + "Error connecting to host=" + address + ", port=" + port_+ ": " + ex.getMessage(),
+ + "Error connecting to host=" + address + ", port=" + this.info_.getPort() + ": " + ex.getMessage(),
org.apache.yoko.orb.OB.MinorCodes.MinorConnectFailed,
org.omg.CORBA.CompletionStatus.COMPLETED_NO);
} catch (java.io.IOException ex) {
- logger.log(Level.FINE, "Error connecting to host=" + address + ", port=" + port_, ex);
+ logger.log(Level.FINE, "Error connecting to host=" + address + ", port=" + this.info_.getPort(), ex);
throw (org.omg.CORBA.COMM_FAILURE)new org.omg.CORBA.COMM_FAILURE(
org.apache.yoko.orb.OB.MinorCodes
.describeCommFailure(org.apache.yoko.orb.OB.MinorCodes.MinorSocket)
- + "Error connecting to host=" + address + ", port=" + port_ + ": " + ex.getMessage(),
+ + "Error connecting to host=" + address + ", port=" + this.info_.getPort() + ": " + ex.getMessage(),
org.apache.yoko.orb.OB.MinorCodes.MinorSocket,
org.omg.CORBA.CompletionStatus.COMPLETED_NO).initCause(ex);
}
@@ -160,12 +148,7 @@
socket_.close();
} catch (java.io.IOException e) {
}
- throw (org.omg.CORBA.COMM_FAILURE)new org.omg.CORBA.COMM_FAILURE(
- org.apache.yoko.orb.OB.MinorCodes
- .describeCommFailure(org.apache.yoko.orb.OB.MinorCodes.MinorSetsockopt)
- + ": " + ex.getMessage(),
- org.apache.yoko.orb.OB.MinorCodes.MinorSetsockopt,
- org.omg.CORBA.CompletionStatus.COMPLETED_NO).initCause(ex);
+ throw Exceptions.asCommFailure(ex);
}
//
@@ -173,7 +156,7 @@
//
org.apache.yoko.orb.OCI.Transport tr = null;
try {
- tr = new Transport_impl(this, socket_, listenMap_);
+ tr = new Transport_impl(socket_, listenMap_);
socket_ = null;
} catch (org.omg.CORBA.SystemException ex) {
logger.log(Level.FINE, "Transport creation error", ex);
@@ -223,9 +206,9 @@
public void run() {
try {
if (connectionHelper_ != null) {
- so_ = connectionHelper_.createSocket(ior_, policies_, address_, port_);
+ so_ = connectionHelper_.createSocket(ior_, policies_, address_, Connector_impl.this.info_.getPort());
} else {
- so_ = extendedConnectionHelper_.createSocket(ior_, policies_, address_, port_);
+ so_ = extendedConnectionHelper_.createSocket(ior_, policies_, address_, Connector_impl.this.info_.getPort());
}
} catch (java.io.IOException ex) {
logger.log(Level.FINE, "Socket creation error", ex);
@@ -284,15 +267,10 @@
//
java.net.InetAddress address = null;
try {
- address = java.net.InetAddress.getByName(host_);
+ address = java.net.InetAddress.getByName(this.info_.getHost());
} catch (java.net.UnknownHostException ex) {
logger.log(Level.FINE, "Host resolution error", ex);
- throw (org.omg.CORBA.COMM_FAILURE)new org.omg.CORBA.COMM_FAILURE(
- org.apache.yoko.orb.OB.MinorCodes
- .describeCommFailure(org.apache.yoko.orb.OB.MinorCodes.MinorGethostbyname)
- + ": " + ex.getMessage(),
- org.apache.yoko.orb.OB.MinorCodes.MinorGethostbyname,
- org.omg.CORBA.CompletionStatus.COMPLETED_NO).initCause(ex);
+ throw asCommFailure(ex);
}
//
@@ -338,12 +316,7 @@
socket_.close();
} catch (java.io.IOException e) {
}
- throw (org.omg.CORBA.COMM_FAILURE)new org.omg.CORBA.COMM_FAILURE(
- org.apache.yoko.orb.OB.MinorCodes
- .describeCommFailure(org.apache.yoko.orb.OB.MinorCodes.MinorSetsockopt)
- + ": " + ex.getMessage(),
- org.apache.yoko.orb.OB.MinorCodes.MinorSetsockopt,
- org.omg.CORBA.CompletionStatus.COMPLETED_NO).initCause(ex);
+ throw Exceptions.asCommFailure(ex);
}
//
@@ -351,7 +324,7 @@
//
org.apache.yoko.orb.OCI.Transport tr = null;
try {
- tr = new Transport_impl(this, socket_, listenMap_);
+ tr = new Transport_impl(socket_, listenMap_);
socket_ = null;
} catch (org.omg.CORBA.SystemException ex) {
logger.log(Level.FINE, "Transport setup error", ex);
@@ -396,7 +369,7 @@
org.apache.yoko.orb.OCI.ProfileInfoSeqHolder profileInfoSeq = new org.apache.yoko.orb.OCI.ProfileInfoSeqHolder();
profileInfoSeq.value = new org.apache.yoko.orb.OCI.ProfileInfo[0];
- Util.extractAllProfileInfos(ior, profileInfoSeq, true, host_, port_,
+ Util.extractAllProfileInfos(ior, profileInfoSeq, true, this.info_.getHost(), this.info_.getPort(),
false, codec_);
//check that the transport info matches ours.
@@ -429,23 +402,23 @@
//
// Compare ports
//
- if (port_ != impl.port_)
+ if (this.info_.getPort() != impl.info_.getPort())
return false;
//
// Direct host name comparison
//
- if (!host_.equals(impl.host_)) {
+ if (!this.info_.getHost().equals(impl.info_.getHost())) {
//
// Direct host name comparision failed - must look up
// addresses to be really sure if the hosts differ
//
try {
java.net.InetAddress addr1 = java.net.InetAddress
- .getByName(host_);
+ .getByName(this.info_.getHost());
java.net.InetAddress addr2 = java.net.InetAddress
- .getByName(impl.host_);
+ .getByName(impl.info_.getHost());
if (!addr1.equals(addr2))
return false;
@@ -458,11 +431,6 @@
}
return Arrays.equals(transportInfo, impl.transportInfo);
-
- //
- // OK, connectors are the same
- //
-// return true;
}
byte[] extractTransportInfo(IOR ior) {
@@ -490,40 +458,27 @@
// Application programs must not use these functions directly
// ------------------------------------------------------------------
- public Connector_impl(org.omg.IOP.IOR ior, org.omg.CORBA.Policy[] policies, String host, int port, boolean keepAlive,
- org.apache.yoko.orb.OCI.ConnectCB[] cb, ListenerMap lm, ConnectionHelper helper, Codec codec) {
- // System.out.println("Connector_impl");
+ private Connector_impl(IOR ior, Policy[] policies, String host, int port, boolean keepAlive, ConnectCB[] cb, ListenerMap lm, ConnectionHelper helper, ExtendedConnectionHelper xhelper, Codec codec) {
ior_ = ior;
policies_ = policies;
- host_ = host;
- port_ = port;
keepAlive_ = keepAlive;
- info_ = new ConnectorInfo_impl(this, cb);
+ info_ = new ConnectorInfo_impl(host, port, cb);
listenMap_ = lm;
connectionHelper_ = helper;
- extendedConnectionHelper_ = null;
+ extendedConnectionHelper_ = xhelper;
codec_ = codec;
transportInfo = extractTransportInfo(ior);
}
- public Connector_impl(org.omg.IOP.IOR ior, org.omg.CORBA.Policy[] policies, String host, int port, boolean keepAlive,
- org.apache.yoko.orb.OCI.ConnectCB[] cb, ListenerMap lm, ExtendedConnectionHelper helper, Codec codec) {
- // System.out.println("Connector_impl");
- ior_ = ior;
- policies_ = policies;
- host_ = host;
- port_ = port;
- keepAlive_ = keepAlive;
- info_ = new ConnectorInfo_impl(this, cb);
- listenMap_ = lm;
- connectionHelper_ = null;
- extendedConnectionHelper_ = helper;
- codec_ = codec;
- transportInfo = extractTransportInfo(ior);
+ public Connector_impl(IOR ior, Policy[] policies, String host, int port, boolean keepAlive, ConnectCB[] cb, ListenerMap lm, ConnectionHelper helper, Codec codec) {
+ this(ior, policies, host, port, keepAlive, cb, lm, helper, null, codec);
+ }
+
+ public Connector_impl(IOR ior, Policy[] policies, String host, int port, boolean keepAlive, ConnectCB[] cb, ListenerMap lm, ExtendedConnectionHelper helper, Codec codec) {
+ this(ior, policies, host, port, keepAlive, cb, lm, null, helper, codec);
}
public void finalize() throws Throwable {
- // System.out.println("~Connector_impl");
if (socket_ != null)
close();
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/Exceptions.java b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/Exceptions.java
new file mode 100644
index 0000000..16b7d46
--- /dev/null
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/Exceptions.java
@@ -0,0 +1,35 @@
+package org.apache.yoko.orb.OCI.IIOP;
+
+import static org.apache.yoko.orb.OB.MinorCodes.*;
+
+import org.omg.CORBA.COMM_FAILURE;
+import org.omg.CORBA.TRANSIENT;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+
+import static org.apache.yoko.orb.OB.MinorCodes.MinorSetsockopt;
+import static org.apache.yoko.orb.OB.MinorCodes.describeCommFailure;
+import static org.omg.CORBA.CompletionStatus.COMPLETED_NO;
+
+enum Exceptions {;
+ static COMM_FAILURE asCommFailure(SocketException e) {return asCommFailure(e, MinorSetsockopt);}
+ static COMM_FAILURE asCommFailure(UnknownHostException e) {return asCommFailure(e, MinorGethostbyname);}
+
+ static COMM_FAILURE asCommFailure(IOException e, int minor) {
+ String msg = String.format("%s: %s", describeCommFailure(minor), e.getMessage());
+ return (COMM_FAILURE) new COMM_FAILURE(msg, minor, COMPLETED_NO).initCause(e);
+ }
+
+ static COMM_FAILURE asCommFailure(Exception e, int minor, String message) {
+ String msg = String.format("%s: %s: %s", describeCommFailure(minor), message, e.getMessage());
+ return (COMM_FAILURE) new COMM_FAILURE(msg, minor, COMPLETED_NO).initCause(e);
+ }
+
+ static TRANSIENT asTransient(Exception e, int minor) {
+ String msg = String.format("%s: %s", describeTransient(minor), e.getMessage());
+ return (TRANSIENT) new TRANSIENT(msg, minor, COMPLETED_NO);
+ }
+}
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/TransportInfo_impl.java b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/TransportInfo_impl.java
index 54ce73e..353de16 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/TransportInfo_impl.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/TransportInfo_impl.java
@@ -18,27 +18,32 @@
package org.apache.yoko.orb.OCI.IIOP;
import java.net.Socket;
+import java.util.Objects;
-import org.apache.yoko.orb.OCI.IIOP.PLUGIN_ID;
-import org.apache.yoko.orb.OCI.IIOP.TransportInfo;
+import org.apache.yoko.orb.CORBA.InputStream;
+import org.apache.yoko.orb.OB.Net;
+import org.apache.yoko.orb.OCI.*;
+import org.omg.BiDirPolicy.BIDIRECTIONAL_POLICY_TYPE;
+import org.omg.BiDirPolicy.BOTH;
+import org.omg.BiDirPolicy.BidirectionalPolicy;
+import org.omg.BiDirPolicy.BidirectionalPolicyHelper;
+import org.omg.CORBA.LocalObject;
+import org.omg.CORBA.NO_RESOURCES;
+import org.omg.CORBA.Policy;
+import org.omg.IIOP.BiDirIIOPServiceContext;
+import org.omg.IIOP.BiDirIIOPServiceContextHelper;
+import org.omg.IIOP.ListenPoint;
+import org.omg.IOP.BI_DIR_IIOP;
+import org.omg.IOP.ServiceContext;
+import org.omg.IOP.TAG_INTERNET_IOP;
-public final class TransportInfo_impl extends org.omg.CORBA.LocalObject
- implements TransportInfo {
- private org.apache.yoko.orb.OCI.ConnectorInfo connectorInfo_; // connector
- // info
+public final class TransportInfo_impl extends LocalObject implements TransportInfo {
+ private enum Origin{CLIENT(CLIENT_SIDE.value), SERVER(SERVER_SIDE.value); final short value; Origin(int v) {value = (short)v;}}
+ private final Socket socket;
+ private final Origin origin;
+ private final ListenerMap listenMap_;
+ private volatile ListenPoint[] listenPoints_ = null;
- private org.apache.yoko.orb.OCI.AcceptorInfo acceptorInfo_; // acceptor info
-
- private Transport_impl transport_; // associated transport
-
- private org.omg.IIOP.ListenPoint[] listenPoints_ = null;
-
- private ListenerMap listenMap_;
-
- //
- // All close callback objects
- //
- private java.util.Vector closeCBVec_ = new java.util.Vector();
// ------------------------------------------------------------------
// Standard IDL to Java Mapping
@@ -49,14 +54,11 @@
}
public int tag() {
- return org.omg.IOP.TAG_INTERNET_IOP.value;
+ return TAG_INTERNET_IOP.value;
}
public short origin() {
- if (acceptorInfo_ == null)
- return org.apache.yoko.orb.OCI.CLIENT_SIDE.value;
- else
- return org.apache.yoko.orb.OCI.SERVER_SIDE.value;
+ return origin.value;
}
public synchronized String describe() {
@@ -79,94 +81,32 @@
return desc;
}
- public synchronized org.apache.yoko.orb.OCI.ConnectorInfo connector_info() {
- return connectorInfo_;
- }
+ public Socket getSocket() {return socket;}
- public synchronized org.apache.yoko.orb.OCI.AcceptorInfo acceptor_info() {
- return acceptorInfo_;
- }
+ public String addr() {return socket.getLocalAddress().getHostAddress();}
- public synchronized void add_close_cb(org.apache.yoko.orb.OCI.CloseCB cb) {
- int length = closeCBVec_.size();
- for (int i = 0; i < length; i++)
- if (closeCBVec_.elementAt(i) == cb)
- return; // Already registered
- closeCBVec_.addElement(cb);
- }
+ public short port() {return (short)socket.getLocalPort();}
- public synchronized void remove_close_cb(org.apache.yoko.orb.OCI.CloseCB cb) {
- int length = closeCBVec_.size();
- for (int i = 0; i < length; i++)
- if (closeCBVec_.elementAt(i) == cb) {
- closeCBVec_.removeElementAt(i);
- return;
- }
- }
+ public String remote_addr() {return socket.getInetAddress().getHostAddress();}
- public synchronized java.net.Socket socket() {
- if (transport_ == null)
- throw new org.omg.CORBA.NO_RESOURCES();
+ public short remote_port() {return (short)socket.getPort();}
- return transport_.socket_;
- }
-
-
- public synchronized String addr() {
- if (transport_ == null)
- throw new org.omg.CORBA.NO_RESOURCES();
-
- return transport_.socket_.getLocalAddress().getHostAddress();
- }
-
- public synchronized short port() {
- if (transport_ == null)
- throw new org.omg.CORBA.NO_RESOURCES();
-
- int port = transport_.socket_.getLocalPort();
-
- if (port >= 0x8000)
- return (short) (port - 0xffff - 1);
- else
- return (short) port;
- }
-
- public synchronized String remote_addr() {
- if (transport_ == null)
- throw new org.omg.CORBA.NO_RESOURCES();
-
- return transport_.socket_.getInetAddress().getHostAddress();
- }
-
- public synchronized short remote_port() {
- if (transport_ == null)
- throw new org.omg.CORBA.NO_RESOURCES();
-
- int port = transport_.socket_.getPort();
-
- if (port >= 0x8000)
- return (short) (port - 0xffff - 1);
- else
- return (short) port;
- }
-
- public org.omg.IOP.ServiceContext[] get_service_contexts(
- org.omg.CORBA.Policy[] policies) {
- org.omg.IOP.ServiceContext[] scl;
+ public ServiceContext[] get_service_contexts(Policy[] policies) {
+ ServiceContext[] scl;
boolean bHaveBidir = false;
- for (int i = 0; i < policies.length; i++) {
- if (policies[i].policy_type() == org.omg.BiDirPolicy.BIDIRECTIONAL_POLICY_TYPE.value) {
- org.omg.BiDirPolicy.BidirectionalPolicy p = org.omg.BiDirPolicy.BidirectionalPolicyHelper
- .narrow(policies[i]);
- if (p.value() == org.omg.BiDirPolicy.BOTH.value)
+ for (Policy policy : policies) {
+ if (policy.policy_type() == BIDIRECTIONAL_POLICY_TYPE.value) {
+ BidirectionalPolicy p = BidirectionalPolicyHelper
+ .narrow(policy);
+ if (p.value() == BOTH.value)
bHaveBidir = true;
break;
}
}
if (bHaveBidir) {
- org.omg.IIOP.BiDirIIOPServiceContext biDirCtxt = new org.omg.IIOP.BiDirIIOPServiceContext();
+ BiDirIIOPServiceContext biDirCtxt = new BiDirIIOPServiceContext();
biDirCtxt.listen_points = listenMap_.getListenPoints();
org.apache.yoko.orb.OCI.Buffer buf = new org.apache.yoko.orb.OCI.Buffer();
@@ -200,22 +140,19 @@
}
public void handle_service_contexts(org.omg.IOP.ServiceContext[] contexts) {
- for (int i = 0; i < contexts.length; i++) {
- if (contexts[i].context_id == org.omg.IOP.BI_DIR_IIOP.value) {
- byte[] pOct = contexts[i].context_data;
- int len = contexts[i].context_data.length;
+ for (ServiceContext context : contexts) {
+ if (context.context_id == BI_DIR_IIOP.value) {
+ byte[] pOct = context.context_data;
+ int len = context.context_data.length;
- org.apache.yoko.orb.OCI.Buffer buf = new org.apache.yoko.orb.OCI.Buffer(
- pOct, len);
- org.apache.yoko.orb.CORBA.InputStream in = new org.apache.yoko.orb.CORBA.InputStream(
- buf, 0, false);
+ Buffer buf = new Buffer(pOct, len);
+ InputStream in = new InputStream(buf, 0, false);
in._OB_readEndian();
//
// unmarshal the octets back to the bidir format
//
- org.omg.IIOP.BiDirIIOPServiceContext biDirCtxt = org.omg.IIOP.BiDirIIOPServiceContextHelper
- .read(in);
+ BiDirIIOPServiceContext biDirCtxt = BiDirIIOPServiceContextHelper.read(in);
//
// save the listening points in the transport
@@ -228,14 +165,10 @@
}
public synchronized boolean received_bidir_SCL() {
- if (listenPoints_ == null)
- return false;
-
- return (listenPoints_.length > 0);
+ return listenPoints_ != null && (listenPoints_.length > 0);
}
- public synchronized boolean endpoint_alias_match(
- org.apache.yoko.orb.OCI.ConnectorInfo connInfo) {
+ public synchronized boolean endpoint_alias_match(org.apache.yoko.orb.OCI.ConnectorInfo connInfo) {
//
// we only deal with Connectors that are of our specific type,
// namely IIOP connectors (and ConnectorInfos)
@@ -257,10 +190,10 @@
short port = infoImpl.remote_port();
String host = infoImpl.remote_addr();
- for (int i = 0; i < listenPoints_.length; i++) {
- if ((listenPoints_[i].port == port)
- && org.apache.yoko.orb.OB.Net.CompareHosts(
- listenPoints_[i].host, host))
+ for (ListenPoint aListenPoints_ : listenPoints_) {
+ if ((aListenPoints_.port == port)
+ && Net.CompareHosts(
+ aListenPoints_.host, host))
return true;
}
@@ -279,31 +212,20 @@
// Yoko internal functions
// Application programs must not use these functions directly
// ------------------------------------------------------------------
-
- TransportInfo_impl(Transport_impl transport,
- org.apache.yoko.orb.OCI.Connector connector, ListenerMap lm) {
- transport_ = transport;
- connectorInfo_ = connector.get_info();
+ private TransportInfo_impl(Socket socket, Origin origin, ListenerMap lm) {
+ this.socket = socket;
+ this.origin = origin;
listenMap_ = lm;
}
- TransportInfo_impl(Transport_impl transport,
- org.apache.yoko.orb.OCI.Acceptor acceptor, ListenerMap lm) {
- transport_ = transport;
- acceptorInfo_ = acceptor.get_info();
- listenMap_ = lm;
+
+ // client-side constructor
+ TransportInfo_impl(Transport_impl transport, ListenerMap lm) {
+ this(transport.socket_, Origin.CLIENT, lm);
}
- synchronized void _OB_callCloseCB(org.apache.yoko.orb.OCI.TransportInfo info) {
- int length = closeCBVec_.size();
- for (int i = 0; i < length; i++) {
- org.apache.yoko.orb.OCI.CloseCB cb = (org.apache.yoko.orb.OCI.CloseCB) closeCBVec_
- .elementAt(i);
- cb.close_cb(info);
- }
- }
-
- synchronized void _OB_destroy() {
- transport_ = null;
+ //server-side constructor
+ TransportInfo_impl(Transport_impl transport, Acceptor acceptor, ListenerMap lm) {
+ this(transport.socket_, Origin.SERVER, lm);
}
}
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/Transport_impl.java b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/Transport_impl.java
index ec24f27..e986502 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/Transport_impl.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/IIOP/Transport_impl.java
@@ -17,16 +17,29 @@
package org.apache.yoko.orb.OCI.IIOP;
+import static org.apache.yoko.orb.OCI.IIOP.Exceptions.*;
+import static org.apache.yoko.orb.OB.MinorCodes.*;
+
+import org.apache.yoko.orb.OB.MinorCodes;
+import org.apache.yoko.orb.OCI.Acceptor;
+import org.apache.yoko.orb.OCI.SendReceiveMode;
+import org.omg.CORBA.COMM_FAILURE;
+import org.omg.CORBA.CompletionStatus;
+import org.omg.CORBA.NO_IMPLEMENT;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.Socket;
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.yoko.orb.OCI.IIOP.PLUGIN_ID;
+import static org.apache.yoko.orb.OCI.SendReceiveMode.*;
final public class Transport_impl extends org.omg.CORBA.LocalObject implements
org.apache.yoko.orb.OCI.Transport {
// This data member must not be private because the info object
// must be able to access it
- public java.net.Socket socket_; // The socket
+ public final java.net.Socket socket_; // The socket
private java.io.InputStream in_; // The socket's input stream
@@ -54,7 +67,7 @@
} catch (java.net.SocketException ex) {
logger.log(Level.FINE, "Socket setup error", ex);
- throw (org.omg.CORBA.COMM_FAILURE)new org.omg.CORBA.COMM_FAILURE(
+ throw (COMM_FAILURE)new COMM_FAILURE(
org.apache.yoko.orb.OB.MinorCodes
.describeCommFailure(org.apache.yoko.orb.OB.MinorCodes.MinorSetSoTimeout)
+ ": socket error during setSoTimeout: "
@@ -63,7 +76,7 @@
org.omg.CORBA.CompletionStatus.COMPLETED_NO).initCause(ex);
} catch (java.lang.NullPointerException ex) {
logger.log(Level.FINE, "Socket setup error", ex);
- throw (org.omg.CORBA.COMM_FAILURE)new org.omg.CORBA.COMM_FAILURE(
+ throw (COMM_FAILURE)new COMM_FAILURE(
org.apache.yoko.orb.OB.MinorCodes
.describeCommFailure(org.apache.yoko.orb.OB.MinorCodes.MinorSetSoTimeout)
+ ": NullPointerException error during setSoTimeout: "
@@ -86,41 +99,27 @@
// shutdown the receiving side. If how == 1, shutdown the sending
// side. If how == 2, shutdown both.
//
- private static void shutdownSocket(java.net.Socket socket, int how) {
- if (socket == null) // Socket already closed
- return;
-
- if (how == 2) {
- shutdownSocket(socket, 0);
- shutdownSocket(socket, 1);
- return;
- }
-
+ private void shutdownSocket() {
try {
- if (how == 0) {
try {
- socket.shutdownInput();
+ socket_.shutdownInput();
} catch (UnsupportedOperationException e) {
// if we're using an SSL connection, this is an unsupported operation.
// just ignore the error and proceed to the close.
}
- } else if (how == 1) {
try {
- socket.shutdownOutput();
+ socket_.shutdownOutput();
} catch (UnsupportedOperationException e) {
// if we're using an SSL connection, this is an unsupported operation.
// just ignore the error and proceed to the close.
}
- } else {
- throw new InternalError();
- }
} catch (java.net.SocketException ex) {
//
// Some VMs (namely JRockit) raise a SocketException if
// the socket has already been closed.
// This exception can be ignored.
//
- } catch (java.io.IOException ex) {
+ } catch (IOException ex) {
logger.log(Level.FINE, "Socket shutdown error", ex);
throw (InternalError)new InternalError().initCause(ex);
}
@@ -138,28 +137,15 @@
return org.omg.IOP.TAG_INTERNET_IOP.value;
}
- public org.apache.yoko.orb.OCI.SendReceiveMode mode() {
- return org.apache.yoko.orb.OCI.SendReceiveMode.SendReceive;
+ public SendReceiveMode mode() {
+ return SendReceive;
}
public int handle() {
- throw new org.omg.CORBA.NO_IMPLEMENT();
+ throw new NO_IMPLEMENT();
}
public void close() {
- if (socket_ == null) // shutdown() may call close()
- return;
-
- //
- // Call callbacks
- //
- info_._OB_callCloseCB(info_);
-
- //
- // Destroy the info object
- //
- info_._OB_destroy();
-
//
// I must set socket_ to null *before* the close or the code
// below, to avoid a race condition with send/receive
@@ -168,26 +154,22 @@
//
// Close the socket
//
- java.net.Socket saveSocket = socket_;
- socket_ = null; // Must be set to null before the shutdown/close
- shutdownSocket(saveSocket, 2); // This helps to unblock threads
+ shutdownSocket(); // This helps to unblock threads
// blocking in recv()
try {
- saveSocket.close();
- } catch (java.io.IOException ex) {
+ socket_.close();
+ } catch (IOException ex) {
}
}
public void shutdown() {
logger.info("shutdown: " + this);
shutdown_ = true;
- shutdownSocket(socket_, 2); // Shutdown send side only
- if (socket_ != null) {
- // blocking in recv()
- try {
- socket_.close();
- } catch (java.io.IOException ex) {
- }
+ shutdownSocket(); // Shutdown send side only
+ // blocking in recv()
+ try {
+ socket_.close();
+ } catch (IOException ex) {
}
}
@@ -199,47 +181,28 @@
try {
int result = in_.read(buf.data(), buf.pos(), buf.rest_length());
if (result <= 0) {
- throw new org.omg.CORBA.COMM_FAILURE(
- org.apache.yoko.orb.OB.MinorCodes
- .describeCommFailure(org.apache.yoko.orb.OB.MinorCodes.MinorRecvZero),
- org.apache.yoko.orb.OB.MinorCodes.MinorRecvZero,
- org.omg.CORBA.CompletionStatus.COMPLETED_NO);
+ throw new COMM_FAILURE(describeCommFailure(MinorRecvZero), MinorRecvZero, CompletionStatus.COMPLETED_NO);
}
buf.advance(result);
- } catch (java.io.InterruptedIOException ex) {
+ } catch (InterruptedIOException ex) {
logger.log(Level.FINE, "Received interrupted exception", ex);
buf.advance(ex.bytesTransferred);
if (!block)
return;
if (shutdown_)
- throw (org.omg.CORBA.COMM_FAILURE)new org.omg.CORBA.COMM_FAILURE(
- org.apache.yoko.orb.OB.MinorCodes
- .describeCommFailure(org.apache.yoko.orb.OB.MinorCodes.MinorRecv)
- + ": IOInterrupted exception during shutdown: " + ex.getMessage(),
- org.apache.yoko.orb.OB.MinorCodes.MinorRecv,
- org.omg.CORBA.CompletionStatus.COMPLETED_NO).initCause(ex);
-
+ throw asCommFailure(ex, MinorCodes.MinorRecv, "Interrupted I/O exception during shutdown");
} catch (java.io.IOException ex) {
logger.log(Level.FINE, "Socket read error", ex);
- throw (org.omg.CORBA.COMM_FAILURE)new org.omg.CORBA.COMM_FAILURE(
- org.apache.yoko.orb.OB.MinorCodes
- .describeCommFailure(org.apache.yoko.orb.OB.MinorCodes.MinorRecv)
- + ": I/O error during read: " + ex.getMessage(),
- org.apache.yoko.orb.OB.MinorCodes.MinorRecv,
- org.omg.CORBA.CompletionStatus.COMPLETED_NO).initCause(ex);
+ throw asCommFailure(ex, MinorCodes.MinorRecv, "I/O error during read");
} catch (java.lang.NullPointerException ex) {
- logger.log(Level.FINE, "Socket read error", ex);
- throw (org.omg.CORBA.COMM_FAILURE)new org.omg.CORBA.COMM_FAILURE(
- org.apache.yoko.orb.OB.MinorCodes
- .describeCommFailure(org.apache.yoko.orb.OB.MinorCodes.MinorRecv)
- + ": NullPointerException during read",
- org.apache.yoko.orb.OB.MinorCodes.MinorRecv,
- org.omg.CORBA.CompletionStatus.COMPLETED_NO).initCause(ex);
+ logger.log(Level.FINE, "Socket read error", ex);
+ throw asCommFailure(ex, MinorCodes.MinorRecv, "NullPointerException during read");
}
}
}
+
public boolean receive_detect(org.apache.yoko.orb.OCI.Buffer buf,
boolean block) {
setBlock(block);
@@ -280,11 +243,7 @@
try {
int result = in_.read(buf.data(), buf.pos(), buf.rest_length());
if (result <= 0) {
- throw new org.omg.CORBA.COMM_FAILURE(
- org.apache.yoko.orb.OB.MinorCodes
- .describeCommFailure(org.apache.yoko.orb.OB.MinorCodes.MinorRecvZero),
- org.apache.yoko.orb.OB.MinorCodes.MinorRecvZero,
- org.omg.CORBA.CompletionStatus.COMPLETED_NO);
+ throw new COMM_FAILURE(describeCommFailure(MinorRecvZero), MinorRecvZero, CompletionStatus.COMPLETED_NO);
}
buf.advance(result);
} catch (java.io.InterruptedIOException ex) {
@@ -292,20 +251,10 @@
return;
} catch (java.io.IOException ex) {
logger.log(Level.FINE, "Socket read error", ex);
- throw (org.omg.CORBA.COMM_FAILURE)new org.omg.CORBA.COMM_FAILURE(
- org.apache.yoko.orb.OB.MinorCodes
- .describeCommFailure(org.apache.yoko.orb.OB.MinorCodes.MinorRecv)
- + ": I/O error during read: " + ex.getMessage(),
- org.apache.yoko.orb.OB.MinorCodes.MinorRecv,
- org.omg.CORBA.CompletionStatus.COMPLETED_NO).initCause(ex);
+ throw asCommFailure(ex, MinorRecv, "I/O error during read");
} catch (java.lang.NullPointerException ex) {
logger.log(Level.FINE, "Socket read error", ex);
- throw (org.omg.CORBA.COMM_FAILURE)new org.omg.CORBA.COMM_FAILURE(
- org.apache.yoko.orb.OB.MinorCodes
- .describeCommFailure(org.apache.yoko.orb.OB.MinorCodes.MinorRecv)
- + ": NullPointerException during read",
- org.apache.yoko.orb.OB.MinorCodes.MinorRecv,
- org.omg.CORBA.CompletionStatus.COMPLETED_NO).initCause(ex);
+ throw asCommFailure(ex, MinorRecv, "NullPointerException during read");
}
}
}
@@ -355,21 +304,11 @@
if (!block)
return;
} catch (java.io.IOException ex) {
- logger.log(Level.FINE, "Socket write error", ex);
- throw (org.omg.CORBA.COMM_FAILURE)new org.omg.CORBA.COMM_FAILURE(
- org.apache.yoko.orb.OB.MinorCodes
- .describeCommFailure(org.apache.yoko.orb.OB.MinorCodes.MinorSend)
- + ": I/O error during write: " + ex.getMessage(),
- org.apache.yoko.orb.OB.MinorCodes.MinorSend,
- org.omg.CORBA.CompletionStatus.COMPLETED_NO).initCause(ex);
+ logger.log(Level.FINE, "Socket write error", ex);
+ throw asCommFailure(ex, MinorSend, "I/O error during write");
} catch (java.lang.NullPointerException ex) {
- logger.log(Level.FINE, "Socket write error", ex);
- throw (org.omg.CORBA.COMM_FAILURE)new org.omg.CORBA.COMM_FAILURE(
- org.apache.yoko.orb.OB.MinorCodes
- .describeCommFailure(org.apache.yoko.orb.OB.MinorCodes.MinorSend)
- + ": NullPointerException during write",
- org.apache.yoko.orb.OB.MinorCodes.MinorSend,
- org.omg.CORBA.CompletionStatus.COMPLETED_NO).initCause(ex);
+ logger.log(Level.FINE, "Socket write error", ex);
+ throw asCommFailure(ex, MinorSend, "NullPointerException during write");
}
}
}
@@ -417,21 +356,11 @@
buf.advance(ex.bytesTransferred);
return;
} catch (java.io.IOException ex) {
- logger.log(Level.FINE, "Socket write error", ex);
- throw (org.omg.CORBA.COMM_FAILURE)new org.omg.CORBA.COMM_FAILURE(
- org.apache.yoko.orb.OB.MinorCodes
- .describeCommFailure(org.apache.yoko.orb.OB.MinorCodes.MinorSend)
- + ": I/O error during write: " + ex.getMessage(),
- org.apache.yoko.orb.OB.MinorCodes.MinorSend,
- org.omg.CORBA.CompletionStatus.COMPLETED_NO).initCause(ex);
+ logger.log(Level.FINE, "Socket write error", ex);
+ throw asCommFailure(ex, MinorSend, "I/O error during write");
} catch (java.lang.NullPointerException ex) {
- logger.log(Level.FINE, "Socket write error", ex);
- throw (org.omg.CORBA.COMM_FAILURE)new org.omg.CORBA.COMM_FAILURE(
- org.apache.yoko.orb.OB.MinorCodes
- .describeCommFailure(org.apache.yoko.orb.OB.MinorCodes.MinorSend)
- + ": NullPointerException during write",
- org.apache.yoko.orb.OB.MinorCodes.MinorSend,
- org.omg.CORBA.CompletionStatus.COMPLETED_NO).initCause(ex);
+ logger.log(Level.FINE, "Socket write error", ex);
+ throw asCommFailure(ex, MinorSend, "NullPointerException during write");
}
}
}
@@ -472,8 +401,7 @@
// Application programs must not use these functions directly
// ------------------------------------------------------------------
- public Transport_impl(org.apache.yoko.orb.OCI.Connector connector,
- java.net.Socket socket, ListenerMap lm) {
+ public Transport_impl(java.net.Socket socket, ListenerMap lm) {
socket_ = socket;
shutdown_ = false;
@@ -485,25 +413,18 @@
in_ = socket_.getInputStream();
out_ = socket_.getOutputStream();
} catch (java.io.IOException ex) {
- logger.log(Level.FINE, "Socket setup error", ex);
- throw (org.omg.CORBA.COMM_FAILURE)new org.omg.CORBA.COMM_FAILURE(
- org.apache.yoko.orb.OB.MinorCodes
- .describeCommFailure(org.apache.yoko.orb.OB.MinorCodes.MinorSocket)
- + ": unable to obtain socket InputStream: "
- + ex.getMessage(),
- org.apache.yoko.orb.OB.MinorCodes.MinorSocket,
- org.omg.CORBA.CompletionStatus.COMPLETED_NO).initCause(ex);
+ logger.log(Level.FINE, "Socket setup error", ex);
+ throw asCommFailure(ex, MinorSocket, "unable to obtain socket InputStream");
}
//
// Since the Constructor of TransportInfo uses this object create
// it after all members are initialized
//
- info_ = new TransportInfo_impl(this, connector, lm);
+ info_ = new TransportInfo_impl(this, lm);
}
- public Transport_impl(org.apache.yoko.orb.OCI.Acceptor acceptor,
- java.net.Socket socket, ListenerMap lm) {
+ public Transport_impl(Acceptor acceptor, Socket socket, ListenerMap lm) {
socket_ = socket;
shutdown_ = false;
@@ -517,14 +438,8 @@
in_ = socket_.getInputStream();
out_ = socket_.getOutputStream();
} catch (java.io.IOException ex) {
- logger.log(Level.FINE, "Socket setup error", ex);
- throw (org.omg.CORBA.COMM_FAILURE)new org.omg.CORBA.COMM_FAILURE(
- org.apache.yoko.orb.OB.MinorCodes
- .describeCommFailure(org.apache.yoko.orb.OB.MinorCodes.MinorSocket)
- + ": unable to obtain socket InputStream: "
- + ex.getMessage(),
- org.apache.yoko.orb.OB.MinorCodes.MinorSocket,
- org.omg.CORBA.CompletionStatus.COMPLETED_NO).initCause(ex);
+ logger.log(Level.FINE, "Socket setup error", ex);
+ throw asCommFailure(ex, MinorSocket, "unable to obtain socket InputStream");
}
//
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/TransportInfoOperations.java b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/TransportInfoOperations.java
index c2f423a..5f26fda 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/TransportInfoOperations.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/TransportInfoOperations.java
@@ -53,32 +53,10 @@
//
// IDL:orb.yoko.apache.org/OCI/TransportInfo/connector_info:1.0
//
- /**
- *
- * The ConnectorInfo object for the Connector that created the
- * Transport object that this TransportInfo object belongs to.
- * If the Transport for this TransportInfo was not created by a
- * Connector, this attribute is set to the nil object reference.
- *
- **/
-
- ConnectorInfo
- connector_info();
//
// IDL:orb.yoko.apache.org/OCI/TransportInfo/acceptor_info:1.0
//
- /**
- *
- * The AcceptorInfo object for the Acceptor that created the
- * Transport object that this TransportInfo object belongs to.
- * If the Transport for this TransportInfo was not created by an
- * Acceptor, this attribute is set to the nil object reference.
- *
- **/
-
- AcceptorInfo
- acceptor_info();
//
// IDL:orb.yoko.apache.org/OCI/TransportInfo/origin:1.0
@@ -118,33 +96,10 @@
//
// IDL:orb.yoko.apache.org/OCI/TransportInfo/add_close_cb:1.0
//
- /**
- *
- * Add a callback that is called before a connection is closed. If
- * the callback has already been registered, this method has no
- * effect.
- *
- * @param cb The callback to add.
- *
- **/
-
- void
- add_close_cb(CloseCB cb);
//
// IDL:orb.yoko.apache.org/OCI/TransportInfo/remove_close_cb:1.0
//
- /**
- *
- * Remove a close callback. If the callback was not registered,
- * this method has no effect.
- *
- * @param cb The callback to remove.
- *
- **/
-
- void
- remove_close_cb(CloseCB cb);
//
// IDL:orb.yoko.apache.org/OCI/TransportInfo/get_service_contexts:1.0
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/iiop.java b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/iiop.java
index aacf2cd..2f43cc7 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/OCI/iiop.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/OCI/iiop.java
@@ -17,14 +17,12 @@
package org.apache.yoko.orb.OCI;
-import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.yoko.orb.OB.Assert;
import org.apache.yoko.orb.OB.AssertionFailed;
import org.apache.yoko.orb.OCI.IIOP.ConnectionHelper;
import org.apache.yoko.orb.OCI.IIOP.ExtendedConnectionHelper;
-import org.apache.yoko.util.osgi.ProviderLocator;
+import org.apache.yoko.osgi.ProviderLocator;
public class iiop implements PluginInit {
static final Logger logger = Logger.getLogger(iiop.class.getName());
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/activator/Activator.java b/yoko-core/src/main/java/org/apache/yoko/orb/activator/Activator.java
index a80bc65..a6a2d0d 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/activator/Activator.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/activator/Activator.java
@@ -1,6 +1,6 @@
package org.apache.yoko.orb.activator;
-import org.apache.yoko.util.osgi.locator.activator.AbstractBundleActivator;
+import org.apache.yoko.osgi.locator.activator.AbstractBundleActivator;
public class Activator extends AbstractBundleActivator {
diff --git a/yoko-core/src/main/java/org/apache/yoko/orb/csi/SecurityContext.java b/yoko-core/src/main/java/org/apache/yoko/orb/csi/SecurityContext.java
index 5c7a2ac..d501f28 100644
--- a/yoko-core/src/main/java/org/apache/yoko/orb/csi/SecurityContext.java
+++ b/yoko-core/src/main/java/org/apache/yoko/orb/csi/SecurityContext.java
@@ -23,7 +23,7 @@
import java.security.AccessController;
import org.apache.yoko.orb.util.GetSystemPropertyAction;
-import org.apache.yoko.util.osgi.ProviderLocator;
+import org.apache.yoko.osgi.ProviderLocator;
public abstract class SecurityContext {
diff --git a/yoko-core/src/test/java/org/apache/yoko/ConnectionCachingTest.java b/yoko-core/src/test/java/org/apache/yoko/ConnectionCachingTest.java
new file mode 100644
index 0000000..230c4e4
--- /dev/null
+++ b/yoko-core/src/test/java/org/apache/yoko/ConnectionCachingTest.java
@@ -0,0 +1,203 @@
+package org.apache.yoko;
+
+import org.apache.yoko.orb.OBPortableServer.POA;
+import org.apache.yoko.orb.OBPortableServer.POAHelper;
+import org.apache.yoko.orb.OBPortableServer.POAManager;
+import org.apache.yoko.orb.OBPortableServer.POAManagerHelper;
+import org.apache.yoko.orb.OCI.Acceptor;
+import org.apache.yoko.orb.OCI.IIOP.AcceptorInfo;
+import org.apache.yoko.orb.OCI.IIOP.AcceptorInfoHelper;
+import org.apache.yoko.orb.spi.naming.NameServiceInitializer;
+import org.apache.yoko.orb.spi.naming.RemoteAccess;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.omg.CORBA.BAD_OPERATION;
+import org.omg.CORBA.LocalObject;
+import org.omg.CORBA.ORB;
+import org.omg.CORBA.portable.OutputStream;
+import org.omg.CORBA.portable.ResponseHandler;
+import org.omg.CORBA_2_3.portable.InputStream;
+import org.omg.CosNaming.*;
+import org.omg.PortableInterceptor.*;
+import org.omg.PortableInterceptor.ORBInitInfoPackage.DuplicateName;
+import test.util.Skellington;
+
+import java.rmi.Remote;
+import java.rmi.RemoteException;
+import java.util.Properties;
+
+import static javax.rmi.PortableRemoteObject.narrow;
+
+public class ConnectionCachingTest {
+ private static final NameComponent[] OBJECT_NAME = { new NameComponent("object", "")};
+ ORB serverORB;
+ ORB clientORB;
+
+ @Before
+ public void setup() throws Exception {
+ serverORB = Util.createServerOrb();
+ clientORB = Util.createClientORB(serverORB);
+ // make a GIOP 1.0 call first
+ NamingContext ctx = NamingContextHelper.narrow(clientORB.string_to_object(Util.getNameServerUrl(serverORB)));
+ ctx.new_context();
+ }
+
+ @Test
+ public void testSingleNull() throws Exception {
+ Assert.assertEquals(null, newRemoteImpl(clientORB).bounce(null));
+ }
+
+ @Test
+ public void testSingleNullSameOrb() throws Exception {
+ Assert.assertEquals(null, newRemoteImpl(serverORB).bounce(null));
+ }
+
+ @Test
+ public void testSingleEmptyString() throws Exception {
+ Assert.assertEquals("", newRemoteImpl(clientORB).bounce(""));
+ }
+
+ @Test
+ public void testSingleEmptyStringSameOrb() throws Exception {
+ Assert.assertEquals("", newRemoteImpl(serverORB).bounce(""));
+ }
+
+ @Test
+ public void testSingleNonEmptyString() throws Exception {
+ Assert.assertEquals("hello", newRemoteImpl(clientORB).bounce("hello"));
+ }
+
+ @Test
+ public void testSingleNonEmptyStringSameOrb() throws Exception {
+ Assert.assertEquals("hello", newRemoteImpl(serverORB).bounce("hello"));
+ }
+
+ @Test
+ public void testLotsOfInvocations() throws Exception {
+ Assert.assertEquals(null, newRemoteImpl(clientORB).bounce(null));
+ Assert.assertEquals("", newRemoteImpl(clientORB).bounce(""));
+ Assert.assertEquals("a", newRemoteImpl(clientORB).bounce("a"));
+ Assert.assertEquals("ab", newRemoteImpl(clientORB).bounce("ab"));
+ Assert.assertEquals("abc", newRemoteImpl(clientORB).bounce("abc"));
+ Assert.assertEquals("abcd", newRemoteImpl(clientORB).bounce("abcd"));
+ Assert.assertEquals("abcde", newRemoteImpl(clientORB).bounce("abcde"));
+ }
+
+ @Test
+ public void testLotsOfInvocationsSameOrb() throws Exception {
+ Assert.assertEquals(null, newRemoteImpl(serverORB).bounce(null));
+ Assert.assertEquals("", newRemoteImpl(serverORB).bounce(""));
+ Assert.assertEquals("a", newRemoteImpl(serverORB).bounce("a"));
+ Assert.assertEquals("ab", newRemoteImpl(serverORB).bounce("ab"));
+ Assert.assertEquals("abc", newRemoteImpl(serverORB).bounce("abc"));
+ Assert.assertEquals("abcd", newRemoteImpl(serverORB).bounce("abcd"));
+ Assert.assertEquals("abcde", newRemoteImpl(serverORB).bounce("abcde"));
+ }
+
+ private TheInterface newRemoteImpl(ORB callerOrb) throws Exception {
+ TheImpl theImpl = new TheImpl();
+ theImpl.publish(serverORB);
+ // bind it into the naming context
+ Util.getNameService(serverORB).rebind(OBJECT_NAME, theImpl.thisObject());
+ // look it up from the caller orb
+ Object stub = Util.getNameService(callerOrb).resolve(OBJECT_NAME);
+ return (TheInterface)narrow(stub, TheInterface.class);
+ }
+
+ public interface TheInterface extends Remote {
+ String bounce(String text) throws RemoteException;
+ }
+
+ private static class TheImpl extends Skellington implements TheInterface {
+ @Override
+ protected OutputStream dispatch(String method, InputStream in, ResponseHandler reply) throws RemoteException {
+ switch (method) {
+ case "bounce":
+ String result = bounce((String) in.read_value(String.class));
+ OutputStream out = reply.createReply();
+ ((org.omg.CORBA_2_3.portable.OutputStream) out).write_value(result, String.class);
+ return out;
+ default:
+ throw new BAD_OPERATION();
+ }
+ }
+
+ @Override
+ public String bounce(String s) {return s;}
+ }
+
+ public static class DummyInterceptor extends LocalObject implements ORBInitializer, ServerRequestInterceptor {
+
+ @Override
+ public String name() {
+ return "DummyInterceptor";
+ }
+
+ @Override
+ public void destroy() {}
+
+ @Override
+ public void pre_init(ORBInitInfo info) {}
+
+ @Override
+ public void post_init(ORBInitInfo info) {
+ try {
+ info.add_server_request_interceptor(this);
+ } catch (DuplicateName duplicateName) {
+ throw new Error(duplicateName);
+ }
+ }
+
+ @Override
+ public void receive_request_service_contexts(ServerRequestInfo ri) throws ForwardRequest {}
+
+ @Override
+ public void receive_request(ServerRequestInfo ri) throws ForwardRequest {}
+
+ @Override
+ public void send_reply(ServerRequestInfo ri) {}
+
+ @Override
+ public void send_exception(ServerRequestInfo ri) throws ForwardRequest {}
+
+ @Override
+ public void send_other(ServerRequestInfo ri) throws ForwardRequest {}
+ }
+
+
+ private static class Util {
+
+ private static int getPort(ORB orb) throws Exception {
+ POA rootPoa = POAHelper.narrow(orb.resolve_initial_references("RootPOA"));
+ POAManager poaMgr = POAManagerHelper.narrow(rootPoa.the_POAManager());
+ for (Acceptor acceptor : poaMgr.get_acceptors()) {
+ AcceptorInfo info = AcceptorInfoHelper.narrow(acceptor.get_info());
+ if (info != null) return (char) info.port();
+ }
+ throw new Error("No IIOP Acceptor found");
+ }
+
+ private static String getNameServerUrl(ORB orb) throws Exception {
+ return "corbaname::localhost:" + getPort(orb);
+ }
+
+ private static ORB createServerOrb() throws Exception {
+ Properties serverProps = new Properties();
+ serverProps.put(NameServiceInitializer.NS_ORB_INIT_PROP, "");
+ serverProps.put(NameServiceInitializer.NS_REMOTE_ACCESS_ARG, RemoteAccess.readWrite.toString());
+ serverProps.put(ORBInitializer.class.getName() + "Class." + DummyInterceptor.class.getName(), "");
+ ORB orb = ORB.init((String[])null, serverProps);
+ POAHelper.narrow(orb.resolve_initial_references("RootPOA")).the_POAManager().activate();
+ return orb;
+ }
+
+ private static ORB createClientORB(ORB targetORB) throws Exception {
+ return ORB.init(new String[]{"-ORBInitRef", "NameService=" + getNameServerUrl(targetORB)}, null);
+ }
+
+ private static NamingContextExt getNameService(ORB orb) throws Exception {
+ return NamingContextExtHelper.narrow(orb.resolve_initial_references("NameService"));
+ }
+ }
+}
diff --git a/yoko-core/src/test/java/org/apache/yoko/ConnectionCleanupTest.java b/yoko-core/src/test/java/org/apache/yoko/ConnectionCleanupTest.java
new file mode 100644
index 0000000..564abfc
--- /dev/null
+++ b/yoko-core/src/test/java/org/apache/yoko/ConnectionCleanupTest.java
@@ -0,0 +1,150 @@
+package org.apache.yoko;
+
+import static javax.rmi.PortableRemoteObject.narrow;
+
+import org.apache.yoko.orb.OBPortableServer.POAHelper;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.omg.CORBA.BAD_OPERATION;
+import org.omg.CORBA.ORB;
+import org.omg.CORBA.ORBPackage.InvalidName;
+import org.omg.CORBA.portable.OutputStream;
+import org.omg.CORBA.portable.ResponseHandler;
+import org.omg.CORBA_2_3.portable.InputStream;
+import org.omg.PortableServer.POA;
+import org.omg.PortableServer.POAManagerPackage.AdapterInactive;
+import org.omg.PortableServer.POAPackage.ServantAlreadyActive;
+import org.omg.PortableServer.POAPackage.WrongPolicy;
+import test.util.MultiException;
+import test.util.Skellington;
+
+import java.rmi.Remote;
+import java.rmi.RemoteException;
+import java.util.*;
+import java.util.concurrent.*;
+
+public class ConnectionCleanupTest {
+ ORB serverORB;
+ ORB clientORB;
+
+ @Before
+ public void setup() throws Exception {
+ serverORB = ORB.init((String[])null, null);
+ clientORB = ORB.init((String[])null, null);
+ }
+
+ @Test
+ public void testOneClient() throws Exception {
+ newRemoteImpl().gcAndSleep(1000);
+ }
+
+ private TheInterface newRemoteImpl() {
+ try {
+ return (TheInterface)narrow(clientORB.string_to_object(new TheImpl().publish(serverORB)), TheInterface.class);
+ } catch (InvalidName | AdapterInactive | WrongPolicy | ServantAlreadyActive e) {
+ e.printStackTrace();
+ throw new AssertionError(e);
+ }
+ }
+
+ @Test
+ public void testRecursiveClient() throws Exception {
+ newRemoteImpl().gcAndSleepRecursive(10000, 100);
+ }
+
+ //@Test
+ public void doNotTestOneHundredClients() throws Throwable {
+ // To avoid multiple threads initializing the singleton at once
+ // during the first call to PRO.narrow() force it to be called
+ // once before the other threads start.
+ testOneClient();
+ // OK - now do a hundred threads.
+ List<Future<Throwable>> futures = new ArrayList<>();
+ ExecutorService xs = Executors.newFixedThreadPool(100);
+ final CyclicBarrier cb = new CyclicBarrier(101);
+ for (int i = 0; i < 100; i++) {
+ futures.add(xs.submit(new Callable<Throwable>() {
+ @Override
+ public Throwable call() throws Exception {
+ try {
+ cb.await();
+ recurse(10); // use recursion so stack trace tells us how far in we failed
+ return null;
+ } catch (Throwable t) {
+ return t;
+ }
+ }
+ private void recurse(int times) throws Exception {
+ testOneClient();
+ if (times > 0) recurse(times - 1);
+ }
+ }));
+ }
+ cb.await();
+
+ MultiException me = new MultiException(futures);
+ if (me.isEmpty()) return;
+ throw me;
+ }
+
+
+ public interface TheInterface extends Remote {
+ void gcAndSleep(long millis) throws RemoteException;
+ void gcAndSleepRecursive(long millis, int depth) throws RemoteException;
+ }
+
+ private class TheImpl extends Skellington implements TheInterface {
+ @Override
+ public void gcAndSleep(long millis) {
+ //forceGarbageCollection();
+ try {
+ Thread.sleep(millis);
+ } catch (InterruptedException ignored) {}
+ }
+
+ @Override
+ public void gcAndSleepRecursive(long millis, int depth) throws RemoteException {
+ if (depth == 1)
+ newRemoteImpl().gcAndSleep(millis);
+ else
+ newRemoteImpl().gcAndSleepRecursive(millis, depth - 1);
+ }
+
+ @Override
+ protected OutputStream dispatch(String method, InputStream in, ResponseHandler reply) throws RemoteException {
+ switch (method) {
+ case "gcAndSleep":
+ gcAndSleep(in.read_longlong());
+ return reply.createReply();
+ case "gcAndSleepRecursive":
+ gcAndSleepRecursive(in.read_longlong(), in.read_long());
+ return reply.createReply();
+ default:
+ throw new BAD_OPERATION();
+ }
+ }
+ }
+
+ private static long forceGarbageCollection() {
+ List<byte[]> extents = new ArrayList<>();
+ long tally = 0;
+ // allocate as much as possible, halve the size and try again
+ for (int i = 30; i >= 0; i--) {
+ try {
+ do {
+ int alloc = 1 << i;
+ extents.add(new byte[alloc]);
+ tally += alloc;
+ } while (true);
+ } catch (OutOfMemoryError oom) {}
+ }
+ // now the heap should be full so even the smallest allocation should fail
+ try {
+ for (int i = 0; i < 1024; i++) extents.add(new byte[128]);
+ Assert.fail("this allocation should have failed");
+ } catch (OutOfMemoryError e) {}
+ System.gc();
+ return tally;
+ }
+}
diff --git a/yoko-core/src/test/java/org/apache/yoko/processmanager/internal/ProcessAgentImpl.java b/yoko-core/src/test/java/org/apache/yoko/processmanager/internal/ProcessAgentImpl.java
index a9e9bd4..b9faca1 100755
--- a/yoko-core/src/test/java/org/apache/yoko/processmanager/internal/ProcessAgentImpl.java
+++ b/yoko-core/src/test/java/org/apache/yoko/processmanager/internal/ProcessAgentImpl.java
@@ -27,7 +27,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.apache.yoko.util.osgi.ProviderLocator;
+import org.apache.yoko.osgi.ProviderLocator;
public class ProcessAgentImpl extends UnicastRemoteObject implements ProcessAgent {
private static final long serialVersionUID = 1L;
diff --git a/yoko-core/src/test/java/test/iiopplugin/ClientPlugin.java b/yoko-core/src/test/java/test/iiopplugin/ClientPlugin.java
index 95a29a7..6745f05 100644
--- a/yoko-core/src/test/java/test/iiopplugin/ClientPlugin.java
+++ b/yoko-core/src/test/java/test/iiopplugin/ClientPlugin.java
@@ -30,7 +30,6 @@
import java.net.Socket;
import org.apache.yoko.orb.OCI.IIOP.ConnectionHelper;
-import org.omg.CORBA.DynAnyPackage.Invalid;
import org.omg.CORBA.ORB;
import org.omg.CORBA.Policy;
import org.omg.IOP.IOR;
diff --git a/yoko-core/src/test/java/test/iiopplugin/ServerPlugin.java b/yoko-core/src/test/java/test/iiopplugin/ServerPlugin.java
index 28c7355..2f8a923 100644
--- a/yoko-core/src/test/java/test/iiopplugin/ServerPlugin.java
+++ b/yoko-core/src/test/java/test/iiopplugin/ServerPlugin.java
@@ -30,7 +30,6 @@
import java.net.Socket;
import org.apache.yoko.orb.OCI.IIOP.ConnectionHelper;
-import org.omg.CORBA.DynAnyPackage.Invalid;
import org.omg.CORBA.Policy;
import org.omg.CORBA.ORB;
import org.omg.IOP.IOR;
diff --git a/yoko-core/src/test/java/test/iiopplugin/ServiceContextInterceptor.java b/yoko-core/src/test/java/test/iiopplugin/ServiceContextInterceptor.java
index faeb14a..64c2841 100644
--- a/yoko-core/src/test/java/test/iiopplugin/ServiceContextInterceptor.java
+++ b/yoko-core/src/test/java/test/iiopplugin/ServiceContextInterceptor.java
@@ -46,9 +46,9 @@
ServerRequestInfoExt riExt = (ServerRequestInfoExt) ri;
TransportInfo_impl connection = (TransportInfo_impl)riExt.getTransportInfo();
if (connection != null) {
- Socket socket = connection.socket();
- if (socket != null) {
- System.out.println("Retrieved socket successfully");
+ String remoteHost = connection.remote_addr();
+ if (remoteHost != null && remoteHost.length() > 0) {
+ System.out.println("Retrieved remote host successfully");
return;
}
}
diff --git a/yoko-core/src/test/java/test/ins/Server.java b/yoko-core/src/test/java/test/ins/Server.java
index d7c2840..fa81ee2 100644
--- a/yoko-core/src/test/java/test/ins/Server.java
+++ b/yoko-core/src/test/java/test/ins/Server.java
@@ -114,10 +114,8 @@
// Create POA
//
Policy[] policies = new Policy[2];
- policies[0] = poa
- .create_id_assignment_policy(IdAssignmentPolicyValue.USER_ID);
- policies[1] = poa
- .create_lifespan_policy(LifespanPolicyValue.PERSISTENT);
+ policies[0] = poa.create_id_assignment_policy(IdAssignmentPolicyValue.USER_ID);
+ policies[1] = poa.create_lifespan_policy(LifespanPolicyValue.PERSISTENT);
POA testPOA = poa.create_POA("testPOA", manager, policies);
//
diff --git a/yoko-core/src/test/java/test/util/MultiException.java b/yoko-core/src/test/java/test/util/MultiException.java
new file mode 100644
index 0000000..23f9984
--- /dev/null
+++ b/yoko-core/src/test/java/test/util/MultiException.java
@@ -0,0 +1,61 @@
+package test.util;
+
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+public class MultiException extends RuntimeException {
+ private static final String SEP = "--------------------------------------------------------------------------------";
+ private static final String NULL_COUNT_FORMAT = SEP + "%n%d \u2715 null%n" + SEP + "%n";
+ private static final String ENTRY_FORMAT = "%n" + SEP + "%n%d \u2715 %s" + SEP + "%n";
+ private Map<String, Integer> map = new TreeMap<>();
+ private int nullCount;
+
+ public boolean isEmpty() {
+ return map.isEmpty();
+ }
+
+ public Integer add(Throwable t) {
+ if (t == null) return nullCount++;
+ String desc = getDescription(t);
+ Integer count = map.get(desc);
+ return count == null ?
+ map.put(desc, 1) :
+ map.put(desc, ++count);
+ }
+
+ private String getDescription(Throwable t) {
+ StringWriter sw = new StringWriter();
+ PrintWriter pw = new PrintWriter(sw);
+ pw.println(t);
+ pw.println(SEP);
+ t.printStackTrace(pw);
+ pw.flush();
+ return sw.getBuffer().toString();
+ }
+
+ public <T extends Throwable, F extends Future<T>> MultiException(Iterable<F> results) {
+ for (F f : results)
+ try {
+ add(f.get());
+ } catch (InterruptedException | ExecutionException e) {
+ add(e);
+ }
+ }
+
+ @Override
+ public void printStackTrace(PrintStream s) {
+ s.printf(NULL_COUNT_FORMAT, nullCount);
+ for (Map.Entry<String, Integer> e : map.entrySet()) s.printf(ENTRY_FORMAT, e.getValue(), e.getKey());
+ }
+
+ @Override
+ public void printStackTrace(PrintWriter s) {
+ s.printf(NULL_COUNT_FORMAT, nullCount);
+ for (Map.Entry<String, Integer> e : map.entrySet()) s.printf(ENTRY_FORMAT, e.getValue(), e.getKey());
+ }
+}
diff --git a/yoko-core/src/test/java/test/util/Skellington.java b/yoko-core/src/test/java/test/util/Skellington.java
new file mode 100644
index 0000000..5e2f0b5
--- /dev/null
+++ b/yoko-core/src/test/java/test/util/Skellington.java
@@ -0,0 +1,127 @@
+package test.util;
+
+import org.apache.yoko.orb.OBPortableServer.POAHelper;
+import org.junit.Assert;
+import org.omg.CORBA.BAD_PARAM;
+import org.omg.CORBA.ORB;
+import org.omg.CORBA.ORBPackage.InvalidName;
+import org.omg.CORBA.SystemException;
+import org.omg.CORBA.portable.InputStream;
+import org.omg.CORBA.portable.OutputStream;
+import org.omg.CORBA.portable.ResponseHandler;
+import org.omg.CORBA.portable.UnknownException;
+import org.omg.PortableServer.POA;
+import org.omg.PortableServer.POAManagerPackage.AdapterInactive;
+import org.omg.PortableServer.POAPackage.ObjectNotActive;
+import org.omg.PortableServer.POAPackage.ServantAlreadyActive;
+import org.omg.PortableServer.POAPackage.ServantNotActive;
+import org.omg.PortableServer.POAPackage.WrongPolicy;
+import org.omg.PortableServer.Servant;
+
+import javax.rmi.CORBA.Tie;
+import javax.rmi.CORBA.Util;
+import javax.rmi.CORBA.ValueHandler;
+import java.lang.reflect.Method;
+import java.rmi.NoSuchObjectException;
+import java.rmi.Remote;
+import java.rmi.RemoteException;
+import java.util.*;
+
+public abstract class Skellington extends Servant implements Tie, Remote {
+ private final Collection<Class<? extends Remote>> interfaceClasses;
+ private final String[] ids;
+
+ public Skellington() {
+ Set<Class<? extends Remote>> ifaces = new HashSet<>();
+ for (Class<?> c = this.getClass(); c != Object.class; c = c.getSuperclass()) {
+ NEXT_CLASS: for (Class<?> iface: c.getInterfaces()) {
+ if (Remote.class.isAssignableFrom(iface)) {
+ for (Method m : iface.getMethods()) {
+ if (Arrays.asList(m.getExceptionTypes()).contains(RemoteException.class))
+ continue;
+ continue NEXT_CLASS;
+ }
+ // there were no non-remote methods, so add the interface
+ ifaces.add((Class<? extends Remote>)iface);
+ }
+ }
+ }
+ final ValueHandler vh = Util.createValueHandler();
+ this.interfaceClasses = Collections.unmodifiableSet(ifaces);
+ this.ids = new String[interfaceClasses.size()];
+ int index = 0;
+ for (Class<?> c : interfaceClasses)
+ this.ids[index++] = vh.getRMIRepositoryID(c);
+ }
+
+ public Skellington(Class<? extends Remote>... interfaces) {
+ final ValueHandler vh = Util.createValueHandler();
+ ids = new String[interfaces.length];
+ List<Class<? extends Remote>> iflst = new ArrayList<>();
+ for (int i = 0; i < interfaces.length; i++) {
+ Assert.assertTrue(interfaces[i].isInterface());
+ iflst.add(interfaces[i]);
+ ids[i] = vh.getRMIRepositoryID(interfaces[i]);
+ }
+ this.interfaceClasses = Collections.unmodifiableList(iflst);
+ }
+
+ @Override
+ public String[] _all_interfaces(POA poa, byte[] objectId) {
+ return ids.clone();
+ }
+
+ @Override
+ public org.omg.CORBA.Object thisObject() {
+ return _this_object();
+ }
+
+ @Override
+ public void deactivate() throws NoSuchObjectException {
+ try{
+ _poa().deactivate_object(_poa().servant_to_id(this));
+ } catch (WrongPolicy |ObjectNotActive |ServantNotActive ignored){}
+ }
+
+ @Override
+ public ORB orb() {return _orb();}
+
+ @Override
+ public void orb(ORB orb) {
+ try {
+ ((org.omg.CORBA_2_3.ORB)orb).set_delegate(this);
+ } catch(ClassCastException e) {
+ throw new BAD_PARAM("POA Servant requires an instance of org.omg.CORBA_2_3.ORB");
+ }
+ }
+
+ @Override
+ public void setTarget(Remote target) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Remote getTarget() {
+ return this;
+ }
+
+ @Override
+ public OutputStream _invoke(String method, InputStream _in, ResponseHandler reply) throws SystemException {
+ try {
+ return dispatch(method, (org.omg.CORBA_2_3.portable.InputStream) _in, reply);
+ } catch (SystemException ex) {
+ throw ex;
+ } catch (Throwable ex) {
+ throw new UnknownException(ex);
+ }
+ }
+
+ public String publish(ORB serverORB) throws InvalidName, AdapterInactive, ServantAlreadyActive, WrongPolicy {
+ POA rootPOA = POAHelper.narrow(serverORB.resolve_initial_references("RootPOA"));
+ rootPOA.the_POAManager().activate();
+ rootPOA.activate_object(this);
+ return serverORB.object_to_string(thisObject());
+ }
+
+ protected abstract OutputStream dispatch(String method, org.omg.CORBA_2_3.portable.InputStream in, ResponseHandler reply) throws RemoteException;
+}
diff --git a/yoko-osgi/pom.xml b/yoko-osgi/pom.xml
new file mode 100644
index 0000000..8f9468a
--- /dev/null
+++ b/yoko-osgi/pom.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+* -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>yoko</artifactId>
+ <groupId>org.apache.yoko</groupId>
+ <version>1.5-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>yoko-osgi</artifactId>
+
+ <name>Apache Yoko OSGi Utilities</name>
+
+ <packaging>bundle</packaging>
+
+ <dependencies>
+ <!-- this contains the osgi-relevant classes in the endorsed dir -->
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.core</artifactId>
+ <version>5.0.0</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.compendium</artifactId>
+ <version>5.0.0</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <configuration>
+ <instructions>
+ <Export-Package>
+ org.apache.yoko.osgi
+ </Export-Package>
+ </instructions>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/yoko-util/src/main/java/org/apache/yoko/util/osgi/ProviderLocator.java b/yoko-osgi/src/main/java/org/apache/yoko/osgi/ProviderLocator.java
similarity index 99%
rename from yoko-util/src/main/java/org/apache/yoko/util/osgi/ProviderLocator.java
rename to yoko-osgi/src/main/java/org/apache/yoko/osgi/ProviderLocator.java
index dc8fffc..1f7d28d 100644
--- a/yoko-util/src/main/java/org/apache/yoko/util/osgi/ProviderLocator.java
+++ b/yoko-osgi/src/main/java/org/apache/yoko/osgi/ProviderLocator.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.yoko.util.osgi;
+package org.apache.yoko.osgi;
import java.io.BufferedReader;
import java.io.File;
diff --git a/yoko-util/src/main/java/org/apache/yoko/util/osgi/ProviderRegistry.java b/yoko-osgi/src/main/java/org/apache/yoko/osgi/ProviderRegistry.java
similarity index 98%
rename from yoko-util/src/main/java/org/apache/yoko/util/osgi/ProviderRegistry.java
rename to yoko-osgi/src/main/java/org/apache/yoko/osgi/ProviderRegistry.java
index 8978c2f..3fc69d4 100644
--- a/yoko-util/src/main/java/org/apache/yoko/util/osgi/ProviderRegistry.java
+++ b/yoko-osgi/src/main/java/org/apache/yoko/osgi/ProviderRegistry.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.yoko.util.osgi;
+package org.apache.yoko.osgi;
import java.util.List;
diff --git a/yoko-util/src/main/java/org/apache/yoko/util/osgi/locator/BundleProviderLoader.java b/yoko-osgi/src/main/java/org/apache/yoko/osgi/locator/BundleProviderLoader.java
similarity index 98%
rename from yoko-util/src/main/java/org/apache/yoko/util/osgi/locator/BundleProviderLoader.java
rename to yoko-osgi/src/main/java/org/apache/yoko/osgi/locator/BundleProviderLoader.java
index ec50364..20b7c47 100644
--- a/yoko-util/src/main/java/org/apache/yoko/util/osgi/locator/BundleProviderLoader.java
+++ b/yoko-osgi/src/main/java/org/apache/yoko/osgi/locator/BundleProviderLoader.java
@@ -18,7 +18,7 @@
*/
-package org.apache.yoko.util.osgi.locator;
+package org.apache.yoko.osgi.locator;
import org.osgi.framework.Bundle;
diff --git a/yoko-util/src/main/java/org/apache/yoko/util/osgi/locator/ProviderBean.java b/yoko-osgi/src/main/java/org/apache/yoko/osgi/locator/ProviderBean.java
similarity index 97%
rename from yoko-util/src/main/java/org/apache/yoko/util/osgi/locator/ProviderBean.java
rename to yoko-osgi/src/main/java/org/apache/yoko/osgi/locator/ProviderBean.java
index f2271f9..688ff61 100644
--- a/yoko-util/src/main/java/org/apache/yoko/util/osgi/locator/ProviderBean.java
+++ b/yoko-osgi/src/main/java/org/apache/yoko/osgi/locator/ProviderBean.java
@@ -18,7 +18,7 @@
*/
-package org.apache.yoko.util.osgi.locator;
+package org.apache.yoko.osgi.locator;
import java.util.logging.Logger;
diff --git a/yoko-util/src/main/java/org/apache/yoko/util/osgi/locator/ProviderRegistryImpl.java b/yoko-osgi/src/main/java/org/apache/yoko/osgi/locator/ProviderRegistryImpl.java
similarity index 97%
rename from yoko-util/src/main/java/org/apache/yoko/util/osgi/locator/ProviderRegistryImpl.java
rename to yoko-osgi/src/main/java/org/apache/yoko/osgi/locator/ProviderRegistryImpl.java
index 7a1feec..31dfa31 100644
--- a/yoko-util/src/main/java/org/apache/yoko/util/osgi/locator/ProviderRegistryImpl.java
+++ b/yoko-osgi/src/main/java/org/apache/yoko/osgi/locator/ProviderRegistryImpl.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.yoko.util.osgi.locator;
+package org.apache.yoko.osgi.locator;
import java.util.ArrayList;
import java.util.Collections;
@@ -24,13 +24,14 @@
import java.util.logging.Level;
import java.util.logging.Logger;
-import org.apache.yoko.util.osgi.ProviderLocator;
+import org.apache.yoko.osgi.ProviderLocator;
+import org.apache.yoko.osgi.ProviderRegistry;
/**
* The implementation of the provider registry used to store
* the bundle registrations.
*/
-public class ProviderRegistryImpl implements org.apache.yoko.util.osgi.ProviderRegistry, Register {
+public class ProviderRegistryImpl implements ProviderRegistry, Register {
private static final Logger log = Logger.getLogger(ProviderRegistryImpl.class.getName());
// our mapping between a provider id and the implementation information. There
diff --git a/yoko-util/src/main/java/org/apache/yoko/util/osgi/locator/Register.java b/yoko-osgi/src/main/java/org/apache/yoko/osgi/locator/Register.java
similarity index 95%
rename from yoko-util/src/main/java/org/apache/yoko/util/osgi/locator/Register.java
rename to yoko-osgi/src/main/java/org/apache/yoko/osgi/locator/Register.java
index 2868a93..030b390 100644
--- a/yoko-util/src/main/java/org/apache/yoko/util/osgi/locator/Register.java
+++ b/yoko-osgi/src/main/java/org/apache/yoko/osgi/locator/Register.java
@@ -18,7 +18,7 @@
*/
-package org.apache.yoko.util.osgi.locator;
+package org.apache.yoko.osgi.locator;
/**
* @version $Rev$ $Date$
diff --git a/yoko-util/src/main/java/org/apache/yoko/util/osgi/locator/ServiceBean.java b/yoko-osgi/src/main/java/org/apache/yoko/osgi/locator/ServiceBean.java
similarity index 97%
rename from yoko-util/src/main/java/org/apache/yoko/util/osgi/locator/ServiceBean.java
rename to yoko-osgi/src/main/java/org/apache/yoko/osgi/locator/ServiceBean.java
index 8f3fcb6..7876495 100644
--- a/yoko-util/src/main/java/org/apache/yoko/util/osgi/locator/ServiceBean.java
+++ b/yoko-osgi/src/main/java/org/apache/yoko/osgi/locator/ServiceBean.java
@@ -18,7 +18,7 @@
*/
-package org.apache.yoko.util.osgi.locator;
+package org.apache.yoko.osgi.locator;
import java.util.logging.Logger;
diff --git a/yoko-util/src/main/java/org/apache/yoko/util/osgi/locator/activator/AbstractBundleActivator.java b/yoko-osgi/src/main/java/org/apache/yoko/osgi/locator/activator/AbstractBundleActivator.java
similarity index 94%
rename from yoko-util/src/main/java/org/apache/yoko/util/osgi/locator/activator/AbstractBundleActivator.java
rename to yoko-osgi/src/main/java/org/apache/yoko/osgi/locator/activator/AbstractBundleActivator.java
index 2e82ffc..c21cf42 100644
--- a/yoko-util/src/main/java/org/apache/yoko/util/osgi/locator/activator/AbstractBundleActivator.java
+++ b/yoko-osgi/src/main/java/org/apache/yoko/osgi/locator/activator/AbstractBundleActivator.java
@@ -1,10 +1,10 @@
-package org.apache.yoko.util.osgi.locator.activator;
+package org.apache.yoko.osgi.locator.activator;
import java.util.ArrayList;
import java.util.List;
-import org.apache.yoko.util.osgi.locator.BundleProviderLoader;
-import org.apache.yoko.util.osgi.locator.Register;
+import org.apache.yoko.osgi.locator.BundleProviderLoader;
+import org.apache.yoko.osgi.locator.Register;
import org.osgi.framework.Bundle;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;
diff --git a/yoko-rmi-impl/pom.xml b/yoko-rmi-impl/pom.xml
index a436390..7b8d098 100755
--- a/yoko-rmi-impl/pom.xml
+++ b/yoko-rmi-impl/pom.xml
@@ -32,6 +32,11 @@
an endorsed standard -->
<dependency>
<groupId>org.apache.yoko</groupId>
+ <artifactId>yoko-osgi</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.yoko</groupId>
<artifactId>yoko-spec-corba</artifactId>
<scope>provided</scope>
</dependency>
diff --git a/yoko-rmi-impl/src/main/java/org/apache/yoko/rmi/api/PortableRemoteObjectExt.java b/yoko-rmi-impl/src/main/java/org/apache/yoko/rmi/api/PortableRemoteObjectExt.java
index 94a5aa5..6d0e16e 100755
--- a/yoko-rmi-impl/src/main/java/org/apache/yoko/rmi/api/PortableRemoteObjectExt.java
+++ b/yoko-rmi-impl/src/main/java/org/apache/yoko/rmi/api/PortableRemoteObjectExt.java
@@ -21,7 +21,7 @@
import java.security.AccessController;
import org.apache.yoko.rmi.util.GetSystemPropertyAction;
-import org.apache.yoko.util.osgi.ProviderLocator;
+import org.apache.yoko.osgi.ProviderLocator;
public class PortableRemoteObjectExt {
diff --git a/yoko-rmi-impl/src/main/java/org/apache/yoko/rmi/impl/UtilImpl.java b/yoko-rmi-impl/src/main/java/org/apache/yoko/rmi/impl/UtilImpl.java
index 55ba438..2131bb7 100755
--- a/yoko-rmi-impl/src/main/java/org/apache/yoko/rmi/impl/UtilImpl.java
+++ b/yoko-rmi-impl/src/main/java/org/apache/yoko/rmi/impl/UtilImpl.java
@@ -47,7 +47,7 @@
import javax.rmi.CORBA.ValueHandler;
import org.apache.yoko.rmi.util.GetSystemPropertyAction;
-import org.apache.yoko.util.osgi.ProviderLocator;
+import org.apache.yoko.osgi.ProviderLocator;
import org.omg.CORBA.Any;
import org.omg.CORBA.BAD_PARAM;
import org.omg.CORBA.COMM_FAILURE;
diff --git a/yoko-rmi-impl/src/main/java/org/apache/yoko/rmi/osgi/activator/Activator.java b/yoko-rmi-impl/src/main/java/org/apache/yoko/rmi/osgi/activator/Activator.java
index 6c37409..573112f 100644
--- a/yoko-rmi-impl/src/main/java/org/apache/yoko/rmi/osgi/activator/Activator.java
+++ b/yoko-rmi-impl/src/main/java/org/apache/yoko/rmi/osgi/activator/Activator.java
@@ -2,9 +2,9 @@
import javax.rmi.CORBA.Stub;
-import org.apache.yoko.util.osgi.locator.ProviderRegistryImpl;
-import org.apache.yoko.util.osgi.locator.Register;
-import org.apache.yoko.util.osgi.locator.activator.AbstractBundleActivator;
+import org.apache.yoko.osgi.locator.ProviderRegistryImpl;
+import org.apache.yoko.osgi.locator.Register;
+import org.apache.yoko.osgi.locator.activator.AbstractBundleActivator;
import org.omg.stub.java.rmi._Remote_Stub;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
diff --git a/yoko-rmi-spec/pom.xml b/yoko-rmi-spec/pom.xml
index 3ac0cea..4506f7b 100644
--- a/yoko-rmi-spec/pom.xml
+++ b/yoko-rmi-spec/pom.xml
@@ -35,6 +35,10 @@
<artifactId>yoko-spec-corba</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.yoko</groupId>
+ <artifactId>yoko-osgi</artifactId>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
diff --git a/yoko-rmi-spec/src/main/java/org/apache/yoko/rmispec/util/UtilLoader.java b/yoko-rmi-spec/src/main/java/org/apache/yoko/rmispec/util/UtilLoader.java
index 54f2da9..cbcd57d 100644
--- a/yoko-rmi-spec/src/main/java/org/apache/yoko/rmispec/util/UtilLoader.java
+++ b/yoko-rmi-spec/src/main/java/org/apache/yoko/rmispec/util/UtilLoader.java
@@ -18,7 +18,7 @@
package org.apache.yoko.rmispec.util;
-import org.apache.yoko.util.osgi.ProviderLocator;
+import org.apache.yoko.osgi.ProviderLocator;
import java.net.MalformedURLException;
import java.net.URL;
diff --git a/yoko-spec-corba/pom.xml b/yoko-spec-corba/pom.xml
index 3d95e9b..cb2f75d 100755
--- a/yoko-spec-corba/pom.xml
+++ b/yoko-spec-corba/pom.xml
@@ -31,7 +31,7 @@
<dependencies>
<dependency>
<groupId>org.apache.yoko</groupId>
- <artifactId>yoko-util</artifactId>
+ <artifactId>yoko-osgi</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
diff --git a/yoko-spec-corba/src/main/java/org/omg/CORBA/ORB.java b/yoko-spec-corba/src/main/java/org/omg/CORBA/ORB.java
index 2063a27..b880190 100755
--- a/yoko-spec-corba/src/main/java/org/omg/CORBA/ORB.java
+++ b/yoko-spec-corba/src/main/java/org/omg/CORBA/ORB.java
@@ -16,7 +16,7 @@
*/
package org.omg.CORBA;
-import org.apache.yoko.util.osgi.ProviderLocator;
+import org.apache.yoko.osgi.ProviderLocator;
import java.security.PrivilegedAction;
import java.security.AccessController;
diff --git a/yoko-spec-corba/src/main/java/org/omg/IIOP/ListenPoint.java b/yoko-spec-corba/src/main/java/org/omg/IIOP/ListenPoint.java
index e76566a..157173a 100755
--- a/yoko-spec-corba/src/main/java/org/omg/IIOP/ListenPoint.java
+++ b/yoko-spec-corba/src/main/java/org/omg/IIOP/ListenPoint.java
@@ -26,19 +26,11 @@
{
private static final String _ob_id = "IDL:omg.org/IIOP/ListenPoint:1.0";
- public
- ListenPoint()
- {
- }
-
- public
- ListenPoint(String host,
- short port)
- {
+ public ListenPoint(String host, short port) {
this.host = host;
this.port = port;
}
- public String host;
- public short port;
+ public final String host;
+ public final short port;
}
diff --git a/yoko-spec-corba/src/main/java/org/omg/IIOP/ListenPointHelper.java b/yoko-spec-corba/src/main/java/org/omg/IIOP/ListenPointHelper.java
index bc679da..5e7d62c 100755
--- a/yoko-spec-corba/src/main/java/org/omg/IIOP/ListenPointHelper.java
+++ b/yoko-spec-corba/src/main/java/org/omg/IIOP/ListenPointHelper.java
@@ -72,10 +72,7 @@
public static ListenPoint
read(org.omg.CORBA.portable.InputStream in)
{
- ListenPoint _ob_v = new ListenPoint();
- _ob_v.host = in.read_string();
- _ob_v.port = in.read_ushort();
- return _ob_v;
+ return new ListenPoint(in.read_string(), in.read_ushort());
}
public static void
diff --git a/yoko-util/pom.xml b/yoko-util/pom.xml
index 9bdabfa..2c6c625 100644
--- a/yoko-util/pom.xml
+++ b/yoko-util/pom.xml
@@ -26,24 +26,33 @@
<artifactId>yoko-util</artifactId>
- <name>Apache Yoko Utilities</name>
+ <name>Apache Yoko Implementation Utilities</name>
<packaging>bundle</packaging>
<dependencies>
<!-- this contains the osgi-relevant classes in the endorsed dir -->
<dependency>
- <groupId>org.osgi</groupId>
- <artifactId>org.osgi.core</artifactId>
- <version>5.0.0</version>
+ <groupId>org.apache.yoko</groupId>
+ <artifactId>yoko-spec-corba</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
- <groupId>org.osgi</groupId>
- <artifactId>org.osgi.compendium</artifactId>
- <version>5.0.0</version>
+ <groupId>org.apache.yoko</groupId>
+ <artifactId>yoko-rmi-spec</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>1.9.0</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
@@ -59,6 +68,15 @@
</instructions>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <includes>
+ <include>**/*Test.java</include>
+ </includes>
+ </configuration>
+ </plugin>
</plugins>
</build>
</project>
diff --git a/yoko-util/src/main/java/org/apache/yoko/util/Cache.java b/yoko-util/src/main/java/org/apache/yoko/util/Cache.java
new file mode 100644
index 0000000..8a4f3d9
--- /dev/null
+++ b/yoko-util/src/main/java/org/apache/yoko/util/Cache.java
@@ -0,0 +1,47 @@
+package org.apache.yoko.util;
+
+import java.util.Map;
+
+public interface Cache<K, V> {
+ /** Get the number of cached values */
+ int size();
+
+ /** Get the number of cached values not currently in use */
+ int idleCount();
+
+ /**
+ * Retrieve the value for the given key.
+ * The caller must ensure the returned reference is closed
+ */
+ Reference<V> get(K key);
+
+ /**
+ * Retrieve or compute the value for the given key.
+ * The caller must ensure that the returned reference is closed.
+ * @return an auto-closeable reference to the cached value
+ */
+ Reference<V> getOrCreate(K key, KeyedFactory<K, V> keyedFactory);
+
+ /**
+ * Retrieve or compute the value for the given key.
+ * The caller must ensure that the returned reference is closed.
+ * @return an auto-closeable reference to the cached value
+ */
+ Reference<V> getOrCreate(K key, Factory<V> factory);
+
+ /**
+ * Uncache an item. No cleanup will be performed.
+ * @throws IllegalStateException if valueRef has already been closed
+ */
+ void remove(Reference<V> ref);
+
+ /**
+ * Remove some idle entries.
+ * @return the number of entries removed
+ */
+ int clean();
+
+ Map<K, V> snapshot();
+
+ interface Cleaner<V> {void clean(V value);}
+}
diff --git a/yoko-util/src/main/java/org/apache/yoko/util/Factory.java b/yoko-util/src/main/java/org/apache/yoko/util/Factory.java
new file mode 100644
index 0000000..0c86537
--- /dev/null
+++ b/yoko-util/src/main/java/org/apache/yoko/util/Factory.java
@@ -0,0 +1,5 @@
+package org.apache.yoko.util;
+
+public interface Factory<V> {
+ V create();
+}
diff --git a/yoko-util/src/main/java/org/apache/yoko/util/Fifa.java b/yoko-util/src/main/java/org/apache/yoko/util/Fifa.java
new file mode 100644
index 0000000..403c397
--- /dev/null
+++ b/yoko-util/src/main/java/org/apache/yoko/util/Fifa.java
@@ -0,0 +1,6 @@
+package org.apache.yoko.util;
+
+/** A first-in, first-accessed holder of stuff */
+public interface Fifa<T> extends Sequential<T> {
+ T peek();
+}
diff --git a/yoko-util/src/main/java/org/apache/yoko/util/Fifo.java b/yoko-util/src/main/java/org/apache/yoko/util/Fifo.java
new file mode 100644
index 0000000..e85376e
--- /dev/null
+++ b/yoko-util/src/main/java/org/apache/yoko/util/Fifo.java
@@ -0,0 +1,6 @@
+package org.apache.yoko.util;
+
+/** A first-in, first-out holder of stuff */
+public interface Fifo<T> extends Sequential<T> {
+ Object remove();
+}
diff --git a/yoko-util/src/main/java/org/apache/yoko/util/KeyedFactory.java b/yoko-util/src/main/java/org/apache/yoko/util/KeyedFactory.java
new file mode 100644
index 0000000..4212dd7
--- /dev/null
+++ b/yoko-util/src/main/java/org/apache/yoko/util/KeyedFactory.java
@@ -0,0 +1,5 @@
+package org.apache.yoko.util;
+
+public interface KeyedFactory<K, V> {
+ V create(K key);
+}
diff --git a/yoko-util/src/main/java/org/apache/yoko/util/Reference.java b/yoko-util/src/main/java/org/apache/yoko/util/Reference.java
new file mode 100644
index 0000000..5d15034
--- /dev/null
+++ b/yoko-util/src/main/java/org/apache/yoko/util/Reference.java
@@ -0,0 +1,9 @@
+package org.apache.yoko.util;
+
+public interface Reference<T> extends AutoCloseable {
+ /** Get the referent, which is guaranteed to be non-null */
+ T get();
+ /** Finish using the reference */
+ @Override
+ void close();
+}
diff --git a/yoko-util/src/main/java/org/apache/yoko/util/Sequential.java b/yoko-util/src/main/java/org/apache/yoko/util/Sequential.java
new file mode 100644
index 0000000..ad1cd20
--- /dev/null
+++ b/yoko-util/src/main/java/org/apache/yoko/util/Sequential.java
@@ -0,0 +1,18 @@
+package org.apache.yoko.util;
+
+/** A holder of stuff */
+public interface Sequential<T> {
+ int size();
+
+ Place<T> put(T elem);
+
+ interface Place<T> {
+ /**
+ * Relinquish this place in the sequence.
+ *
+ * @return the element if it is successfully removed from the sequence<br>
+ * or <code>null</code> if the element has already been removed by another operation
+ */
+ T relinquish();
+ }
+}
diff --git a/yoko-util/src/main/java/org/apache/yoko/util/concurrent/ConcurrentFifo.java b/yoko-util/src/main/java/org/apache/yoko/util/concurrent/ConcurrentFifo.java
new file mode 100644
index 0000000..67d2eb3
--- /dev/null
+++ b/yoko-util/src/main/java/org/apache/yoko/util/concurrent/ConcurrentFifo.java
@@ -0,0 +1,164 @@
+package org.apache.yoko.util.concurrent;
+
+import org.apache.yoko.util.Fifa;
+import org.apache.yoko.util.Fifo;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A thread-safe queue that allows concurrent modification of non-adjacent elements.
+ */
+class ConcurrentFifo<T> implements Fifo<T>, Fifa<T> {
+ /*
+ * This class relies on consistent lock ordering. Locks are ALWAYS obtained
+ * in the order of elements in the queue. The locks used are the monitors
+ * of the node elements, and each node's monitor guards the relationship
+ * between that node and its successor. By implication, it guards the other
+ * node's back reference as well.
+ *
+ * So, for a delete operation, two nodes must be locked: the node to be
+ * deleted, and the previous node, but NOT IN THAT ORDER! Moreover, after
+ * the previous node is locked, the relationship must be checked to ensure
+ * it is still current. The convention observed is that next() is only
+ * accessed while the lock is held, and it is cross-checked against any
+ * unguarded calls to prev().
+ *
+ * NOTE: this is not double-check locking (DCL) because the early access
+ * never obviates a synchronized block, and results are always checked
+ * within the guarded section. Therefore, it is not necessary for any of
+ * the non-final fields to be volatile.
+ *
+ * DCL: https://www.cs.umd.edu/~pugh/java/memoryModel/jsr-133-faq.html#dcl
+ *
+ * If inconsistency is detected, all locks are to be released and the
+ * operation restarted from scratch, unless it can be determined the
+ * operation has definitively failed due to concurrent modification.
+ *
+ * Operations on non-adjacent nodes are concurrent. Concurrent operations
+ * on a node or on adjacent nodes will contend for monitors, but never
+ * deadlock.
+ */
+
+ private final Head<T> head = new Head<>();
+ private final Foot<T> foot = new Foot<>(head);
+ protected final AtomicInteger size = new AtomicInteger(0);
+
+ @Override
+ public int size() { return size.get(); }
+
+ /**
+ * Get, without removing it, the oldest remaining element from this FIFO.
+ * This method does not block and returns an answer consistent with the state of the FIFO at some point.
+ *
+ * @return the oldest remaining element or <code>null</code> if the FIFO was empty
+ */
+ @Override
+ public T peek() {
+ return recursivePeek(head);
+ }
+
+ /**
+ * Find the first non-null value.
+ */
+ private T recursivePeek(PNode<T> start) {
+ synchronized (start) {
+ NNode<T> nn = start.next();
+ if (nn == foot) return null;
+ VNode<T> node = (VNode<T>) nn;
+ T result = node.get();
+ return (result == null) ? recursivePeek(node) : result;
+ }
+ }
+
+ /**
+ * Add an element to the end of this FIFO.
+ *
+ * @param elem must not be <code>null</code>
+ * @return an object representing the place in the queue
+ */
+ @Override
+ public Place<T> put(T elem) {
+ do {
+ final PNode<T> pnode = foot.prev();
+ // lock penultimate node
+ synchronized (pnode) {
+ // RETRY if structure changed
+ if (pnode.next() != foot) continue;
+ // create a new node
+ final VNode<T> node = createNode(elem);
+ // insert new node
+ synchronized (node) {
+ node.insertAfter(pnode);
+ size.incrementAndGet();
+ }
+ // return place in queue
+ return new Place<T>() {
+ @Override
+ public T relinquish() {
+ return remove(node);
+ }
+ };
+ }
+ } while (true);
+ }
+
+ protected VNode<T> createNode(T elem) {
+ return new StrongNode<>(requireNonNull(elem));
+ }
+
+ /**
+ * Remove the least recently added element.
+ *
+ * @return the removed element, or <code>null</code> if the FIFO was empty.
+ */
+ @Override
+ public T remove() {
+ return recursiveRemove(head);
+ }
+
+ /**
+ * Find and remove the first non-null value
+ */
+ private T recursiveRemove(PNode<T> start) {
+ synchronized (start) {
+ NNode<T> nn = start.next();
+ if (nn == foot) return null;
+ VNode<T> node = (VNode<T>) nn;
+ T result = node.get();
+ if (result == null)
+ return recursiveRemove(node);
+ synchronized (node) {
+ node.delete();
+ size.decrementAndGet();
+ return result;
+ }
+ }
+ }
+
+ /**
+ * Remove the specified node from the FIFO, if present.
+ * @return the element if it was successfully removed,
+ * otherwise <code>null</code>
+ */
+ protected T remove(VNode<T> node) {
+ do {
+ // retrieve previous node
+ final PNode<T> pNode = node.prev();
+ // FAIL if node already deleted
+ if (pNode == null) return null;
+ // lock previous node
+ synchronized (pNode) {
+ // RETRY if structure has changed
+ if (pNode.next() != node) continue;
+ // Remove node from chain and report success
+ synchronized (node) {
+ node.delete();
+ size.decrementAndGet();
+ }
+ return node.get();
+ }
+ } while (true);
+ }
+}
diff --git a/yoko-util/src/main/java/org/apache/yoko/util/concurrent/CountedEntry.java b/yoko-util/src/main/java/org/apache/yoko/util/concurrent/CountedEntry.java
new file mode 100644
index 0000000..0177f5f
--- /dev/null
+++ b/yoko-util/src/main/java/org/apache/yoko/util/concurrent/CountedEntry.java
@@ -0,0 +1,153 @@
+package org.apache.yoko.util.concurrent;
+
+import org.apache.yoko.util.Reference;
+import org.apache.yoko.util.Sequential;
+
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A thread-safe, reference-counting entry for use in a cache.
+ * If threads race to call @link{#clear} and @link{#obtain},
+ * one or other method will return <code>null</code>.
+ * <br>
+ * Entries with a reference count of zero will be put onto the
+ * provided @link{Sequential} object, and removed on successful
+ * calls to either @link{#clear} or @link{#obtain}.
+ */
+class CountedEntry<K, V> {
+ private static final int CLEANED = Integer.MIN_VALUE;
+ private static final int NOT_READY = -1;
+ private static final int IDLE = -2;
+ private final AtomicInteger refCount = new AtomicInteger(NOT_READY);
+ private final Sequential<CountedEntry<K, V>> idleEntries;
+ private Sequential.Place<?> idlePlace;
+ private V value;
+ final K key;
+
+ /** Create a not-yet-ready CountedEntry - the next operation must be to call setValue() or abort() */
+ CountedEntry(K key, Sequential<CountedEntry<K, V>> idleEntries) {
+ this.key = key;
+ this.idleEntries = idleEntries;
+ }
+
+ ValueReference setValue(V value) {
+ this.value = Objects.requireNonNull(value);
+ notifyReady(1);
+ return new ValueReference();
+ }
+
+ void abort() {
+ assert value == null;
+ notifyReady(CLEANED);
+ }
+
+ private synchronized void notifyReady(int newCount) {
+ boolean success = refCount.compareAndSet(NOT_READY, newCount);
+ assert success;
+ this.notifyAll();
+ }
+
+ private synchronized void blockWhileNotReady() {
+ while (refCount.get() == NOT_READY) {
+ try {
+ this.wait();
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }
+
+ // Acquire a reference to this entry.
+ private boolean acquire() {
+ RESPIN: do {
+ int oldCount = refCount.get();
+ switch (oldCount) {
+ case CLEANED:
+ // terminal state - must fail
+ return false;
+ case NOT_READY:
+ blockWhileNotReady();
+ continue RESPIN;
+ case IDLE:
+ // grab the ref while it's idle or start again
+ if (!!!refCount.compareAndSet(IDLE, NOT_READY)) continue RESPIN;
+ // remove from the idle list
+ Object self = idlePlace.relinquish();
+ assert this == self;
+ idlePlace = null;
+ // let other threads know this entry is accessible again
+ notifyReady(1);
+ return true;
+ default:
+ // increment the value retrieved or start again
+ if (!!!refCount.compareAndSet(oldCount, oldCount + 1)) continue RESPIN;
+ return true;
+ }
+ } while (true);
+ }
+
+ // Release a reference to this entry. Only the owner of the reference should call this method.
+ private boolean release() {
+ int newCount = refCount.decrementAndGet();
+ if (newCount != 0) return true;
+
+ // try to IDLE this entry
+ if (!!!refCount.compareAndSet(0, NOT_READY))
+ // some other thread revived or purged this entry, so no need to IDLE it now
+ return true;
+
+ idlePlace = idleEntries.put(this);
+ notifyReady(IDLE);
+ return true;
+ }
+
+ // Mark this entry unusable. Return value if entry is modified, null otherwise.
+ V clear() {
+ if (!!! refCount.compareAndSet(IDLE, CLEANED)) return null;
+ // safe to read/update idlePlace since this is the only thread that has moved it from IDLE
+ try {
+ Object self = idlePlace.relinquish();
+ assert self == this;
+ return value;
+ } finally {
+ value = null;
+ idlePlace = null;
+ }
+ }
+
+ ValueReference obtain() {return acquire() ? new ValueReference() : null;}
+
+ /** Clear an entry that still has valid references */
+ private CountedEntry<K, V> purge() {
+ RESPIN: do {
+ int oldCount = refCount.get();
+ if (oldCount == CLEANED) return null;
+ if (oldCount < 1) throw new IllegalStateException();
+ if (!!! refCount.compareAndSet(oldCount, CLEANED)) continue RESPIN;
+ return this;
+ } while (true);
+ }
+
+ final class ValueReference implements Reference<V> {
+ private final ReferenceCloserTask closer = new ReferenceCloserTask();
+ public V get() {return value;}
+ public void close() {closer.run();}
+ CountedEntry<K, V> invalidateAndGetEntry() {return closer.purge();}
+ Runnable getCloserTask() {return closer;}
+ }
+
+ /**
+ * In order to drive cleanup after a ValueReference becomes unreachable,
+ * we need to store the clean up details in a separate object that holds
+ * no strong reference back to the ValueReference object
+ */
+ final class ReferenceCloserTask implements Runnable {
+ boolean closed;
+ public synchronized void run() {closed = closed || release();}
+ synchronized CountedEntry<K,V> purge() {
+ if (closed) throw new IllegalStateException();
+ closed = true;
+ return CountedEntry.this.purge();
+ }
+ }
+}
diff --git a/yoko-util/src/main/java/org/apache/yoko/util/concurrent/Foot.java b/yoko-util/src/main/java/org/apache/yoko/util/concurrent/Foot.java
new file mode 100644
index 0000000..cc433f8
--- /dev/null
+++ b/yoko-util/src/main/java/org/apache/yoko/util/concurrent/Foot.java
@@ -0,0 +1,15 @@
+package org.apache.yoko.util.concurrent;
+
+import static java.util.Objects.requireNonNull;
+
+final class Foot<T> implements NNode<T> {
+ private PNode<T> prev;
+
+ public PNode<T> prev() {return prev;}
+ public void prev(PNode<T> pnode) {prev = pnode;}
+
+ Foot(Head<T> head) {
+ this.prev = requireNonNull(head);
+ head.next(this);
+ }
+}
\ No newline at end of file
diff --git a/yoko-util/src/main/java/org/apache/yoko/util/concurrent/Head.java b/yoko-util/src/main/java/org/apache/yoko/util/concurrent/Head.java
new file mode 100644
index 0000000..9f52e63
--- /dev/null
+++ b/yoko-util/src/main/java/org/apache/yoko/util/concurrent/Head.java
@@ -0,0 +1,7 @@
+package org.apache.yoko.util.concurrent;
+
+final class Head<T> implements PNode<T> {
+ private NNode<T> next;
+ public NNode<T> next() {return next;}
+ public void next(NNode<T> nnode) {next = nnode;}
+}
\ No newline at end of file
diff --git a/yoko-util/src/main/java/org/apache/yoko/util/concurrent/NNode.java b/yoko-util/src/main/java/org/apache/yoko/util/concurrent/NNode.java
new file mode 100644
index 0000000..51863fd
--- /dev/null
+++ b/yoko-util/src/main/java/org/apache/yoko/util/concurrent/NNode.java
@@ -0,0 +1,7 @@
+package org.apache.yoko.util.concurrent;
+
+interface NNode<T> {
+ PNode<T> prev();
+
+ void prev(PNode<T> pnode);
+}
\ No newline at end of file
diff --git a/yoko-util/src/main/java/org/apache/yoko/util/concurrent/NoOpRunnableFactory.java b/yoko-util/src/main/java/org/apache/yoko/util/concurrent/NoOpRunnableFactory.java
new file mode 100644
index 0000000..6d71a32
--- /dev/null
+++ b/yoko-util/src/main/java/org/apache/yoko/util/concurrent/NoOpRunnableFactory.java
@@ -0,0 +1,10 @@
+package org.apache.yoko.util.concurrent;
+
+import org.apache.yoko.util.Factory;
+import org.apache.yoko.util.KeyedFactory;
+
+public enum NoOpRunnableFactory implements Runnable, KeyedFactory<Object, Runnable> {
+ INSTANCE;
+ public Runnable create(Object key) {return this;}
+ public void run() {}
+}
diff --git a/yoko-util/src/main/java/org/apache/yoko/util/concurrent/PNode.java b/yoko-util/src/main/java/org/apache/yoko/util/concurrent/PNode.java
new file mode 100644
index 0000000..9ee2fdf
--- /dev/null
+++ b/yoko-util/src/main/java/org/apache/yoko/util/concurrent/PNode.java
@@ -0,0 +1,7 @@
+package org.apache.yoko.util.concurrent;
+
+interface PNode<T> {
+ NNode<T> next();
+
+ void next(NNode<T> nnode);
+}
\ No newline at end of file
diff --git a/yoko-util/src/main/java/org/apache/yoko/util/concurrent/ReferenceCountedCache.java b/yoko-util/src/main/java/org/apache/yoko/util/concurrent/ReferenceCountedCache.java
new file mode 100644
index 0000000..b031a3a
--- /dev/null
+++ b/yoko-util/src/main/java/org/apache/yoko/util/concurrent/ReferenceCountedCache.java
@@ -0,0 +1,124 @@
+package org.apache.yoko.util.concurrent;
+
+import org.apache.yoko.util.*;
+
+import java.lang.ref.ReferenceQueue;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+public class ReferenceCountedCache<K, V> implements Cache<K,V> {
+
+ private final ConcurrentMap<K, CountedEntry<K, V>> map = new ConcurrentHashMap<>();
+ private final Fifa<CountedEntry<K, V>> idleEntries = new ConcurrentFifo<>();
+ private volatile int threshold;
+ private volatile int sweep;
+ private final Cleaner<V> cleaner;
+ private final ReferenceQueue<Reference<V>> gcQueue;
+
+ /**
+ * Create a new cache
+ * @param cleaner the object to use to clean entries
+ * @param threshold the number of values above which to start cleaning up
+ * @param sweep the number of unused values to clear up
+ */
+ public ReferenceCountedCache(Cleaner<V> cleaner, int threshold, int sweep) {
+ this.threshold = threshold;
+ this.sweep = sweep;
+ this.cleaner = cleaner;
+ gcQueue = new ReferenceQueue<>();
+ }
+
+ @Override
+ public int size() {
+ return map.size();
+ }
+
+ @Override
+ public int idleCount() {
+ return idleEntries.size();
+ }
+
+ @Override
+ public Reference<V> get(K key) {
+ CountedEntry<K, V> entry = map.get(key);
+ if (entry == null) return null;
+ return entry == null ? null : track(entry.obtain());
+ }
+
+ @Override
+ public Reference<V> getOrCreate(K key, KeyedFactory<K, V> valueFactory) {
+ CountedEntry<K,V>.ValueReference result;
+ do {
+ CountedEntry<K, V> entry = map.get(key);
+ if (entry == null) {
+ // try putting a new entry in the map
+ CountedEntry<K, V> newEntry = new CountedEntry<>(key, idleEntries);
+ entry = map.putIfAbsent(key, newEntry);
+ if (entry == null) {
+ // this thread won the race to create the new entry
+ V value = null;
+ try {
+ value = valueFactory.create(key);
+ return track(newEntry.setValue(value));
+ } finally {
+ if (value == null) {
+ // create() threw an exception, so clean up
+ // and make sure no-one else tries to use this entry
+ newEntry.abort();
+ map.remove(key, newEntry);
+ }
+ }
+ }
+ }
+ result = entry.obtain();
+ } while (result == null); // the entry was cleared - try again
+ return track(result);
+ }
+
+ protected CountedEntry<K,V>.ValueReference track(CountedEntry<K,V>.ValueReference ref) {return ref;}
+
+ @Override
+ public final Reference<V> getOrCreate(K key, final Factory<V> factory) {
+ return getOrCreate(key, new KeyedFactory<K, V>() {
+ @Override
+ public V create(K key) {
+ return factory.create();
+ }
+ });
+ }
+
+ @Override
+ public void remove(Reference<V> ref) {remove(((CountedEntry<K,V>.ValueReference) ref).invalidateAndGetEntry());}
+
+ protected void remove(CountedEntry<K,V> entry) {if (entry != null) map.remove(entry.key, entry);}
+
+ @Override
+ public int clean() {
+ if (size() <= threshold) return 0;
+ int removed = 0;
+ while (removed < sweep) {
+ CountedEntry<K, V> e = idleEntries.peek();
+ if (e == null) break;
+ V clearedValue = e.clear();
+ if (clearedValue == null) continue;
+ if (!!!map.remove(e.key, e))
+ throw new IllegalStateException("Entry already removed");
+ cleaner.clean(clearedValue);
+ removed++;
+ }
+ return removed;
+ }
+
+ @Override
+ public Map<K, V> snapshot() {
+ Map<K, V> result = new HashMap<>();
+ for (Map.Entry<K,CountedEntry<K, V>> entry : map.entrySet()) {
+ try (Reference<V> ref = entry.getValue().obtain()){
+ result.put(entry.getKey(), ref.get());
+ } catch (NullPointerException ignored) {}
+ }
+ return result;
+ }
+}
diff --git a/yoko-util/src/main/java/org/apache/yoko/util/concurrent/StrongNode.java b/yoko-util/src/main/java/org/apache/yoko/util/concurrent/StrongNode.java
new file mode 100644
index 0000000..b870ca4
--- /dev/null
+++ b/yoko-util/src/main/java/org/apache/yoko/util/concurrent/StrongNode.java
@@ -0,0 +1,29 @@
+package org.apache.yoko.util.concurrent;
+
+final class StrongNode<T> implements VNode<T> {
+ private final T value;
+ private PNode<T> prev;
+ private NNode<T> next;
+
+ StrongNode(T value) {this.value = value;}
+ public PNode<T> prev() {return prev;}
+ public NNode<T> next() {return next;}
+ public void prev(PNode<T> pnode) {prev = pnode;}
+ public void next(NNode<T> nnode) {next = nnode;}
+ public T get() {return value;}
+
+ public void insertAfter(PNode<T> pnode) {
+ NNode<T> nnode = pnode.next();
+ this.next = nnode;
+ this.prev = pnode;
+ nnode.prev(this);
+ pnode.next(this);
+ }
+
+ public void delete() {
+ this.prev.next(this.next);
+ this.next.prev(this.prev);
+ this.prev = null;
+ this.next = null;
+ }
+}
\ No newline at end of file
diff --git a/yoko-util/src/main/java/org/apache/yoko/util/concurrent/VNode.java b/yoko-util/src/main/java/org/apache/yoko/util/concurrent/VNode.java
new file mode 100644
index 0000000..1aef732
--- /dev/null
+++ b/yoko-util/src/main/java/org/apache/yoko/util/concurrent/VNode.java
@@ -0,0 +1,10 @@
+package org.apache.yoko.util.concurrent;
+
+public interface VNode<T> extends PNode<T>, NNode<T> {
+ T get();
+
+ void insertAfter(PNode<T> pnode);
+
+ void delete();
+
+}
diff --git a/yoko-util/src/main/java/org/apache/yoko/util/concurrent/WeakConcurrentFifo.java b/yoko-util/src/main/java/org/apache/yoko/util/concurrent/WeakConcurrentFifo.java
new file mode 100644
index 0000000..3dd9592
--- /dev/null
+++ b/yoko-util/src/main/java/org/apache/yoko/util/concurrent/WeakConcurrentFifo.java
@@ -0,0 +1,73 @@
+package org.apache.yoko.util.concurrent;
+
+import org.apache.yoko.util.KeyedFactory;
+import org.apache.yoko.util.Sequential;
+
+import java.lang.ref.ReferenceQueue;
+
+public class WeakConcurrentFifo<T> extends ConcurrentFifo<T> {
+ private final ReferenceQueue<T> refQueue = new ReferenceQueue<>();
+ private final KeyedFactory<? super T, Runnable> cleanupFactory;
+
+ WeakConcurrentFifo() {
+ this(NoOpRunnableFactory.INSTANCE);
+ }
+
+ WeakConcurrentFifo(KeyedFactory<? super T, Runnable> cleanupFactory) {
+ this.cleanupFactory = cleanupFactory;
+ }
+
+ @Override
+ public int size() {
+ cleanup();
+ return super.size();
+ }
+
+ @Override
+ public T peek() {
+ cleanup();
+ return super.peek();
+ }
+
+ @Override
+ public Place<T> put(T elem) {
+ cleanup();
+ return super.put(elem);
+ }
+
+ @Override
+ public T remove() {
+ cleanup();
+ return super.remove();
+ }
+
+ @Override
+ protected VNode<T> createNode(T elem) {
+ return new WeakNode<>(elem, refQueue, cleanupFactory.create(elem));
+ }
+
+ private void cleanup() {
+ do {
+ @SuppressWarnings("unchecked")
+ WeakNode<T> wn = (WeakNode<T>) refQueue.poll();
+ if (wn == null) return;
+ cleanup(wn);
+ } while (true);
+ }
+
+ private void cleanup(WeakNode<T> wn) {
+ RESPIN: do {
+ PNode<T> prev = wn.prev();
+ if (prev == null) return; // this node is already removed
+ synchronized (prev) {
+ if (wn.prev() != prev) continue RESPIN; // something changed!
+ synchronized (wn) {
+ wn.delete();
+ size.decrementAndGet();
+ wn.cleanup.run();
+ return;
+ }
+ }
+ } while (true);
+ }
+}
diff --git a/yoko-util/src/main/java/org/apache/yoko/util/concurrent/WeakCountedCache.java b/yoko-util/src/main/java/org/apache/yoko/util/concurrent/WeakCountedCache.java
new file mode 100644
index 0000000..6c0dffe
--- /dev/null
+++ b/yoko-util/src/main/java/org/apache/yoko/util/concurrent/WeakCountedCache.java
@@ -0,0 +1,35 @@
+package org.apache.yoko.util.concurrent;
+
+import org.apache.yoko.util.KeyedFactory;
+
+public class WeakCountedCache<K, V> extends ReferenceCountedCache<K, V> {
+ private final WeakConcurrentFifo<CountedEntry<K,V>.ValueReference> referenceTracker
+ = new WeakConcurrentFifo<>(new CleanupFactory());
+
+ /**
+ * Create a new cache
+ *
+ * @param cleaner the object to use to clean entries
+ * @param threshold the number of values above which to start cleaning up
+ * @param sweep the number of unused values to clear up
+ */
+ public WeakCountedCache(Cleaner<V> cleaner, int threshold, int sweep) {
+ super(cleaner, threshold, sweep);
+ }
+
+ @Override
+ protected CountedEntry<K,V>.ValueReference track(CountedEntry<K,V>.ValueReference ref) {
+ if (ref != null)
+ referenceTracker.put(ref);
+ return ref;
+ }
+
+ private final class CleanupFactory implements KeyedFactory<CountedEntry<K,V>.ValueReference, Runnable> {
+ public Runnable create(CountedEntry<K, V>.ValueReference key) {
+ // Do NOT keep the key around, since this must only be held weakly.
+ // Instead, keep the closer task so we can clean up after the key is collected.
+ // Subsequent calls to close do nothing, so it's safe to run this multiple times.
+ return key.getCloserTask();
+ }
+ }
+}
diff --git a/yoko-util/src/main/java/org/apache/yoko/util/concurrent/WeakNode.java b/yoko-util/src/main/java/org/apache/yoko/util/concurrent/WeakNode.java
new file mode 100644
index 0000000..1235b5f
--- /dev/null
+++ b/yoko-util/src/main/java/org/apache/yoko/util/concurrent/WeakNode.java
@@ -0,0 +1,34 @@
+package org.apache.yoko.util.concurrent;
+
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
+
+final class WeakNode<T> extends WeakReference<T> implements VNode<T> {
+ final Runnable cleanup;
+ private PNode<T> prev;
+ private NNode<T> next;
+
+ WeakNode(T value, ReferenceQueue<T> q, Runnable cleanup) {
+ super(value, q);
+ this.cleanup = cleanup;
+ }
+ public PNode<T> prev() {return prev;}
+ public NNode<T> next() {return next;}
+ public void prev(PNode<T> pnode) {prev = pnode;}
+ public void next(NNode<T> nnode) {next = nnode;}
+
+ public void insertAfter(PNode<T> pnode) {
+ NNode<T> nnode = pnode.next();
+ this.next = nnode;
+ this.prev = pnode;
+ nnode.prev(this);
+ pnode.next(this);
+ }
+
+ public void delete() {
+ this.prev.next(this.next);
+ this.next.prev(this.prev);
+ this.prev = null;
+ this.next = null;
+ }
+}
\ No newline at end of file
diff --git a/yoko-util/src/test/java/org/apache/yoko/util/concurrent/ConcurrentFifoTest.java b/yoko-util/src/test/java/org/apache/yoko/util/concurrent/ConcurrentFifoTest.java
new file mode 100644
index 0000000..4818250
--- /dev/null
+++ b/yoko-util/src/test/java/org/apache/yoko/util/concurrent/ConcurrentFifoTest.java
@@ -0,0 +1,274 @@
+package org.apache.yoko.util.concurrent;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.yoko.util.Sequential;
+import org.junit.Before;
+import org.junit.Test;
+
+@SuppressWarnings("unused")
+public class ConcurrentFifoTest {
+ private static final List<String> ELEMS = new ArrayList<>();
+
+ static {
+ for (char c1 = 'A'; c1 <= 'Z'; c1++)
+ for (char c2 = 'a'; c2 <= 'z'; c2++)
+ ELEMS.add("" + c1 + c2);
+ }
+
+ ConcurrentFifo<String> fifo;
+ Set<Sequential.Place<String>> places;
+ volatile CyclicBarrier startBarrier;
+
+ @Before
+ public void setupFifo() {
+ fifo = new ConcurrentFifo<>();
+ }
+
+ @Before
+ public void setupPlaces() {
+ places = Collections.newSetFromMap(new ConcurrentHashMap<Sequential.Place<String>, Boolean>());
+ }
+
+ @Before
+ public void nullifyStartBarrier() {
+ startBarrier = null;
+ }
+
+ @Test
+ public void testPuttingStuff() throws Exception {
+ // create tasks
+ List<Adder> tasks = new ArrayList<>();
+ for (String dummy : ELEMS)
+ tasks.add(new Adder());
+
+ // run the tasks concurrently
+ List<List<String>> expectedOrders = runConcurrently(tasks);
+
+ // convert the fifo to a list
+ List<String> actualOrder = drainFifo();
+
+ assertEquals(ELEMS.size() * ELEMS.size(), actualOrder.size());
+
+ // check correct ordering per Adder:
+ // as we tally the elements in order,
+ // the number of ELEM[n] encountered
+ // should always be greater than or
+ // equal to the number of ELEM[n+1]
+ // encountered
+ TreeMap<String, Integer> tallySheet = new TreeMap<>();
+ // start with zero for every element
+ for (String elem : ELEMS) tallySheet.put(elem, 0);
+ // place known elephant in Cairo
+ tallySheet.put("", Integer.MAX_VALUE);
+ int index = -1;
+ for (String elem : actualOrder) {
+ index ++;
+ int newTally = tallySheet.get(elem) + 1;
+ tallySheet.put(elem, newTally);
+ Entry<String, Integer> lowerEntry = tallySheet.lowerEntry(elem);
+ String msg = String.format("Element out of order at index %d: %d \"%s\" found but only %d \"%s\"", index, newTally, elem, lowerEntry.getValue(), lowerEntry.getKey());
+ assertTrue(msg, lowerEntry.getValue() >= newTally);
+ }
+ }
+
+ @Test
+ public void testPickingStuff() throws Exception {
+ // pre-populate elements
+ for (String elem : ELEMS)
+ places.add(fifo.put(elem));
+
+ // create tasks
+ List<Constrictor> tasks = new ArrayList<>();
+ for (String elem : ELEMS)
+ tasks.add(new Constrictor());
+
+ // run the tasks concurrently
+ List<List<String>> removalLists = runConcurrently(tasks);
+ for (List<String> list : removalLists) if (!!!list.isEmpty()) System.out.println(list);
+
+ // convert the queue to a list
+ List<String> remainingElements = drainFifo();
+
+ // check for the right number of entries
+ assertEquals(Collections.emptyList(), remainingElements);
+
+ // check everything was removed exactly once
+ List<String> checkedRemovals = concatenate(removalLists);
+ Collections.sort(checkedRemovals);
+ assertEquals(ELEMS, checkedRemovals);
+ }
+
+ @Test
+ public void testGettingStuff() throws Exception {
+ // pre-populate elements
+ for (String elem : ELEMS)
+ places.add(fifo.put(elem));
+
+ // create tasks
+ List<Wiper> tasks = new ArrayList<>();
+ for (String elem : ELEMS)
+ tasks.add(new Wiper());
+
+ // run the tasks concurrently
+ List<List<String>> removalLists = runConcurrently(tasks);
+ for (List<String> list : removalLists) if (!!!list.isEmpty()) System.out.println(list);
+
+ // convert the queue to a list
+ List<String> remainingElements = drainFifo();
+
+ // check for the right number of entries
+ assertEquals(Collections.emptyList(), remainingElements);
+
+ // check everything was removed exactly once
+ List<String> checkedRemovals = concatenate(removalLists);
+ Collections.sort(checkedRemovals);
+ assertEquals(ELEMS, checkedRemovals);
+
+ // check no-one removed anything out of order
+ // i.e. each list of removed elements should remain unchanged by sorting
+ for (List<String> removed : removalLists)
+ assertEquals(new ArrayList<>(new TreeSet<>(removed)), removed);
+ }
+
+ @Test
+ public void testAllOpsConcurrently() throws Exception {
+ // pre-populate all the possible elements
+ for (String elem : ELEMS)
+ places.add(fifo.put(elem));
+
+ // create tasks
+ List<Callable<List<String>>> tasks = new ArrayList<>();
+ for (int i = 0; i < 1000; i++) {
+ switch (i % 4) {
+ case 0:
+ case 2:
+ tasks.add(new Adder());
+ break;
+ case 1:
+ tasks.add(new Constrictor());
+ break;
+ case 3:
+ tasks.add(new Wiper());
+ break;
+ }
+ }
+
+ List<String> added = new ArrayList<>(), removed = new ArrayList<>();
+
+ // collate additions and subtractions
+ Iterator<Callable<List<String>>> iterator = tasks.iterator();
+ for (List<String> results : runConcurrently(tasks))
+ (iterator.next() instanceof Adder ? added : removed).addAll(results);
+ added.addAll(ELEMS); // these were added up front
+ Collections.sort(added);
+ List<String> remainder = drainFifo();
+ removed.addAll(remainder);
+ Collections.sort(removed);
+
+ assertEquals(added, removed);
+
+ }
+
+ private <T> List<T> concatenate(List<List<T>> lists) {
+ List<T> result = new ArrayList<>();
+ for (List<T> list : lists)
+ result.addAll(list);
+ return result;
+ }
+
+ private List<String> drainFifo() {
+ List<String> queuedOrder = new ArrayList<>();
+ do {
+ String o = fifo.remove();
+ if (o == null) break;
+ queuedOrder.add(o);
+ } while (true);
+ return queuedOrder;
+ }
+
+ private <T> List<T> runConcurrently(List<? extends Callable<T>> tasks)
+ throws InterruptedException, ExecutionException {
+ // start all the threads
+ startBarrier = new CyclicBarrier(tasks.size());
+ ExecutorService xs = Executors.newFixedThreadPool(tasks.size());
+ try {
+ List<Future<T>> futures = new ArrayList<>();
+ for (Callable<T> task : tasks)
+ futures.add(xs.submit(task));
+
+ // collect the results
+ List<T> results = new ArrayList<>();
+ for (Future<T> future : futures)
+ results.add(future.get());
+ return results;
+ } finally {
+ xs.shutdown();
+ }
+ }
+
+ /** Adds stuff to the FIFO */
+ class Adder implements Callable<List<String>> {
+ @Override
+ public List<String> call() throws Exception {
+ List<String> added = new ArrayList<>();
+ startBarrier.await();
+ for (String elem : new ArrayList<>(ELEMS)) {
+ places.add(fifo.put(elem));
+ added.add(elem);
+ }
+ return added;
+ }
+ }
+
+ /** Constrains the FIFO by removing specific elements */
+ class Constrictor implements Callable<List<String>> {
+ @Override
+ public List<String> call() throws Exception {
+ List<String> constricted = new ArrayList<>();
+ startBarrier.await();
+ while (!!! places.isEmpty()) {
+ // get a copy of the known places in the fifo and shuffle it
+ List<Sequential.Place<String>> myPlaces = new ArrayList<>(places);
+ Collections.shuffle(myPlaces);
+ // remove everything we can
+ for (Sequential.Place<String> place : myPlaces) {
+ String elem = place.relinquish();
+ if (elem != null)
+ constricted.add(elem);
+ places.remove(place);
+ }
+ }
+ return constricted;
+ }
+ }
+
+ /** Wipes out the FIFO by getting all the elements */
+ class Wiper implements Callable<List<String>> {
+ @Override
+ public List<String> call() throws Exception {
+ List<String> wiped = new ArrayList<>();
+ startBarrier.await();
+ for (String elem = fifo.remove(); elem != null; elem = fifo.remove())
+ wiped.add(elem);
+ return wiped;
+ }
+ }
+}
diff --git a/yoko-util/src/test/java/org/apache/yoko/util/concurrent/ReferenceCountedCacheTest.java b/yoko-util/src/test/java/org/apache/yoko/util/concurrent/ReferenceCountedCacheTest.java
new file mode 100644
index 0000000..65c304e
--- /dev/null
+++ b/yoko-util/src/test/java/org/apache/yoko/util/concurrent/ReferenceCountedCacheTest.java
@@ -0,0 +1,255 @@
+package org.apache.yoko.util.concurrent;
+
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+
+import org.apache.yoko.util.Cache;
+import org.apache.yoko.util.KeyedFactory;
+import org.apache.yoko.util.Reference;
+import org.junit.After;
+import org.junit.Test;
+
+import org.junit.runner.RunWith;
+import org.mockito.Spy;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ReferenceCountedCacheTest {
+ private static final ConcurrentLinkedQueue<Integer> createdInts = new ConcurrentLinkedQueue<>();
+ private static final ConcurrentLinkedQueue<Integer> deletedInts = new ConcurrentLinkedQueue<>();
+ private static class StringToInteger implements KeyedFactory<String, Integer>, Cache.Cleaner<Integer> {
+ @Override
+ public Integer create(String key) {
+ Integer result = new Integer(key);
+ createdInts.add(result);
+ return result;
+ }
+
+ @Override
+ public void clean(Integer value) {
+ deletedInts.add(value);
+ }
+ }
+ private static class BadFactory implements KeyedFactory<String, Integer> {
+ @Override
+ public Integer create(String key) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ @Spy
+ StringToInteger factory;
+ @Spy
+ BadFactory badFactory;
+ ReferenceCountedCache<String, Integer> cache;
+ volatile CyclicBarrier startBarrier, endBarrier;
+ volatile boolean retrieving = true;
+
+ @After
+ public void setup() {
+ createdInts.clear();
+ deletedInts.clear();
+ cache = null;
+ retrieving = true;
+ startBarrier = endBarrier = null;
+ }
+
+ @Test
+ public void testGetAndCreate() {
+ cache = new ReferenceCountedCache<>(factory, 0, 5);
+ assertEquals(null, cache.get("1"));
+ try (Reference<Integer> ref = cache.getOrCreate("1", factory);)
+ {assertEquals(Integer.valueOf(1), ref.get());}
+ try (Reference<Integer> ref = cache.get("1");)
+ {assertEquals(Integer.valueOf(1), ref.get());}
+ cache.clean();
+ assertEquals(null, cache.get("1"));
+ }
+
+ @Test
+ public void testFailedCreateDoesNotPolluteCache() {
+ cache = new ReferenceCountedCache<>(factory, 0, 5);
+ assertEquals(null, cache.get("1"));
+ try (Reference<Integer> ref = cache.getOrCreate("1", badFactory);) {
+ fail("getOrCreate() should throw an exception");
+ } catch (UnsupportedOperationException expected) {}
+
+ assertEquals(null, cache.get("1"));
+
+ try (Reference<Integer> ref = cache.getOrCreate("1", factory);)
+ {assertEquals(Integer.valueOf(1), ref.get());}
+ try (Reference<Integer> ref = cache.get("1");)
+ {assertEquals(Integer.valueOf(1), ref.get());}
+ cache.clean();
+ assertEquals(null, cache.get("1"));
+ }
+
+ @Test
+ public void testCreateAndClean() {
+ cache = new ReferenceCountedCache<>(factory, 3, 5);
+ cache.getOrCreate("0", factory).close();
+ cache.getOrCreate("1", factory).close();
+ assertEquals(2, cache.snapshot().size());
+ cache.getOrCreate("2", factory).close();
+ assertEquals(3, cache.snapshot().size());
+ cache.getOrCreate("3", factory).close();
+ assertEquals(4, cache.snapshot().size());
+ long cleaned = cache.clean();
+ assertEquals(4, cleaned);
+ assertEquals(0, cache.snapshot().size());
+ }
+
+ @Test
+ public void testCreateEntries() {
+ cache = new ReferenceCountedCache<>(factory, 0, 0);
+ // new entries should result in factory invocations
+ cache.getOrCreate("0", factory);
+ verify(factory).create("0");
+ cache.getOrCreate("1", factory);
+ verify(factory).create("1");
+ // existing entries should not invoke the factory further
+ cache.getOrCreate("0", factory);
+ cache.getOrCreate("1", factory);
+ verify(factory, times(2)).create(anyString());
+ System.out.println(createdInts);
+ }
+
+ @Test
+ public void testReleaseResults() {
+ cache = new ReferenceCountedCache<>(factory, 3, 5);
+ Reference<Integer> r0, r1, r2, r3, r4, r5;
+ r0 = cache.getOrCreate("0", factory);
+ r1 = cache.getOrCreate("1", factory);
+ r2 = cache.getOrCreate("2", factory);
+ // check the references are to the right values
+ assertEquals(Integer.valueOf(0), r0.get());
+ assertEquals(Integer.valueOf(1), r1.get());
+ assertEquals(Integer.valueOf(2), r2.get());
+ // check the size is correct
+ assertEquals(3, cache.size());
+ assertEquals(0, cache.idleCount());
+ r0.close();
+ // after releasing one ref, we should see only the unused count go up.
+ assertEquals(3, cache.size());
+ assertEquals(1, cache.idleCount());
+ r1.close();
+
+ assertEquals(3, cache.size());
+ assertEquals(2, cache.idleCount());
+
+ // this should do nothing
+ cache.clean();
+
+ assertEquals(3, cache.size());
+ assertEquals(2, cache.idleCount());
+
+ // this should force a cleanup
+ r3 = cache.getOrCreate("3", factory);
+ cache.clean();
+ assertEquals(2, cache.size());
+ assertEquals(0, cache.idleCount());
+
+ // check the expected methods were called on the factory
+ verify(factory).clean(0);
+ verify(factory).clean(1);
+ verify(factory, times(2)).clean(anyInt());
+ }
+
+ @Test
+ public void testMultiThreaded() throws Exception {
+ cache = new ReferenceCountedCache<>(factory, 15, 7);
+ int retrievers = 50;
+ int cleaners = 5;
+ startBarrier = new CyclicBarrier(retrievers + 1);
+ endBarrier = new CyclicBarrier(retrievers);
+ retrieving = true;
+ ExecutorService xs = Executors.newFixedThreadPool(retrievers + cleaners);
+ List<Future<List<Integer>>> retrievals = new ArrayList<>();
+ List<Future<Long>> cleanTallies = new ArrayList<>();
+
+ for (int i = 0; i < retrievers; i++)
+ retrievals.add(xs.submit(new Retriever(20)));
+ for (int i = 0; i < cleaners; i++)
+ cleanTallies.add(xs.submit(new Cleaner()));
+
+ startBarrier.await();
+ long cleaned = 0;
+ for (Future<Long> cleanTally : cleanTallies)
+ cleaned += cleanTally.get();
+
+ Set<Integer> results = newIdentityHashSet();
+ for (Future<List<Integer>> retrieval : retrievals)
+ results.addAll(retrieval.get());
+
+ Set<Integer> created = newIdentityHashSet(createdInts);
+
+ Set<Integer> deleted = newIdentityHashSet(deletedInts);
+
+ System.out.printf("%ncreated %d values", created.size());
+ System.out.printf("%ndeleted %d values", deleted.size());
+ System.out.printf("%nfetched %d values", results.size());
+ System.out.printf("%ncleaned %d values", cleaned);
+ System.out.printf("%nremaining entries: %s%n", cache.snapshot());
+
+ assertEquals(deleted.size(), cleaned);
+ assertEquals(created, unionByIdentity(deleted, cache.snapshot().values()));
+ assertEquals(created, results);
+ }
+
+ private static <T> Set<T> newIdentityHashSet() {
+ return Collections.newSetFromMap(new IdentityHashMap<T, Boolean>());
+ }
+
+ private static <T> Set<T> newIdentityHashSet(Collection<? extends T> c) {
+ Set<T> result = newIdentityHashSet();
+ result.addAll(c);
+ return result;
+ }
+
+ private static <T> Set<T> unionByIdentity(Collection<T>...collections) {
+ Set<T> result = newIdentityHashSet();
+ for(Collection<T> c : collections)
+ result.addAll(c);
+ return result;
+ }
+
+
+ class Retriever implements Callable<List<Integer>> {
+ final int bound;
+ final Random random = new Random();
+
+ Retriever(int bound) { this.bound = bound; }
+
+ @Override
+ public List<Integer> call() throws Exception {
+ List<Integer> list = new ArrayList<>();
+ try {
+ startBarrier.await();
+ for (int i = 0; i < 1_000; i++) {
+ try (Reference<Integer> ref = cache.getOrCreate("" + random.nextInt(bound), factory)) {
+ list.add(ref.get());
+ }
+ }
+ endBarrier.await();
+ retrieving = false;
+ } catch (Throwable t) {
+ System.out.printf("Retriever aborted with %s.%n", t);
+ t.printStackTrace(System.out);
+ }
+ return list;
+
+ }
+ }
+
+ class Cleaner implements Callable<Long> {
+ @Override
+ public Long call() throws Exception {
+ long cleaned = 0;
+ while (retrieving) cleaned += cache.clean();
+ return cleaned;
+ }
+ }
+}
diff --git a/yoko-util/src/test/java/org/apache/yoko/util/concurrent/WeakConcurrentFifoTest.java b/yoko-util/src/test/java/org/apache/yoko/util/concurrent/WeakConcurrentFifoTest.java
new file mode 100644
index 0000000..75a0727
--- /dev/null
+++ b/yoko-util/src/test/java/org/apache/yoko/util/concurrent/WeakConcurrentFifoTest.java
@@ -0,0 +1,105 @@
+package org.apache.yoko.util.concurrent;
+
+import org.apache.yoko.util.KeyedFactory;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.lang.ref.WeakReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+@RunWith(MockitoJUnitRunner.class)
+public class WeakConcurrentFifoTest extends ConcurrentFifoTest {
+ private KeyedFactory<String, Runnable> factory = new KeyedFactory<String, Runnable>() {
+ public Runnable create(String key) {
+ return cleanup;
+ }
+ };
+
+ @Mock
+ private Runnable cleanup;
+
+ @Override
+ @Before
+ public void setupFifo() {
+ fifo = new WeakConcurrentFifo<>(factory);
+ }
+
+ @Test
+ public void testWeakRefsGetCollectedOnRemove() {
+ WeakReference[] refs;
+
+ refs = enqueueStringsCollectably("foo", "foo", "bar", "bar", "bar", "bar");
+ fifo.put("baz"); // strings in constant pool are never GC'd
+
+ assertEquals(refs.length + 1, fifo.size());
+ assertEquals("foo", fifo.remove());
+ assertEquals(refs.length, fifo.size());
+
+ gcUntilCleared(refs);
+
+ verify(cleanup, times(0)).run();
+ assertEquals("baz", fifo.remove());
+ verify(cleanup, times(refs.length - 1)).run();
+ }
+
+ @Test
+ public void testWeakRefsGetCollectedOnPut() {
+ WeakReference[] refs;
+
+ refs = enqueueStringsCollectably("foo", "foo", "bar", "bar", "bar", "bar");
+
+ assertEquals(refs.length, fifo.size());
+ assertEquals("foo", fifo.remove());
+ assertEquals(refs.length - 1, fifo.size());
+
+ gcUntilCleared(refs);
+
+ verify(cleanup, times(0)).run();
+ fifo.put("baz");
+ verify(cleanup, times(refs.length - 1)).run();
+ assertEquals("baz", fifo.remove());
+ }
+
+ @Test
+ public void testWeakRefsGetCollectedOnSize() {
+ WeakReference[] refs;
+
+ refs = enqueueStringsCollectably("foo", "foo", "bar", "bar", "bar", "bar");
+
+ assertEquals(refs.length, fifo.size());
+ assertEquals("foo", fifo.remove());
+ assertEquals(refs.length - 1, fifo.size());
+
+ gcUntilCleared(refs);
+
+ verify(cleanup, times(0)).run();
+ assertEquals(0, fifo.size());
+ verify(cleanup, times(refs.length - 1)).run();
+ }
+
+ private WeakReference[] enqueueStringsCollectably(String... strings) {
+ WeakReference[] refs = new WeakReference[strings.length];
+ for (int i = 0 ; i < strings.length; i++) {
+ String s = new String(strings[i]);
+ refs[i] = new WeakReference(s);
+ fifo.put(s);
+ }
+ return refs;
+ }
+
+ public static void gcUntilCleared(WeakReference<?>... refs) {
+ for (WeakReference<?> ref : refs) {
+ while (ref.get() != null) {
+ System.out.print("gc ");
+ System.gc();
+ }
+ System.out.println();
+ }
+ }
+}