blob: 3d1a9e8d2e446712ba63f0c32a9ff4dc50808a48 [file] [log] [blame]
/*
* Copyright 1999-2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jk.common;
import java.net.URLEncoder;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import javax.management.ObjectName;
import org.apache.commons.modeler.Registry;
import org.apache.jk.core.JkHandler;
import org.apache.jk.core.Msg;
import org.apache.jk.core.MsgContext;
import org.apache.jk.core.JkChannel;
import org.apache.jk.core.WorkerEnv;
import org.apache.coyote.Request;
import org.apache.coyote.RequestGroupInfo;
import org.apache.coyote.RequestInfo;
import org.apache.tomcat.util.threads.ThreadPool;
import org.apache.tomcat.util.threads.ThreadPoolRunnable;
/** Pass messages using unix domain sockets.
*
* @author Costin Manolache
*/
public class ChannelUn extends JniHandler implements JkChannel {
static final int CH_OPEN=4;
static final int CH_CLOSE=5;
static final int CH_READ=6;
static final int CH_WRITE=7;
String file;
ThreadPool tp = ThreadPool.createThreadPool(true);
/* ==================== Tcp socket options ==================== */
public ThreadPool getThreadPool() {
return tp;
}
public void setFile( String f ) {
file=f;
}
public String getFile() {
return file;
}
/* ==================== ==================== */
int socketNote=1;
int isNote=2;
int osNote=3;
int localId=0;
public void init() throws IOException {
if( file==null ) {
log.debug("No file, disabling unix channel");
return;
//throw new IOException( "No file for the unix socket channel");
}
if( wEnv!=null && wEnv.getLocalId() != 0 ) {
localId=wEnv.getLocalId();
}
if( localId != 0 ) {
file=file+ localId;
}
File socketFile=new File( file );
if( !socketFile.isAbsolute() ) {
String home=wEnv.getJkHome();
if( home==null ) {
log.debug("No jkhome");
} else {
File homef=new File( home );
socketFile=new File( homef, file );
log.debug( "Making the file absolute " +socketFile);
}
}
if( ! socketFile.exists() ) {
try {
FileOutputStream fos=new FileOutputStream(socketFile);
fos.write( 1 );
fos.close();
} catch( Throwable t ) {
log.error("Attempting to create the file failed, disabling channel"
+ socketFile);
return;
}
}
// The socket file cannot be removed ...
if (!socketFile.delete()) {
log.error( "Can't remove socket file " + socketFile);
return;
}
super.initNative( "channel.un:" + file );
if( apr==null || ! apr.isLoaded() ) {
log.debug("Apr is not available, disabling unix channel ");
apr=null;
return;
}
// Set properties and call init.
setNativeAttribute( "file", file );
// unixListenSocket=apr.unSocketListen( file, 10 );
setNativeAttribute( "listen", "10" );
// setNativeAttribute( "debug", "10" );
// Initialize the thread pool and execution chain
if( next==null && wEnv!=null ) {
if( nextName!=null )
setNext( wEnv.getHandler( nextName ) );
if( next==null )
next=wEnv.getHandler( "dispatch" );
if( next==null )
next=wEnv.getHandler( "request" );
}
super.initJkComponent();
JMXRequestNote =wEnv.getNoteId( WorkerEnv.ENDPOINT_NOTE, "requestNote");
// Run a thread that will accept connections.
if( this.domain != null ) {
try {
tpOName=new ObjectName(domain + ":type=ThreadPool,name=" +
getChannelName());
Registry.getRegistry(null, null)
.registerComponent(tp, tpOName, null);
rgOName = new ObjectName
(domain+":type=GlobalRequestProcessor,name=" + getChannelName());
Registry.getRegistry(null, null)
.registerComponent(global, rgOName, null);
} catch (Exception e) {
log.error("Can't register threadpool" );
}
}
tp.start();
AprAcceptor acceptAjp=new AprAcceptor( this );
tp.runIt( acceptAjp);
log.info("JK: listening on unix socket: " + file );
}
ObjectName tpOName;
ObjectName rgOName;
RequestGroupInfo global=new RequestGroupInfo();
int count = 0;
int JMXRequestNote;
public void start() throws IOException {
}
public void destroy() throws IOException {
if( apr==null ) return;
try {
if( tp != null )
tp.shutdown();
//apr.unSocketClose( unixListenSocket,3);
super.destroyJkComponent();
if(tpOName != null) {
Registry.getRegistry().unregisterComponent(tpOName);
}
if(rgOName != null) {
Registry.getRegistry().unregisterComponent(rgOName);
}
} catch(Exception e) {
log.error("Error in destroy",e);
}
}
public void registerRequest(Request req, MsgContext ep, int count) {
if(this.domain != null) {
try {
RequestInfo rp=req.getRequestProcessor();
rp.setGlobalProcessor(global);
ObjectName roname = new ObjectName
(getDomain() + ":type=RequestProcessor,worker="+
getChannelName()+",name=JkRequest" +count);
ep.setNote(JMXRequestNote, roname);
Registry.getRegistry().registerComponent( rp, roname, null);
} catch( Exception ex ) {
log.warn("Error registering request");
}
}
}
/** Open a connection - since we're listening that will block in
accept
*/
public int open(MsgContext ep) throws IOException {
// Will associate a jk_endpoint with ep and call open() on it.
// jk_channel_un will accept a connection and set the socket info
// in the endpoint. MsgContext will represent an active connection.
return super.nativeDispatch( ep.getMsg(0), ep, CH_OPEN, 1 );
}
public void close(MsgContext ep) throws IOException {
super.nativeDispatch( ep.getMsg(0), ep, CH_CLOSE, 1 );
}
public int send( Msg msg, MsgContext ep)
throws IOException
{
return super.nativeDispatch( msg, ep, CH_WRITE, 0 );
}
public int receive( Msg msg, MsgContext ep )
throws IOException
{
int rc=super.nativeDispatch( msg, ep, CH_READ, 1 );
if( rc!=0 ) {
log.error("receive error: " + rc, new Throwable());
return -1;
}
msg.processHeader();
if (log.isDebugEnabled())
log.debug("receive: total read = " + msg.getLen());
return msg.getLen();
}
public int flush( Msg msg, MsgContext ep) throws IOException {
return OK;
}
public boolean isSameAddress( MsgContext ep ) {
return false; // Not supporting shutdown on this channel.
}
boolean running=true;
/** Accept incoming connections, dispatch to the thread pool
*/
void acceptConnections() {
if( apr==null ) return;
if( log.isDebugEnabled() )
log.debug("Accepting ajp connections on " + file);
while( running ) {
try {
MsgContext ep=this.createMsgContext();
// blocking - opening a server connection.
int status=this.open(ep);
if( status != 0 && status != 2 ) {
log.error( "Error acceptin connection on " + file );
break;
}
// if( log.isDebugEnabled() )
// log.debug("Accepted ajp connections ");
AprConnection ajpConn= new AprConnection(this, ep);
tp.runIt( ajpConn );
} catch( Exception ex ) {
ex.printStackTrace();
}
}
}
/** Process a single ajp connection.
*/
void processConnection(MsgContext ep) {
if( log.isDebugEnabled() )
log.debug( "New ajp connection ");
try {
MsgAjp recv=new MsgAjp();
while( running ) {
int res=this.receive( recv, ep );
if( res<0 ) {
// EOS
break;
}
ep.setType(0);
log.debug( "Process msg ");
int status=next.invoke( recv, ep );
}
if( log.isDebugEnabled() )
log.debug( "Closing un channel");
try{
Request req = (Request)ep.getRequest();
if( req != null ) {
ObjectName roname = (ObjectName)ep.getNote(JMXRequestNote);
Registry.getRegistry().unregisterComponent(roname);
req.getRequestProcessor().setGlobalProcessor(null);
}
} catch( Exception ee) {
log.error( "Error, releasing connection",ee);
}
this.close( ep );
} catch( Exception ex ) {
ex.printStackTrace();
}
}
public int invoke( Msg msg, MsgContext ep ) throws IOException {
int type=ep.getType();
switch( type ) {
case JkHandler.HANDLE_RECEIVE_PACKET:
return receive( msg, ep );
case JkHandler.HANDLE_SEND_PACKET:
return send( msg, ep );
case JkHandler.HANDLE_FLUSH:
return flush( msg, ep );
}
// return next.invoke( msg, ep );
return OK;
}
public String getChannelName() {
String encodedAddr = "";
String address = file;
if (address != null) {
encodedAddr = "" + address;
if (encodedAddr.startsWith("/"))
encodedAddr = encodedAddr.substring(1);
encodedAddr = URLEncoder.encode(encodedAddr) ;
}
return ("jk-" + encodedAddr);
}
private static org.apache.commons.logging.Log log=
org.apache.commons.logging.LogFactory.getLog( ChannelUn.class );
}
class AprAcceptor implements ThreadPoolRunnable {
ChannelUn wajp;
AprAcceptor(ChannelUn wajp ) {
this.wajp=wajp;
}
public Object[] getInitData() {
return null;
}
public void runIt(Object thD[]) {
wajp.acceptConnections();
}
}
class AprConnection implements ThreadPoolRunnable {
ChannelUn wajp;
MsgContext ep;
AprConnection(ChannelUn wajp, MsgContext ep) {
this.wajp=wajp;
this.ep=ep;
}
public Object[] getInitData() {
return null;
}
public void runIt(Object perTh[]) {
wajp.processConnection(ep);
}
}