blob: 8ae9929fa4818d50b1a3e1cf03de7c40092107ac [file] [log] [blame]
/* $Id$
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.etch.bindings.java.transport;
import org.apache.etch.bindings.java.msg.Message;
import org.apache.etch.bindings.java.msg.Type;
import org.apache.etch.bindings.java.support.DeliveryService;
import org.apache.etch.bindings.java.support.Mailbox;
import org.apache.etch.util.Monitor;
import org.apache.etch.util.Resources;
import org.apache.etch.util.TimeoutException;
import org.apache.etch.util.URL;
import org.apache.etch.util.core.Who;
/**
* Default implementation of DeliveryService.
*/
public class DefaultDeliveryService implements DeliveryService
{
/**
* Parameter to Globally Disable Timeout.
*/
public final static String DISABLE_TIMEOUT = "DefaultDeliveryService.disableTimeout";
private boolean disableTimeout = false;
/**
* @param transport
* @param uri
* @param resources
*/
public DefaultDeliveryService( MailboxManager transport, String uri,
Resources resources )
{
this( transport, new URL( uri ), resources );
}
/**
* @param transport
* @param uri
* @param resources
*/
public DefaultDeliveryService( MailboxManager transport, URL uri,
Resources resources )
{
this.transport = transport;
transport.setSession( this );
disableTimeout = uri.getBooleanTerm( DISABLE_TIMEOUT , false );
}
private final MailboxManager transport;
/**
* @return the transport.
*/
public MailboxManager getTransport()
{
return transport;
}
public SessionMessage getSession()
{
return session;
}
public void setSession( SessionMessage session )
{
if (this.session != null)
throw new UnsupportedOperationException( "only one stub for now" );
this.session = session;
}
private SessionMessage session;
public Object sessionQuery( Object query ) throws Exception
{
return session.sessionQuery( query );
}
public void sessionControl( Object control, Object value ) throws Exception
{
session.sessionControl( control, value );
}
public void sessionNotify( Object event ) throws Exception
{
if (event == UP)
status.set( UP );
else if (event == DOWN)
status.set( DOWN );
session.sessionNotify( event );
}
private final Monitor<String> status = new Monitor<String>( "session status" );
public boolean sessionMessage( Who sender, Message msg ) throws Exception
{
return session.sessionMessage( sender, msg );
}
@Override
public String toString()
{
return transport.toString();
}
private void waitUp( int maxDelay ) throws Exception
{
status.waitUntilEq( UP, maxDelay );
}
private void waitDown( int maxDelay ) throws Exception
{
status.waitUntilEq( DOWN, maxDelay );
}
public void transportMessage( Who recipient, Message msg ) throws Exception
{
transport.transportMessage( recipient, msg );
}
public Object transportQuery( Object query ) throws Exception
{
if (query.getClass() == WaitUp.class)
{
waitUp( ((WaitUp) query).maxDelay );
return null;
}
else if (query.getClass() == WaitDown.class)
{
waitDown( ((WaitDown) query).maxDelay );
return null;
}
else
{
return transport.transportQuery( query );
}
}
public void transportControl( Object control, Object value ) throws Exception
{
if (control == START_AND_WAIT_UP)
{
transport.transportControl( START, null );
waitUp( (Integer) value );
}
else if (control == STOP_AND_WAIT_DOWN)
{
transport.transportControl( STOP, null );
waitDown( (Integer) value );
}
else
{
transport.transportControl( control, value );
}
}
public void transportNotify( Object event ) throws Exception
{
transport.transportNotify( event );
}
public Mailbox begincall( Message msg )
{
try
{
return transport.transportCall( null, msg );
}
catch ( Exception e )
{
throw new RuntimeException( "unexpected exception sending message", e );
}
}
public Object endcall( Mailbox mb, Type responseType ) throws Exception
{
try
{
int timeout = disableTimeout ? 0 : responseType.getTimeout();
Mailbox.Element mbe = mb.read( timeout);
if (mbe == null)
throw new TimeoutException( "timeout waiting for "+responseType );
Message rmsg = mbe.msg;
rmsg.checkType( responseType );
Object r = rmsg.get( responseType.getResponseField() );
if (r instanceof Exception)
{
Exception e = (Exception) r;
e.fillInStackTrace();
throw e;
}
return r;
}
finally
{
mb.closeRead();
}
}
}