Fix https://bz.apache.org/bugzilla/show_bug.cgi?id=51513
Add support for the compressionMinSize attribute to the GzipInterceptor, add optional statistics collection and expose the Interceptor over JMX.
Based on a patch by Christian Stöber.
git-svn-id: https://svn.apache.org/repos/asf/tomcat/trunk@1800708 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/java/org/apache/catalina/tribes/group/interceptors/GzipInterceptor.java b/java/org/apache/catalina/tribes/group/interceptors/GzipInterceptor.java
index 21c75e0..89dddad 100644
--- a/java/org/apache/catalina/tribes/group/interceptors/GzipInterceptor.java
+++ b/java/org/apache/catalina/tribes/group/interceptors/GzipInterceptor.java
@@ -20,6 +20,8 @@
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
 
@@ -36,38 +38,108 @@
 /**
  * @version 1.0
  */
-public class GzipInterceptor extends ChannelInterceptorBase {
+public class GzipInterceptor extends ChannelInterceptorBase implements GzipInterceptorMBean {
 
     private static final Log log = LogFactory.getLog(GzipInterceptor.class);
     protected static final StringManager sm = StringManager.getManager(GzipInterceptor.class);
 
     public static final int DEFAULT_BUFFER_SIZE = 2048;
+    public static final int DEFAULT_OPTION_COMPRESSION_ENABLE = 0x0100;
+
+    private int compressionMinSize = 0;
+    private volatile boolean statsEnabled = false;
+    private int interval = 0;
+
+    // Stats
+    private final AtomicInteger count = new AtomicInteger();
+    private final AtomicInteger countCompressedTX = new AtomicInteger();
+    private final AtomicInteger countUncompressedTX = new AtomicInteger();
+    private final AtomicInteger countCompressedRX = new AtomicInteger();
+    private final AtomicInteger countUncompressedRX = new AtomicInteger();
+    private final AtomicLong sizeTX = new AtomicLong();
+    private final AtomicLong compressedSizeTX = new AtomicLong();
+    private final AtomicLong uncompressedSizeTX = new AtomicLong();
+    private final AtomicLong sizeRX = new AtomicLong();
+    private final AtomicLong compressedSizeRX = new AtomicLong();
+    private final AtomicLong uncompressedSizeRX = new AtomicLong();
+
+
+    public GzipInterceptor() {
+        setOptionFlag(DEFAULT_OPTION_COMPRESSION_ENABLE);
+    }
+
 
     @Override
-    public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException {
+    public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload)
+            throws ChannelException {
         try {
-            byte[] data = compress(msg.getMessage().getBytes());
+            byte[] data = msg.getMessage().getBytes();
+            if (statsEnabled) {
+                sizeTX.addAndGet(data.length);
+            }
+
+            if (data.length > compressionMinSize) {
+                data = compress(data);
+                // Set the flag that indicates that the message is compressed
+                msg.setOptions(msg.getOptions() | getOptionFlag());
+                if (statsEnabled) {
+                    countCompressedTX.incrementAndGet();
+                    compressedSizeTX.addAndGet(data.length);
+                }
+            } else if (statsEnabled){
+                countUncompressedTX.incrementAndGet();
+                uncompressedSizeTX.addAndGet(data.length);
+            }
+
             msg.getMessage().trim(msg.getMessage().getLength());
             msg.getMessage().append(data,0,data.length);
             super.sendMessage(destination, msg, payload);
+
+            int currentCount = count.incrementAndGet();
+            if (statsEnabled && interval > 0 && currentCount % interval == 0) {
+                report();
+            }
         } catch ( IOException x ) {
             log.error(sm.getString("gzipInterceptor.compress.failed"));
             throw new ChannelException(x);
         }
     }
 
+
     @Override
     public void messageReceived(ChannelMessage msg) {
         try {
-            byte[] data = decompress(msg.getMessage().getBytes());
+            byte[] data = msg.getMessage().getBytes();
+            if ((msg.getOptions() & getOptionFlag()) > 0) {
+                if (statsEnabled) {
+                    countCompressedRX.incrementAndGet();
+                    compressedSizeRX.addAndGet(data.length);
+                }
+                // Message was compressed
+                data = decompress(data);
+            } else if (statsEnabled) {
+                countUncompressedRX.incrementAndGet();
+                uncompressedSizeRX.addAndGet(data.length);
+            }
+
+            if (statsEnabled) {
+                sizeRX.addAndGet(data.length);
+            }
+
             msg.getMessage().trim(msg.getMessage().getLength());
             msg.getMessage().append(data,0,data.length);
             super.messageReceived(msg);
+
+            int currentCount = count.incrementAndGet();
+            if (statsEnabled && interval > 0 && currentCount % interval == 0) {
+                report();
+            }
         } catch ( IOException x ) {
             log.error(sm.getString("gzipInterceptor.decompress.failed"),x);
         }
     }
 
+
     public static byte[] compress(byte[] data) throws IOException {
         ByteArrayOutputStream bout = new ByteArrayOutputStream();
         GZIPOutputStream gout = new GZIPOutputStream(bout);
@@ -77,6 +149,7 @@
         return bout.toByteArray();
     }
 
+
     /**
      * @param data  Data to decompress
      * @return      Decompressed data
@@ -95,4 +168,134 @@
         }
         return bout.toByteArray();
     }
+
+
+    @Override
+    public void report() {
+        log.info(sm.getString("gzipInterceptor.report", Integer.valueOf(getCount()),
+                Integer.valueOf(getCountCompressedTX()), Integer.valueOf(getCountUncompressedTX()),
+                Integer.valueOf(getCountCompressedRX()), Integer.valueOf(getCountUncompressedRX()),
+                Long.valueOf(getSizeTX()), Long.valueOf(getCompressedSizeTX()),
+                Long.valueOf(getUncompressedSizeTX()),
+                Long.valueOf(getSizeRX()), Long.valueOf(getCompressedSizeRX()),
+                Long.valueOf(getUncompressedSizeRX())));
+    }
+
+
+    @Override
+    public int getCompressionMinSize() {
+        return compressionMinSize;
+    }
+
+
+    @Override
+    public void setCompressionMinSize(int compressionMinSize) {
+        this.compressionMinSize = compressionMinSize;
+    }
+
+
+    @Override
+    public boolean getStatsEnabled() {
+        return statsEnabled;
+    }
+
+
+    @Override
+    public void setStatsEnabled(boolean statsEnabled) {
+        this.statsEnabled = statsEnabled;
+    }
+
+
+    @Override
+    public int getInterval() {
+        return interval;
+    }
+
+
+    @Override
+    public void setInterval(int interval) {
+        this.interval = interval;
+    }
+
+
+    @Override
+    public int getCount() {
+        return count.get();
+    }
+
+
+    @Override
+    public int getCountCompressedTX() {
+        return countCompressedTX.get();
+    }
+
+
+    @Override
+    public int getCountUncompressedTX() {
+        return countUncompressedTX.get();
+    }
+
+
+    @Override
+    public int getCountCompressedRX() {
+        return countCompressedRX.get();
+    }
+
+
+    @Override
+    public int getCountUncompressedRX() {
+        return countUncompressedRX.get();
+    }
+
+
+    @Override
+    public long getSizeTX() {
+        return sizeTX.get();
+    }
+
+
+    @Override
+    public long getCompressedSizeTX() {
+        return compressedSizeTX.get();
+    }
+
+
+    @Override
+    public long getUncompressedSizeTX() {
+        return uncompressedSizeTX.get();
+    }
+
+
+    @Override
+    public long getSizeRX() {
+        return sizeRX.get();
+    }
+
+
+    @Override
+    public long getCompressedSizeRX() {
+        return compressedSizeRX.get();
+    }
+
+
+    @Override
+    public long getUncompressedSizeRX() {
+        return uncompressedSizeRX.get();
+    }
+
+
+    @Override
+    public void reset() {
+        count.set(0);
+        countCompressedTX.set(0);
+        countUncompressedTX.set(0);
+        countCompressedRX.set(0);
+        countUncompressedRX.set(0);
+        sizeTX.set(0);
+        compressedSizeTX.set(0);
+        uncompressedSizeTX.set(0);
+        sizeRX.set(0);
+        compressedSizeRX.set(0);
+        uncompressedSizeRX.set(0);
+    }
 }
diff --git a/java/org/apache/catalina/tribes/group/interceptors/GzipInterceptorMBean.java b/java/org/apache/catalina/tribes/group/interceptors/GzipInterceptorMBean.java
new file mode 100644
index 0000000..d0287ef
--- /dev/null
+++ b/java/org/apache/catalina/tribes/group/interceptors/GzipInterceptorMBean.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.catalina.tribes.group.interceptors;
+
+public interface GzipInterceptorMBean {
+
+    // Config
+    public int getOptionFlag();
+    public void setOptionFlag(int optionFlag);
+
+    /**
+     * @return the minimum payload size for compression to be enabled.
+     */
+    public int getCompressionMinSize();
+    /**
+     * Set the minimum payload size for compression to be enabled. A value of
+     * zero or less means compression will always be used. If not explicitly
+     * configured, a default of zero will be used.
+     *
+     * @param compressionMinSize The new minimum payload size
+     */
+    public void setCompressionMinSize(int compressionMinSize);
+
+    /**
+     * @return {@code true} if the interceptor is configured to collect
+     *         statistics, otherwise {@code false}
+     */
+    public boolean getStatsEnabled();
+    /**
+     * Configure whether the interceptor collects statistics.
+     *
+     * @param statsEnabled {@code true} to enable statistics collections,
+     *        otherwise {@code false}
+     */
+    public void setStatsEnabled(boolean statsEnabled);
+
+    /**
+     * @return If statistics collection is enabled, the number of messages
+     *         between statistics reports being written to the log.
+     */
+    public int getInterval();
+    /**
+     * If statistics collection is enabled, set the number of messages between
+     * statistics reports being written to the log. A value of zero or less
+     * means no statistics reports are written.
+     *
+     * @param interval The new interval between reports
+     */
+    public void setInterval(int interval);
+
+    // Stats
+    public int getCount();
+    public int getCountCompressedTX();
+    public int getCountUncompressedTX();
+    public int getCountCompressedRX();
+    public int getCountUncompressedRX();
+    public long getSizeTX();
+    public long getCompressedSizeTX();
+    public long getUncompressedSizeTX();
+    public long getSizeRX();
+    public long getCompressedSizeRX();
+    public long getUncompressedSizeRX();
+    public void reset();
+    public void report();
+}
diff --git a/java/org/apache/catalina/tribes/group/interceptors/LocalStrings.properties b/java/org/apache/catalina/tribes/group/interceptors/LocalStrings.properties
index a0bd085..4ed114e 100644
--- a/java/org/apache/catalina/tribes/group/interceptors/LocalStrings.properties
+++ b/java/org/apache/catalina/tribes/group/interceptors/LocalStrings.properties
@@ -19,6 +19,19 @@
 fragmentationInterceptor.fragments.missing=Fragments are missing.
 gzipInterceptor.compress.failed=Unable to compress byte contents
 gzipInterceptor.decompress.failed=Unable to decompress byte contents
+gzipInterceptor.report=GZip Interceptor Report[\
+  \n\tTotal Messages: {0}\
+  \n\tTx Messages Compressed: {1}\
+  \n\tTx Messages Uncompressed: {2}\
+  \n\tRx Messages Compressed: {3}\
+  \n\tRx Messages Uncompressed: {4}\
+  \n\tTotal Tx bytes: {5}\
+  \n\tCompressed Tx bytes: {6}\
+  \n\tUncompressed Tx bytes: {7}\
+  \n\tTotal Rx bytes: {8}\
+  \n\tCompressed Rx bytes: {9}\
+  \n\tUncompressed Rx bytes: {10}\
+  \n]
 messageDispatchInterceptor.queue.full=Asynchronous queue is full, reached its limit of [{0}] bytes, current:[{1}] bytes.
 messageDispatchInterceptor.unableAdd.queue=Unable to add the message to the async queue, queue bug?
 messageDispatchInterceptor.warning.optionflag=Warning, you are overriding the asynchronous option flag, this will disable the Channel.SEND_OPTIONS_ASYNCHRONOUS that other apps might use.
@@ -54,7 +67,7 @@
   \n\tSent:{2} MB (application)\
   \n\tTime:{3} seconds\
   \n\tTx Speed:{4} MB/sec (total)\
-  \n\tTxSpeed:{5} MB/sec (application)\
+  \n\tTx Speed:{5} MB/sec (application)\
   \n\tError Msg:{6}\
   \n\tRx Msg:{7} messages\
   \n\tRx Speed:{8} MB/sec (since 1st msg)\
diff --git a/webapps/docs/changelog.xml b/webapps/docs/changelog.xml
index 2f957be..793e97b 100644
--- a/webapps/docs/changelog.xml
+++ b/webapps/docs/changelog.xml
@@ -138,6 +138,12 @@
   <subsection name="Tribes">
     <changelog>
       <add>
+        <bug>51513</bug>: Add support for the <code>compressionMinSize</code>
+        attribute to the <code>GzipInterceptor</code>, add optional statistics
+        collection and expose the Interceptor over JMX. Based on a patch by
+        Christian Stöber. (markt)
+      </add>
+      <add>
         <bug>61127</bug>Allow human-readable names for channelSendOptions and
         mapSendOptions. Patch provided by Igal Sapir. (schultz)
       </add>