blob: af5f69622314d1a3a7e3abb6827d8a1f8df6348b [file] [log] [blame]
package org.apache.jcs.auxiliary.lateral.socket.tcp;
/*
* Copyright 2001-2004 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.Socket;
import org.apache.jcs.auxiliary.lateral.LateralCacheInfo;
import org.apache.jcs.auxiliary.lateral.LateralElementDescriptor;
import org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheAttributes;
import org.apache.jcs.auxiliary.lateral.socket.tcp.utils.SocketOpener;
import org.apache.jcs.engine.CacheElement;
import org.apache.jcs.engine.behavior.ICacheElement;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* This class is based on the log4j SocketAppender class. I'm using a differnet
* repair structure, so it is significant;y different.
*
* @version $Id$
*/
public class LateralTCPSender
{
private final static Log log =
LogFactory.getLog( LateralTCPSender.class );
private ILateralCacheAttributes ilca;
private String remoteHost;
private InetAddress address;
int port = 1111;
private ObjectOutputStream oos;
private Socket socket;
int counter = 0;
private int sendCnt = 0;
// reset the ObjectOutputStream every 70 calls
//private static final int RESET_FREQUENCY = 70;
private final static int RESET_FREQUENCY = 70;
/**
* Only block for 10 seconds before timing out on a read. TODO: make
* configurable. The default 10 it way too long.
*/
private final static int timeOut = 10000;
/** Only block for 5 seconds before timing out on startup. */
private final static int openTimeOut = 5000;
/** Use to synchronize multiple threads that may be trying to get.*/
private Object getLock = new int[0];
/**
* Constructor for the LateralTCPSender object
*
* @param lca
* @exception IOException
*/
public LateralTCPSender( ILateralCacheAttributes lca )
throws IOException
{
String p1 = lca.getTcpServer();
String h2 = p1.substring( 0, p1.indexOf( ":" ) );
int po = Integer.parseInt( p1.substring( p1.indexOf( ":" ) + 1 ) );
if ( log.isDebugEnabled() )
{
log.debug( "h2 = " + h2 );
log.debug( "po = " + po );
}
if ( h2 == null )
{
throw new IOException( "Cannot connect to invalid address " + h2 + ":" + po );
}
init( h2, po );
this.ilca = lca;
}
/** Description of the Method */
protected void init( String host, int port )
throws IOException
{
this.port = port;
this.address = getAddressByName( host );
this.remoteHost = host;
try
{
log.debug( "Attempting connection to " + address.getHostName() );
//socket = new Socket( address, port );
// have time out socket open do this for us
socket = SocketOpener.openSocket( host, port, openTimeOut );
if ( socket == null )
{
throw new IOException( "Socket is null" );
}
socket.setSoTimeout( LateralTCPSender.timeOut );
synchronized ( this )
{
oos = new ObjectOutputStream( socket.getOutputStream() );
}
}
catch ( java.net.ConnectException e )
{
log.debug( "Remote host " + address.getHostName() + " refused connection." );
throw e;
}
catch ( IOException e )
{
log.debug( "Could not connect to " + address.getHostName() +
". Exception is " + e );
throw e;
}
}
// end constructor
/**
* Gets the addressByName attribute of the LateralTCPSender object
*
* @return The addressByName value
*/
private InetAddress getAddressByName( String host )
throws IOException
{
try
{
return InetAddress.getByName( host );
}
catch ( Exception e )
{
log.error( "Could not find address of [" + host + "]", e );
throw new IOException( "Could not find address of [" + host + "]" + e.getMessage() );
//return null;
}
}
/** Sends commands to the lateral cache listener. */
public void send( LateralElementDescriptor led )
throws IOException
{
sendCnt++;
if ( log.isInfoEnabled() ) {
if ( sendCnt % 100 == 0 ) {
log.info( "Send Count = " + sendCnt );
}
}
log.debug( "sending LateralElementDescriptor" );
if ( led == null )
{
return;
}
if ( address == null )
{
throw new IOException( "No remote host is set for LateralTCPSender." );
//return;
}
if ( oos != null )
{
try
{
oos.writeObject( led );
oos.flush();
if ( ++counter >= RESET_FREQUENCY )
{
counter = 0;
// Failing to reset the object output stream every now and
// then creates a serious memory leak.
log.debug( "Doing oos.reset()" );
oos.reset();
}
}
catch ( IOException e )
{
oos = null;
log.error( "Detected problem with connection: " + e );
throw e;
}
}
}
/**
* Sends commands to the lateral cache listener and gets a response. I'm
* afraid that we could get into a pretty bad blocking situation here. This
* needs work. I just wanted to get some form of get working. However, get
* is not recommended for performance reasons. If you have 10 laterals, then
* you have to make 10 failed gets to find out none of the caches have the
* item.
*/
public ICacheElement sendAndReceive( LateralElementDescriptor led )
throws IOException
{
ICacheElement ice = null;
if ( led == null )
{
return null;
}
if ( address == null )
{
throw new IOException( "No remote host is set for LateralTCPSender." );
}
if ( oos != null )
{
// Synchronized to insure that the get requests to server from this
// sender and the responses are processed in order, else you could
// return the wrong item from the cache.
// This is a big block of code. May need to rethink this strategy.
// This may not be necessary.
// Normal puts, etc to laterals do not have to be synchronized.
synchronized ( this.getLock )
{
try
{
try
{
// clean up input stream, nothing should be there yet.
if ( socket.getInputStream().available() > 0 )
{
socket.getInputStream().read( new byte[socket.getInputStream().available()] );
}
}
catch ( IOException ioe )
{
log.error( "Problem cleaning socket before send " + socket, ioe );
throw ioe;
}
// write object to listener
oos.writeObject( led );
oos.flush();
try
{
ObjectInputStream ois = new ObjectInputStream( socket.getInputStream() );
Object obj = ois.readObject();
ice = ( ICacheElement ) obj;
if ( ice == null )
{
//p( "ice is null" );
// TODO: count misses
}
}
catch ( IOException ioe )
{
log.error( "Could not open ObjectInputStream to " + socket, ioe );
throw ioe;
}
catch ( Exception e )
{
log.error( e );
}
if ( ++counter >= RESET_FREQUENCY )
{
counter = 0;
// Failing to reset the object output stream every now and
// then creates a serious memory leak.
log.info( "Doing oos.reset()" );
oos.reset();
}
}
catch ( IOException e )
{
oos = null;
log.error( "Detected problem with connection: " + e );
throw e;
}
}
} // end synchronized block
return ice;
}// end sendAndReceive
// Service Methods //
/** Description of the Method */
public void update( ICacheElement item, long requesterId )
throws IOException
{
LateralElementDescriptor led = new LateralElementDescriptor( item );
led.requesterId = requesterId;
led.command = LateralElementDescriptor.UPDATE;
send( led );
}
/** Description of the Method */
public void remove( String cacheName, Serializable key )
throws IOException
{
remove( cacheName, key, LateralCacheInfo.listenerId );
}
/** Description of the Method */
public void remove( String cacheName, Serializable key, long requesterId )
throws IOException
{
CacheElement ce = new CacheElement( cacheName, key, null );
LateralElementDescriptor led = new LateralElementDescriptor( ce );
led.requesterId = requesterId;
led.command = LateralElementDescriptor.REMOVE;
send( led );
}
/** Description of the Method */
public void release()
throws IOException
{
// nothing needs to be done
}
/**
* Closes connection used by all LateralTCPSenders for this lateral
* conneciton. Dispose request should come into the facade and be sent to
* all lateral cache sevices. The lateral cache service will then call this
* method.
*/
public void dispose( String cache )
throws IOException
{
// WILL CLOSE CONNECTION USED BY ALL
oos.close();
}
/** Description of the Method */
public void removeAll( String cacheName )
throws IOException
{
removeAll( cacheName, LateralCacheInfo.listenerId );
}
/** Description of the Method */
public void removeAll( String cacheName, long requesterId )
throws IOException
{
CacheElement ce = new CacheElement( cacheName, "ALL", null );
LateralElementDescriptor led = new LateralElementDescriptor( ce );
led.requesterId = requesterId;
led.command = LateralElementDescriptor.REMOVEALL;
send( led );
}
/** Description of the Method */
public static void main( String args[] )
{
try
{
LateralTCPSender lur = null;
//new LateralTCPSender( "localhost", 1111 );
// process user input till done
boolean notDone = true;
String message = null;
// wait to dispose
BufferedReader br = new BufferedReader( new InputStreamReader( System.in ) );
while ( notDone )
{
System.out.println( "enter mesage:" );
message = br.readLine();
CacheElement ce = new CacheElement( "test", "test", message );
LateralElementDescriptor led = new LateralElementDescriptor( ce );
lur.send( led );
}
}
catch ( Exception e )
{
System.out.println( e.toString() );
}
}
}
// end class