| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.yoko.orb.OB; |
| |
| import org.apache.yoko.orb.CORBA.InputStream; |
| import org.apache.yoko.orb.OBPortableServer.POAManager_impl; |
| import org.apache.yoko.orb.OCI.ConnectorInfo; |
| import org.apache.yoko.orb.OCI.GiopVersion; |
| import org.apache.yoko.orb.OCI.Transport; |
| import org.omg.CONV_FRAME.CodeSetContext; |
| import org.omg.CONV_FRAME.CodeSetContextHolder; |
| import org.omg.CORBA.COMM_FAILURE; |
| import org.omg.CORBA.CompletionStatus; |
| import org.omg.CORBA.NO_IMPLEMENT; |
| import org.omg.CORBA.SystemException; |
| import org.omg.CORBA.SystemExceptionHelper; |
| import org.omg.CORBA.TRANSIENT; |
| import org.omg.CORBA.UNKNOWN; |
| import org.omg.GIOP.LocateStatusType_1_2; |
| import org.omg.GIOP.ReplyStatusType_1_2; |
| import org.omg.IOP.CodeSets; |
| import org.omg.IOP.IOR; |
| import org.omg.IOP.IORHelper; |
| import org.omg.IOP.IORHolder; |
| import org.omg.IOP.ServiceContext; |
| import org.omg.IOP.UnknownExceptionInfo; |
| import org.omg.PortableServer.POAManager; |
| import org.omg.SendingContext.CodeBase; |
| |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| abstract public class GIOPConnection implements DowncallEmitter, UpcallReturn { |
| static final java.util.logging.Logger logger = java.util.logging.Logger.getLogger(GIOPConnection.class.getName()); |
| |
| // ---------------------------------------------------------------- |
| // Inner classes |
| // ---------------------------------------------------------------- |
| |
| /* access operations class */ |
| public static final class AccessOp { |
| public static final int Nil = 0; |
| |
| public static final int Read = 1; |
| |
| public static final int Write = 2; |
| |
| public static final int Close = 4; |
| |
| public static final int All = 7; |
| } |
| |
| /* connection properties */ |
| public static final class Property { |
| public static final int RequestSent = 1; |
| |
| public static final int ReplySent = 2; |
| |
| public static final int Destroyed = 4; |
| |
| public static final int CreatedByClient = 8; |
| |
| public static final int ClientEnabled = 16; |
| |
| public static final int ServerEnabled = 32; |
| |
| public static final int ClosingLogged = 64; |
| } |
| |
| /* connection states */ |
| public static final class State { |
| public static final int Active = 1; |
| |
| public static final int Holding = 2; |
| |
| public static final int Closing = 3; |
| |
| public static final int Error = 4; |
| |
| public static final int Closed = 5; |
| } |
| |
| /* task to execute when ACM timer signal arrives */ |
| final class ACMTask extends java.util.TimerTask { |
| GIOPConnection connection_; |
| |
| public ACMTask(GIOPConnection parent) { |
| connection_ = parent; |
| } |
| |
| public void run() { |
| // |
| // execute the callback method |
| // |
| connection_.ACM_callback(); |
| |
| // |
| // break cyclic dependency |
| // |
| connection_ = null; |
| } |
| } |
| |
| // ---------------------------------------------------------------- |
| // Member data |
| // ---------------------------------------------------------------- |
| |
| /** the next request id */ |
| private final AtomicInteger nextRequestId; |
| |
| /** the ORB instance this connection is bound with */ |
| protected ORBInstance orbInstance_ = null; |
| |
| /** transport this connection represents */ |
| protected Transport transport_ = null; |
| |
| /** Client parent (null if server-side only) */ |
| private final ConnectorInfo outboundConnectionKey; |
| |
| /** Object-adapter interface (null if client-side only) */ |
| protected OAInterface oaInterface_ = null; |
| |
| /** storage space for unsent/pending messages */ |
| protected MessageQueue messageQueue_ = new MessageQueue(); |
| |
| /** enabled processing operations */ |
| protected int enabledOps_ = AccessOp.Nil; |
| |
| /** enabled connection property flags */ |
| protected int properties_ = 0; |
| |
| /** state of this connection */ |
| protected int state_ = State.Holding; |
| |
| /** number of upcalls in progress */ |
| protected int upcallsInProgress_ = 0; |
| |
| /** code converters used by the connection */ |
| protected CodeConverters codeConverters_ = null; |
| |
| /** maximum GIOP version encountered during message transactions */ |
| protected org.omg.GIOP.Version giopVersion_ = new org.omg.GIOP.Version( |
| (byte) 0, (byte) 0); |
| |
| /** ACM timeout variables */ |
| protected int shutdownTimeout_ = 2; |
| |
| protected int idleTimeout_ = 0; |
| |
| /** timer used for ACM management */ |
| protected java.util.Timer acmTimer_ = null; |
| |
| private CodeBase serverRuntime_; |
| |
| protected ACMTask acmTask_ = null; |
| |
| // ---------------------------------------------------------------- |
| // Protected methods |
| // ---------------------------------------------------------------- |
| |
| // |
| // check if its compliant for this connection to send a |
| // CloseConnection message to its peer |
| // |
| synchronized protected boolean canSendCloseConnection() { |
| // |
| // any GIOP versioned server can send a CloseConnection |
| // |
| if ((properties_ & Property.ServerEnabled) != 0) |
| return true; |
| |
| // |
| // anything >= than GIOP 1.2 can send a CloseConnection |
| // |
| if (giopVersion_.major > 1 |
| || (giopVersion_.major == 1 && giopVersion_.minor >= 2)) |
| return true; |
| |
| // |
| // otherwise we can't send it |
| // |
| return false; |
| } |
| |
| /** read the codeset information from the SCL */ |
| protected void readCodeConverters(org.omg.IOP.ServiceContext[] scl) { |
| if (codeConverters_ != null) |
| return; |
| |
| for (ServiceContext aScl : scl) { |
| if (aScl.context_id == CodeSets.value) { |
| CodeSetContextHolder codeSetContextH = new CodeSetContextHolder(); |
| CodeSetUtil.extractCodeSetContext(aScl, codeSetContextH); |
| CodeSetContext codeSetContext = codeSetContextH.value; |
| |
| CodeSetDatabase db = CodeSetDatabase.instance(); |
| |
| codeConverters_ = new CodeConverters(); |
| codeConverters_.inputCharConverter = db.getConverter( |
| orbInstance_.getNativeCs(), codeSetContext.char_data); |
| codeConverters_.outputCharConverter = db.getConverter( |
| codeSetContext.char_data, orbInstance_.getNativeCs()); |
| codeConverters_.inputWcharConverter = db.getConverter( |
| orbInstance_.getNativeWcs(), codeSetContext.wchar_data); |
| codeConverters_.outputWcharConverter = db.getConverter( |
| codeSetContext.wchar_data, orbInstance_.getNativeWcs()); |
| |
| CoreTraceLevels coreTraceLevels = orbInstance_ |
| .getCoreTraceLevels(); |
| if (coreTraceLevels.traceConnections() >= 2) { |
| String msg = "receiving transmission code sets"; |
| msg += "\nchar code set: "; |
| if (codeConverters_.inputCharConverter != null) |
| msg += codeConverters_.inputCharConverter.getFrom().description; |
| else { |
| if (codeSetContext.char_data == 0) |
| msg += "none"; |
| else { |
| CodeSetInfo info = db.getCodeSetInfo(orbInstance_ |
| .getNativeCs()); |
| msg += info != null ? info.description : null; |
| } |
| } |
| msg += "\nwchar code set: "; |
| if (codeConverters_.inputWcharConverter != null) |
| msg += codeConverters_.inputWcharConverter.getFrom().description; |
| else { |
| if (codeSetContext.wchar_data == 0) |
| msg += "none"; |
| else { |
| CodeSetInfo info = db.getCodeSetInfo(orbInstance_ |
| .getNativeWcs()); |
| msg += info != null ? info.description : null; |
| } |
| } |
| |
| orbInstance_.getLogger().trace("incoming", msg); |
| } |
| break; |
| } |
| } |
| } |
| |
| /** |
| * set the OAInterface used by BiDir clients to handle requests |
| * @return true iff an OAInterface is found |
| */ |
| protected boolean setOAInterface(org.apache.yoko.orb.OCI.ProfileInfo pi) { |
| // |
| // Release the old OAInterface |
| // |
| oaInterface_ = null; |
| |
| // |
| // make sure we're allowed to do server processing as well as |
| // being bidir enabled. A server's OAInterface should not |
| // change whereas a bidir client would need to change regularly |
| // |
| Assert._OB_assert((properties_ & Property.CreatedByClient) != 0); |
| Assert._OB_assert((properties_ & Property.ServerEnabled) != 0); |
| Assert._OB_assert(orbInstance_ != null); |
| |
| org.apache.yoko.orb.OBPortableServer.POAManagerFactory poamanFactory = orbInstance_ |
| .getPOAManagerFactory(); |
| Assert._OB_assert(poamanFactory != null); |
| |
| org.omg.PortableServer.POAManager[] poaManagers = poamanFactory.list(); |
| |
| for (POAManager poaManager : poaManagers) { |
| try { |
| POAManager_impl poamanImpl = (POAManager_impl) poaManager; |
| |
| OAInterface oaImpl = poamanImpl |
| ._OB_getOAInterface(); |
| |
| IORHolder refIOR = new IORHolder(); |
| if (oaImpl.findByKey(pi.key, refIOR) == OAInterface.OBJECT_HERE) { |
| oaInterface_ = oaImpl; |
| return true; |
| } |
| } catch (ClassCastException ignore) {} |
| } |
| |
| return false; |
| } |
| |
| /** log the closing of this connection */ |
| synchronized protected void logClose(boolean initiatedClosure) { |
| if ((properties_ & Property.ClosingLogged) != 0) |
| return; |
| |
| properties_ |= Property.ClosingLogged; |
| |
| CoreTraceLevels coreTraceLevels = orbInstance_.getCoreTraceLevels(); |
| if (coreTraceLevels.traceConnections() > 0) { |
| org.apache.yoko.orb.OCI.TransportInfo info = transport_.get_info(); |
| String msg = "closing connection\n"; |
| msg += info.describe(); |
| |
| if (initiatedClosure) |
| orbInstance_.getLogger().trace("outgoing", msg); |
| else |
| orbInstance_.getLogger().trace("incoming", msg); |
| |
| } |
| } |
| |
| /** main entry point into message processing - delegate to a specific methods */ |
| protected Upcall processMessage(GIOPIncomingMessage msg) { |
| // |
| // update the version of GIOP found |
| // |
| synchronized (this) { |
| if (msg.version().major > giopVersion_.major) { |
| giopVersion_.major = msg.version().major; |
| giopVersion_.minor = msg.version().minor; |
| } else if (msg.version().major == giopVersion_.major |
| && msg.version().minor > giopVersion_.minor) { |
| giopVersion_.minor = msg.version().minor; |
| } |
| } |
| |
| // |
| // hand off message type processing |
| // |
| switch (msg.type().value()) { |
| case org.omg.GIOP.MsgType_1_1._Reply: |
| processReply(msg); |
| break; |
| |
| case org.omg.GIOP.MsgType_1_1._Request: |
| return processRequest(msg); |
| |
| case org.omg.GIOP.MsgType_1_1._LocateRequest: |
| processLocateRequest(msg); |
| break; |
| |
| case org.omg.GIOP.MsgType_1_1._CancelRequest: |
| break; |
| |
| case org.omg.GIOP.MsgType_1_1._LocateReply: |
| processLocateReply(msg); |
| break; |
| |
| case org.omg.GIOP.MsgType_1_1._CloseConnection: |
| processCloseConnection(msg); |
| break; |
| |
| case org.omg.GIOP.MsgType_1_1._MessageError: |
| processMessageError(msg); |
| break; |
| |
| case org.omg.GIOP.MsgType_1_1._Fragment: |
| processFragment(msg); |
| break; |
| |
| default: |
| processException( |
| State.Error, |
| new COMM_FAILURE( |
| MinorCodes.describeCommFailure(MinorCodes.MinorUnknownMessage), |
| MinorCodes.MinorUnknownMessage, |
| CompletionStatus.COMPLETED_MAYBE), |
| false); |
| break; |
| } |
| |
| return null; |
| } |
| |
| /** process a request message */ |
| synchronized protected Upcall processRequest(GIOPIncomingMessage msg) { |
| if ((properties_ & Property.ServerEnabled) == 0) { |
| processException(State.Error, new COMM_FAILURE( |
| MinorCodes |
| .describeCommFailure(MinorCodes.MinorWrongMessage), |
| MinorCodes.MinorWrongMessage, |
| CompletionStatus.COMPLETED_MAYBE), false); |
| return null; |
| } |
| |
| if (state_ != State.Active) |
| return null; |
| |
| int reqId; |
| org.omg.CORBA.BooleanHolder response = new org.omg.CORBA.BooleanHolder(); |
| org.omg.CORBA.StringHolder op = new org.omg.CORBA.StringHolder(); |
| org.omg.IOP.ServiceContextListHolder scl = new org.omg.IOP.ServiceContextListHolder(); |
| org.omg.GIOP.TargetAddressHolder target = new org.omg.GIOP.TargetAddressHolder(); |
| |
| try { |
| reqId = msg.readRequestHeader(response, target, op, scl); |
| if (target.value.discriminator() != org.omg.GIOP.KeyAddr.value) { |
| processException( |
| State.Error, |
| new NO_IMPLEMENT( |
| MinorCodes |
| .describeNoImplement(MinorCodes.MinorNotSupportedByLocalObject), |
| MinorCodes.MinorNotSupportedByLocalObject, |
| CompletionStatus.COMPLETED_NO), |
| false); |
| return null; |
| } |
| } catch (SystemException ex) { |
| processException(State.Error, ex, false); |
| return null; |
| } |
| |
| // |
| // Setup upcall data |
| // |
| org.omg.GIOP.Version version = msg.version(); |
| |
| org.apache.yoko.orb.OCI.ProfileInfo profileInfo = new org.apache.yoko.orb.OCI.ProfileInfo(); |
| profileInfo.major = version.major; |
| profileInfo.minor = version.minor; |
| profileInfo.key = target.value.object_key(); |
| |
| org.apache.yoko.orb.CORBA.InputStream in = msg.input(); |
| |
| // |
| // We have some decision making to do here if BiDir is |
| // enabled: |
| // - If this is a client then make sure to properly |
| // evaluate the message and obtain the correct OAInterface |
| // to create the upcalls |
| // - If this is a server then take the listen points from |
| // the SCL (if the POA has the BiDir policy enabled) and |
| // store them in this' transport for connection reuse |
| // |
| if ((properties_ & Property.CreatedByClient) != 0) { |
| if (setOAInterface(profileInfo) == false) { |
| // |
| // we can't find an appropriate OAInterface in order |
| // to direct the upcall so we must simply not handle |
| // this request |
| // |
| return null; |
| } |
| } |
| |
| // |
| // Parse the SCL, examining it for various codeset info |
| // |
| readCodeConverters(scl.value); |
| in._OB_codeConverters(codeConverters_, GiopVersion.get(version.major, version.minor)); |
| |
| // |
| // read in the peer's sending context runtime object |
| // |
| assignSendingContextRuntime(in, scl.value); |
| |
| // |
| // New upcall will be started |
| // |
| if (response.value) |
| upcallsInProgress_++; |
| |
| orbInstance_.getLogger().debug("Processing request reqId=" + reqId + " op=" + op.value); |
| |
| return oaInterface_.createUpcall( |
| response.value ? upcallReturnInterface() : null, profileInfo, |
| transport_.get_info(), reqId, op.value, in, scl.value); |
| } |
| |
| /** process a reply message */ |
| synchronized protected void processReply(GIOPIncomingMessage msg) { |
| if ((properties_ & Property.ClientEnabled) == 0) { |
| processException(State.Error, new COMM_FAILURE( |
| MinorCodes |
| .describeCommFailure(MinorCodes.MinorWrongMessage), |
| MinorCodes.MinorWrongMessage, |
| CompletionStatus.COMPLETED_MAYBE), false); |
| return; |
| } |
| |
| int reqId; |
| org.omg.GIOP.ReplyStatusType_1_2Holder status = new org.omg.GIOP.ReplyStatusType_1_2Holder(); |
| org.omg.IOP.ServiceContextListHolder scl = new org.omg.IOP.ServiceContextListHolder(); |
| |
| try { |
| reqId = msg.readReplyHeader(status, scl); |
| } catch (SystemException ex) { |
| processException(State.Error, ex, false); |
| return; |
| } |
| |
| Downcall down = messageQueue_.findAndRemovePending(reqId); |
| if (down == null) { |
| // |
| // Request id is unknown |
| // |
| processException(State.Error, new COMM_FAILURE( |
| MinorCodes |
| .describeCommFailure(MinorCodes.MinorUnknownReqId) |
| + ": " + reqId, MinorCodes.MinorUnknownReqId, |
| CompletionStatus.COMPLETED_MAYBE), false); |
| return; |
| } |
| |
| down.setReplySCL(scl.value); |
| org.apache.yoko.orb.CORBA.InputStream in = msg.input(); |
| |
| // |
| // read in the peer's sendig context runtime object |
| // |
| assignSendingContextRuntime(in, scl.value); |
| |
| orbInstance_.getLogger().debug("Processing reply for reqId=" + reqId + " status=" + status.value.value()); |
| |
| switch (status.value.value()) { |
| case ReplyStatusType_1_2._NO_EXCEPTION: |
| down.setNoException(in); |
| break; |
| |
| case ReplyStatusType_1_2._USER_EXCEPTION: |
| down.setUserException(in); |
| break; |
| |
| case ReplyStatusType_1_2._SYSTEM_EXCEPTION: { |
| try { |
| SystemException ex = Util.unmarshalSystemException(in); |
| ex = convertToUnknownExceptionIfAppropriate(ex, in, scl.value); |
| down.setSystemException(ex); |
| } catch (SystemException ex) { |
| processException(State.Error, ex, false); |
| } |
| |
| break; |
| } |
| |
| case ReplyStatusType_1_2._LOCATION_FORWARD: { |
| try { |
| IOR ior = IORHelper.read(in); |
| down.setLocationForward(ior, false); |
| } catch (SystemException ex) { |
| processException(State.Error, ex, false); |
| } |
| |
| break; |
| } |
| |
| case ReplyStatusType_1_2._LOCATION_FORWARD_PERM: { |
| try { |
| IOR ior = IORHelper.read(in); |
| down.setLocationForward(ior, true); |
| break; |
| } catch (SystemException ex) { |
| processException(State.Error, ex, false); |
| } |
| } |
| |
| case ReplyStatusType_1_2._NEEDS_ADDRESSING_MODE: |
| // |
| // TODO: implement |
| // |
| processException( |
| State.Error, |
| new NO_IMPLEMENT( |
| MinorCodes.describeNoImplement(MinorCodes.MinorNotSupportedByLocalObject), |
| MinorCodes.MinorNotSupportedByLocalObject, |
| CompletionStatus.COMPLETED_NO), false); |
| break; |
| |
| default: |
| processException( |
| State.Error, |
| new COMM_FAILURE( |
| MinorCodes.describeCommFailure(MinorCodes.MinorUnknownReplyMessage), |
| MinorCodes.MinorUnknownReplyMessage, |
| CompletionStatus.COMPLETED_MAYBE), |
| false); |
| break; |
| } |
| } |
| |
| private SystemException convertToUnknownExceptionIfAppropriate(SystemException ex, InputStream is, |
| ServiceContext[] scl) { |
| if (ex instanceof UNKNOWN) { |
| for (ServiceContext sc : scl) { |
| if (sc.context_id == UnknownExceptionInfo.value) { |
| return new UnresolvedException((UNKNOWN) ex, sc.context_data, is); |
| } |
| } |
| } |
| return ex; |
| } |
| |
| private void assignSendingContextRuntime(InputStream in, ServiceContext[] scl) { |
| if (serverRuntime_ == null) { |
| serverRuntime_ |
| = Util.getSendingContextRuntime (orbInstance_, scl); |
| } |
| |
| in.__setSendingContextRuntime(serverRuntime_); |
| |
| } |
| |
| /** process a LocateRequest message */ |
| synchronized protected void processLocateRequest(GIOPIncomingMessage msg) { |
| if ((properties_ & Property.ServerEnabled) == 0) { |
| processException(State.Error, new COMM_FAILURE( |
| MinorCodes.describeCommFailure(MinorCodes.MinorWrongMessage), |
| MinorCodes.MinorWrongMessage, |
| CompletionStatus.COMPLETED_MAYBE), false); |
| return; |
| } |
| Assert._OB_assert(state_ == State.Active); |
| |
| // |
| // Make sure the transport can send a reply |
| // |
| if (transport_.mode() == org.apache.yoko.orb.OCI.SendReceiveMode.ReceiveOnly) { |
| String message = "Discarding locate request - transport " |
| + "does not support twoway invocations"; |
| |
| org.apache.yoko.orb.OCI.TransportInfo transportInfo = transport_ |
| .get_info(); |
| if (transportInfo != null) { |
| String desc = transportInfo.describe(); |
| message += '\n'; |
| message += desc; |
| } else { |
| message += "\nCollocated method call"; |
| } |
| |
| Logger logger = orbInstance_.getLogger(); |
| logger.warning(message); |
| |
| return; |
| } |
| |
| try { |
| int reqId; |
| org.omg.GIOP.TargetAddressHolder target = new org.omg.GIOP.TargetAddressHolder(); |
| reqId = msg.readLocateRequestHeader(target); |
| |
| if (target.value.discriminator() != org.omg.GIOP.KeyAddr.value) { |
| processException( |
| State.Error, |
| new NO_IMPLEMENT( |
| MinorCodes.describeNoImplement(MinorCodes.MinorNotSupportedByLocalObject), |
| MinorCodes.MinorNotSupportedByLocalObject, |
| CompletionStatus.COMPLETED_NO), |
| false); |
| return; |
| } |
| |
| // |
| // Get the key |
| // |
| byte[] key = target.value.object_key(); |
| |
| // |
| // Find the IOR for the key |
| // |
| org.omg.IOP.IORHolder ior = new org.omg.IOP.IORHolder(); |
| int val = oaInterface_.findByKey(key, ior); |
| LocateStatusType_1_2 status = LocateStatusType_1_2 |
| .from_int(val); |
| |
| // |
| // Send back locate reply message |
| // |
| org.apache.yoko.orb.OCI.Buffer buf = new org.apache.yoko.orb.OCI.Buffer( |
| 12); |
| buf.pos(12); |
| org.apache.yoko.orb.CORBA.OutputStream out = new org.apache.yoko.orb.CORBA.OutputStream( |
| buf); |
| org.apache.yoko.orb.OCI.ProfileInfo profileInfo = new org.apache.yoko.orb.OCI.ProfileInfo(); |
| profileInfo.major = msg.version().major; |
| profileInfo.minor = msg.version().minor; |
| GIOPOutgoingMessage outgoing = new GIOPOutgoingMessage( |
| orbInstance_, out, profileInfo); |
| outgoing.writeLocateReplyHeader(reqId, status); |
| |
| // |
| // If the reply status is OBJECT_FORWARD or |
| // OBJECT_FORWARD_PERM the IOR is appended to the end of the |
| // LocateReply. |
| // |
| if (status == LocateStatusType_1_2.OBJECT_FORWARD |
| || status == LocateStatusType_1_2.OBJECT_FORWARD_PERM) |
| IORHelper.write(out, ior.value); |
| |
| // |
| // TODO: |
| // LOC_SYSTEM_EXCEPTION, |
| // LOC_NEEDS_ADDRESSING_MODE |
| // |
| int pos = out._OB_pos(); |
| out._OB_pos(0); |
| outgoing.writeMessageHeader(org.omg.GIOP.MsgType_1_1.LocateReply, |
| false, pos - 12); |
| out._OB_pos(pos); |
| |
| // |
| // A locate request is treated just like an upcall |
| // |
| upcallsInProgress_++; |
| |
| // |
| // Send the locate reply |
| // |
| sendUpcallReply(out._OB_buffer()); |
| } catch (SystemException ex) { |
| processException(State.Error, ex, false); |
| } |
| } |
| |
| /** process a LocateReply message */ |
| synchronized protected void processLocateReply(GIOPIncomingMessage msg) { |
| if ((properties_ & Property.ClientEnabled) == 0) { |
| processException(State.Closed, new COMM_FAILURE( |
| MinorCodes.describeCommFailure(MinorCodes.MinorWrongMessage), |
| MinorCodes.MinorWrongMessage, |
| CompletionStatus.COMPLETED_MAYBE), false); |
| return; |
| } |
| |
| int reqId; |
| org.omg.GIOP.LocateStatusType_1_2Holder status = new org.omg.GIOP.LocateStatusType_1_2Holder(); |
| |
| try { |
| reqId = msg.readLocateReplyHeader(status); |
| } catch (SystemException ex) { |
| processException(State.Error, ex, false); |
| return; |
| } |
| |
| Downcall down = messageQueue_.findAndRemovePending(reqId); |
| if (down == null) { |
| // |
| // Request id is unknown |
| // |
| processException(State.Error, new COMM_FAILURE( |
| MinorCodes.describeCommFailure(MinorCodes.MinorUnknownReqId), |
| MinorCodes.MinorUnknownReqId, |
| CompletionStatus.COMPLETED_MAYBE), false); |
| return; |
| } |
| |
| // |
| // Was this a LocateRequest? |
| // |
| String op = down.operation(); |
| if (!op.equals("_locate")) { |
| processException(State.Error, new COMM_FAILURE( |
| MinorCodes.describeCommFailure(MinorCodes.MinorWrongMessage), |
| MinorCodes.MinorWrongMessage, |
| CompletionStatus.COMPLETED_MAYBE), false); |
| return; |
| } |
| |
| org.apache.yoko.orb.CORBA.InputStream in = msg.input(); |
| Logger logger = orbInstance_.getLogger(); |
| |
| switch (status.value.value()) { |
| case LocateStatusType_1_2._UNKNOWN_OBJECT: |
| down.setSystemException(new org.omg.CORBA.OBJECT_NOT_EXIST()); |
| break; |
| |
| case LocateStatusType_1_2._OBJECT_HERE: |
| down.setNoException(in); |
| break; |
| |
| case LocateStatusType_1_2._OBJECT_FORWARD: |
| try { |
| IOR ior = IORHelper.read(in); |
| down.setLocationForward(ior, false); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Locate request forwarded to " + IORDump.PrintObjref(orbInstance_.getORB(), ior)); |
| } |
| } catch (SystemException ex) { |
| logger.warning("An error occurred while reading a " |
| + "locate reply, possibly indicating\n" |
| + "an interoperability problem. You may " |
| + "need to set the LocateRequestPolicy\n" |
| + "to false."); |
| down.setSystemException(ex); |
| processException(State.Error, ex, false); |
| } |
| break; |
| |
| case LocateStatusType_1_2._OBJECT_FORWARD_PERM: |
| try { |
| IOR ior = IORHelper.read(in); |
| down.setLocationForward(ior, true); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Locate request forwarded to " + IORDump.PrintObjref(orbInstance_.getORB(), ior)); |
| } |
| } catch (SystemException ex) { |
| logger.warning("An error occurred while reading a " |
| + "locate reply, possibly indicating\n" |
| + "an interoperability problem. You may " |
| + "need to set the LocateRequestPolicy\n" |
| + "to false."); |
| down.setSystemException(ex); |
| processException(State.Error, ex, false); |
| } |
| |
| break; |
| |
| case LocateStatusType_1_2._LOC_SYSTEM_EXCEPTION: |
| try { |
| SystemException ex = SystemExceptionHelper.read(in); |
| down.setSystemException(ex); |
| } catch (SystemException ex) { |
| down.setSystemException(ex); |
| processException(State.Error, ex, false); |
| } |
| |
| break; |
| |
| case LocateStatusType_1_2._LOC_NEEDS_ADDRESSING_MODE: |
| // TODO: implement |
| processException(State.Error, new NO_IMPLEMENT(), false); |
| break; |
| } |
| } |
| |
| /** process a CloseConnection message */ |
| protected void processCloseConnection(GIOPIncomingMessage msg) { |
| orbInstance_.getLogger().debug("Close connection request received from peer"); |
| if ((properties_ & Property.ClientEnabled) != 0) { |
| // |
| // If the peer closes the connection, all outstanding |
| // requests can safely be reissued. Thus we send all |
| // of them a TRANSIENT exception with a completion |
| // status of COMPLETED_NO. This is done by calling |
| // exception() with the notCompleted parameter set to |
| // true. |
| // |
| processException( |
| State.Closed, |
| new TRANSIENT( |
| MinorCodes.describeTransient(MinorCodes.MinorCloseConnection), |
| MinorCodes.MinorCloseConnection, |
| CompletionStatus.COMPLETED_NO), true); |
| } else { |
| setState(State.Closed); |
| } |
| } |
| |
| /** process a MessageError message */ |
| protected void processMessageError(GIOPIncomingMessage msg) { |
| processException( |
| State.Error, |
| new COMM_FAILURE( |
| MinorCodes.describeCommFailure(MinorCodes.MinorMessageError), |
| MinorCodes.MinorMessageError, |
| CompletionStatus.COMPLETED_NO), false); |
| } |
| |
| /** process a Fragment message */ |
| protected void processFragment(GIOPIncomingMessage msg) { |
| // At this point there should be no fragments, only complete messages. |
| Assert._OB_assert(false); |
| } |
| |
| /** process a system exception */ |
| protected boolean processException(int state,SystemException ex, boolean completed) { |
| Assert._OB_assert(state == State.Error || state == State.Closed); |
| |
| orbInstance_.getLogger().debug("processing an exception, state=" + state, ex); |
| |
| synchronized (this) { |
| // |
| // Don't do anything if there is no state change and it is |
| // not possible to transition backwards. |
| // |
| if (state <= state_) |
| return false; |
| |
| // |
| // update the state |
| // |
| state_ = state; |
| |
| // change the enabled/disable operations and break the cyclic dependency with GIOPClient |
| switch (state) { |
| case State.Error: |
| enabledOps_ &= ~(AccessOp.Read | AccessOp.Write); |
| enabledOps_ |= AccessOp.Close; |
| break; |
| |
| case State.Closed: |
| enabledOps_ = AccessOp.Nil; |
| break; |
| } |
| orbInstance_.getOutboundConnectionCache().remove(outboundConnectionKey, this); |
| |
| // |
| // propogate any exceptions to the message queue |
| // |
| messageQueue_.setException(state, ex, completed); |
| } |
| |
| // |
| // apply the shutdown |
| // |
| switch (state) { |
| case State.Error: |
| abortiveShutdown(); |
| break; |
| case State.Closed: |
| logClose(true); |
| transport_.close(); |
| break; |
| } |
| |
| // |
| // set 'this' properties |
| // |
| synchronized (this) { |
| properties_ |= Property.Destroyed; |
| } |
| |
| // |
| // update the connection status |
| // |
| refresh(); |
| return true; |
| } |
| |
| /** transmits a reply back once the upcall completes */ |
| protected void sendUpcallReply(org.apache.yoko.orb.OCI.Buffer buf) { |
| synchronized (this) { |
| // |
| // no need to do anything if we are closed |
| // |
| if (state_ == State.Closed) |
| return; |
| |
| // |
| // decrement the number of upcalls in progress |
| // |
| Assert._OB_assert(upcallsInProgress_ > 0); |
| upcallsInProgress_--; |
| |
| // |
| // add this message to the message Queue |
| // |
| messageQueue_.add(orbInstance_, buf); |
| } |
| |
| // |
| // refresh the connection status |
| // |
| refresh(); |
| |
| // |
| // if that was the last upcall and we are in the closing state |
| // then shutdown now |
| // |
| synchronized (this) { |
| if (upcallsInProgress_ == 0 && state_ == State.Closing) |
| gracefulShutdown(); |
| } |
| } |
| |
| /** shutdown the connection forcefully and immediately */ |
| abstract protected void abortiveShutdown(); |
| |
| /** shutdown the connection gracefully */ |
| abstract protected void gracefulShutdown(); |
| |
| /** turn on ACM idle connection monitoring */ |
| synchronized protected void ACM_enableIdleMonitor() { |
| if (idleTimeout_ > 0) { |
| acmTimer_ = new java.util.Timer(true); |
| acmTask_ = new ACMTask(this); |
| |
| acmTimer_.schedule(acmTask_, idleTimeout_ * 1000); |
| } |
| } |
| |
| /** turn off ACM idle connection monitoring */ |
| synchronized protected void ACM_disableIdleMonitor() { |
| if (acmTimer_ != null) { |
| acmTimer_.cancel(); |
| acmTimer_ = null; |
| } |
| |
| if (acmTask_ != null) { |
| acmTask_.cancel(); |
| acmTask_ = null; |
| } |
| } |
| |
| // ---------------------------------------------------------------- |
| // Public methods |
| // ---------------------------------------------------------------- |
| |
| /** client-side constructor */ |
| public GIOPConnection(ORBInstance orbInstance, Transport transport, GIOPClient client) { |
| // |
| // set member properties |
| // |
| nextRequestId = new AtomicInteger(0xA); |
| orbInstance_ = orbInstance; |
| transport_ = transport; |
| outboundConnectionKey = client.connectorInfo(); |
| state_ = State.Active; |
| properties_ = Property.CreatedByClient | Property.ClientEnabled; |
| enabledOps_ = AccessOp.Read | AccessOp.Write; |
| |
| // |
| // read ACM properties |
| // |
| String value; |
| java.util.Properties properties = orbInstance_.getProperties(); |
| |
| // |
| // the shutdown timeout for the client |
| // |
| value = properties.getProperty("yoko.orb.client_shutdown_timeout"); |
| if (value != null) |
| shutdownTimeout_ = Integer.parseInt(value); |
| |
| // |
| // the idle timeout for the client |
| // |
| value = properties.getProperty("yoko.orb.client_timeout"); |
| if (value != null) |
| idleTimeout_ = Integer.parseInt(value); |
| |
| // |
| // Trace new outgoing connection |
| // |
| CoreTraceLevels coreTraceLevels = orbInstance_.getCoreTraceLevels(); |
| if (coreTraceLevels.traceConnections() > 0) { |
| org.apache.yoko.orb.OCI.TransportInfo info = transport_.get_info(); |
| String msg = "new connection\n"; |
| msg += info.describe(); |
| orbInstance_.getLogger().trace("client-side", msg); |
| } |
| } |
| |
| /** server-side constructor */ |
| public GIOPConnection(ORBInstance orbInstance, Transport transport, OAInterface oa) { |
| // |
| // set members |
| // |
| nextRequestId = new AtomicInteger(0xB); |
| orbInstance_ = orbInstance; |
| transport_ = transport; |
| outboundConnectionKey = null; |
| oaInterface_ = oa; |
| properties_ = Property.ServerEnabled; |
| |
| // |
| // read ACM properties |
| // |
| String value; |
| java.util.Properties properties = orbInstance_.getProperties(); |
| |
| // |
| // the shutdown timeout for the client |
| // |
| value = properties.getProperty("yoko.orb.server_shutdown_timeout"); |
| if (value != null) |
| shutdownTimeout_ = Integer.parseInt(value); |
| |
| // |
| // the idle timeout for the client |
| // |
| value = properties.getProperty("yoko.orb.server_timeout"); |
| if (value != null) |
| idleTimeout_ = Integer.parseInt(value); |
| } |
| |
| /** @return true iff this connection was initiated by the other party */ |
| public final boolean isInbound() { |
| return (properties_ & Property.CreatedByClient) == 0; |
| } |
| |
| /** @return true iff this connection was initiated by this party */ |
| public final boolean isOutbound() { |
| return !!! isInbound(); |
| } |
| |
| /** @return the next request id to use */ |
| public int getNewRequestId() { |
| // In the case of BiDir connections, the client should use |
| // even numbered requestIds and the server should use odd |
| // numbered requestIds... the += 2 keeps this pattern intact |
| // assuming its correct at startup |
| return nextRequestId.getAndAdd(2); |
| } |
| |
| /** start populating the reply data */ |
| public void upcallBeginReply(Upcall upcall, org.omg.IOP.ServiceContext[] scl) { |
| upcall.createOutputStream(12); |
| org.apache.yoko.orb.CORBA.OutputStream out = upcall.output(); |
| org.apache.yoko.orb.OCI.ProfileInfo profileInfo = upcall.profileInfo(); |
| GIOPOutgoingMessage outgoing = new GIOPOutgoingMessage(orbInstance_, |
| out, profileInfo); |
| |
| int reqId = upcall.requestId(); |
| |
| try { |
| synchronized (this) { |
| outgoing.writeReplyHeader(reqId, |
| ReplyStatusType_1_2.NO_EXCEPTION, scl); |
| } |
| } catch (SystemException ex) { |
| // |
| // Nothing may go wrong here, otherwise we might have a |
| // recursion |
| // |
| Assert._OB_assert(ex); |
| } |
| } |
| |
| /** finished reply construction; ready its return */ |
| public void upcallEndReply(Upcall upcall) { |
| // |
| // Make sure the transport can send a reply |
| // |
| if (transport_.mode() == org.apache.yoko.orb.OCI.SendReceiveMode.ReceiveOnly) { |
| String msg = "Discarding reply - transport does not " |
| + "support twoway invocations"; |
| |
| msg += "\noperation name: \""; |
| msg += upcall.operation(); |
| msg += '"'; |
| |
| org.apache.yoko.orb.OCI.TransportInfo transportInfo = transport_ |
| .get_info(); |
| if (transportInfo != null) { |
| String desc = transportInfo.describe(); |
| msg += '\n'; |
| msg += desc; |
| } else { |
| msg += "\nCollocated method call"; |
| } |
| |
| Logger logger = orbInstance_.getLogger(); |
| logger.warning(msg); |
| |
| return; |
| } |
| |
| org.apache.yoko.orb.CORBA.OutputStream out = upcall.output(); |
| org.apache.yoko.orb.OCI.ProfileInfo profileInfo = upcall.profileInfo(); |
| GIOPOutgoingMessage outgoing = new GIOPOutgoingMessage(orbInstance_, |
| out, profileInfo); |
| |
| int pos = out._OB_pos(); |
| out._OB_pos(0); |
| try { |
| outgoing.writeMessageHeader(org.omg.GIOP.MsgType_1_1.Reply, false, |
| pos - 12); |
| } catch (SystemException ex) { |
| // |
| // Nothing may go wrong here, otherwise we might have a |
| // recursion |
| // |
| Assert._OB_assert(ex); |
| } |
| |
| sendUpcallReply(out._OB_buffer()); |
| } |
| |
| /** start populating the reply with a user exception */ |
| public void upcallBeginUserException(Upcall upcall, |
| org.omg.IOP.ServiceContext[] scl) { |
| upcall.createOutputStream(12); |
| |
| org.apache.yoko.orb.CORBA.OutputStream out = upcall.output(); |
| org.apache.yoko.orb.OCI.ProfileInfo profileInfo = upcall.profileInfo(); |
| GIOPOutgoingMessage outgoing = new GIOPOutgoingMessage(orbInstance_, |
| out, profileInfo); |
| |
| int reqId = upcall.requestId(); |
| |
| try { |
| outgoing.writeReplyHeader(reqId, |
| ReplyStatusType_1_2.USER_EXCEPTION, scl); |
| } catch (SystemException ex) { |
| // |
| // Nothing may go wrong here, otherwise we might have a |
| // recursion |
| // |
| Assert._OB_assert(ex); |
| } |
| } |
| |
| /** finished reply construction; ready its return */ |
| public void upcallEndUserException(Upcall upcall) { |
| upcallEndReply(upcall); |
| } |
| |
| /** populate and send the reply with a UserException */ |
| public void upcallUserException(Upcall upcall, |
| org.omg.CORBA.UserException ex, org.omg.IOP.ServiceContext[] scl) { |
| upcall.createOutputStream(12); |
| |
| org.apache.yoko.orb.CORBA.OutputStream out = upcall.output(); |
| org.apache.yoko.orb.OCI.ProfileInfo profileInfo = upcall.profileInfo(); |
| GIOPOutgoingMessage outgoing = new GIOPOutgoingMessage(orbInstance_, |
| out, profileInfo); |
| |
| int reqId = upcall.requestId(); |
| |
| try { |
| outgoing.writeReplyHeader(reqId, |
| ReplyStatusType_1_2.USER_EXCEPTION, scl); |
| |
| // |
| // Cannot marshal the exception without the Helper |
| // |
| // ex._OB_marshal(out); |
| Assert._OB_assert(false); |
| } catch (SystemException e) { |
| // |
| // Nothing may go wrong here, otherwise we might have a |
| // recursion |
| // |
| Assert._OB_assert(ex); |
| } |
| |
| upcallEndReply(upcall); |
| } |
| |
| /** populate and end the reply with a system exception */ |
| public void upcallSystemException(Upcall upcall, |
| SystemException ex, org.omg.IOP.ServiceContext[] scl) { |
| upcall.createOutputStream(12); |
| |
| org.apache.yoko.orb.CORBA.OutputStream out = upcall.output(); |
| org.apache.yoko.orb.OCI.ProfileInfo profileInfo = upcall.profileInfo(); |
| GIOPOutgoingMessage outgoing = new GIOPOutgoingMessage(orbInstance_, |
| out, profileInfo); |
| |
| int reqId = upcall.requestId(); |
| try { |
| // print this exception out here so applications have at stack trace to work |
| // cwith for problem determination. |
| |
| orbInstance_.getLogger().debug("upcall exception", ex); |
| outgoing.writeReplyHeader(reqId, |
| ReplyStatusType_1_2.SYSTEM_EXCEPTION, scl); |
| Util.marshalSystemException(out, ex); |
| } catch (SystemException e) { |
| // |
| // Nothing may go wrong here, otherwise we might have a |
| // recursion |
| // |
| Assert._OB_assert(ex); |
| } |
| |
| upcallEndReply(upcall); |
| } |
| |
| /** prepare the reply for location forwarding */ |
| public void upcallForward(Upcall upcall, IOR ior, boolean perm, |
| org.omg.IOP.ServiceContext[] scl) { |
| upcall.createOutputStream(12); |
| |
| org.apache.yoko.orb.CORBA.OutputStream out = upcall.output(); |
| org.apache.yoko.orb.OCI.ProfileInfo profileInfo = upcall.profileInfo(); |
| GIOPOutgoingMessage outgoing = new GIOPOutgoingMessage(orbInstance_, |
| out, profileInfo); |
| |
| int reqId = upcall.requestId(); |
| ReplyStatusType_1_2 status = perm ? ReplyStatusType_1_2.LOCATION_FORWARD_PERM |
| : ReplyStatusType_1_2.LOCATION_FORWARD; |
| try { |
| outgoing.writeReplyHeader(reqId, status, scl); |
| Logger logger = orbInstance_.getLogger(); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("Sending forward reply to " + IORDump.PrintObjref(orbInstance_.getORB(), ior)); |
| } |
| |
| IORHelper.write(out, ior); |
| } catch (SystemException ex) { |
| // |
| // Nothing may go wrong here, otherwise we might have a |
| // recursion |
| // |
| Assert._OB_assert(ex); |
| } |
| |
| upcallEndReply(upcall); |
| } |
| |
| /** enable this connection for processing as a client */ |
| synchronized public void activateClientSide() { |
| properties_ |= Property.ClientEnabled; |
| enableConnectionModes(true, true); |
| } |
| |
| /** enable this connection for processing as a server */ |
| synchronized public void activateServerSide() { |
| Assert._OB_assert((properties_ & Property.CreatedByClient) != 0); |
| |
| if ((properties_ & Property.ServerEnabled) == 0) { |
| properties_ |= Property.ServerEnabled; |
| enableConnectionModes(true, true); |
| } |
| } |
| |
| /** @return a reference to the DowncallEmitter interface */ |
| public DowncallEmitter emitterInterface() { |
| Assert._OB_assert((properties_ & Property.ClientEnabled) != 0); |
| return this; |
| } |
| |
| /** @return a reference to the UpcallReturn interface */ |
| public UpcallReturn upcallReturnInterface() { |
| Assert._OB_assert((properties_ & Property.ServerEnabled) != 0); |
| return this; |
| } |
| |
| /** return the transport we represent */ |
| public Transport transport() { |
| return transport_; |
| } |
| |
| /** get the state of this connection */ |
| synchronized public int state() { |
| Assert._OB_assert(state_ >= State.Active && state_ <= State.Closed); |
| return state_; |
| } |
| |
| /** check if a request has been sent yet */ |
| synchronized public boolean requestSent() { |
| return (properties_ & Property.RequestSent) != 0; |
| } |
| |
| /** check if a reply has been sent yet */ |
| synchronized public boolean replySent() { |
| return (properties_ & Property.ReplySent) != 0; |
| } |
| |
| /** check if this connection was already destroyed */ |
| synchronized public boolean destroyed() { |
| return (properties_ & Property.Destroyed) != 0; |
| } |
| |
| /** change the state of this connection */ |
| public void setState(int newState) { |
| synchronized (this) { |
| if (state_ == newState |
| || (state_ != State.Holding && newState < state_)) { |
| logger.fine("No state change from " +state_ + " to " + newState); |
| return; |
| } |
| |
| // |
| // make sure to update the state since some of the actions |
| // will key off this new state |
| // |
| state_ = newState; |
| } |
| |
| switch (newState) { |
| case State.Active: |
| |
| // |
| // set the new accessable operations |
| // |
| synchronized (this) { |
| enabledOps_ = AccessOp.Read | AccessOp.Write; |
| } |
| |
| // |
| // start and refresh the connection |
| start(); |
| refresh(); |
| break; |
| |
| case State.Holding: |
| |
| // |
| // holding connections can't read new messages but can write |
| // pending messages |
| // |
| synchronized (this) { |
| enabledOps_ &= ~AccessOp.Read; |
| } |
| |
| // |
| // pause the connection |
| // |
| pause(); |
| break; |
| |
| case State.Closing: |
| |
| // |
| // during the closing, the connection can read/write/close |
| // |
| synchronized (this) { |
| enabledOps_ = AccessOp.All; |
| } |
| |
| // |
| // gracefully shutdown by sending off pending messages, |
| // reading any messages left on the wire and then closing |
| // |
| gracefulShutdown(); |
| |
| // |
| // refresh this status |
| // |
| refresh(); |
| break; |
| |
| case State.Error: |
| |
| // |
| // we can't read or write in the error state but we can |
| // close ourself down |
| // |
| synchronized (this) { |
| enabledOps_ = AccessOp.Close; |
| } |
| |
| // |
| // there is an error so shutdown abortively |
| // |
| abortiveShutdown(); |
| |
| // |
| // mark the connection as destroyed now |
| // |
| synchronized (this) { |
| properties_ |= Property.Destroyed; |
| } |
| |
| // |
| // refresh the connection status |
| // |
| refresh(); |
| break; |
| |
| case State.Closed: |
| |
| // |
| // once closed, nothing else can take place |
| // |
| synchronized (this) { |
| enabledOps_ = AccessOp.Nil; |
| } |
| |
| // |
| // log the connection closure |
| // |
| logClose(true); |
| |
| // |
| // close the transport |
| // |
| transport_.close(); |
| |
| // |
| // mark the connection as destroyed |
| // |
| synchronized (this) { |
| properties_ |= Property.Destroyed; |
| } |
| |
| // |
| // and refresh the connection |
| // |
| refresh(); |
| break; |
| |
| default: |
| Assert._OB_assert(false); |
| break; |
| } |
| } |
| |
| /** destroy this connection */ |
| public void destroy() { |
| setState(State.Closing); |
| } |
| |
| /** callback method when the ACM signals a timeout */ |
| abstract public void ACM_callback(); |
| |
| /** activate the connection */ |
| abstract public void start(); |
| |
| /** refresh the connection status after a change in internal state */ |
| abstract public void refresh(); |
| |
| /** tell the connection to stop processing; resumable with a refresh() */ |
| abstract public void pause(); |
| |
| /** change the connection mode to [client, server, both] */ |
| abstract public void enableConnectionModes(boolean enableClient, boolean enableServer); |
| } |