(TWILL-265) Upgrading kafka version and removing lz4 dependency

This closes #84 on Github.

Signed-off-by: Terence Yim <terencey@google.com>
diff --git a/pom.xml b/pom.xml
index b76b1b3..401d97c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -165,7 +165,7 @@
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <surefire.redirectTestOutputToFile>true</surefire.redirectTestOutputToFile>
-        <slf4j.version>1.7.5</slf4j.version>
+        <slf4j.version>1.7.30</slf4j.version>
         <logback.version>1.0.9</logback.version>
         <guava.version>13.0.1</guava.version>
         <gson.version>2.2.4</gson.version>
@@ -174,7 +174,7 @@
         <snappy-java.version>1.0.5</snappy-java.version>
         <jcl-over-slf4j.version>1.7.2</jcl-over-slf4j.version>
         <asm.version>7.1</asm.version>
-        <kafka.version>0.8.0</kafka.version>
+        <kafka.version>0.8.2.2</kafka.version>
         <zkclient.version>0.10</zkclient.version>
         <zookeeper.version>3.4.5</zookeeper.version>
         <junit.version>4.11</junit.version>
@@ -754,8 +754,17 @@
                         <groupId>com.sun.jmx</groupId>
                         <artifactId>jmxri</artifactId>
                     </exclusion>
+                    <exclusion>
+                        <groupId>net.jpountz.lz4</groupId>
+                        <artifactId>lz4</artifactId>
+                    </exclusion>
                 </exclusions>
             </dependency>
+            <dependency>
+                <groupId>org.slf4j</groupId>
+                <artifactId>log4j-over-slf4j</artifactId>
+                <version>${slf4j.version}</version>
+            </dependency>
 	    <dependency>
 	      <groupId>net.sf.jopt-simple</groupId>
 	      <artifactId>jopt-simple</artifactId>
diff --git a/twill-core/pom.xml b/twill-core/pom.xml
index 249c061..603e809 100644
--- a/twill-core/pom.xml
+++ b/twill-core/pom.xml
@@ -89,6 +89,10 @@
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka_2.10</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>log4j-over-slf4j</artifactId>
+        </dependency>
 	<!-- https://mvnrepository.com/artifact/net.sf.jopt-simple/jopt-simple -->
 	<dependency>
 	  <groupId>net.sf.jopt-simple</groupId>
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IntegerPartitioner.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IntegerPartitioner.java
deleted file mode 100644
index 4aa7940..0000000
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/IntegerPartitioner.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.twill.internal.kafka.client;
-
-import kafka.producer.Partitioner;
-import kafka.utils.VerifiableProperties;
-
-/**
- * A kafka {@link kafka.producer.Partitioner} using integer key to compute partition id.
- */
-public final class IntegerPartitioner implements Partitioner<Integer> {
-
-  public IntegerPartitioner(VerifiableProperties properties) {
-  }
-
-  public int partition(Integer key, int numPartitions) {
-    return key % numPartitions;
-  }
-}
diff --git a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
index e5d0f8d..832faa8 100644
--- a/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
+++ b/twill-core/src/main/java/org/apache/twill/internal/kafka/client/SimpleKafkaPublisher.java
@@ -47,6 +47,7 @@
 final class SimpleKafkaPublisher implements KafkaPublisher {
 
   private static final Logger LOG = LoggerFactory.getLogger(SimpleKafkaPublisher.class);
+  private static final int MAX_MESSAGE_BYTES = 1024 * 1024 * 10;
 
   private final BrokerService brokerService;
   private final Ack ack;
@@ -171,9 +172,9 @@
         props.put("metadata.broker.list", newBrokerList);
         props.put("serializer.class", ByteBufferEncoder.class.getName());
         props.put("key.serializer.class", IntegerEncoder.class.getName());
-        props.put("partitioner.class", IntegerPartitioner.class.getName());
         props.put("request.required.acks", Integer.toString(ack.getAck()));
         props.put("compression.codec", compression.getCodec());
+        props.put("message.max.bytes", Integer.toString(MAX_MESSAGE_BYTES));
 
         ProducerConfig config = new ProducerConfig(props);
         newProducer = new Producer<>(config);
diff --git a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
index 958925c..e7e7e7c 100644
--- a/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
+++ b/twill-core/src/test/java/org/apache/twill/kafka/client/KafkaTest.java
@@ -135,6 +135,9 @@
           server = new EmbeddedKafkaServer(kafkaServerConfig);
           server.startAndWait();
 
+          // Wait a little while to make sure changes is reflected in broker service
+          TimeUnit.SECONDS.sleep(3);
+
           // Publish another message
           createPublishThread(kafkaClient, topic, Compression.NONE, "Second message", 1).start();
 
@@ -308,16 +311,14 @@
 
   private Thread createPublishThread(final KafkaClient kafkaClient, final String topic, final Compression compression,
                                      final String message, final int count, final int base) {
-    return new Thread() {
-      public void run() {
-        KafkaPublisher publisher = kafkaClient.getPublisher(KafkaPublisher.Ack.ALL_RECEIVED, compression);
-        KafkaPublisher.Preparer preparer = publisher.prepare(topic);
-        for (int i = 0; i < count; i++) {
-          preparer.add(Charsets.UTF_8.encode((base + i) + " " + message), 0);
-        }
-        Futures.getUnchecked(preparer.send());
+    return new Thread(() -> {
+      KafkaPublisher publisher = kafkaClient.getPublisher(KafkaPublisher.Ack.ALL_RECEIVED, compression);
+      KafkaPublisher.Preparer preparer = publisher.prepare(topic);
+      for (int i = 0; i < count; i++) {
+        preparer.add(Charsets.UTF_8.encode((base + i) + " " + message), 0);
       }
-    };
+      Futures.getUnchecked(preparer.send());
+    });
   }
 
 
diff --git a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
index b27dcd8..ca0bc08 100644
--- a/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
+++ b/twill-yarn/src/main/java/org/apache/twill/internal/ServiceMain.java
@@ -18,8 +18,11 @@
 package org.apache.twill.internal;
 
 import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.classic.util.ContextInitializer;
+import ch.qos.logback.core.filter.Filter;
 import ch.qos.logback.core.joran.spi.JoranException;
+import ch.qos.logback.core.spi.FilterReply;
 import com.google.common.base.Throwables;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.util.concurrent.Futures;
@@ -211,7 +214,14 @@
       }
     }
 
+    KafkaAppender kafkaAppender = getKafkaAppender(context);
+    kafkaAppender.start();
+
     // Attach the KafkaAppender to the root logger
+    context.getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME).addAppender(kafkaAppender);
+  }
+
+  private KafkaAppender getKafkaAppender(LoggerContext context) {
     KafkaAppender kafkaAppender = new KafkaAppender();
     kafkaAppender.setName("KAFKA");
     kafkaAppender.setTopic(Constants.LOG_TOPIC);
@@ -223,10 +233,15 @@
       kafkaAppender.setRunnableName(runnableName);
     }
 
+    kafkaAppender.addFilter(new Filter<ILoggingEvent>() {
+      @Override
+      public FilterReply decide(ILoggingEvent event) {
+        return event.getLoggerName().startsWith("kafka.") ? FilterReply.DENY : FilterReply.ACCEPT;
+      }
+    });
     kafkaAppender.setContext(context);
-    kafkaAppender.start();
 
-    context.getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME).addAppender(kafkaAppender);
+    return kafkaAppender;
   }
 
   /**