KAFKA-3704; Revert "Remove hard-coded block size in KafkaProducer"

This is not an exact revert as the code changed a bit since the
original commit. We also include a note in `upgrade.html`.

The original commit is 1182d61deb23b5cd86cbe462471f7df583a796e1.

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Gwen Shapira, Guozhang Wang

Closes #1391 from ijuma/kafka-3704-revert and squashes the following commits:

7891b67 [Ismael Juma] Tweak upgrade note based on Gwen's feedback
1673cd0 [Ismael Juma] Revert "KAFKA-3704: Remove hard-coded block size in KafkaProducer"

(cherry picked from commit 9a3fcf41350ddda9a41db18cde718f892b95177c)
Signed-off-by: Gwen Shapira <cshapi@gmail.com>
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
index 60c15e6..e23a52e 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Compressor.java
@@ -33,6 +33,7 @@
 
     static private final float COMPRESSION_RATE_DAMPING_FACTOR = 0.9f;
     static private final float COMPRESSION_RATE_ESTIMATION_FACTOR = 1.05f;
+    static private final int COMPRESSION_DEFAULT_BUFFER_SIZE = 1024;
 
     private static final float[] TYPE_TO_RATE;
 
@@ -52,7 +53,7 @@
         @Override
         public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
             return Class.forName("org.xerial.snappy.SnappyOutputStream")
-                .getConstructor(OutputStream.class);
+                .getConstructor(OutputStream.class, Integer.TYPE);
         }
     });
 
@@ -107,7 +108,7 @@
 
         // create the stream
         bufferStream = new ByteBufferOutputStream(buffer);
-        appendStream = wrapForOutput(bufferStream, type);
+        appendStream = wrapForOutput(bufferStream, type, COMPRESSION_DEFAULT_BUFFER_SIZE);
     }
 
     public ByteBuffer buffer() {
@@ -241,16 +242,16 @@
 
     // the following two functions also need to be public since they are used in MemoryRecords.iteration
 
-    static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type) {
+    static public DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, int bufferSize) {
         try {
             switch (type) {
                 case NONE:
                     return new DataOutputStream(buffer);
                 case GZIP:
-                    return new DataOutputStream(new GZIPOutputStream(buffer));
+                    return new DataOutputStream(new GZIPOutputStream(buffer, bufferSize));
                 case SNAPPY:
                     try {
-                        OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer);
+                        OutputStream stream = (OutputStream) snappyOutputStreamSupplier.get().newInstance(buffer, bufferSize);
                         return new DataOutputStream(stream);
                     } catch (Exception e) {
                         throw new KafkaException(e);
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 2972f26..d09b9d7 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -68,10 +68,18 @@
     message throughput degradation because of the increased overhead.
     Likewise, replication now transmits an additional 8 bytes per message.
     If you're running close to the network capacity of your cluster, it's possible that you'll overwhelm the network cards
-    and see failures and performance issues due to the overload. When receiving compressed messages, 0.10.0
+    and see failures and performance issues due to the overload.
+</p>
+    <b>Note:</b> If you have enabled compression on producers, you may notice reduced producer throughput and/or
+    lower compression rate on the broker in some cases. When receiving compressed messages, 0.10.0
     brokers avoid recompressing the messages, which in general reduces the latency and improves the throughput. In
-    certain cases, this may reduce the batching size on the producer, which could lead to worse throughput. If this
-    happens, users can tune linger.ms and batch.size of the producer for better throughput.
+    certain cases, however, this may reduce the batching size on the producer, which could lead to worse throughput. If this
+    happens, users can tune linger.ms and batch.size of the producer for better throughput. In addition, the producer buffer
+    used for compressing messages with snappy is smaller than the one used by the broker, which may have a negative
+    impact on the compression ratio for the messages on disk. We intend to make this configurable in a future Kafka
+    release.
+<p>
+
 </p>
 
 <h5><a id="upgrade_10_breaking" href="#upgrade_10_breaking">Potential breaking changes in 0.10.0.0</a></h5>
@@ -101,7 +109,6 @@
 
 <ul>
     <li> Starting from Kafka 0.10.0.0, a new client library named <b>Kafka Streams</b> is available for stream processing on data stored in Kafka topics. This new client library only works with 0.10.x and upward versioned brokers due to message format changes mentioned above. For more information please read <a href="#streams_overview">this section</a>.</li>
-    <li> If compression with snappy or gzip is enabled, the new producer will use the compression scheme's default buffer size (this is already the case for LZ4) instead of 1 KB in order to improve the compression ratio. Note that the default buffer sizes for gzip, snappy and LZ4 are 0.5 KB, 2x32 KB and 2x64KB respectively. For the snappy case, a producer with 5000 partitions will require an additional 315 MB of JVM heap.</li>
     <li> The default value of the configuration parameter <code>receive.buffer.bytes</code> is now 64K for the new consumer.</li>
     <li> The new consumer now exposes the configuration parameter <code>exclude.internal.topics</code> to restrict internal topics (such as the consumer offsets topic) from accidentally being included in regular expression subscriptions. By default, it is enabled.</li>
     <li> The old Scala producer has been deprecated. Users should migrate their code to the Java producer included in the kafka-clients JAR as soon as possible. </li>