KAFKA-2311; Make KafkaConsumer's ensureNotClosed method thread-safe

Here is the patch on github ijuma.

Acquiring the consumer lock (the single thread access controls) requires that the consumer be open. I changed the closed variable to be volatile so that another thread's writes will visible to the reading thread.

Additionally, there was an additional check if the consumer was closed after the lock was acquired. This check is no longer necessary.

This is my original work and I license it to the project under the project's open source license.

Author: Tim Brooks <tim@uncontended.net>

Reviewers: Jason Gustafson <jason@confluent.io>

Closes #1637 from tbrooks8/KAFKA-2311
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index ade4243..cfa046f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -508,7 +508,7 @@
     private final Metadata metadata;
     private final long retryBackoffMs;
     private final long requestTimeoutMs;
-    private boolean closed = false;
+    private volatile boolean closed = false;
 
     // currentThread holds the threadId of the current thread accessing KafkaConsumer
     // and is used to prevent multi-threaded access
@@ -1397,7 +1397,6 @@
     public void close() {
         acquire();
         try {
-            if (closed) return;
             close(false);
         } finally {
             release();