Fix https://bz.apache.org/bugzilla/show_bug.cgi?id=51513
Add support for the compressionMinSize attribute to the GzipInterceptor, add optional statistics collection and expose the Interceptor over JMX.
Based on a patch by Christian Stöber.
git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@1800708 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/java/org/apache/catalina/tribes/group/interceptors/GzipInterceptor.java b/java/org/apache/catalina/tribes/group/interceptors/GzipInterceptor.java
index 21c75e0..89dddad 100644
--- a/java/org/apache/catalina/tribes/group/interceptors/GzipInterceptor.java
+++ b/java/org/apache/catalina/tribes/group/interceptors/GzipInterceptor.java
@@ -20,6 +20,8 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
@@ -36,38 +38,108 @@
/**
* @version 1.0
*/
-public class GzipInterceptor extends ChannelInterceptorBase {
+public class GzipInterceptor extends ChannelInterceptorBase implements GzipInterceptorMBean {
private static final Log log = LogFactory.getLog(GzipInterceptor.class);
protected static final StringManager sm = StringManager.getManager(GzipInterceptor.class);
public static final int DEFAULT_BUFFER_SIZE = 2048;
+ public static final int DEFAULT_OPTION_COMPRESSION_ENABLE = 0x0100;
+
+ private int compressionMinSize = 0;
+ private volatile boolean statsEnabled = false;
+ private int interval = 0;
+
+ // Stats
+ private final AtomicInteger count = new AtomicInteger();
+ private final AtomicInteger countCompressedTX = new AtomicInteger();
+ private final AtomicInteger countUncompressedTX = new AtomicInteger();
+ private final AtomicInteger countCompressedRX = new AtomicInteger();
+ private final AtomicInteger countUncompressedRX = new AtomicInteger();
+ private final AtomicLong sizeTX = new AtomicLong();
+ private final AtomicLong compressedSizeTX = new AtomicLong();
+ private final AtomicLong uncompressedSizeTX = new AtomicLong();
+ private final AtomicLong sizeRX = new AtomicLong();
+ private final AtomicLong compressedSizeRX = new AtomicLong();
+ private final AtomicLong uncompressedSizeRX = new AtomicLong();
+
+
+ public GzipInterceptor() {
+ setOptionFlag(DEFAULT_OPTION_COMPRESSION_ENABLE);
+ }
+
@Override
- public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
+ public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload)
+ throws ChannelException {
try {
- byte[] data = compress(msg.getMessage().getBytes());
+ byte[] data = msg.getMessage().getBytes();
+ if (statsEnabled) {
+ sizeTX.addAndGet(data.length);
+ }
+
+ if (data.length > compressionMinSize) {
+ data = compress(data);
+ // Set the flag that indicates that the message is compressed
+ msg.setOptions(msg.getOptions() | getOptionFlag());
+ if (statsEnabled) {
+ countCompressedTX.incrementAndGet();
+ compressedSizeTX.addAndGet(data.length);
+ }
+ } else if (statsEnabled){
+ countUncompressedTX.incrementAndGet();
+ uncompressedSizeTX.addAndGet(data.length);
+ }
+
msg.getMessage().trim(msg.getMessage().getLength());
msg.getMessage().append(data,0,data.length);
super.sendMessage(destination, msg, payload);
+
+ int currentCount = count.incrementAndGet();
+ if (statsEnabled && interval > 0 && currentCount % interval == 0) {
+ report();
+ }
} catch ( IOException x ) {
log.error(sm.getString("gzipInterceptor.compress.failed"));
throw new ChannelException(x);
}
}
+
@Override
public void messageReceived(ChannelMessage msg) {
try {
- byte[] data = decompress(msg.getMessage().getBytes());
+ byte[] data = msg.getMessage().getBytes();
+ if ((msg.getOptions() & getOptionFlag()) > 0) {
+ if (statsEnabled) {
+ countCompressedRX.incrementAndGet();
+ compressedSizeRX.addAndGet(data.length);
+ }
+ // Message was compressed
+ data = decompress(data);
+ } else if (statsEnabled) {
+ countUncompressedRX.incrementAndGet();
+ uncompressedSizeRX.addAndGet(data.length);
+ }
+
+ if (statsEnabled) {
+ sizeRX.addAndGet(data.length);
+ }
+
msg.getMessage().trim(msg.getMessage().getLength());
msg.getMessage().append(data,0,data.length);
super.messageReceived(msg);
+
+ int currentCount = count.incrementAndGet();
+ if (statsEnabled && interval > 0 && currentCount % interval == 0) {
+ report();
+ }
} catch ( IOException x ) {
log.error(sm.getString("gzipInterceptor.decompress.failed"),x);
}
}
+
public static byte[] compress(byte[] data) throws IOException {
ByteArrayOutputStream bout = new ByteArrayOutputStream();
GZIPOutputStream gout = new GZIPOutputStream(bout);
@@ -77,6 +149,7 @@
return bout.toByteArray();
}
+
/**
* @param data Data to decompress
* @return Decompressed data
@@ -95,4 +168,134 @@
}
return bout.toByteArray();
}
+
+
+ @Override
+ public void report() {
+ log.info(sm.getString("gzipInterceptor.report", Integer.valueOf(getCount()),
+ Integer.valueOf(getCountCompressedTX()), Integer.valueOf(getCountUncompressedTX()),
+ Integer.valueOf(getCountCompressedRX()), Integer.valueOf(getCountUncompressedRX()),
+ Long.valueOf(getSizeTX()), Long.valueOf(getCompressedSizeTX()),
+ Long.valueOf(getUncompressedSizeTX()),
+ Long.valueOf(getSizeRX()), Long.valueOf(getCompressedSizeRX()),
+ Long.valueOf(getUncompressedSizeRX())));
+ }
+
+
+ @Override
+ public int getCompressionMinSize() {
+ return compressionMinSize;
+ }
+
+
+ @Override
+ public void setCompressionMinSize(int compressionMinSize) {
+ this.compressionMinSize = compressionMinSize;
+ }
+
+
+ @Override
+ public boolean getStatsEnabled() {
+ return statsEnabled;
+ }
+
+
+ @Override
+ public void setStatsEnabled(boolean statsEnabled) {
+ this.statsEnabled = statsEnabled;
+ }
+
+
+ @Override
+ public int getInterval() {
+ return interval;
+ }
+
+
+ @Override
+ public void setInterval(int interval) {
+ this.interval = interval;
+ }
+
+
+ @Override
+ public int getCount() {
+ return count.get();
+ }
+
+
+ @Override
+ public int getCountCompressedTX() {
+ return countCompressedTX.get();
+ }
+
+
+ @Override
+ public int getCountUncompressedTX() {
+ return countUncompressedTX.get();
+ }
+
+
+ @Override
+ public int getCountCompressedRX() {
+ return countCompressedRX.get();
+ }
+
+
+ @Override
+ public int getCountUncompressedRX() {
+ return countUncompressedRX.get();
+ }
+
+
+ @Override
+ public long getSizeTX() {
+ return sizeTX.get();
+ }
+
+
+ @Override
+ public long getCompressedSizeTX() {
+ return compressedSizeTX.get();
+ }
+
+
+ @Override
+ public long getUncompressedSizeTX() {
+ return uncompressedSizeTX.get();
+ }
+
+
+ @Override
+ public long getSizeRX() {
+ return sizeRX.get();
+ }
+
+
+ @Override
+ public long getCompressedSizeRX() {
+ return compressedSizeRX.get();
+ }
+
+
+ @Override
+ public long getUncompressedSizeRX() {
+ return uncompressedSizeRX.get();
+ }
+
+
+ @Override
+ public void reset() {
+ count.set(0);
+ countCompressedTX.set(0);
+ countUncompressedTX.set(0);
+ countCompressedRX.set(0);
+ countUncompressedRX.set(0);
+ sizeTX.set(0);
+ compressedSizeTX.set(0);
+ uncompressedSizeTX.set(0);
+ sizeRX.set(0);
+ compressedSizeRX.set(0);
+ uncompressedSizeRX.set(0);
+ }
}
diff --git a/java/org/apache/catalina/tribes/group/interceptors/GzipInterceptorMBean.java b/java/org/apache/catalina/tribes/group/interceptors/GzipInterceptorMBean.java
new file mode 100644
index 0000000..d0287ef
--- /dev/null
+++ b/java/org/apache/catalina/tribes/group/interceptors/GzipInterceptorMBean.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.group.interceptors;
+
+public interface GzipInterceptorMBean {
+
+ // Config
+ public int getOptionFlag();
+ public void setOptionFlag(int optionFlag);
+
+ /**
+ * @return the minimum payload size for compression to be enabled.
+ */
+ public int getCompressionMinSize();
+ /**
+ * Set the minimum payload size for compression to be enabled. A value of
+ * zero or less means compression will always be used. If not explicitly
+ * configured, a default of zero will be used.
+ *
+ * @param compressionMinSize The new minimum payload size
+ */
+ public void setCompressionMinSize(int compressionMinSize);
+
+ /**
+ * @return {@code true} if the interceptor is configured to collect
+ * statistics, otherwise {@code false}
+ */
+ public boolean getStatsEnabled();
+ /**
+ * Configure whether the interceptor collects statistics.
+ *
+ * @param statsEnabled {@code true} to enable statistics collections,
+ * otherwise {@code false}
+ */
+ public void setStatsEnabled(boolean statsEnabled);
+
+ /**
+ * @return If statistics collection is enabled, the number of messages
+ * between statistics reports being written to the log.
+ */
+ public int getInterval();
+ /**
+ * If statistics collection is enabled, set the number of messages between
+ * statistics reports being written to the log. A value of zero or less
+ * means no statistics reports are written.
+ *
+ * @param interval The new interval between reports
+ */
+ public void setInterval(int interval);
+
+ // Stats
+ public int getCount();
+ public int getCountCompressedTX();
+ public int getCountUncompressedTX();
+ public int getCountCompressedRX();
+ public int getCountUncompressedRX();
+ public long getSizeTX();
+ public long getCompressedSizeTX();
+ public long getUncompressedSizeTX();
+ public long getSizeRX();
+ public long getCompressedSizeRX();
+ public long getUncompressedSizeRX();
+ public void reset();
+ public void report();
+}
diff --git a/java/org/apache/catalina/tribes/group/interceptors/LocalStrings.properties b/java/org/apache/catalina/tribes/group/interceptors/LocalStrings.properties
index a0bd085..4ed114e 100644
--- a/java/org/apache/catalina/tribes/group/interceptors/LocalStrings.properties
+++ b/java/org/apache/catalina/tribes/group/interceptors/LocalStrings.properties
@@ -19,6 +19,19 @@
fragmentationInterceptor.fragments.missing=Fragments are missing.
gzipInterceptor.compress.failed=Unable to compress byte contents
gzipInterceptor.decompress.failed=Unable to decompress byte contents
+gzipInterceptor.report=GZip Interceptor Report[\
+ \n\tTotal Messages: {0}\
+ \n\tTx Messages Compressed: {1}\
+ \n\tTx Messages Uncompressed: {2}\
+ \n\tRx Messages Compressed: {3}\
+ \n\tRx Messages Uncompressed: {4}\
+ \n\tTotal Tx bytes: {5}\
+ \n\tCompressed Tx bytes: {6}\
+ \n\tUncompressed Tx bytes: {7}\
+ \n\tTotal Rx bytes: {8}\
+ \n\tCompressed Rx bytes: {9}\
+ \n\tUncompressed Rx bytes: {10}\
+ \n]
messageDispatchInterceptor.queue.full=Asynchronous queue is full, reached its limit of [{0}] bytes, current:[{1}] bytes.
messageDispatchInterceptor.unableAdd.queue=Unable to add the message to the async queue, queue bug?
messageDispatchInterceptor.warning.optionflag=Warning, you are overriding the asynchronous option flag, this will disable the Channel.SEND_OPTIONS_ASYNCHRONOUS that other apps might use.
@@ -54,7 +67,7 @@
\n\tSent:{2} MB (application)\
\n\tTime:{3} seconds\
\n\tTx Speed:{4} MB/sec (total)\
- \n\tTxSpeed:{5} MB/sec (application)\
+ \n\tTx Speed:{5} MB/sec (application)\
\n\tError Msg:{6}\
\n\tRx Msg:{7} messages\
\n\tRx Speed:{8} MB/sec (since 1st msg)\
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 2f957be..793e97b 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -138,6 +138,12 @@
<subsection name="Tribes">
<changelog>
<add>
+ <bug>51513</bug>: Add support for the <code>compressionMinSize</code>
+ attribute to the <code>GzipInterceptor</code>, add optional statistics
+ collection and expose the Interceptor over JMX. Based on a patch by
+ Christian Stöber. (markt)
+ </add>
+ <add>
<bug>61127</bug>Allow human-readable names for channelSendOptions and
mapSendOptions. Patch provided by Igal Sapir. (schultz)
</add>