| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.catalina.cluster.tcp; |
| |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.zip.GZIPInputStream; |
| |
| import javax.management.MBeanServer; |
| import javax.management.ObjectName; |
| |
| import org.apache.catalina.Container; |
| import org.apache.catalina.cluster.CatalinaCluster; |
| import org.apache.catalina.cluster.ClusterMessage; |
| import org.apache.catalina.cluster.ClusterReceiver; |
| import org.apache.catalina.cluster.io.ListenCallback; |
| import org.apache.catalina.cluster.session.ClusterSessionListener; |
| import org.apache.catalina.cluster.session.ReplicationStream; |
| import org.apache.catalina.core.StandardHost; |
| import org.apache.catalina.util.StringManager; |
| |
| /** |
| * FIXME i18n log messages |
| * @author Peter Rossbach |
| * @version $Revision$ $Date$ |
| */ |
| |
| public abstract class ClusterReceiverBase implements Runnable, ClusterReceiver,ListenCallback { |
| |
| protected static org.apache.commons.logging.Log log = |
| org.apache.commons.logging.LogFactory.getLog( ClusterReceiverBase.class ); |
| |
| /** |
| * The string manager for this package. |
| */ |
| protected StringManager sm = StringManager.getManager(Constants.Package); |
| |
| private CatalinaCluster cluster; |
| private java.net.InetAddress bind; |
| private String tcpListenAddress; |
| private int tcpListenPort; |
| private boolean sendAck; |
| protected boolean doListen = false; |
| |
| /** |
| * total bytes to recevied |
| */ |
| protected long totalReceivedBytes = 0; |
| |
| /** |
| * doProcessingStats |
| */ |
| protected boolean doReceivedProcessingStats = false; |
| |
| /** |
| * proessingTime |
| */ |
| protected long receivedProcessingTime = 0; |
| |
| /** |
| * min proessingTime |
| */ |
| protected long minReceivedProcessingTime = Long.MAX_VALUE ; |
| |
| /** |
| * max proessingTime |
| */ |
| protected long maxReceivedProcessingTime = 0; |
| |
| /** |
| * Sending Stats |
| */ |
| private long nrOfMsgsReceived = 0; |
| |
| private long receivedTime = 0; |
| |
| private long lastChecked = System.currentTimeMillis(); |
| |
| |
| /** |
| * Compress message data bytes |
| */ |
| private boolean compress = true ; |
| |
| /** |
| * Transmitter Mbean name |
| */ |
| private ObjectName objectName; |
| |
| /** |
| * @return Returns the doListen. |
| */ |
| public boolean isDoListen() { |
| return doListen; |
| } |
| |
| /** |
| * @return Returns the bind. |
| */ |
| public java.net.InetAddress getBind() { |
| if (bind == null) { |
| try { |
| if (log.isDebugEnabled()) |
| log.debug("Starting replication listener on address:" |
| + tcpListenAddress); |
| bind = java.net.InetAddress.getByName(tcpListenAddress); |
| } catch (IOException ioe) { |
| log.error("Failed bind replication listener on address:" |
| + tcpListenAddress, ioe); |
| } |
| } |
| return bind; |
| } |
| |
| /** |
| * @param bind The bind to set. |
| */ |
| public void setBind(java.net.InetAddress bind) { |
| this.bind = bind; |
| } |
| public void setCatalinaCluster(CatalinaCluster cluster) { |
| this.cluster = cluster; |
| } |
| |
| public CatalinaCluster getCatalinaCluster() { |
| return (CatalinaCluster) cluster; |
| } |
| |
| /** |
| * set Receiver ObjectName |
| * |
| * @param name |
| */ |
| public void setObjectName(ObjectName name) { |
| objectName = name; |
| } |
| |
| /** |
| * Receiver ObjectName |
| * |
| */ |
| public ObjectName getObjectName() { |
| return objectName; |
| } |
| |
| /** |
| * @return Returns the compress. |
| */ |
| public boolean isCompress() { |
| return compress; |
| } |
| |
| /** |
| * @param compressMessageData The compress to set. |
| */ |
| public void setCompress(boolean compressMessageData) { |
| this.compress = compressMessageData; |
| } |
| |
| /** |
| * Send ACK to sender |
| * |
| * @return True if sending ACK |
| */ |
| public boolean isSendAck() { |
| return sendAck; |
| } |
| |
| /** |
| * set ack mode or not! |
| * |
| * @param sendAck |
| */ |
| public void setSendAck(boolean sendAck) { |
| this.sendAck = sendAck; |
| } |
| |
| /** |
| * get tcp listen recevier ip address |
| * @return listen address |
| */ |
| public String getTcpListenAddress() { |
| return tcpListenAddress; |
| } |
| |
| /** |
| * Set TCP listen ip address. If arg auto use InetAddress.getLocalHost() |
| * otherwise arg converted with InetAddress.getByName(). |
| * |
| * @param tcpListenAddress |
| */ |
| public void setTcpListenAddress(String tcpListenAddress) { |
| try { |
| if ("auto".equals(tcpListenAddress)) { |
| this.tcpListenAddress = |
| java.net.InetAddress.getLocalHost().getHostAddress(); |
| } else { |
| this.tcpListenAddress = |
| java.net.InetAddress.getByName(tcpListenAddress).getHostAddress(); |
| } |
| if (log.isDebugEnabled()) |
| log.debug("Set replication listener on address:" |
| + this.tcpListenAddress); |
| } catch (IOException ioe) { |
| log.error("Failed get Inet address at replication listener on address:" |
| + tcpListenAddress, ioe); |
| } |
| } |
| |
| |
| public int getTcpListenPort() { |
| return tcpListenPort; |
| } |
| |
| public void setTcpListenPort(int tcpListenPort) { |
| this.tcpListenPort = tcpListenPort; |
| } |
| |
| public String getHost() { |
| return getTcpListenAddress(); |
| } |
| |
| public int getPort() { |
| return getTcpListenPort(); |
| } |
| // ------------------------------------------------------------- stats |
| |
| /** |
| * @return Returns the doReceivedProcessingStats. |
| */ |
| public boolean isDoReceivedProcessingStats() { |
| return doReceivedProcessingStats; |
| } |
| /** |
| * @param doReceiverProcessingStats The doReceivedProcessingStats to set. |
| */ |
| public void setDoReceivedProcessingStats(boolean doReceiverProcessingStats) { |
| this.doReceivedProcessingStats = doReceiverProcessingStats; |
| } |
| /** |
| * @return Returns the maxReceivedProcessingTime. |
| */ |
| public long getMaxReceivedProcessingTime() { |
| return maxReceivedProcessingTime; |
| } |
| /** |
| * @return Returns the minReceivedProcessingTime. |
| */ |
| public long getMinReceivedProcessingTime() { |
| return minReceivedProcessingTime; |
| } |
| /** |
| * @return Returns the receivedProcessingTime. |
| */ |
| public long getReceivedProcessingTime() { |
| return receivedProcessingTime; |
| } |
| /** |
| * @return Returns the totalReceivedBytes. |
| */ |
| public long getTotalReceivedBytes() { |
| return totalReceivedBytes; |
| } |
| |
| /** |
| * @return Returns the avg receivedProcessingTime/nrOfMsgsReceived. |
| */ |
| public double getAvgReceivedProcessingTime() { |
| if ( nrOfMsgsReceived > 0 ) { |
| return ((double)receivedProcessingTime) / nrOfMsgsReceived; |
| } else { |
| return 0; |
| } |
| } |
| |
| /** |
| * @return Returns the avg totalReceivedBytes/nrOfMsgsReceived. |
| */ |
| public long getAvgTotalReceivedBytes() { |
| if ( nrOfMsgsReceived > 0 ) { |
| return ((long)totalReceivedBytes) / nrOfMsgsReceived; |
| } else { |
| return 0; |
| } |
| } |
| |
| /** |
| * @return Returns the receivedTime. |
| */ |
| public long getReceivedTime() { |
| return receivedTime; |
| } |
| |
| /** |
| * @return Returns the lastChecked. |
| */ |
| public long getLastChecked() { |
| return lastChecked; |
| } |
| |
| /** |
| * @return Returns the nrOfMsgsReceived. |
| */ |
| public long getNrOfMsgsReceived() { |
| return nrOfMsgsReceived; |
| } |
| |
| /** |
| * start cluster receiver |
| * |
| * @see org.apache.catalina.cluster.ClusterReceiver#start() |
| */ |
| public void start() { |
| try { |
| getBind(); |
| Thread t = new Thread(this, "ClusterReceiver"); |
| t.setDaemon(true); |
| t.start(); |
| } catch (Exception x) { |
| log.fatal("Unable to start cluster receiver", x); |
| } |
| registerReceiverMBean(); |
| } |
| |
| |
| /** |
| * Stop accept |
| * |
| * @see org.apache.catalina.cluster.ClusterReceiver#stop() |
| * @see #stopListening() |
| */ |
| public void stop() { |
| stopListening(); |
| unregisterRecevierMBean(); |
| |
| } |
| |
| /** |
| * Register Recevier MBean |
| * <domain>:type=ClusterReceiver,host=<host> |
| */ |
| protected void registerReceiverMBean() { |
| if (cluster != null && cluster instanceof SimpleTcpCluster) { |
| SimpleTcpCluster scluster = (SimpleTcpCluster) cluster; |
| ObjectName clusterName = scluster.getObjectName(); |
| try { |
| MBeanServer mserver = scluster.getMBeanServer(); |
| Container container = cluster.getContainer(); |
| String name = clusterName.getDomain() + ":type=ClusterReceiver"; |
| if (container instanceof StandardHost) { |
| name += ",host=" + clusterName.getKeyProperty("host"); |
| } |
| ObjectName receiverName = new ObjectName(name); |
| if (mserver.isRegistered(receiverName)) { |
| if (log.isWarnEnabled()) |
| log.warn(sm.getString( |
| "cluster.mbean.register.already", |
| receiverName)); |
| return; |
| } |
| setObjectName(receiverName); |
| mserver.registerMBean(scluster.getManagedBean(this), |
| getObjectName()); |
| } catch (Exception e) { |
| log.warn(e); |
| } |
| } |
| } |
| |
| /** |
| * UnRegister Recevier MBean |
| * <domain>:type=ClusterReceiver,host=<host> |
| */ |
| protected void unregisterRecevierMBean() { |
| if (cluster != null && getObjectName() != null |
| && cluster instanceof SimpleTcpCluster) { |
| SimpleTcpCluster scluster = (SimpleTcpCluster) cluster; |
| try { |
| MBeanServer mserver = scluster.getMBeanServer(); |
| mserver.unregisterMBean(getObjectName()); |
| } catch (Exception e) { |
| log.error(e); |
| } |
| } |
| } |
| |
| /** |
| * stop Listener sockets |
| */ |
| protected abstract void stopListening() ; |
| |
| /** |
| * Start Listener |
| * @throws Exception |
| */ |
| protected abstract void listen () |
| throws Exception ; |
| |
| |
| /** |
| * Start thread and listen |
| */ |
| public void run() |
| { |
| try |
| { |
| listen(); |
| } |
| catch ( Exception x ) |
| { |
| log.error("Unable to start cluster listener.",x); |
| } |
| } |
| |
| // --------------------------------------------------------- receiver messages |
| |
| /** |
| * receiver Message from other node. |
| * All SessionMessage forward to ClusterManager and other message dispatch to all accept MessageListener. |
| * |
| * @see ClusterSessionListener#messageReceived(ClusterMessage) |
| */ |
| public void messageDataReceived(ClusterData data) { |
| //public void messageDataReceived(byte[] data) { |
| long timeSent = 0 ; |
| if (doReceivedProcessingStats) { |
| timeSent = System.currentTimeMillis(); |
| } |
| try { |
| ClusterMessage message = deserialize(data); |
| cluster.receive(message); |
| } catch (Exception x) { |
| log |
| .error( |
| "Unable to deserialize session message or unexpected exception from message listener.", |
| x); |
| } finally { |
| if (doReceivedProcessingStats) { |
| addReceivedProcessingStats(timeSent); |
| } |
| } |
| } |
| |
| /** |
| * deserialize the receieve cluster message |
| * @param data uncompress data |
| * @return The message |
| * @throws IOException |
| * @throws ClassNotFoundException |
| */ |
| //protected ClusterMessage deserialize(byte[] data) |
| protected ClusterMessage deserialize(ClusterData data) |
| throws IOException, ClassNotFoundException { |
| Object message = null; |
| if (data != null) { |
| InputStream instream; |
| if (isCompress() || data.getCompress() == ClusterMessage.FLAG_ALLOWED ) { |
| instream = new GZIPInputStream(new ByteArrayInputStream(data.getMessage())); |
| } else { |
| instream = new ByteArrayInputStream(data.getMessage()); |
| } |
| ReplicationStream stream = new ReplicationStream(instream, |
| getClass().getClassLoader()); |
| message = stream.readObject(); |
| // calc stats really received bytes |
| totalReceivedBytes += data.getMessage().length; |
| //totalReceivedBytes += data.length; |
| nrOfMsgsReceived++; |
| instream.close(); |
| } |
| if (message instanceof ClusterMessage) |
| return (ClusterMessage) message; |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Message " + message.toString() + " from type " |
| + message.getClass().getName() |
| + " transfered but is not a cluster message"); |
| return null; |
| } |
| } |
| |
| // --------------------------------------------- Performance Stats |
| |
| /** |
| * Reset sender statistics |
| */ |
| public synchronized void resetStatistics() { |
| nrOfMsgsReceived = 0; |
| totalReceivedBytes = 0; |
| minReceivedProcessingTime = Long.MAX_VALUE ; |
| maxReceivedProcessingTime = 0 ; |
| receivedProcessingTime = 0 ; |
| receivedTime = 0 ; |
| } |
| |
| /** |
| * Add receiver processing stats times |
| * @param startTime |
| */ |
| protected void addReceivedProcessingStats(long startTime) { |
| long current = System.currentTimeMillis() ; |
| long time = current - startTime ; |
| synchronized(this) { |
| if(time < minReceivedProcessingTime) |
| minReceivedProcessingTime = time ; |
| if( time > maxReceivedProcessingTime) |
| maxReceivedProcessingTime = time ; |
| receivedProcessingTime += time ; |
| } |
| if (log.isDebugEnabled()) { |
| if ((current - lastChecked) > 5000) { |
| log.debug("Calc msg send time total=" + receivedTime |
| + "ms num request=" + nrOfMsgsReceived |
| + " average per msg=" |
| + (receivedTime / nrOfMsgsReceived) + "ms."); |
| lastChecked=current ; |
| } |
| } |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.catalina.cluster.io.ListenCallback#sendAck() |
| */ |
| public void sendAck() throws IOException { |
| // do nothing |
| } |
| |
| } |