blob: 5ad6d38be3a2e26d019791889f405b316d41c36f [file] [log] [blame]
/* $Id$
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.etch.util.core.io;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.SocketException;
import org.apache.etch.util.Monitor;
import org.apache.etch.util.Runner;
import org.apache.etch.util.RunnerHandler;
import org.apache.etch.util.Todo;
import org.apache.etch.util.TodoManager;
/**
* Implementation of runner which handles a network connection.
* @param <H> the event handler type.
*/
abstract public class Connection<H extends Session>
extends Runner implements Transport<H>, RunnerHandler
{
/////////////
// QUERIES //
/////////////
/** Source query to get the local address. */
public static final String LOCAL_ADDRESS = "LOCAL_ADDRESS";
/** Source query to get the remote address. */
public static final String REMOTE_ADDRESS = "REMOTE_ADDRESS";
////////////
// EVENTS //
////////////
/**
* Host name to specify to select listening on all interfaces.
* The value is "0.0.0.0".
*/
public static final String ALL_INTFS = "0.0.0.0";
/**
* @param s
* @return null if s is null or ALL_INTFS ("0.0.0.0").
*/
protected static String translateHost( String s )
{
if (s != null && s.equals( ALL_INTFS ))
return null;
return s;
}
/**
* Constructs the Connection.
*/
public Connection()
{
setHandler( this );
}
public void started()
{
// ignore
}
public void stopped()
{
// ignore
}
public void exception( String what, final Exception e )
{
TodoManager.addTodo( new Todo()
{
public void doit( TodoManager m ) throws Exception
{
session.sessionNotify( e );
}
public void exception( TodoManager mgr, Exception ex )
{
e.printStackTrace();
if (ex != e)
ex.printStackTrace();
}
} );
}
@Override
protected boolean run0( boolean first ) throws Exception
{
boolean ok = openSocket( !first );
// Log.report( "openSocketDone", "result", ok, "c", this, "t", Thread.currentThread() );
if (!ok)
return false;
try
{
setupSocket();
// Log.report( "setupSocketDone", "c", this );
}
catch ( Exception e )
{
fireException( "setup", e );
close( true );
return true;
}
try
{
// Log.report( "beforeFireUp", "c", this );
fireUp();
readSocket();
// Log.report( "readSocketDone", "c", this );
return true;
}
catch ( SocketException e )
{
// Log.report( "readSocketDone", "c", this, "e", e );
if ("socket closed".equalsIgnoreCase( e.getMessage() ))
return true;
fireException( "run", e );
close( true );
return true;
}
catch ( Exception e )
{
fireException( "run", e );
close( true );
return true;
}
finally
{
// Log.report( "beforeFireDown", "c", this );
fireDown();
close( false );
// Log.report( "closeDone", "c", this );
}
}
/**
* Opens the socket of a connection.
* @param reconnect true if we are trying to reconnect, false if this
* is the first time.
* @return true if we should reconnect, false if we should stop.
* @throws Exception
*/
abstract protected boolean openSocket( boolean reconnect ) throws Exception;
/**
* Sets up a newly opened socket. This may involve setting socket
* options and opening input and output streams.
* @throws Exception
*/
abstract protected void setupSocket() throws Exception;
/**
* Performs the usual and customary operations on a socket, such
* as read or accept.
* @throws Exception
*/
abstract protected void readSocket() throws Exception;
/**
* Terminates operations on the socket.
* @param reset true if the socket should be terminated immediately.
* False if the pending output data should be allowed to linger.
* @throws Exception
*/
abstract public void close( boolean reset ) throws Exception;
/**
* Terminates operations on the socket. Same as close( false );
* @throws Exception
* @see #close(boolean)
*/
public void close() throws Exception
{
close( false );
}
public Object transportQuery( Object query ) throws Exception
{
if (query == LOCAL_ADDRESS)
return localAddress();
if (query == REMOTE_ADDRESS)
return remoteAddress();
if (query instanceof WaitUp)
{
waitUp( ((WaitUp) query).maxDelay );
return null;
}
if (query instanceof WaitDown)
{
waitDown( ((WaitDown) query).maxDelay );
return null;
}
throw new UnsupportedOperationException( "unknown query: "+query );
}
/**
* @return the local address
* @throws IOException
*/
abstract public SocketAddress localAddress() throws IOException;
/**
* @return the remote address
* @throws IOException
*/
abstract public SocketAddress remoteAddress() throws IOException;
public void transportControl( Object control, Object value ) throws Exception
{
if (control == START)
{
start();
return;
}
if (control == START_AND_WAIT_UP)
{
start();
waitUp( (Integer) value );
return;
}
if (control == STOP)
{
stop();
return;
}
if (control == STOP_AND_WAIT_DOWN)
{
stop();
waitDown( (Integer) value );
return;
}
if (control == RESET)
{
close( true );
return;
}
throw new UnsupportedOperationException( "unknown control: "+control );
}
public void transportNotify( Object event ) throws Exception
{
// nothing to do.
}
private void fireUp() throws Exception
{
status.set( Session.UP );
TodoManager.addTodo( new Todo()
{
public void doit( TodoManager m ) throws Exception
{
session.sessionNotify( Session.UP );
}
public void exception( TodoManager mgr, Exception e )
{
e.printStackTrace();
}
} );
}
private void fireDown() throws Exception
{
status.set( Session.DOWN );
TodoManager.addTodo( new Todo()
{
public void doit( TodoManager m ) throws Exception
{
session.sessionNotify( Session.DOWN );
}
public void exception( TodoManager mgr, Exception e )
{
e.printStackTrace();
}
} );
}
public H getSession()
{
return session;
}
public void setSession( H session )
{
this.session = session;
}
/**
* The session for the connection.
*/
protected H session;
/**
* Waits until the connection is up.
* @param maxDelay time in milliseconds to wait.
* @throws Exception
*/
public void waitUp( int maxDelay ) throws Exception
{
status.waitUntilEq( Session.UP, maxDelay );
}
/**
* @param maxDelay time in milliseconds to wait.
* @throws Exception
*/
public void waitDown( int maxDelay ) throws Exception
{
status.waitUntilEq( Session.DOWN, maxDelay );
}
private final Monitor<String> status = new Monitor<String>( "status", Session.DOWN );
}