Update to kafka 2.2.1

Remove duplication of KafkaData, KafkaSource, KafkaInputFormat in order to only retain
the variants from org.apache.crunch.kafka.record that were already mostly compatible
with kafka 2.2.1. Fix some remaining incompatibilities, in particular related to reading
offset information from the broker.

Signed-off-by: Josh Wills <jwills@apache.org>
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaData.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaData.java
deleted file mode 100644
index 6543aad..0000000
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaData.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.kafka;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.ReadableData;
-import org.apache.crunch.SourceTarget;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-class KafkaData<K, V> implements ReadableData<Pair<K, V>> {
-
-  private static final long serialVersionUID = -6582212311361579556L;
-
-  private final Map<TopicPartition, Pair<Long, Long>> offsets;
-  private final Properties props;
-
-  public KafkaData(Properties connectionProperties,
-                   Map<TopicPartition, Pair<Long, Long>> offsets) {
-    this.props = connectionProperties;
-    this.offsets = offsets;
-  }
-
-
-  @Override
-  public Set<SourceTarget<?>> getSourceTargets() {
-    return null;
-  }
-
-  @Override
-  public void configure(Configuration conf) {
-    //no-op
-  }
-
-  @Override
-  public Iterable<Pair<K, V>> read(TaskInputOutputContext<?, ?, ?, ?> context) throws IOException {
-    Consumer<K, V> consumer = new KafkaConsumer<K, V>(props);
-    return new KafkaRecordsIterable<>(consumer, offsets, props);
-  }
-}
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaRecordsIterable.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaRecordsIterable.java
deleted file mode 100644
index 6f1b564..0000000
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaRecordsIterable.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.kafka;
-
-import org.apache.crunch.Pair;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.RetriableException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Properties;
-import java.util.Set;
-
-
-class KafkaRecordsIterable<K, V> implements Iterable<Pair<K, V>> {
-
-  /**
-   * Logger
-   */
-  private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordsIterable.class);
-
-  /**
-   * The Kafka consumer responsible for retrieving messages.
-   */
-  private final Consumer<K, V> consumer;
-
-  /**
-   * The starting positions of the iterable for the topic.
-   */
-  private final Map<TopicPartition, Pair<Long, Long>> offsets;
-
-  /**
-   * Tracks if the iterable is empty.
-   */
-  private final boolean isEmpty;
-
-  /**
-   * The poll time between each request to Kafka
-   */
-  private final long scanPollTime;
-
-  private final int maxRetryAttempts;
-
-  /**
-   * Creates the iterable that will pull values for a collection of topics using the provided {@code consumer} between
-   * the {@code startOffsets} and {@code stopOffsets}.
-   * @param consumer The consumer for pulling the data from Kafka.  The consumer will be closed automatically once all
-   *                 of the records have been consumed.
-   * @param offsets offsets for pulling data
-   * @param properties properties for tweaking the behavior of the iterable.
-   * @throws IllegalArgumentException if any of the arguments are {@code null} or empty.
-   */
-  public KafkaRecordsIterable(Consumer<K, V> consumer, Map<TopicPartition, Pair<Long, Long>> offsets,
-                              Properties properties) {
-    if (consumer == null) {
-      throw new IllegalArgumentException("The 'consumer' cannot be 'null'.");
-    }
-    this.consumer = consumer;
-
-    if (properties == null) {
-      throw new IllegalArgumentException("The 'properties' cannot be 'null'.");
-    }
-
-    String retryString = properties.getProperty(KafkaUtils.KAFKA_RETRY_ATTEMPTS_KEY,
-        KafkaUtils.KAFKA_RETRY_ATTEMPTS_DEFAULT_STRING);
-    maxRetryAttempts = Integer.parseInt(retryString);
-
-    if (offsets == null || offsets.isEmpty()) {
-      throw new IllegalArgumentException("The 'offsets' cannot 'null' or empty.");
-    }
-
-    //filter out any topics and partitions that do not have offset ranges that will produce data.
-    Map<TopicPartition, Pair<Long, Long>> filteredOffsets = new HashMap<>();
-    for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) {
-      Pair<Long, Long> value = entry.getValue();
-      //if start is less than one less than stop then there is data to be had
-      if(value.first() < value.second()){
-        filteredOffsets.put(entry.getKey(), value);
-      }else{
-        LOG.debug("Removing offsets for {} because start is not less than the end offset.", entry.getKey());
-      }
-    }
-
-    //check to make sure that based on the offsets there is data to retrieve, otherwise false.
-    //there will be data if the start offsets are less than stop offsets
-    isEmpty = filteredOffsets.isEmpty();
-    if (isEmpty) {
-      LOG.warn("Iterable for Kafka for is empty because offsets are empty.");
-    }
-
-    //assign this
-    this.offsets = filteredOffsets;
-
-    scanPollTime = Long.parseLong(properties.getProperty(KafkaSource.CONSUMER_POLL_TIMEOUT_KEY,
-        Long.toString(KafkaSource.CONSUMER_POLL_TIMEOUT_DEFAULT)));
-  }
-
-  @Override
-  public Iterator<Pair<K, V>> iterator() {
-    if (isEmpty) {
-      LOG.debug("Returning empty iterator since offsets align.");
-      return Collections.emptyIterator();
-    }
-    //Assign consumer to all of the partitions
-    LOG.debug("Assigning topics and partitions and seeking to start offsets.");
-
-    consumer.assign(new LinkedList<>(offsets.keySet()));
-    //hack so maybe look at removing this
-    consumer.poll(0);
-    for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) {
-      consumer.seek(entry.getKey(), entry.getValue().first());
-    }
-
-    return new RecordsIterator<K, V>(consumer, offsets, scanPollTime, maxRetryAttempts);
-  }
-
-  private static class RecordsIterator<K, V> implements Iterator<Pair<K, V>> {
-
-    private final Consumer<K, V> consumer;
-    private final Map<TopicPartition, Pair<Long, Long>> offsets;
-    private final long pollTime;
-    private final int maxNumAttempts;
-    private ConsumerRecords<K, V> records;
-    private Iterator<ConsumerRecord<K, V>> currentIterator;
-    private final Set<TopicPartition> remainingPartitions;
-
-    private Pair<K, V> next;
-
-    public RecordsIterator(Consumer<K, V> consumer,
-                           Map<TopicPartition, Pair<Long, Long>> offsets, long pollTime, int maxNumRetries) {
-      this.consumer = consumer;
-      remainingPartitions = new HashSet<>(offsets.keySet());
-      this.offsets = offsets;
-      this.pollTime = pollTime;
-      this.maxNumAttempts = maxNumRetries;
-    }
-
-    @Override
-    public boolean hasNext() {
-      if (next != null)
-        return true;
-
-      //if partitions to consume then pull next value
-      if (remainingPartitions.size() > 0) {
-        next = getNext();
-      }
-
-      return next != null;
-    }
-
-    @Override
-    public Pair<K, V> next() {
-      if (next == null) {
-        next = getNext();
-      }
-
-      if (next != null) {
-        Pair<K, V> returnedNext = next;
-        //prime for next call
-        next = getNext();
-        //return the current next
-        return returnedNext;
-      } else {
-        throw new NoSuchElementException("No more elements.");
-      }
-    }
-
-    @Override
-    public void remove() {
-      throw new UnsupportedOperationException("remove is not supported.");
-    }
-
-    /**
-     * Gets the current iterator.
-     *
-     * @return the current iterator or {@code null} if there are no more values to consume.
-     */
-    private Iterator<ConsumerRecord<K, V>> getIterator() {
-      if (!remainingPartitions.isEmpty()) {
-        if (currentIterator != null && currentIterator.hasNext()) {
-          return currentIterator;
-        }
-        LOG.debug("Retrieving next set of records.");
-        int numTries = 0;
-        boolean notSuccess = false;
-        while(!notSuccess && numTries < maxNumAttempts) {
-          try {
-            records = consumer.poll(pollTime);
-            notSuccess = true;
-          }catch(RetriableException re){
-            numTries++;
-            if(numTries < maxNumAttempts) {
-              LOG.warn("Error pulling messages from Kafka. Retrying with attempt {}", numTries, re);
-            }else{
-              LOG.error("Error pulling messages from Kafka. Exceeded maximum number of attempts {}", maxNumAttempts, re);
-              throw re;
-            }
-          }
-        }
-        if (records == null || records.isEmpty()) {
-          LOG.debug("Retrieved empty records.");
-          currentIterator = null;
-          return null;
-        }
-        currentIterator = records.iterator();
-        return currentIterator;
-      }
-
-      LOG.debug("No more partitions to consume therefore not retrieving any more records.");
-      return null;
-    }
-
-    /**
-     * Internal method for retrieving the next value to retrieve.
-     *
-     * @return {@code null} if there are no more values to retrieve otherwise the next event.
-     */
-    private Pair<K, V> getNext() {
-      while (!remainingPartitions.isEmpty()) {
-        Iterator<ConsumerRecord<K, V>> iterator = getIterator();
-
-        while (iterator != null && iterator.hasNext()) {
-          ConsumerRecord<K, V> record = iterator.next();
-          TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
-          long offset = record.offset();
-
-          if (withinRange(topicPartition, offset)) {
-            LOG.debug("Retrieving value for {} with offset {}.", topicPartition, offset);
-            return Pair.of(record.key(), record.value());
-          }
-          LOG.debug("Value for {} with offset {} is outside of range skipping.", topicPartition, offset);
-        }
-      }
-
-      LOG.debug("Closing the consumer because there are no more remaining partitions.");
-      consumer.close();
-
-      LOG.debug("Consumed data from all partitions.");
-      return null;
-
-    }
-
-    /**
-     * Checks whether the value for {@code topicPartition} with an {@code offset} is within scan range.  If
-     * the value is not then {@code false} is returned otherwise {@code true}.
-     *
-     * @param topicPartion The partition for the offset
-     * @param offset the offset in the partition
-     * @return {@code true} if the value is within the expected consumption range, otherwise {@code false}.
-     */
-    private boolean withinRange(TopicPartition topicPartion, long offset) {
-      long endOffset = offsets.get(topicPartion).second();
-      //end offsets are one higher than the last written value.
-      boolean emit = offset < endOffset;
-      if (offset >= endOffset - 1) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Completed consuming partition {} with offset {} and ending offset {}.",
-              new Object[]{topicPartion, offset, endOffset});
-        }
-        remainingPartitions.remove(topicPartion);
-        consumer.pause(Collections.singleton(topicPartion));
-      }
-      LOG.debug("Value for partition {} and offset {} is within range.", topicPartion, offset);
-      return emit;
-    }
-  }
-}
-
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java
deleted file mode 100644
index fcf002b..0000000
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.kafka;
-
-
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Pair;
-import org.apache.crunch.ReadableData;
-import org.apache.crunch.Source;
-import org.apache.crunch.TableSource;
-import org.apache.crunch.impl.mr.run.CrunchMapper;
-import org.apache.crunch.io.CrunchInputs;
-import org.apache.crunch.io.FormatBundle;
-import org.apache.crunch.io.ReadableSource;
-import org.apache.crunch.kafka.inputformat.KafkaInputFormat;
-import org.apache.crunch.types.Converter;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.writable.Writables;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-/**
- * A Crunch Source that will retrieve events from Kafka given start and end offsets.  The source is not designed to
- * process unbounded data but instead to retrieve data between a specified range.
- * <p>
- *
- * The values retrieved from Kafka are returned as raw bytes inside of a {@link BytesWritable}.  If callers
- * need specific parsing logic based on the topic then consumers are encouraged to use multiple Kafka Sources
- * for each topic and use special {@link DoFn} to parse the payload.
- *
- * @deprecated Use {@link org.apache.crunch.kafka.record.KafkaSource} instead
- */
-@Deprecated
-public class KafkaSource
-    implements TableSource<BytesWritable, BytesWritable>, ReadableSource<Pair<BytesWritable, BytesWritable>> {
-
-  private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
-
-  private final FormatBundle inputBundle;
-  private final Properties props;
-  private final Map<TopicPartition, Pair<Long, Long>> offsets;
-
-  /**
-   * The consistent PType describing all of the data being retrieved from Kafka as a BytesWritable.
-   */
-  private static PTableType<BytesWritable, BytesWritable> KAFKA_SOURCE_TYPE =
-      Writables.tableOf(Writables.writables(BytesWritable.class), Writables.writables(BytesWritable.class));
-
-  /**
-   * Constant to indicate how long the reader waits before timing out when retrieving data from Kafka.
-   */
-  public static final String CONSUMER_POLL_TIMEOUT_KEY = "org.apache.crunch.kafka.consumer.poll.timeout";
-
-  /**
-   * Default timeout value for {@link #CONSUMER_POLL_TIMEOUT_KEY} of 1 second.
-   */
-  public static final long CONSUMER_POLL_TIMEOUT_DEFAULT = 1000L;
-
-
-  /**
-   * Constructs a Kafka source that will read data from the Kafka cluster identified by the {@code kafkaConnectionProperties}
-   * and from the specific topics and partitions identified in the {@code offsets}
-   * @param kafkaConnectionProperties The connection properties for reading from Kafka.  These properties will be honored
-   *                                  with the exception of the {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and
-   *                                  {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
-   * @param offsets A map of {@link TopicPartition} to a pair of start and end offsets respectively.  The start and end offsets
-   *                are evaluated at [start, end) where the ending offset is excluded.  Each TopicPartition must have a
-   *                non-null pair describing its offsets.  The start offset should be less than the end offset.  If the values
-   *                are equal or start is greater than the end then that partition will be skipped.
-   */
-  public KafkaSource(Properties kafkaConnectionProperties, Map<TopicPartition, Pair<Long, Long>> offsets) {
-    this.props = copyAndSetProperties(kafkaConnectionProperties);
-
-    inputBundle = createFormatBundle(props, offsets);
-
-    this.offsets = Collections.unmodifiableMap(new HashMap<>(offsets));
-  }
-
-  @Override
-  public Source<Pair<BytesWritable, BytesWritable>> inputConf(String key, String value) {
-    inputBundle.set(key, value);
-    return this;
-  }
-
-  @Override
-  public Source<Pair<BytesWritable, BytesWritable>> fileSystem(FileSystem fileSystem) {
-    // not currently applicable/supported for Kafka
-    return this;
-  }
-
-  @Override
-  public FileSystem getFileSystem() {
-    // not currently applicable/supported for Kafka
-    return null;
-  }
-
-  @Override
-  public PType<Pair<BytesWritable, BytesWritable>> getType() {
-    return KAFKA_SOURCE_TYPE;
-  }
-
-  @Override
-  public Converter<?, ?, ?, ?> getConverter() {
-    return KAFKA_SOURCE_TYPE.getConverter();
-  }
-
-  @Override
-  public PTableType<BytesWritable, BytesWritable> getTableType() {
-    return KAFKA_SOURCE_TYPE;
-  }
-
-  @Override
-  public long getSize(Configuration configuration) {
-    // TODO something smarter here.
-    return 1000L * 1000L * 1000L;
-  }
-
-  @Override
-  public String toString() {
-    return "KafkaSource("+props.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)+")";
-  }
-
-  @Override
-  public long getLastModifiedAt(Configuration configuration) {
-    LOG.warn("Cannot determine last modified time for source: {}", toString());
-    return -1;
-  }
-
-  private static <K, V> FormatBundle createFormatBundle(Properties kafkaConnectionProperties,
-                                                        Map<TopicPartition, Pair<Long, Long>> offsets) {
-
-    FormatBundle<KafkaInputFormat> bundle = FormatBundle.forInput(KafkaInputFormat.class);
-
-    KafkaInputFormat.writeOffsetsToBundle(offsets, bundle);
-    KafkaInputFormat.writeConnectionPropertiesToBundle(kafkaConnectionProperties, bundle);
-
-    return bundle;
-  }
-
-  private static <K, V> Properties copyAndSetProperties(Properties kafkaConnectionProperties) {
-    Properties props = new Properties();
-
-    //set the default to be earliest for auto reset but allow it to be overridden if appropriate.
-    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-    props.putAll(kafkaConnectionProperties);
-
-    //Setting the key/value deserializer to ensure proper translation from Kafka to PType format.
-    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
-    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
-
-    return KafkaInputFormat.tagExistingKafkaConnectionProperties(props);
-  }
-
-
-  @Override
-  public Iterable<Pair<BytesWritable, BytesWritable>> read(Configuration conf) throws IOException {
-    // consumer will get closed when the iterable is fully consumed.
-    // skip using the inputformat/splits since this will be read in a single JVM and don't need the complexity
-    // of parallelism when reading.
-    Consumer<BytesWritable, BytesWritable> consumer = new KafkaConsumer<>(KafkaInputFormat.filterConnectionProperties(props));
-    return new KafkaRecordsIterable<>(consumer, offsets, KafkaInputFormat.filterConnectionProperties(props));
-  }
-
-
-  @Override
-  public void configureSource(Job job, int inputId) throws IOException {
-    Configuration conf = job.getConfiguration();
-    //an id of -1 indicates that this is the only input so just use it directly
-    if (inputId == -1) {
-      job.setMapperClass(CrunchMapper.class);
-      job.setInputFormatClass(inputBundle.getFormatClass());
-      inputBundle.configure(conf);
-    } else {
-      //there are multiple inputs for this mapper so add it as a CrunchInputs and need a fake path just to
-      //make it play well with other file based inputs.
-      Path dummy = new Path("/kafka/" + inputId);
-      CrunchInputs.addInputPath(job, dummy, inputBundle, inputId);
-    }
-  }
-
-  @Override
-  public ReadableData<Pair<BytesWritable, BytesWritable>> asReadable() {
-    // skip using the inputformat/splits since this will be read in a single JVM and don't need the complexity
-    // of parallelism when reading.
-    return new KafkaData<>(props, offsets);
-  }
-
-  //exposed for testing purposes
-  FormatBundle getInputBundle() {
-    return inputBundle;
-  }
-
-  /**
-   * Basic {@link Deserializer} which simply wraps the payload as a {@link BytesWritable}.
-   */
-  public static class BytesDeserializer implements Deserializer<BytesWritable> {
-
-    @Override
-    public void configure(Map<String, ?> configProperties, boolean isKey) {
-      //no-op
-    }
-
-    @Override
-    public BytesWritable deserialize(String topic, byte[] valueBytes) {
-      return new BytesWritable(valueBytes);
-    }
-
-    @Override
-    public void close() {
-      //no-op
-    }
-  }
-}
\ No newline at end of file
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
index 0e9d750..b6793b1 100644
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
@@ -17,50 +17,24 @@
  */
 package org.apache.crunch.kafka;
 
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.cluster.Broker;
-import kafka.cluster.BrokerEndPoint;
-import kafka.cluster.EndPoint;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.OffsetRequest;
-import kafka.javaapi.OffsetResponse;
-import kafka.javaapi.PartitionMetadata;
-import kafka.javaapi.TopicMetadata;
-import kafka.javaapi.TopicMetadataRequest;
-import kafka.javaapi.TopicMetadataResponse;
-import kafka.javaapi.consumer.SimpleConsumer;
-import org.apache.commons.lang.StringUtils;
-import org.apache.crunch.CrunchRuntimeException;
+import com.google.common.collect.ImmutableList;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.network.ListenerName;
-import org.apache.kafka.common.security.auth.SecurityProtocol; 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Option;
-import scala.collection.JavaConversions;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Random;
+import java.util.stream.Collectors;
 
 /**
  * Simple utilities for retrieving offset and Kafka information to assist in setting up and configuring a
- * {@link KafkaSource} instance.
+ * {@link org.apache.crunch.kafka.record.KafkaSource} instance.
  */
 public class KafkaUtils {
 
-  private static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);
-
-  private static final String CLIENT_ID = "crunch-kafka-client";
-
-  private static final Random RANDOM = new Random();
-
   /**
    * Configuration property for the number of retry attempts that will be made to Kafka.
    */
@@ -73,20 +47,8 @@
   public static final String KAFKA_RETRY_ATTEMPTS_DEFAULT_STRING = Integer.toString(KAFKA_RETRY_ATTEMPTS_DEFAULT);
 
   /**
-   * Configuration property for the number of retry attempts that will be made to Kafka in the event of getting empty
-   * responses.
-   */
-  public static final String KAFKA_EMPTY_RETRY_ATTEMPTS_KEY = "org.apache.crunch.kafka.retry.empty.attempts";
-
-  /**
-   * Default number of empty retry attempts.
-   */
-  public static final int KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT = 10;
-  public static final String KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT_STRING =
-    Integer.toString(KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT);
-
-  /**
    * Converts the provided {@code config} into a {@link Properties} object to connect with Kafka.
+   *
    * @param config the config to read properties
    * @return a properties instance populated with all of the values inside the provided {@code config}.
    */
@@ -101,8 +63,9 @@
 
   /**
    * Adds the {@code properties} to the provided {@code config} instance.
+   *
    * @param properties the properties to add to the config.
-   * @param config the configuration instance to be modified.
+   * @param config     the configuration instance to be modified.
    * @return the config instance with the populated properties
    */
   public static Configuration addKafkaConnectionProperties(Properties properties, Configuration config) {
@@ -113,220 +76,19 @@
   }
 
   /**
-   * Returns a {@link TopicMetadataRequest} from the given topics
-   *
-   * @param topics an array of topics you want metadata for
-   * @return a {@link TopicMetadataRequest} from the given topics
-   * @throws IllegalArgumentException if topics is {@code null} or empty, or if any of the topics is null, empty or blank
+   * Returns the {@link TopicPartition}s in a topic, returns an empty list if the topic does not exist.
    */
-  private static TopicMetadataRequest getTopicMetadataRequest(String... topics) {
-    if (topics == null)
-      throw new IllegalArgumentException("topics cannot be null");
-    if (topics.length == 0)
-      throw new IllegalArgumentException("topics cannot be empty");
+  public static List<TopicPartition> getTopicPartitions(Consumer<?, ?> kafkaConsumer, String topic) {
+    List<PartitionInfo> partitionInfos = kafkaConsumer.partitionsFor(topic);
 
-    for (String topic : topics)
-      if (StringUtils.isBlank(topic))
-        throw new IllegalArgumentException("No topic can be null, empty or blank");
-
-    return new TopicMetadataRequest(Arrays.asList(topics));
-  }
-
-  /**
-   * <p>
-   * Retrieves the offset values for an array of topics at the specified time.
-   * </p>
-   * <p>
-   * If the Kafka cluster does not have the logs for the partition at the specified time or if the topic did not exist
-   * at that time this will instead return the earliest offset for that partition.
-   * </p>
-   *
-   * @param properties the properties containing the configuration for kafka
-   * @param time       the time at which we want to know what the offset values were
-   * @param topics     the topics we want to know the offset values of
-   * @return the offset values for an array of topics at the specified time
-   * @throws IllegalArgumentException if properties is {@code null} or if topics is {@code null} or empty or if any of
-   *                                  the topics are {@code null}, empty or blank, or if there is an error parsing the
-   *                                  properties.
-   * @throws IllegalStateException if there is an error communicating with the Kafka cluster to retrieve information.
-   * @deprecated As of 1.0. Use beginning/end offset APIs on {@link org.apache.kafka.clients.consumer.Consumer}
-   */
-  @Deprecated
-  public static Map<TopicPartition, Long> getBrokerOffsets(Properties properties, long time, String... topics) {
-    if (properties == null)
-      throw new IllegalArgumentException("properties cannot be null");
-
-    final List<Broker> brokers = getBrokers(properties);
-    Collections.shuffle(brokers, RANDOM);
-
-    return getBrokerOffsets(brokers, time, topics);
-  }
-
-  // Visible for testing
-  static Map<TopicPartition, Long> getBrokerOffsets(List<Broker> brokers, long time, String... topics) {
-    if (topics == null)
-      throw new IllegalArgumentException("topics cannot be null");
-    if (topics.length == 0)
-      throw new IllegalArgumentException("topics cannot be empty");
-
-    for (String topic : topics)
-      if (StringUtils.isBlank(topic))
-        throw new IllegalArgumentException("No topic can be null, empty or blank");
-
-    TopicMetadataResponse topicMetadataResponse = null;
-
-    final TopicMetadataRequest topicMetadataRequest = getTopicMetadataRequest(topics);
-
-    for (final Broker broker : brokers) {
-      final SimpleConsumer consumer = getSimpleConsumer(broker);
-      try {
-        topicMetadataResponse = consumer.send(topicMetadataRequest);
-        break;
-      } catch (Exception err) {
-        EndPoint endpoint = JavaConversions.seqAsJavaList(broker.endPoints()).get(0);
-        LOG.warn(String.format("Fetching topic metadata for topic(s) '%s' from broker '%s' failed",
-            Arrays.toString(topics), endpoint.host()), err);
-      } finally {
-        consumer.close();
-      }
+    // This conversion of null to empty list is consistent with https://issues.apache.org/jira/browse/KAFKA-2358
+    if (partitionInfos == null) {
+      return ImmutableList.of();
+    } else {
+      return partitionInfos.stream()
+              .map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
+              .collect(Collectors.toList());
     }
-
-    if (topicMetadataResponse == null) {
-      throw new IllegalStateException(String.format("Fetching topic metadata for topic(s) '%s' from broker(s) '%s' failed",
-          Arrays.toString(topics), Arrays.toString(brokers.toArray())));
-    }
-
-    // From the topic metadata, build a PartitionOffsetRequestInfo for each partition of each topic. It should be noted that
-    // only the leader Broker has the partition offset information[1] so save the leader Broker so we
-    // can send the request to it.
-    // [1] - https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html#AGuideToTheKafkaProtocol-OffsetAPI
-    Map<Broker, Map<TopicAndPartition, PartitionOffsetRequestInfo>> brokerRequests =
-        new HashMap<>();
-
-    for (TopicMetadata metadata : topicMetadataResponse.topicsMetadata()) {
-      for (PartitionMetadata partition : metadata.partitionsMetadata()) {
-        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
-            new HashMap<>();
-
-        BrokerEndPoint brokerEndPoint = partition.leader();
-        if(brokerEndPoint == null){
-          throw new CrunchRuntimeException("Unable to find leader for topic:"+metadata.topic()
-            +" partition:"+partition.partitionId());
-        }
-
-        EndPoint endPoint = new EndPoint(brokerEndPoint.host(), brokerEndPoint.port(),
-            ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT);
-
-        Broker leader = new Broker(0, JavaConversions.asScalaBuffer(Arrays.asList(endPoint)),
-            Option.<String>empty());
-
-        if (brokerRequests.containsKey(leader))
-          requestInfo = brokerRequests.get(leader);
-
-        requestInfo.put(new TopicAndPartition(metadata.topic(), partition.partitionId()), new PartitionOffsetRequestInfo(
-            time, 1));
-
-        brokerRequests.put(leader, requestInfo);
-      }
-    }
-
-    Map<TopicPartition, Long> topicPartitionToOffset = new HashMap<>();
-
-    // Send the offset request to the leader broker
-    for (Map.Entry<Broker, Map<TopicAndPartition, PartitionOffsetRequestInfo>> brokerRequest : brokerRequests.entrySet()) {
-      SimpleConsumer simpleConsumer = getSimpleConsumer(brokerRequest.getKey());
-
-      OffsetResponse offsetResponse = null;
-      try {
-        OffsetRequest offsetRequest = new OffsetRequest(brokerRequest.getValue(), kafka.api.OffsetRequest.CurrentVersion(),
-            CLIENT_ID);
-        offsetResponse = simpleConsumer.getOffsetsBefore(offsetRequest);
-      } finally {
-        simpleConsumer.close();
-      }
-
-      Map<TopicPartition, Long> earliestOffsets = null;
-
-      // Retrieve/parse the results
-      for (Map.Entry<TopicAndPartition, PartitionOffsetRequestInfo> entry : brokerRequest.getValue().entrySet()) {
-        TopicAndPartition topicAndPartition = entry.getKey();
-        TopicPartition topicPartition = new TopicPartition(topicAndPartition.topic(), topicAndPartition.partition());
-        long[] offsets = offsetResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition());
-        long offset;
-
-        // The Kafka API will return no value if a time is given which there is no log that contains messages from that time
-        // (i.e. before a topic existed or in a log that was rolled/cleaned)
-        if (offsets.length > 0) {
-          offset = offsets[0];
-        } else {
-          LOG.info("Kafka did not have an offset for topic/partition [{}]. Returning earliest known offset instead",
-              topicAndPartition);
-
-          // This shouldn't happen but if kafka's API did not provide us with a value and we are asking for the earliest
-          // time we can't be sure what to do so quit
-          if (time == kafka.api.OffsetRequest.EarliestTime())
-            throw new IllegalStateException("We requested the earliest offsets for topic [" + topicAndPartition.topic()
-                + "] but Kafka returned no values");
-
-          // Load the earliest offsets for the topic if it hasn't been loaded already
-          if (earliestOffsets == null)
-            earliestOffsets = getBrokerOffsets(Arrays.asList(brokerRequest.getKey()),
-                kafka.api.OffsetRequest.EarliestTime(), topicAndPartition.topic());
-
-          offset = earliestOffsets.get(topicPartition);
-        }
-
-        topicPartitionToOffset.put(topicPartition, offset);
-      }
-    }
-
-    return topicPartitionToOffset;
-  }
-
-  /**
-   * Returns a {@link SimpleConsumer} connected to the given {@link Broker}
-   */
-  private static SimpleConsumer getSimpleConsumer(final Broker broker) {
-    // BrokerHost, BrokerPort, timeout, buffer size, client id
-    EndPoint endpoint = JavaConversions.seqAsJavaList(broker.endPoints()).get(0);
-    return new SimpleConsumer(endpoint.host(), endpoint.port(), 100000, 64 * 1024, CLIENT_ID);
-  }
-
-  /**
-   * Returns a {@link Broker} list from the given {@link Properties}
-   *
-   * @param properties the {@link Properties} with configuration to connect to a Kafka broker
-   */
-  private static List<Broker> getBrokers(final Properties properties) {
-    if (properties == null)
-      throw new IllegalArgumentException("props cannot be null");
-
-    String commaDelimitedBrokerList = properties.getProperty("metadata.broker.list");
-    if (commaDelimitedBrokerList == null)
-      throw new IllegalArgumentException("Unable to find 'metadata.broker.list' in given properties");
-
-    // Split broker list into host/port pairs
-    String[] brokerPortList = commaDelimitedBrokerList.split(",");
-    if (brokerPortList.length < 1)
-      throw new IllegalArgumentException("Unable to parse broker list : [" + Arrays.toString(brokerPortList) + "]");
-
-    final List<Broker> brokers = new ArrayList<Broker>(brokerPortList.length);
-    for (final String brokerHostPortString : brokerPortList) {
-      // Split host/port
-      String[] brokerHostPort = brokerHostPortString.split(":");
-      if (brokerHostPort.length != 2)
-        throw new IllegalArgumentException("Unable to parse host/port from broker string : ["
-            + Arrays.toString(brokerHostPort) + "] from broker list : [" + Arrays.toString(brokerPortList) + "]");
-      try {
-        EndPoint endPoint = new EndPoint(brokerHostPort[0], Integer.parseInt(brokerHostPort[1]),
-            ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT);
-        brokers.add(new Broker(0, JavaConversions.asScalaBuffer(Arrays.asList(endPoint)),
-            Option.<String>empty()));
-      } catch (NumberFormatException e) {
-        throw new IllegalArgumentException("Error parsing broker port : " + brokerHostPort[1], e);
-      }
-    }
-    return brokers;
   }
 
 }
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java
deleted file mode 100644
index 0dadf97..0000000
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.kafka.inputformat;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.crunch.Pair;
-import org.apache.crunch.io.FormatBundle;
-import org.apache.hadoop.conf.Configurable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.kafka.common.TopicPartition;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-/**
- * Basic input format for reading data from Kafka. Data is read and maintained in its pure byte form and wrapped inside of a
- * {@link BytesWritable} instance.
- *
- * Populating the configuration of the input format is handled with the convenience method of
- * {@link #writeOffsetsToConfiguration(Map, Configuration)}.  This should be done to ensure
- * the Kafka offset information is available when the input format {@link #getSplits(JobContext) creates its splits}
- * and {@link #createRecordReader(InputSplit, TaskAttemptContext) readers}.
- *
- * To suppress warnings generated by unused configs in the {@link org.apache.kafka.clients.consumer.ConsumerConfig ConsumerConfig},
- * one can use {@link #tagExistingKafkaConnectionProperties(Properties) tagExistingKafkaConnectionProperties} and
- * {@link #generateConnectionPropertyKey(String) generateConnectionPropertyKey} to prefix Kafka connection properties with
- * "org.apache.crunch.kafka.connection.properties" to allow for retrieval later using {@link #getConnectionPropertyFromKey(String)
- * getConnectionPropertyFromKey} and {@link #filterConnectionProperties(Properties) filterConnectionProperties}.
- */
-
-public class KafkaInputFormat extends InputFormat<BytesWritable, BytesWritable> implements Configurable {
-
-  /**
-   * Constant for constructing configuration keys for the input format.
-   */
-  private static final String KAFKA_INPUT_OFFSETS_BASE = "org.apache.crunch.kafka.offsets.topic";
-
-  /**
-   * Constant used for building configuration keys and specifying partitions.
-   */
-  private static final String PARTITIONS = "partitions";
-
-  /**
-   * Constant used for building configuration keys and specifying the start of a partition.
-   */
-  private static final String START = "start";
-
-  /**
-   * Constant used for building configuration keys and specifying the end of a partition.
-   */
-  private static final String END = "end";
-
-  /**
-   * Regex to discover all of the defined partitions which should be consumed by the input format.
-   */
-  private static final String TOPIC_KEY_REGEX = KAFKA_INPUT_OFFSETS_BASE + "\\..*\\." + PARTITIONS + "$";
-
-  /**
-   * Constant for constructing configuration keys for the Kafka connection properties.
-   */
-  private static final String KAFKA_CONNECTION_PROPERTY_BASE = "org.apache.crunch.kafka.connection.properties";
-
-  /**
-   * Regex to discover all of the defined Kafka connection properties which should be passed to the ConsumerConfig.
-   */
-  private static final Pattern CONNECTION_PROPERTY_REGEX =
-      Pattern.compile(Pattern.quote(KAFKA_CONNECTION_PROPERTY_BASE) + "\\..*$");
-
-  private Configuration configuration;
-
-  @Override
-  public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException {
-    Map<TopicPartition, Pair<Long, Long>> offsets = getOffsets(getConf());
-    List<InputSplit> splits = new LinkedList<>();
-    for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) {
-      TopicPartition topicPartition = entry.getKey();
-
-      long start = entry.getValue().first();
-      long end = entry.getValue().second();
-      if(start != end) {
-        splits.add(new KafkaInputSplit(topicPartition.topic(), topicPartition.partition(), entry.getValue().first(),
-            entry.getValue().second()));
-      }
-    }
-
-    return splits;
-  }
-
-  @Override
-  public RecordReader<BytesWritable, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
-      throws IOException, InterruptedException {
-    return new KafkaRecordReader<>();
-  }
-
-  @Override
-  public void setConf(Configuration configuration) {
-    this.configuration = configuration;
-  }
-
-  @Override
-  public Configuration getConf() {
-    return configuration;
-  }
-
-
-  //The following methods are used for reading and writing Kafka Partition offset information into Hadoop's Configuration
-  //objects and into Crunch's FormatBundle.  For a specific Kafka Topic it might have one or many partitions and for
-  //each partition it will need a start and end offset.  Assuming you have a topic of "abc" and it has 2 partitions the
-  //configuration would be populated with the following:
-  // org.apache.crunch.kafka.offsets.topic.abc.partitions = [0,1]
-  // org.apache.crunch.kafka.offsets.topic.abc.partitions.0.start = <partition start>
-  // org.apache.crunch.kafka.offsets.topic.abc.partitions.0.end = <partition end>
-  // org.apache.crunch.kafka.offsets.topic.abc.partitions.1.start = <partition start>
-  // org.apache.crunch.kafka.offsets.topic.abc.partitions.1.end = <partition end>
-
-  /**
-   * Writes the start and end offsets for the provided topic partitions to the {@code bundle}.
-   *
-   * @param offsets The starting and ending offsets for the topics and partitions.
-   * @param bundle the bundle into which the information should be persisted.
-   */
-  public static void writeOffsetsToBundle(Map<TopicPartition, Pair<Long, Long>> offsets, FormatBundle bundle) {
-    for (Map.Entry<String, String> entry : generateValues(offsets).entrySet()) {
-      bundle.set(entry.getKey(), entry.getValue());
-    }
-  }
-
-  /**
-   * Writes the start and end offsets for the provided topic partitions to the {@code config}.
-   *
-   * @param offsets The starting and ending offsets for the topics and partitions.
-   * @param config the config into which the information should be persisted.
-   */
-  public static void writeOffsetsToConfiguration(Map<TopicPartition, Pair<Long, Long>> offsets, Configuration config) {
-    for (Map.Entry<String, String> entry : generateValues(offsets).entrySet()) {
-      config.set(entry.getKey(), entry.getValue());
-    }
-  }
-
-  /**
-   * Reads the {@code configuration} to determine which topics, partitions, and offsets should be used for reading data.
-   *
-   * @param configuration the configuration to derive the data to read.
-   * @return a map of {@link TopicPartition} to a pair of start and end offsets.
-   * @throws IllegalStateException if the {@code configuration} does not have the start and end offsets set properly
-   * for a partition.
-   */
-  public static Map<TopicPartition, Pair<Long, Long>> getOffsets(Configuration configuration) {
-    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
-    //find configuration for all of the topics with defined partitions
-    Map<String, String> topicPartitionKeys = configuration.getValByRegex(TOPIC_KEY_REGEX);
-
-    //for each topic start to process it's partitions
-    for (String key : topicPartitionKeys.keySet()) {
-      String topic = getTopicFromKey(key);
-      int[] partitions = configuration.getInts(key);
-      //for each partition find and add the start/end offset
-      for (int partitionId : partitions) {
-        TopicPartition topicPartition = new TopicPartition(topic, partitionId);
-        long start = configuration.getLong(generatePartitionStartKey(topic, partitionId),Long.MIN_VALUE);
-        long end = configuration.getLong(generatePartitionEndKey(topic, partitionId),
-            Long.MIN_VALUE);
-
-        if(start == Long.MIN_VALUE || end == Long.MIN_VALUE){
-          throw new IllegalStateException("The "+topicPartition+ "has an invalid start:"+start+ " or end:"+end
-              +" offset configured.");
-        }
-
-        offsets.put(topicPartition, Pair.of(start, end));
-      }
-    }
-
-    return offsets;
-  }
-
-  private static Map<String, String> generateValues(Map<TopicPartition, Pair<Long, Long>> offsets) {
-    Map<String, String> offsetConfigValues = new HashMap<>();
-    Map<String, Set<Integer>> topicsPartitions = new HashMap<>();
-
-    for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) {
-      TopicPartition topicPartition = entry.getKey();
-      String topic = topicPartition.topic();
-      int partition = topicPartition.partition();
-      String startKey = generatePartitionStartKey(topic, partition);
-      String endKey = generatePartitionEndKey(topic, partition);
-      //Add the start and end offsets for a specific partition
-      offsetConfigValues.put(startKey, Long.toString(entry.getValue().first()));
-      offsetConfigValues.put(endKey, Long.toString(entry.getValue().second()));
-
-      Set<Integer> partitions = topicsPartitions.get(topic);
-      if (partitions == null) {
-        partitions = new HashSet<>();
-        topicsPartitions.put(topic, partitions);
-      }
-      partitions.add(partition);
-    }
-
-    //generate the partitions values for each topic
-    for (Map.Entry<String, Set<Integer>> entry : topicsPartitions.entrySet()) {
-      String key = KAFKA_INPUT_OFFSETS_BASE + "." + entry.getKey() + "." + PARTITIONS;
-      Set<Integer> partitions = entry.getValue();
-      String partitionsString = StringUtils.join(partitions, ",");
-      offsetConfigValues.put(key, partitionsString);
-    }
-
-    return offsetConfigValues;
-  }
-
-  static String generatePartitionStartKey(String topic, int partition) {
-    return KAFKA_INPUT_OFFSETS_BASE + "." + topic + "." + PARTITIONS + "." + partition + "." + START;
-  }
-
-  static String generatePartitionEndKey(String topic, int partition) {
-    return KAFKA_INPUT_OFFSETS_BASE + "." + topic + "." + PARTITIONS + "." + partition + "." + END;
-  }
-
-  static String generateTopicPartitionsKey(String topic) {
-    return KAFKA_INPUT_OFFSETS_BASE + "." + topic + "." + PARTITIONS;
-  }
-
-  static String getTopicFromKey(String key) {
-    //strip off the base key + a trailing "."
-    String value = key.substring(KAFKA_INPUT_OFFSETS_BASE.length() + 1);
-    //strip off the end part + a preceding "."
-    value = value.substring(0, (value.length() - (PARTITIONS.length() + 1)));
-
-    return value;
-  }
-
-  // The following methods are convenience methods for dealing with Kafka connection properties. This includes:
-  //    - writing Kafka connection properties to a FormatBundle
-  //    - generating tagged Kafka connection properties using the prefix "org.apache.crunch.kafka.connection.properties"
-  //    - retrieving Kafka connection properties prefixed by "org.apache.crunch.kafka.connection.properties"
-  //    - filtering out Kafka connection properties from a Properties object
-  //    - tagging all properties in a Properties object with the Kafka connection properties prefix
-  // The tagging of the Kafka connection properties allows for suppression of "isn't a known config" ConsumerConfig warnings that
-  // are generated by unused properties carried over from a Hadoop configuration.
-
-  /**
-   * Writes the Kafka connection properties to the {@code bundle}.
-   *
-   * @param connectionProperties the Kafka connection properties
-   * @param bundle the bundle into which the information should be persisted.
-   */
-  public static void writeConnectionPropertiesToBundle(Properties connectionProperties, FormatBundle bundle) {
-    for (final String name : connectionProperties.stringPropertyNames()) {
-      bundle.set(name, connectionProperties.getProperty(name));
-    }
-  }
-
-  /**
-   * Prefixes a given property with "org.apache.crunch.kafka.connection.properties" to allow for filtering with
-   * {@link #filterConnectionProperties(Properties) filterConnectionProperties}.
-   *
-   * @param property the Kafka connection property that will be prefixed for retrieval at a later time.
-   * @return the property prefixed "org.apache.crunch.kafka.connection.properties"
-   */
-  static String generateConnectionPropertyKey(String property) {
-    return KAFKA_CONNECTION_PROPERTY_BASE + "." + property;
-  }
-
-  /**
-   *
-   * Retrieves the original property that was tagged using {@link #generateConnectionPropertyKey(String)
-   * generateConnectionPropertyKey}.
-   *
-   * @param key the key that was tagged using {@link #generateConnectionPropertyKey(String) generateConnectionPropertyKey}.
-   * @return the original property prior to tagging.
-   */
-  static String getConnectionPropertyFromKey(String key) {
-    // Strip off the base key + a trailing "."
-    return key.substring(KAFKA_CONNECTION_PROPERTY_BASE.length() + 1);
-  }
-
-  /**
-   * Generates a {@link Properties} object containing the properties in {@code connectionProperties}, but with every
-   * property prefixed with "org.apache.crunch.kafka.connection.properties".
-   *
-   * @param connectionProperties the properties to be prefixed with "org.apache.crunch.kafka.connection.properties"
-   * @return a {@link Properties} object representing Kafka connection properties
-   */
-  public static Properties tagExistingKafkaConnectionProperties(Properties connectionProperties) {
-    Properties taggedProperties = new Properties();
-
-    for (final String name : connectionProperties.stringPropertyNames()) {
-      taggedProperties.put(generateConnectionPropertyKey(name), connectionProperties.getProperty(name));
-    }
-
-    return taggedProperties;
-  }
-
-  /**
-   * Filters out Kafka connection properties that were tagged using {@link #generateConnectionPropertyKey(String)
-   * generateConnectionPropertyKey}.
-   *
-   * @param props the properties to be filtered.
-   * @return the properties containing Kafka connection information that were tagged using
-   *         {@link #generateConnectionPropertyKey(String)}.
-   */
-  public static Properties filterConnectionProperties(Properties props) {
-    Properties filteredProperties = new Properties();
-
-    for (final String name : props.stringPropertyNames()) {
-      if (CONNECTION_PROPERTY_REGEX.matcher(name).matches()) {
-        filteredProperties.put(getConnectionPropertyFromKey(name), props.getProperty(name));
-      }
-    }
-
-    return filteredProperties;
-  }
-}
\ No newline at end of file
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputSplit.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputSplit.java
deleted file mode 100644
index c8ebc6a..0000000
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputSplit.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.kafka.inputformat;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.kafka.common.TopicPartition;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * InputSplit that represent retrieving data from a single {@link TopicPartition} between the specified start
- * and end offsets.
- */
-public class KafkaInputSplit extends InputSplit implements Writable {
-
-  private long startingOffset;
-  private long endingOffset;
-  private TopicPartition topicPartition;
-
-  /**
-   * Nullary Constructor for creating the instance inside the Mapper instance.
-   */
-  public KafkaInputSplit() {
-
-  }
-
-  /**
-   * Constructs an input split for the provided {@code topic} and {@code partition} restricting data to be between
-   * the {@code startingOffset} and {@code endingOffset}
-   * @param topic the topic for the split
-   * @param partition the partition for the topic
-   * @param startingOffset the start of the split
-   * @param endingOffset the end of the split
-   */
-  public KafkaInputSplit(String topic, int partition, long startingOffset, long endingOffset) {
-    this.startingOffset = startingOffset;
-    this.endingOffset = endingOffset;
-    topicPartition = new TopicPartition(topic, partition);
-  }
-
-  @Override
-  public long getLength() throws IOException, InterruptedException {
-    // This is just used as a hint for size of bytes so it is already inaccurate.
-    return startingOffset > 0 ? endingOffset - startingOffset : endingOffset;
-  }
-
-  @Override
-  public String[] getLocations() throws IOException, InterruptedException {
-    //Leave empty since data locality not really an issue.
-    return new String[0];
-  }
-
-  /**
-   * Returns the topic and partition for the split
-   * @return the topic and partition for the split
-   */
-  public TopicPartition getTopicPartition() {
-    return topicPartition;
-  }
-
-  /**
-   * Returns the starting offset for the split
-   * @return the starting offset for the split
-   */
-  public long getStartingOffset() {
-    return startingOffset;
-  }
-
-  /**
-   * Returns the ending offset for the split
-   * @return the ending offset for the split
-   */
-  public long getEndingOffset() {
-    return endingOffset;
-  }
-
-  @Override
-  public void write(DataOutput dataOutput) throws IOException {
-    dataOutput.writeUTF(topicPartition.topic());
-    dataOutput.writeInt(topicPartition.partition());
-    dataOutput.writeLong(startingOffset);
-    dataOutput.writeLong(endingOffset);
-  }
-
-  @Override
-  public void readFields(DataInput dataInput) throws IOException {
-    String topic = dataInput.readUTF();
-    int partition = dataInput.readInt();
-    startingOffset = dataInput.readLong();
-    endingOffset = dataInput.readLong();
-
-    topicPartition = new TopicPartition(topic, partition);
-  }
-
-  @Override
-  public String toString() {
-    return getTopicPartition() + " Start: " + startingOffset + " End: " + endingOffset;
-  }
-}
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
deleted file mode 100644
index 0c49c66..0000000
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.kafka.inputformat;
-
-import kafka.api.OffsetRequest;
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.kafka.KafkaUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.RetriableException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-
-import static org.apache.crunch.kafka.KafkaSource.CONSUMER_POLL_TIMEOUT_DEFAULT;
-import static org.apache.crunch.kafka.KafkaSource.CONSUMER_POLL_TIMEOUT_KEY;
-import static org.apache.crunch.kafka.KafkaUtils.KAFKA_EMPTY_RETRY_ATTEMPTS_KEY;
-import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_DEFAULT;
-import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_KEY;
-import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT;
-import static org.apache.crunch.kafka.KafkaUtils.getKafkaConnectionProperties;
-import static org.apache.crunch.kafka.inputformat.KafkaInputFormat.filterConnectionProperties;
-
-/**
- * A {@link RecordReader} for pulling data from Kafka.
- * @param <K> the key of the records from Kafka
- * @param <V> the value of the records from Kafka
- */
-public class KafkaRecordReader<K, V> extends RecordReader<K, V> {
-
-  private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordReader.class);
-
-  private Consumer<K, V> consumer;
-  private ConsumerRecord<K, V> record;
-  private long endingOffset;
-  private Iterator<ConsumerRecord<K, V>> recordIterator;
-  private long consumerPollTimeout;
-  private long maxNumberOfRecords;
-  private long startingOffset;
-  private long currentOffset;
-  private int maxNumberAttempts;
-  private Properties connectionProperties;
-  private TopicPartition topicPartition;
-  private int concurrentEmptyResponses;
-  private int maxConcurrentEmptyResponses;
-
-  @Override
-  public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
-    if(!(inputSplit instanceof KafkaInputSplit)){
-      throw new CrunchRuntimeException("InputSplit for RecordReader is not valid split type.");
-    }
-    Properties kafkaConnectionProperties = filterConnectionProperties(
-            getKafkaConnectionProperties(taskAttemptContext.getConfiguration()));
-
-    consumer = new KafkaConsumer<>(kafkaConnectionProperties);
-    KafkaInputSplit split = (KafkaInputSplit) inputSplit;
-    TopicPartition topicPartition = split.getTopicPartition();
-
-    consumer.assign(Collections.singletonList(topicPartition));
-
-    //suggested hack to gather info without gathering data
-    consumer.poll(0);
-    //now seek to the desired start location
-    startingOffset = split.getStartingOffset();
-    consumer.seek(topicPartition,startingOffset);
-
-    currentOffset = startingOffset - 1;
-    endingOffset = split.getEndingOffset();
-
-    maxNumberOfRecords = endingOffset - startingOffset;
-    if(LOG.isInfoEnabled()) {
-      LOG.info("Reading data from {} between {} and {}", new Object[]{topicPartition, startingOffset, endingOffset});
-    }
-
-    Configuration config = taskAttemptContext.getConfiguration();
-    consumerPollTimeout = config.getLong(CONSUMER_POLL_TIMEOUT_KEY, CONSUMER_POLL_TIMEOUT_DEFAULT);
-    maxNumberAttempts = config.getInt(KAFKA_RETRY_ATTEMPTS_KEY, KAFKA_RETRY_ATTEMPTS_DEFAULT);
-    maxConcurrentEmptyResponses = config.getInt(KAFKA_EMPTY_RETRY_ATTEMPTS_KEY, KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT);
-    concurrentEmptyResponses = 0;
-  }
-
-  @Override
-  public boolean nextKeyValue() throws IOException, InterruptedException {
-    if(hasPendingData()) {
-      recordIterator = getRecords();
-      record = recordIterator.hasNext() ? recordIterator.next() : null;
-      if (record != null) {
-        LOG.debug("nextKeyValue: Retrieved record with offset {}", record.offset());
-        long oldOffset = currentOffset;
-        currentOffset = record.offset();
-        LOG.debug("Current offset will be updated to be [{}]", currentOffset);
-        if (LOG.isWarnEnabled() && (currentOffset - oldOffset > 1)) {
-          LOG.warn("Offset increment was larger than expected value of one, old {} new {}", oldOffset, currentOffset);
-        }
-        return true;
-      } else {
-        LOG.warn("nextKeyValue: Retrieved null record last offset was {} and ending offset is {}", currentOffset, endingOffset);
-      }
-    }
-    record = null;
-    return false;
-  }
-
-  @Override
-  public K getCurrentKey() throws IOException, InterruptedException {
-    return record == null ? null : record.key();
-  }
-
-  @Override
-  public V getCurrentValue() throws IOException, InterruptedException {
-    return record == null ? null : record.value();
-  }
-
-  @Override
-  public float getProgress() throws IOException, InterruptedException {
-    //not most accurate but gives reasonable estimate
-    return ((float) (currentOffset - startingOffset +1)) / maxNumberOfRecords;
-  }
-
-  private boolean hasPendingData(){
-    //offset range is exclusive at the end which means the ending offset is one higher
-    // than the actual physical last offset
-
-    boolean hasPending = currentOffset < endingOffset-1;
-
-    if(concurrentEmptyResponses > maxConcurrentEmptyResponses){
-      long earliest = getEarliestOffset();
-      if(earliest == endingOffset){
-        LOG.warn("Possible data loss for {} as earliest {} is equal to {} and greater than expected current {}.",
-          new Object[]{topicPartition, earliest, endingOffset, currentOffset});
-        return false;
-      }
-    }
-
-    return hasPending;
-  }
-
-  private Iterator<ConsumerRecord<K, V>> getRecords() {
-    if ((recordIterator == null) || !recordIterator.hasNext()) {
-      ConsumerRecords<K, V> records = null;
-      int numTries = 0;
-      boolean success = false;
-      while(!success && (numTries < maxNumberAttempts)) {
-        try {
-          records = getConsumer().poll(consumerPollTimeout);
-        } catch (RetriableException re) {
-          numTries++;
-          if (numTries < maxNumberAttempts) {
-            LOG.warn("Error pulling messages from Kafka. Retrying with attempt {}", numTries+1, re);
-          } else {
-            LOG.error("Error pulling messages from Kafka. Exceeded maximum number of attempts {}", maxNumberAttempts, re);
-            throw re;
-          }
-        }
-        if(((records == null) || records.isEmpty()) && hasPendingData()){
-          concurrentEmptyResponses++;
-          LOG.warn("No records retrieved but pending offsets to consume therefore polling again. Attempt {}/{}",
-            concurrentEmptyResponses, maxConcurrentEmptyResponses);
-        }else{
-          success = true;
-        }
-      }
-      concurrentEmptyResponses = 0;
-
-      if((records == null) || records.isEmpty()){
-        LOG.info("No records retrieved from Kafka therefore nothing to iterate over.");
-      }else{
-        LOG.info("Retrieved records from Kafka to iterate over.");
-      }
-      return records != null ? records.iterator() : ConsumerRecords.<K, V>empty().iterator();
-    }
-    return recordIterator;
-  }
-
-  protected Consumer<K,V> getConsumer(){
-    return consumer;
-  }
-
-  protected long getEarliestOffset(){
-    Map<TopicPartition, Long> brokerOffsets = KafkaUtils
-      .getBrokerOffsets(connectionProperties, OffsetRequest.EarliestTime(), topicPartition.topic());
-    Long offset = brokerOffsets.get(topicPartition);
-    if(offset == null){
-      LOG.debug("Unable to determine earliest offset for {} so returning earliest {}", topicPartition,
-        OffsetRequest.EarliestTime());
-      return OffsetRequest.EarliestTime();
-    }
-    LOG.debug("Earliest offset for {} is {}", topicPartition, offset);
-    return offset;
-  }
-
-  @Override
-  public void close() throws IOException {
-    LOG.debug("Closing the record reader.");
-    if(consumer != null) {
-      consumer.close();
-    }
-  }
-}
\ No newline at end of file
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/hdfs/Offsets.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/hdfs/Offsets.java
index 78034c1..ed7febb 100644
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/hdfs/Offsets.java
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/hdfs/Offsets.java
@@ -21,8 +21,8 @@
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
-import kafka.api.OffsetRequest;
 import org.apache.commons.lang.StringUtils;
+import org.apache.kafka.common.requests.ListOffsetRequest;
 
 import java.util.Collections;
 import java.util.HashSet;
@@ -254,7 +254,7 @@
 
       private String topic;
       private int partition = -1;
-      private long offset = OffsetRequest.EarliestTime();
+      private long offset = ListOffsetRequest.EARLIEST_TIMESTAMP;
 
       /**
        * Creates a new builder instance.
@@ -299,7 +299,7 @@
 
       /**
        * Set the {@code offset} for the partition offset being built.  If the {@code offset} is not
-       * set then it defaults to {@link OffsetRequest#EarliestTime()}.
+       * set then it defaults to {@link ListOffsetRequest#EARLIEST_TIMESTAMP}.
        *
        * @param offset the topic for the partition offset being built.
        * @return builder instance
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputFormat.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputFormat.java
index 2f1d139..101034d 100644
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputFormat.java
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/record/KafkaInputFormat.java
@@ -337,4 +337,22 @@
     // Strip off the base key + a trailing "."
     return key.substring(KAFKA_CONNECTION_PROPERTY_BASE.length() + 1);
   }
+
+  /**
+   * Generates a {@link Properties} object containing the properties in {@code connectionProperties}, but with every
+   * property prefixed with "org.apache.crunch.kafka.connection.properties".
+   *
+   * @param connectionProperties the properties to be prefixed with "org.apache.crunch.kafka.connection.properties"
+   * @return a {@link Properties} object representing Kafka connection properties
+   */
+  public static Properties tagExistingKafkaConnectionProperties(Properties connectionProperties) {
+    Properties taggedProperties = new Properties();
+
+    for (final String name : connectionProperties.stringPropertyNames()) {
+      taggedProperties.put(generateConnectionPropertyKey(name), connectionProperties.getProperty(name));
+    }
+
+    return taggedProperties;
+  }
+
 }
\ No newline at end of file
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java
index 0694551..5fc6bb1 100644
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java
@@ -17,20 +17,17 @@
  */
 package org.apache.crunch.kafka;
 
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
 import kafka.serializer.Decoder;
-import kafka.serializer.Encoder;
 import kafka.utils.VerifiableProperties;
 import org.apache.crunch.impl.mr.run.RuntimeParameters;
-import org.apache.crunch.kafka.inputformat.KafkaInputFormat;
-import org.apache.crunch.kafka.inputformat.KafkaInputFormatIT;
-import org.apache.crunch.kafka.inputformat.KafkaRecordReaderIT;
+import org.apache.crunch.kafka.record.KafkaInputFormat;
 import org.apache.crunch.kafka.utils.KafkaBrokerTestHarness;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.junit.AfterClass;
@@ -47,13 +44,9 @@
 
 @RunWith(Suite.class)
 @Suite.SuiteClasses({
-    //org.apache.crunch.kafka
-    KafkaSourceIT.class, KafkaRecordsIterableIT.class, KafkaDataIT.class,
-    //org.apache.crunch.kafka.inputformat
-    KafkaRecordReaderIT.class, KafkaInputFormatIT.class, KafkaUtilsIT.class,
-    // org.apache.crunch.kafka.record
-    org.apache.crunch.kafka.record.KafkaSourceIT.class, org.apache.crunch.kafka.record.KafkaRecordsIterableIT.class,
-    org.apache.crunch.kafka.record.KafkaDataIT.class
+        // org.apache.crunch.kafka.record
+        org.apache.crunch.kafka.record.KafkaSourceIT.class, org.apache.crunch.kafka.record.KafkaRecordsIterableIT.class,
+        org.apache.crunch.kafka.record.KafkaDataIT.class
 })
 public class ClusterTest {
 
@@ -140,34 +133,30 @@
   public static Configuration getConsumerConfig() {
     Configuration kafkaConfig = new Configuration(conf);
     KafkaUtils.addKafkaConnectionProperties(KafkaInputFormat.tagExistingKafkaConnectionProperties(
-        getConsumerProperties()), kafkaConfig);
+            getConsumerProperties()), kafkaConfig);
     return kafkaConfig;
   }
 
   public static List<String> writeData(Properties props, String topic, String batch, int loops, int numValuesPerLoop) {
     Properties producerProps = new Properties();
     producerProps.putAll(props);
-    producerProps.setProperty("serializer.class", StringEncoderDecoder.class.getName());
-    producerProps.setProperty("key.serializer.class", StringEncoderDecoder.class.getName());
+    producerProps.setProperty("value.serializer", StringSerDe.class.getName());
+    producerProps.setProperty("key.serializer", StringSerDe.class.getName());
 
     // Set the default compression used to be snappy
     producerProps.setProperty("compression.codec", "snappy");
     producerProps.setProperty("request.required.acks", "1");
 
-    ProducerConfig producerConfig = new ProducerConfig(producerProps);
-
-    Producer<String, String> producer = new Producer<>(producerConfig);
+    Producer<String, String> producer = new KafkaProducer<>(producerProps);
     List<String> keys = new LinkedList<>();
     try {
       for (int i = 0; i < loops; i++) {
-        List<KeyedMessage<String, String>> events = new LinkedList<>();
         for (int j = 0; j < numValuesPerLoop; j++) {
           String key = "key" + batch + i + j;
           String value = "value" + batch + i + j;
           keys.add(key);
-          events.add(new KeyedMessage<>(topic, key, value));
+          producer.send(new ProducerRecord<>(topic, key, value));
         }
-        producer.send(events);
       }
     } finally {
       producer.close();
@@ -199,24 +188,4 @@
     }
   }
 
-  public static class StringEncoderDecoder implements Encoder<String>, Decoder<String> {
-
-    public StringEncoderDecoder() {
-
-    }
-
-    public StringEncoderDecoder(VerifiableProperties props) {
-
-    }
-
-    @Override
-    public String fromBytes(byte[] bytes) {
-      return new String(bytes);
-    }
-
-    @Override
-    public byte[] toBytes(String value) {
-      return value.getBytes();
-    }
-  }
 }
\ No newline at end of file
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaDataIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaDataIT.java
deleted file mode 100644
index 595a94b..0000000
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaDataIT.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.kafka;
-
-
-import kafka.api.OffsetRequest;
-import org.apache.crunch.Pair;
-import org.apache.kafka.common.TopicPartition;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import static org.apache.crunch.kafka.ClusterTest.writeData;
-import static org.hamcrest.Matchers.hasItem;
-import static org.hamcrest.core.Is.is;
-import static org.hamcrest.core.IsNot.not;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-public class KafkaDataIT {
-
-  @Rule
-  public TestName testName = new TestName();
-
-  private String topic;
-  private Map<TopicPartition, Long> startOffsets;
-  private Map<TopicPartition, Long> stopOffsets;
-  private Map<TopicPartition, Pair<Long, Long>> offsets;
-  private Properties props;
-
-  @BeforeClass
-  public static void init() throws Exception {
-    ClusterTest.startTest();
-  }
-
-  @AfterClass
-  public static void cleanup() throws Exception {
-    ClusterTest.endTest();
-  }
-
-  @Before
-  public void setup() {
-    topic = testName.getMethodName();
-
-    props = ClusterTest.getConsumerProperties();
-
-    startOffsets = new HashMap<>();
-    stopOffsets = new HashMap<>();
-    offsets = new HashMap<>();
-    for (int i = 0; i < 4; i++) {
-      TopicPartition tp = new TopicPartition(topic, i);
-      startOffsets.put(tp, 0L);
-      stopOffsets.put(tp, 100L);
-
-      offsets.put(tp, Pair.of(0L, 100L));
-    }
-  }
-
-  @Test
-  public void getDataIterable() throws IOException {
-    int loops = 10;
-    int numPerLoop = 100;
-    int total = loops * numPerLoop;
-    List<String> keys = writeData(props, topic, "batch", loops, numPerLoop);
-
-    startOffsets = getStartOffsets(props, topic);
-    stopOffsets = getStopOffsets(props, topic);
-
-    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
-    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
-      offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
-    }
-
-    Iterable<Pair<String, String>> data = new KafkaData<String, String>(props, offsets).read(null);
-
-    int count = 0;
-    for (Pair<String, String> event : data) {
-      assertThat(keys, hasItem(event.first()));
-      assertTrue(keys.remove(event.first()));
-      count++;
-    }
-
-    assertThat(count, is(total));
-    assertThat(keys.size(), is(0));
-  }
-
-  private static Map<TopicPartition, Long> getStopOffsets(Properties props, String topic) {
-    return KafkaUtils.getBrokerOffsets(props, OffsetRequest.LatestTime(), topic);
-  }
-
-  private static Map<TopicPartition, Long> getStartOffsets(Properties props, String topic) {
-    return KafkaUtils.getBrokerOffsets(props, OffsetRequest.EarliestTime(), topic);
-  }
-}
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java
deleted file mode 100644
index dd179ae..0000000
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java
+++ /dev/null
@@ -1,411 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.kafka;
-
-import kafka.api.OffsetRequest;
-import org.apache.crunch.Pair;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.RetriableException;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
-import org.mockito.Matchers;
-import org.mockito.Mock;
-import org.mockito.runners.MockitoJUnitRunner;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import static org.apache.crunch.kafka.ClusterTest.writeData;
-import static org.hamcrest.Matchers.hasItem;
-import static org.hamcrest.core.Is.is;
-import static org.hamcrest.core.IsNot.not;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public class KafkaRecordsIterableIT {
-
-  @Mock
-  private Consumer<String, String> mockedConsumer;
-
-  @Mock
-  private ConsumerRecords<String, String> records;
-
-  @Rule
-  public TestName testName = new TestName();
-
-  private String topic;
-  private Map<TopicPartition, Long> startOffsets;
-  private Map<TopicPartition, Long> stopOffsets;
-  private Map<TopicPartition, Pair<Long, Long>> offsets;
-  private Consumer<String, String> consumer;
-  private Properties props;
-  private Properties consumerProps;
-
-  @BeforeClass
-  public static void init() throws Exception {
-    ClusterTest.startTest();
-  }
-
-  @AfterClass
-  public static void cleanup() throws Exception {
-    ClusterTest.endTest();
-  }
-
-  @Before
-  public void setup() {
-    topic = testName.getMethodName();
-
-    props = ClusterTest.getConsumerProperties();
-
-    startOffsets = new HashMap<>();
-    stopOffsets = new HashMap<>();
-    offsets = new HashMap<>();
-    for (int i = 0; i < 4; i++) {
-      TopicPartition tp = new TopicPartition(topic, i);
-      startOffsets.put(tp, 0L);
-      stopOffsets.put(tp, 100L);
-
-      offsets.put(tp, Pair.of(0L, 100L));
-    }
-
-
-    consumerProps = new Properties();
-    consumerProps.putAll(props);
-  }
-
-  @After
-  public void shutdown() {
-  }
-
-
-  @Test(expected = IllegalArgumentException.class)
-  public void nullConsumer() {
-    new KafkaRecordsIterable(null, offsets, new Properties());
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void nullOffsets() {
-    new KafkaRecordsIterable<>(consumer, null, new Properties());
-  }
-
-  @Test(expected=IllegalArgumentException.class)
-  public void emptyOffsets() {
-    consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe());
-    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<String, String>(consumer,
-        Collections.<TopicPartition, Pair<Long, Long>>emptyMap(), new Properties());
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void nullProperties() {
-    new KafkaRecordsIterable(consumer, offsets, null);
-  }
-
-  @Test
-  public void iterateOverValues() {
-    consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe());
-    int loops = 10;
-    int numPerLoop = 100;
-    int total = loops * numPerLoop;
-    List<String> keys = writeData(props, topic, "batch", loops, numPerLoop);
-
-    startOffsets = getStartOffsets(props, topic);
-    stopOffsets = getStopOffsets(props, topic);
-
-    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
-    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
-      offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
-    }
-
-
-    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<String, String>(consumer, offsets, new Properties());
-
-    int count = 0;
-    for (Pair<String, String> event : data) {
-      assertThat(keys, hasItem(event.first()));
-      assertTrue(keys.remove(event.first()));
-      count++;
-    }
-
-    assertThat(count, is(total));
-    assertThat(keys.size(), is(0));
-  }
-
-  @Test
-  public void iterateOverOneValue() {
-    consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe());
-    int loops = 1;
-    int numPerLoop = 1;
-    int total = loops * numPerLoop;
-    List<String> keys = writeData(props, topic, "batch", loops, numPerLoop);
-
-    startOffsets = getStartOffsets(props, topic);
-    stopOffsets = getStopOffsets(props, topic);
-
-    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
-    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
-      offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
-    }
-
-    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<String, String>(consumer, offsets, new Properties());
-
-    int count = 0;
-    for (Pair<String, String> event : data) {
-      assertThat(keys, hasItem(event.first()));
-      assertTrue(keys.remove(event.first()));
-      count++;
-    }
-
-    assertThat(count, is(total));
-    assertThat(keys.size(), is(0));
-  }
-
-  @Test
-  public void iterateOverNothing() {
-    consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe());
-    int loops = 10;
-    int numPerLoop = 100;
-    writeData(props, topic, "batch", loops, numPerLoop);
-
-    //set the start offsets equal to the stop so won't iterate over anything
-    startOffsets = getStartOffsets(props, topic);
-    stopOffsets = getStartOffsets(props, topic);
-
-    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
-    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
-      offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
-    }
-
-    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(consumer, offsets, new Properties());
-
-    int count = 0;
-    for (Pair<String, String> event : data) {
-      count++;
-    }
-
-    assertThat(count, is(0));
-  }
-
-  @Test
-  public void iterateOverPartial() {
-    consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe());
-    int loops = 10;
-    int numPerLoop = 100;
-    int numPerPartition = 50;
-
-    writeData(props, topic, "batch", loops, numPerLoop);
-
-    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
-    //set the start offsets equal to the stop so won't iterate over anything
-    startOffsets = getStartOffsets(props, topic);
-    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
-      offsets.put(entry.getKey(), Pair.of(entry.getValue(), entry.getValue() + numPerPartition));
-    }
-
-    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(consumer, offsets, new Properties());
-
-    int count = 0;
-    for (Pair<String, String> event : data) {
-      count++;
-    }
-
-    assertThat(count, is(startOffsets.size() * numPerPartition));
-  }
-
-  @Test
-  public void dontIteratePastStop() {
-    consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe());
-    int loops = 10;
-    int numPerLoop = 100;
-
-    List<String> keys = writeData(props, topic, "batch1", loops, numPerLoop);
-
-    //set the start offsets equal to the stop so won't iterate over anything
-    startOffsets = getStartOffsets(props, topic);
-    stopOffsets = getStopOffsets(props, topic);
-
-    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
-    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
-      offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
-    }
-
-    List<String> secondKeys = writeData(props, topic, "batch2", loops, numPerLoop);
-
-    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(consumer, offsets, new Properties());
-
-    int count = 0;
-    for (Pair<String, String> event : data) {
-      assertThat(keys, hasItem(event.first()));
-      assertTrue(keys.remove(event.first()));
-      assertThat(secondKeys, not(hasItem(event.first())));
-      count++;
-    }
-
-    assertThat(count, is(loops * numPerLoop));
-    assertThat(keys.size(), is(0));
-  }
-
-  @Test
-  public void iterateSkipInitialValues() {
-    consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe());
-    int loops = 10;
-    int numPerLoop = 100;
-
-    List<String> keys = writeData(props, topic, "batch1", loops, numPerLoop);
-
-    //set the start offsets equal to the stop so won't iterate over anything
-    startOffsets = getStopOffsets(props, topic);
-
-    List<String> secondKeys = writeData(props, topic, "batch2", loops, numPerLoop);
-
-    stopOffsets = getStopOffsets(props, topic);
-
-
-    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
-    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
-      offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
-    }
-
-
-    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<String, String>(consumer, offsets,
-        new Properties());
-
-    int count = 0;
-    for (Pair<String, String> event : data) {
-      assertThat(secondKeys, hasItem(event.first()));
-      assertTrue(secondKeys.remove(event.first()));
-      assertThat(keys, not(hasItem(event.first())));
-      count++;
-    }
-
-    assertThat(count, is(loops * numPerLoop));
-    assertThat(secondKeys.size(), is(0));
-  }
-
-  @Test
-  public void iterateValuesWithExceptions() {
-    List<ConsumerRecord<String, String>> returnedRecords = new LinkedList<>();
-
-    for(int i = 0; i < 25; i++){
-      returnedRecords.add(new ConsumerRecord<String, String>(topic, 0, i, "key", null));
-      returnedRecords.add(new ConsumerRecord<String, String>(topic, 1, i, "key", null));
-      returnedRecords.add(new ConsumerRecord<String, String>(topic, 2, i, "key", null));
-      returnedRecords.add(new ConsumerRecord<String, String>(topic, 3, i, "key", null));
-    }
-
-    offsets = new HashMap<>();
-    offsets.put(new TopicPartition(topic, 0), Pair.of(0L, 25L));
-    offsets.put(new TopicPartition(topic, 1), Pair.of(0L, 25L));
-    offsets.put(new TopicPartition(topic, 2), Pair.of(0L, 25L));
-    offsets.put(new TopicPartition(topic, 3), Pair.of(0L, 25L));
-
-    when(records.isEmpty()).thenReturn(false);
-    when(records.iterator()).thenReturn(returnedRecords.iterator());
-    when(mockedConsumer.poll(Matchers.anyLong()))
-        //request for the first poll
-        .thenReturn(null)
-        //fail twice
-        .thenThrow(new TimeoutException("fail1"))
-        .thenThrow(new TimeoutException("fail2"))
-        //request that will give data
-        .thenReturn(records)
-        // shows to stop retrieving data
-        .thenReturn(null);
-
-    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(mockedConsumer, offsets, new Properties());
-
-    int count = 0;
-    for (Pair<String, String> event : data) {
-      count++;
-    }
-
-    //should have gotten one value per topicpartition
-    assertThat(count, is(returnedRecords.size()));
-  }
-
-  @Test
-  public void iterateValuesAfterStopOffsets() {
-    List<ConsumerRecord<String, String>> returnedRecords = new LinkedList<>();
-    for (Map.Entry<TopicPartition, Long> entry : stopOffsets.entrySet()) {
-      returnedRecords.add(new ConsumerRecord<String, String>(entry.getKey().topic(),
-          entry.getKey().partition(), entry.getValue() + 1, "key", null));
-    }
-
-    when(records.isEmpty()).thenReturn(false);
-    when(records.iterator()).thenReturn(returnedRecords.iterator());
-    when(mockedConsumer.poll(Matchers.anyLong())).thenReturn(records).thenReturn(records).thenReturn(null);
-
-    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(mockedConsumer, offsets, new Properties());
-
-    int count = 0;
-    for (Pair<String, String> event : data) {
-      count++;
-    }
-
-    assertThat(count, is(0));
-
-  }
-
-  @Test(expected = RetriableException.class)
-  public void iterateRetriableExceptionMaxExceeded() {
-    List<ConsumerRecord<String, String>> returnedRecords = new LinkedList<>();
-    for (Map.Entry<TopicPartition, Long> entry : stopOffsets.entrySet()) {
-      returnedRecords.add(new ConsumerRecord<String, String>(entry.getKey().topic(),
-          entry.getKey().partition(), entry.getValue() + 1, "key", null));
-    }
-
-    when(mockedConsumer.poll(Matchers.anyLong()))
-        //for the fill poll call
-        .thenReturn(null)
-        //retry 5 times then fail
-        .thenThrow(new TimeoutException("fail1"))
-        .thenThrow(new TimeoutException("fail2"))
-        .thenThrow(new TimeoutException("fail3"))
-        .thenThrow(new TimeoutException("fail4"))
-        .thenThrow(new TimeoutException("fail5"))
-        .thenThrow(new TimeoutException("fail6"));
-
-    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(mockedConsumer, offsets, new Properties());
-
-    data.iterator().next();
-  }
-
-  private static Map<TopicPartition, Long> getStopOffsets(Properties props, String topic) {
-    return KafkaUtils.getBrokerOffsets(props, OffsetRequest.LatestTime(), topic);
-  }
-
-  private static Map<TopicPartition, Long> getStartOffsets(Properties props, String topic) {
-    return KafkaUtils.getBrokerOffsets(props, OffsetRequest.EarliestTime(), topic);
-  }
-}
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java
deleted file mode 100644
index 16aa767..0000000
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.kafka;
-
-import kafka.api.OffsetRequest;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.PCollection;
-import org.apache.crunch.PTable;
-import org.apache.crunch.Pair;
-import org.apache.crunch.Pipeline;
-import org.apache.crunch.TableSource;
-import org.apache.crunch.impl.mr.MRPipeline;
-import org.apache.crunch.io.FormatBundle;
-import org.apache.crunch.io.From;
-import org.apache.crunch.io.To;
-import org.apache.crunch.kafka.inputformat.KafkaInputFormat;
-import org.apache.crunch.test.TemporaryPath;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.TopicPartition;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import static org.apache.crunch.kafka.KafkaUtils.getBrokerOffsets;
-import static org.apache.crunch.kafka.inputformat.KafkaInputFormat.filterConnectionProperties;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.matchers.JUnitMatchers.hasItem;
-
-public class KafkaSourceIT {
-
-  @Rule
-  public TemporaryPath path = new TemporaryPath();
-
-  @Rule
-  public TestName testName = new TestName();
-
-  private Properties consumerProps;
-  private Configuration config;
-  private String topic;
-
-  @BeforeClass
-  public static void setup() throws Exception {
-    ClusterTest.startTest();
-  }
-
-  @AfterClass
-  public static void cleanup() throws Exception {
-    ClusterTest.endTest();
-  }
-
-  @Before
-  public void setupTest() {
-    topic = testName.getMethodName();
-    consumerProps = ClusterTest.getConsumerProperties();
-    config = ClusterTest.getConsumerConfig();
-  }
-
-  @Test
-  public void defaultEarliestOffsetReset() {
-    Map<TopicPartition, Pair<Long, Long>> offsets = Collections.emptyMap();
-
-    Configuration config = ClusterTest.getConf();
-
-    //Remove this so should revert to default.
-    consumerProps.remove(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
-
-    KafkaSource kafkaSource = new KafkaSource(consumerProps, offsets);
-
-    FormatBundle inputBundle = kafkaSource.getInputBundle();
-    Configuration cfg = new Configuration(false);
-    inputBundle.configure(cfg);
-    Properties kafkaConnectionProperties = KafkaUtils.getKafkaConnectionProperties(cfg);
-    kafkaConnectionProperties = KafkaInputFormat.filterConnectionProperties(kafkaConnectionProperties);
-    assertThat(kafkaConnectionProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), is("earliest"));
-  }
-
-  @Test
-  public void offsetResetOverridable() {
-    Map<TopicPartition, Pair<Long, Long>> offsets = Collections.emptyMap();
-
-    Configuration config = ClusterTest.getConf();
-
-    consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
-
-    KafkaSource kafkaSource = new KafkaSource(consumerProps, offsets);
-
-    FormatBundle inputBundle = kafkaSource.getInputBundle();
-    Configuration cfg = new Configuration(false);
-    inputBundle.configure(cfg);
-    Properties kafkaConnectionProperties = KafkaUtils.getKafkaConnectionProperties(cfg);
-    kafkaConnectionProperties = KafkaInputFormat.filterConnectionProperties(kafkaConnectionProperties);
-    assertThat(kafkaConnectionProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), is("latest"));
-  }
-
-  @Test
-  public void sourceReadData() {
-    List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
-    Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
-    Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
-
-    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
-    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
-      Long endingOffset = endOffsets.get(entry.getKey());
-      offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
-    }
-
-    Configuration config = ClusterTest.getConf();
-
-    Pipeline pipeline = new MRPipeline(KafkaSourceIT.class, config);
-    pipeline.enableDebug();
-
-    TableSource<BytesWritable, BytesWritable> kafkaSource = new KafkaSource(consumerProps, offsets);
-
-    PTable<BytesWritable, BytesWritable> read = pipeline.read(kafkaSource);
-
-    Set<String> keysRead = new HashSet<>();
-    int numRecordsFound = 0;
-    String currentKey;
-    for (Pair<BytesWritable, BytesWritable> values : read.materialize()) {
-      currentKey = new String(values.first().getBytes());
-      assertThat(keys, hasItem(currentKey));
-      numRecordsFound++;
-      keysRead.add(new String(values.first().getBytes()));
-    }
-
-    assertThat(numRecordsFound, is(keys.size()));
-    assertThat(keysRead.size(), is(keys.size()));
-
-    pipeline.done();
-  }
-
-
-  @Test
-  public void sourceReadDataThroughPipeline() {
-    List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
-    Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
-    Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
-
-    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
-    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
-      Long endingOffset = endOffsets.get(entry.getKey());
-      offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
-    }
-
-    Configuration config = ClusterTest.getConf();
-
-    Pipeline pipeline = new MRPipeline(KafkaSourceIT.class, config);
-    pipeline.enableDebug();
-
-    TableSource<BytesWritable, BytesWritable> kafkaSource = new KafkaSource(consumerProps, offsets);
-
-    PTable<BytesWritable, BytesWritable> read = pipeline.read(kafkaSource);
-    Path out = path.getPath("out");
-    read.parallelDo(new SimpleConvertFn(), Avros.strings()).write(To.textFile(out));
-
-    pipeline.run();
-
-    PCollection<String> persistedKeys = pipeline.read(From.textFile(out));
-
-    Set<String> keysRead = new HashSet<>();
-    int numRecordsFound = 0;
-    for (String value : persistedKeys.materialize()) {
-      assertThat(keys, hasItem(value));
-      numRecordsFound++;
-      keysRead.add(value);
-    }
-
-    assertThat(numRecordsFound, is(keys.size()));
-    assertThat(keysRead.size(), is(keys.size()));
-
-    pipeline.done();
-  }
-
-
-  private static class SimpleConvertFn extends MapFn<Pair<BytesWritable, BytesWritable>, String> {
-    @Override
-    public String map(Pair<BytesWritable, BytesWritable> input) {
-      return new String(input.first().getBytes());
-    }
-  }
-}
\ No newline at end of file
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java
index 707f1b3..85615f8 100644
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java
@@ -17,32 +17,16 @@
  */
 package org.apache.crunch.kafka;
 
-import kafka.cluster.Broker;
-import kafka.cluster.EndPoint;
-import org.apache.crunch.kafka.ClusterTest;
-import org.apache.crunch.kafka.KafkaUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.network.ListenerName;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.junit.AfterClass;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
-import scala.Option;
-import scala.collection.JavaConversions;
 
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Map;
 import java.util.Properties;
 
-import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 
 
@@ -51,25 +35,9 @@
   @Rule
   public TestName testName = new TestName();
 
-  private String topic;
-  private static Broker broker;
-
   @BeforeClass
   public static void startup() throws Exception {
     ClusterTest.startTest();
-
-    Properties props = ClusterTest.getConsumerProperties();
-    String brokerHostPorts = props.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
-
-    String brokerHostPortString = brokerHostPorts.split(",")[0];
-    String[] brokerHostPort = brokerHostPortString.split(":");
-
-    String brokerHost = brokerHostPort[0];
-    int brokerPort = Integer.parseInt(brokerHostPort[1]);
-
-    EndPoint endPoint = new EndPoint(brokerHost, brokerPort,
-        ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT);
-    broker = new Broker(0, JavaConversions.asScalaBuffer(Arrays.asList(endPoint)), Option.<String>empty());
   }
 
   @AfterClass
@@ -77,11 +45,6 @@
     ClusterTest.endTest();
   }
 
-  @Before
-  public void setup() throws IOException {
-    topic = "topic-" + testName.getMethodName();
-  }
-
   @Test
   public void getKafkaProperties() {
     Configuration config = new Configuration(false);
@@ -108,89 +71,4 @@
   }
 
 
-  @Test(expected = IllegalArgumentException.class)
-  public void getBrokerOffsetsKafkaNullProperties() throws IOException {
-    KafkaUtils.getBrokerOffsets((Properties) null, kafka.api.OffsetRequest.LatestTime(), topic);
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void getBrokerOffsetsKafkaNullTopics() throws IOException {
-    KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(), kafka.api.OffsetRequest.LatestTime(), (String[]) null);
-  }
-
-  @Test(expected = IllegalArgumentException.class)
-  public void getBrokerOffsetsKafkaEmptyTopics() throws IOException {
-    KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(), kafka.api.OffsetRequest.LatestTime());
-  }
-
-  @Test(timeout = 10000)
-  public void getLatestBrokerOffsetsKafka() throws IOException, InterruptedException {
-    ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 1, 4);
-    while (true) {
-      Map<TopicPartition, Long> offsets = KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(),
-          kafka.api.OffsetRequest.LatestTime(), topic);
-
-      assertNotNull(offsets);
-      assertThat(offsets.size(), is(4));
-      boolean allMatch = true;
-      for (int i = 0; i < 4; i++) {
-        TopicPartition tp = new TopicPartition(topic, i);
-        assertThat(offsets.keySet(), hasItem(tp));
-        allMatch &= (offsets.get(tp) == 1L);
-      }
-      if (allMatch) {
-        break;
-      }
-      Thread.sleep(100L);
-    }
-  }
-
-  @Test
-  public void getEarliestBrokerOffsetsKafka() throws IOException {
-    ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 1, 1);
-
-    Map<TopicPartition, Long> offsets = KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(),
-        kafka.api.OffsetRequest.EarliestTime(), topic);
-
-    assertNotNull(offsets);
-    //default create 4 topics
-    assertThat(offsets.size(), is(4));
-    for (int i = 0; i < 4; i++) {
-      assertThat(offsets.keySet(), hasItem(new TopicPartition(topic, i)));
-      assertThat(offsets.get(new TopicPartition(topic, i)), is(0L));
-    }
-  }
-
-  @Test
-  public void getBrokerOffsetsKafkaWithTimeBeforeTopicExists() throws IOException {
-    ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 1, 4);
-
-    // A time of 1L (1 ms after epoch) should be before the topic was created
-    Map<TopicPartition, Long> offsets = KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(), 1L, topic);
-
-    assertNotNull(offsets);
-    //default create 4 topics
-    assertThat(offsets.size(), is(4));
-    for (int i = 0; i < 4; i++) {
-      assertThat(offsets.keySet(), hasItem(new TopicPartition(topic, i)));
-      assertThat(offsets.get(new TopicPartition(topic, i)), is(0L));
-    }
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void getBrokerOffsetsNoHostAvailable() throws IOException {
-    Properties testProperties = ClusterTest.getConsumerProperties();
-    testProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "dummyBrokerHost1:0000,dummyBrokerHost2:0000");
-    testProperties.setProperty("metadata.broker.list", "dummyBrokerHost1:0000,dummyBrokerHost2:0000");
-    assertNotNull(KafkaUtils.getBrokerOffsets(testProperties, kafka.api.OffsetRequest.LatestTime(), topic));
-  }
-
-  @Test
-  public void getBrokerOffsetsSomeHostsUnavailable() throws IOException {
-    EndPoint endPoint = new EndPoint("dummyBrokerHost1", 0,
-        ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT);
-    final Broker bad = new Broker(0, JavaConversions.asScalaBuffer(Arrays.asList(endPoint)), Option.<String>empty());
-    assertNotNull(KafkaUtils.getBrokerOffsets(Arrays.asList(broker, bad), kafka.api.OffsetRequest.LatestTime(), topic));
-    assertNotNull(KafkaUtils.getBrokerOffsets(Arrays.asList(bad, broker), kafka.api.OffsetRequest.LatestTime(), topic));
-  }
 }
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java
deleted file mode 100644
index 3e7ab6f..0000000
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java
+++ /dev/null
@@ -1,460 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.kafka.inputformat;
-
-
-import kafka.api.OffsetRequest;
-import org.apache.crunch.Pair;
-import org.apache.crunch.io.FormatBundle;
-import org.apache.crunch.kafka.ClusterTest;
-import org.apache.crunch.kafka.KafkaSource;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.TopicPartition;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mock;
-import org.mockito.runners.MockitoJUnitRunner;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import static org.apache.crunch.kafka.KafkaUtils.getBrokerOffsets;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.matchers.JUnitMatchers.hasItem;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public class KafkaInputFormatIT {
-
-  @Rule
-  public TestName testName = new TestName();
-
-  @Mock
-  private TaskAttemptContext taskContext;
-
-  @Mock
-  private FormatBundle bundle;
-  private Properties consumerProps;
-  private Configuration config;
-  private String topic;
-
-  @BeforeClass
-  public static void setup() throws Exception {
-    ClusterTest.startTest();
-  }
-
-  @AfterClass
-  public static void cleanup() throws Exception {
-    ClusterTest.endTest();
-  }
-
-  @Before
-  public void setupTest() {
-    topic = testName.getMethodName();
-    consumerProps = ClusterTest.getConsumerProperties();
-
-    consumerProps.setProperty(KafkaInputFormat.generateConnectionPropertyKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG),
-        KafkaSource.BytesDeserializer.class.getName());
-    consumerProps.setProperty(KafkaInputFormat.generateConnectionPropertyKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG),
-        KafkaSource.BytesDeserializer.class.getName());
-
-    config = ClusterTest.getConsumerConfig();
-
-    config.set(KafkaInputFormat.generateConnectionPropertyKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG),
-        KafkaSource.BytesDeserializer.class.getName());
-    config.set(KafkaInputFormat.generateConnectionPropertyKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG),
-        KafkaSource.BytesDeserializer.class.getName());
-  }
-
-  @Test
-  public void getSplitsFromFormat() throws IOException, InterruptedException {
-    List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
-    Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
-    Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
-
-    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
-    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
-      Long endingOffset = endOffsets.get(entry.getKey());
-      offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
-    }
-
-    KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
-
-    KafkaInputFormat inputFormat = new KafkaInputFormat();
-    inputFormat.setConf(config);
-    List<InputSplit> splits = inputFormat.getSplits(null);
-
-    assertThat(splits.size(), is(offsets.size()));
-
-    for (InputSplit split : splits) {
-      KafkaInputSplit inputSplit = (KafkaInputSplit) split;
-      Pair<Long, Long> startEnd = offsets.get(inputSplit.getTopicPartition());
-      assertThat(inputSplit.getStartingOffset(), is(startEnd.first()));
-      assertThat(inputSplit.getEndingOffset(), is(startEnd.second()));
-    }
-  }
-
-  @Test
-  public void getSplitsSameStartEnd() throws IOException, InterruptedException {
-
-    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
-    for(int i = 0; i < 10; i++) {
-      offsets.put(new TopicPartition(topic, i), Pair.of((long)i, (long)i));
-    }
-
-    KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
-
-    KafkaInputFormat inputFormat = new KafkaInputFormat();
-    inputFormat.setConf(config);
-    List<InputSplit> splits = inputFormat.getSplits(null);
-
-    assertThat(splits.size(), is(0));
-  }
-
-  @Test
-  public void getSplitsCreateReaders() throws IOException, InterruptedException {
-    List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
-    Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
-    Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
-
-    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
-    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
-      Long endingOffset = endOffsets.get(entry.getKey());
-      offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
-    }
-
-    KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
-
-    KafkaInputFormat inputFormat = new KafkaInputFormat();
-    inputFormat.setConf(config);
-    List<InputSplit> splits = inputFormat.getSplits(null);
-
-    assertThat(splits.size(), is(offsets.size()));
-
-    for (InputSplit split : splits) {
-      KafkaInputSplit inputSplit = (KafkaInputSplit) split;
-      Pair<Long, Long> startEnd = offsets.get(inputSplit.getTopicPartition());
-      assertThat(inputSplit.getStartingOffset(), is(startEnd.first()));
-      assertThat(inputSplit.getEndingOffset(), is(startEnd.second()));
-    }
-
-    //create readers and consume the data
-    when(taskContext.getConfiguration()).thenReturn(config);
-    Set<String> keysRead = new HashSet<>();
-    //read all data from all splits
-    for (InputSplit split : splits) {
-      KafkaInputSplit inputSplit = (KafkaInputSplit) split;
-      long start = inputSplit.getStartingOffset();
-      long end = inputSplit.getEndingOffset();
-
-      RecordReader<BytesWritable, BytesWritable> recordReader = inputFormat.createRecordReader(split, taskContext);
-      recordReader.initialize(split, taskContext);
-
-      int numRecordsFound = 0;
-      String currentKey;
-      while (recordReader.nextKeyValue()) {
-        currentKey = new String(recordReader.getCurrentKey().getBytes());
-        keysRead.add(currentKey);
-        assertThat(keys, hasItem(currentKey));
-        assertThat(recordReader.getCurrentValue(), is(notNullValue()));
-        numRecordsFound++;
-      }
-      recordReader.close();
-
-      //assert that it encountered a partitions worth of data
-      assertThat(((long) numRecordsFound), is(end - start));
-    }
-
-    //validate the same number of unique keys was read as were written.
-    assertThat(keysRead.size(), is(keys.size()));
-  }
-
-  @Test
-  public void writeOffsetsToFormatBundle() {
-    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
-    String topic = testName.getMethodName();
-    int numPartitions = 10;
-    for (int i = 0; i < numPartitions; i++) {
-      TopicPartition tAndP = new TopicPartition(topic, i);
-      offsets.put(tAndP, Pair.of((long) i, i * 10L));
-    }
-
-    KafkaInputFormat.writeOffsetsToBundle(offsets, bundle);
-
-    ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<String> valueCaptor = ArgumentCaptor.forClass(String.class);
-
-    //number of Partitions * 2 for start and end + 1 for the topic
-    verify(bundle, times((numPartitions * 2) + 1)).set(keyCaptor.capture(), valueCaptor.capture());
-
-    List<String> keyValues = keyCaptor.getAllValues();
-    List<String> valueValues = valueCaptor.getAllValues();
-
-    String partitionKey = KafkaInputFormat.generateTopicPartitionsKey(topic);
-    assertThat(keyValues, hasItem(partitionKey));
-
-    String partitions = valueValues.get(keyValues.indexOf(partitionKey));
-    List<String> parts = Arrays.asList(partitions.split(","));
-
-    for (int i = 0; i < numPartitions; i++) {
-      assertThat(keyValues, hasItem(KafkaInputFormat.generateTopicPartitionsKey(topic)));
-      String startKey = KafkaInputFormat.generatePartitionStartKey(topic, i);
-      String endKey = KafkaInputFormat.generatePartitionEndKey(topic, i);
-      assertThat(keyValues, hasItem(startKey));
-      assertThat(keyValues, hasItem(endKey));
-      assertThat(valueValues.get(keyValues.indexOf(startKey)), is(Long.toString(i)));
-      assertThat(valueValues.get(keyValues.indexOf(endKey)), is(Long.toString(i * 10L)));
-      assertThat(parts, hasItem(Long.toString(i)));
-    }
-  }
-
-  @Test
-  public void writeOffsetsToFormatBundleSpecialCharacters() {
-    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
-    String topic = "partitions." + testName.getMethodName();
-    int numPartitions = 10;
-    for (int i = 0; i < numPartitions; i++) {
-      TopicPartition tAndP = new TopicPartition(topic, i);
-      offsets.put(tAndP, Pair.of((long) i, i * 10L));
-    }
-
-    KafkaInputFormat.writeOffsetsToBundle(offsets, bundle);
-
-    ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<String> valueCaptor = ArgumentCaptor.forClass(String.class);
-
-    //number of Partitions * 2 for start and end + 1 for the topic
-    verify(bundle, times((numPartitions * 2) + 1)).set(keyCaptor.capture(), valueCaptor.capture());
-
-    List<String> keyValues = keyCaptor.getAllValues();
-    List<String> valueValues = valueCaptor.getAllValues();
-
-    String partitionKey = KafkaInputFormat.generateTopicPartitionsKey(topic);
-    assertThat(keyValues, hasItem(partitionKey));
-
-    String partitions = valueValues.get(keyValues.indexOf(partitionKey));
-    List<String> parts = Arrays.asList(partitions.split(","));
-
-    for (int i = 0; i < numPartitions; i++) {
-      assertThat(keyValues, hasItem(KafkaInputFormat.generateTopicPartitionsKey(topic)));
-      String startKey = KafkaInputFormat.generatePartitionStartKey(topic, i);
-      String endKey = KafkaInputFormat.generatePartitionEndKey(topic, i);
-      assertThat(keyValues, hasItem(startKey));
-      assertThat(keyValues, hasItem(endKey));
-      assertThat(valueValues.get(keyValues.indexOf(startKey)), is(Long.toString(i)));
-      assertThat(valueValues.get(keyValues.indexOf(endKey)), is(Long.toString(i * 10L)));
-      assertThat(parts, hasItem(Long.toString(i)));
-    }
-  }
-
-  @Test
-  public void writeOffsetsToFormatBundleMultipleTopics() {
-    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
-    Set<String> topics = new HashSet<>();
-
-    int numPartitions = 10;
-    int numTopics = 10;
-    for (int j = 0; j < numTopics; j++) {
-      String topic = testName.getMethodName() + j;
-      topics.add(topic);
-      for (int i = 0; i < numPartitions; i++) {
-        TopicPartition tAndP = new TopicPartition(topic, i);
-        offsets.put(tAndP, Pair.of((long) i, i * 10L));
-      }
-    }
-
-    KafkaInputFormat.writeOffsetsToBundle(offsets, bundle);
-
-    ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
-    ArgumentCaptor<String> valueCaptor = ArgumentCaptor.forClass(String.class);
-
-    //number of Partitions * 2 for start and end + num of topics
-    verify(bundle, times((numTopics * numPartitions * 2) + numTopics)).set(keyCaptor.capture(), valueCaptor.capture());
-
-    List<String> keyValues = keyCaptor.getAllValues();
-    List<String> valueValues = valueCaptor.getAllValues();
-
-    for (String topic : topics) {
-
-      String partitionKey = KafkaInputFormat.generateTopicPartitionsKey(topic);
-      assertThat(keyValues, hasItem(partitionKey));
-
-      String partitions = valueValues.get(keyValues.indexOf(partitionKey));
-      List<String> parts = Arrays.asList(partitions.split(","));
-
-      for (int i = 0; i < numPartitions; i++) {
-        assertThat(keyValues, hasItem(KafkaInputFormat.generateTopicPartitionsKey(topic)));
-        String startKey = KafkaInputFormat.generatePartitionStartKey(topic, i);
-        String endKey = KafkaInputFormat.generatePartitionEndKey(topic, i);
-        assertThat(keyValues, hasItem(startKey));
-        assertThat(keyValues, hasItem(endKey));
-        assertThat(valueValues.get(keyValues.indexOf(startKey)), is(Long.toString(i)));
-        assertThat(valueValues.get(keyValues.indexOf(endKey)), is(Long.toString(i * 10L)));
-        assertThat(parts, hasItem(Long.toString(i)));
-      }
-    }
-  }
-
-  @Test
-  public void getOffsetsFromConfig() {
-    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
-    Set<String> topics = new HashSet<>();
-
-    int numPartitions = 10;
-    int numTopics = 10;
-    for (int j = 0; j < numTopics; j++) {
-      String topic = testName.getMethodName() + ".partitions" + j;
-      topics.add(topic);
-      for (int i = 0; i < numPartitions; i++) {
-        TopicPartition tAndP = new TopicPartition(topic, i);
-        offsets.put(tAndP, Pair.of((long) i, i * 10L));
-      }
-    }
-
-    Configuration config = new Configuration(false);
-
-    KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
-
-    Map<TopicPartition, Pair<Long, Long>> returnedOffsets = KafkaInputFormat.getOffsets(config);
-
-    assertThat(returnedOffsets.size(), is(returnedOffsets.size()));
-    for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) {
-      Pair<Long, Long> valuePair = returnedOffsets.get(entry.getKey());
-      assertThat(valuePair, is(entry.getValue()));
-    }
-  }
-
-  @Test
-  public void generateConnectionPropertyKey() {
-    String propertyName = "some.property";
-    String actual = KafkaInputFormat.generateConnectionPropertyKey(propertyName);
-    String expected = "org.apache.crunch.kafka.connection.properties.some.property";
-    assertThat(expected, is(actual));
-  }
-
-  @Test
-  public void getConnectionPropertyFromKey() {
-    String prefixedConnectionProperty = "org.apache.crunch.kafka.connection.properties.some.property";
-    String actual = KafkaInputFormat.getConnectionPropertyFromKey(prefixedConnectionProperty);
-    String expected = "some.property";
-    assertThat(expected, is(actual));
-  }
-
-  @Test
-  public void writeConnectionPropertiesToBundle() {
-    FormatBundle<KafkaInputFormat> actual = FormatBundle.forInput(KafkaInputFormat.class);
-    Properties connectionProperties = new Properties();
-    connectionProperties.put("key1", "value1");
-    connectionProperties.put("key2", "value2");
-    KafkaInputFormat.writeConnectionPropertiesToBundle(connectionProperties, actual);
-
-    FormatBundle<KafkaInputFormat> expected = FormatBundle.forInput(KafkaInputFormat.class);
-    expected.set("key1", "value1");
-    expected.set("key2", "value2");
-
-    assertThat(expected, is(actual));
-  }
-
-  @Test
-  public void filterConnectionProperties() {
-    Properties props = new Properties();
-    props.put("org.apache.crunch.kafka.connection.properties.key1", "value1");
-    props.put("org.apache.crunch.kafka.connection.properties.key2", "value2");
-    props.put("org_apache_crunch_kafka_connection_properties.key3", "value3");
-    props.put("org.apache.crunch.another.prefix.properties.key4", "value4");
-
-    Properties actual = KafkaInputFormat.filterConnectionProperties(props);
-    Properties expected = new Properties();
-    expected.put("key1", "value1");
-    expected.put("key2", "value2");
-
-    assertThat(expected, is(actual));
-  }
-
-
-  @Test(expected=IllegalStateException.class)
-  public void getOffsetsFromConfigMissingStart() {
-    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
-    Set<String> topics = new HashSet<>();
-
-    int numPartitions = 10;
-    int numTopics = 10;
-    for (int j = 0; j < numTopics; j++) {
-      String topic = testName.getMethodName() + ".partitions" + j;
-      topics.add(topic);
-      for (int i = 0; i < numPartitions; i++) {
-        TopicPartition tAndP = new TopicPartition(topic, i);
-        offsets.put(tAndP, Pair.of((long) i, i * 10L));
-      }
-    }
-
-    Configuration config = new Configuration(false);
-
-    KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
-
-    config.unset("org.apache.crunch.kafka.offsets.topic."+topics.iterator().next()+".partitions.0.start");
-
-    Map<TopicPartition, Pair<Long, Long>> returnedOffsets = KafkaInputFormat.getOffsets(config);
-  }
-
-  @Test(expected=IllegalStateException.class)
-  public void getOffsetsFromConfigMissingEnd() {
-    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
-    Set<String> topics = new HashSet<>();
-
-    int numPartitions = 10;
-    int numTopics = 10;
-    for (int j = 0; j < numTopics; j++) {
-      String topic = testName.getMethodName() + ".partitions" + j;
-      topics.add(topic);
-      for (int i = 0; i < numPartitions; i++) {
-        TopicPartition tAndP = new TopicPartition(topic, i);
-        offsets.put(tAndP, Pair.of((long) i, i * 10L));
-      }
-    }
-
-    Configuration config = new Configuration(false);
-
-    KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
-
-    config.unset("org.apache.crunch.kafka.offsets.topic."+topics.iterator().next()+".partitions.0.end");
-
-    Map<TopicPartition, Pair<Long, Long>> returnedOffsets = KafkaInputFormat.getOffsets(config);
-  }
-}
\ No newline at end of file
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputSplitTest.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputSplitTest.java
deleted file mode 100644
index 3833e9d..0000000
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputSplitTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.kafka.inputformat;
-
-import kafka.api.OffsetRequest;
-import org.apache.kafka.common.TopicPartition;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-import java.io.IOException;
-
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-
-public class KafkaInputSplitTest {
-
-  @Rule
-  public TestName testName = new TestName();
-
-  @Test
-  public void createSplit() throws IOException, InterruptedException {
-    String topic = testName.getMethodName();
-    int partition = 18;
-    long startingOffet = 10;
-    long endingOffset = 23;
-
-
-    KafkaInputSplit split = new KafkaInputSplit(topic, partition, startingOffet, endingOffset);
-    assertThat(split.getStartingOffset(), is(startingOffet));
-    assertThat(split.getEndingOffset(), is(endingOffset));
-    assertThat(split.getTopicPartition(), is(new TopicPartition(topic, partition)));
-    assertThat(split.getLength(), is(endingOffset - startingOffet));
-    assertThat(split.getLocations(), is(new String[0]));
-  }
-
-  @Test
-  public void createSplitEarliestOffset() throws IOException, InterruptedException {
-    String topic = testName.getMethodName();
-    int partition = 18;
-    long endingOffset = 23;
-
-    KafkaInputSplit split = new KafkaInputSplit(topic, partition, -1L, endingOffset);
-    assertThat(split.getStartingOffset(), is(-1L));
-    assertThat(split.getEndingOffset(), is(endingOffset));
-    assertThat(split.getTopicPartition(), is(new TopicPartition(topic, partition)));
-    assertThat(split.getLength(), is(endingOffset));
-    assertThat(split.getLocations(), is(new String[0]));
-  }
-}
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java
deleted file mode 100644
index c15b4d9..0000000
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java
+++ /dev/null
@@ -1,406 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.kafka.inputformat;
-
-import kafka.api.OffsetRequest;
-import org.apache.crunch.Pair;
-import org.apache.crunch.kafka.ClusterTest;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.TaskAttemptContext;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.TopicPartition;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
-import org.mockito.Matchers;
-import org.mockito.Mock;
-import org.mockito.runners.MockitoJUnitRunner;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-
-import static org.apache.crunch.kafka.KafkaUtils.getBrokerOffsets;
-import static org.hamcrest.CoreMatchers.notNullValue;
-import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.matchers.JUnitMatchers.hasItem;
-import static org.mockito.Mockito.when;
-
-@RunWith(MockitoJUnitRunner.class)
-public class KafkaRecordReaderIT {
-
-  @Mock
-  private TaskAttemptContext context;
-
-  @Mock
-  private Consumer<String, String> consumer;
-
-  @Rule
-  public TestName testName = new TestName();
-  private Properties consumerProps;
-  private Configuration config;
-
-  @BeforeClass
-  public static void setup() throws Exception {
-    ClusterTest.startTest();
-  }
-
-  @AfterClass
-  public static void cleanup() throws Exception {
-    ClusterTest.endTest();
-  }
-
-  private String topic;
-
-  @Before
-  public void setupTest() {
-    topic = testName.getMethodName();
-    consumerProps = ClusterTest.getConsumerProperties();
-    config = ClusterTest.getConsumerConfig();
-    when(context.getConfiguration()).thenReturn(config);
-    when(consumer.poll(Matchers.anyLong())).thenReturn(null);
-  }
-
-  @Test
-  public void readData() throws IOException, InterruptedException {
-    List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
-
-    Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
-    Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
-
-    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
-    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
-      Long endingOffset = endOffsets.get(entry.getKey());
-      offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
-    }
-
-    KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
-
-    Set<String> keysRead = new HashSet<>();
-    //read all data from all splits
-    for (Map.Entry<TopicPartition, Pair<Long, Long>> partitionInfo : offsets.entrySet()) {
-      KafkaInputSplit split = new KafkaInputSplit(partitionInfo.getKey().topic(), partitionInfo.getKey().partition(),
-          partitionInfo.getValue().first(), partitionInfo.getValue().second());
-
-      KafkaRecordReader<String, String> recordReader = new KafkaRecordReader<>();
-      recordReader.initialize(split, context);
-
-      int numRecordsFound = 0;
-      while (recordReader.nextKeyValue()) {
-        keysRead.add(recordReader.getCurrentKey());
-        assertThat(keys, hasItem(recordReader.getCurrentKey()));
-        assertThat(recordReader.getCurrentValue(), is(notNullValue()));
-        numRecordsFound++;
-      }
-      recordReader.close();
-
-      //assert that it encountered a partitions worth of data
-      assertThat(((long) numRecordsFound), is(partitionInfo.getValue().second() - partitionInfo.getValue().first()));
-    }
-
-    //validate the same number of unique keys was read as were written.
-    assertThat(keysRead.size(), is(keys.size()));
-  }
-
-  @Test
-  public void pollReturnsNullAtStart() throws IOException, InterruptedException {
-    List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
-
-    Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
-    Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
-
-    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
-    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
-      Long endingOffset = endOffsets.get(entry.getKey());
-      offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
-    }
-
-    KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
-
-    Set<String> keysRead = new HashSet<>();
-    //read all data from all splits
-    for (Map.Entry<TopicPartition, Pair<Long, Long>> partitionInfo : offsets.entrySet()) {
-      KafkaInputSplit split = new KafkaInputSplit(partitionInfo.getKey().topic(), partitionInfo.getKey().partition(),
-              partitionInfo.getValue().first(), partitionInfo.getValue().second());
-
-      KafkaRecordReader<String, String> recordReader = new NullAtStartKafkaRecordReader<>(consumer, 3);
-      recordReader.initialize(split, context);
-
-      int numRecordsFound = 0;
-      while (recordReader.nextKeyValue()) {
-        keysRead.add(recordReader.getCurrentKey());
-        assertThat(keys, hasItem(recordReader.getCurrentKey()));
-        assertThat(recordReader.getCurrentValue(), is(notNullValue()));
-        numRecordsFound++;
-      }
-      recordReader.close();
-
-      //assert that it encountered a partitions worth of data
-      assertThat(((long) numRecordsFound), is(partitionInfo.getValue().second() - partitionInfo.getValue().first()));
-    }
-
-    //validate the same number of unique keys was read as were written.
-    assertThat(keysRead.size(), is(keys.size()));
-  }
-
-  @Test
-  public void pollReturnsEmptyAtStart() throws IOException, InterruptedException {
-    List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
-
-    Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
-    Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
-
-    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
-    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
-      Long endingOffset = endOffsets.get(entry.getKey());
-      offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
-    }
-
-    KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
-
-    Set<String> keysRead = new HashSet<>();
-    //read all data from all splits
-    for (Map.Entry<TopicPartition, Pair<Long, Long>> partitionInfo : offsets.entrySet()) {
-      KafkaInputSplit split = new KafkaInputSplit(partitionInfo.getKey().topic(), partitionInfo.getKey().partition(),
-              partitionInfo.getValue().first(), partitionInfo.getValue().second());
-
-      when(consumer.poll(Matchers.anyLong())).thenReturn(ConsumerRecords.<String, String>empty());
-      KafkaRecordReader<String, String> recordReader = new NullAtStartKafkaRecordReader<>(consumer, 3);
-      recordReader.initialize(split, context);
-
-      int numRecordsFound = 0;
-      while (recordReader.nextKeyValue()) {
-        keysRead.add(recordReader.getCurrentKey());
-        assertThat(keys, hasItem(recordReader.getCurrentKey()));
-        assertThat(recordReader.getCurrentValue(), is(notNullValue()));
-        numRecordsFound++;
-      }
-      recordReader.close();
-
-      //assert that it encountered a partitions worth of data
-      assertThat(((long) numRecordsFound), is(partitionInfo.getValue().second() - partitionInfo.getValue().first()));
-    }
-
-    //validate the same number of unique keys was read as were written.
-    assertThat(keysRead.size(), is(keys.size()));
-  }
-
-  @Test
-  public void pollReturnsNullInMiddle() throws IOException, InterruptedException {
-    List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
-
-    Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
-    Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
-
-    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
-    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
-      Long endingOffset = endOffsets.get(entry.getKey());
-      offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
-    }
-
-    KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
-
-    Set<String> keysRead = new HashSet<>();
-    //read all data from all splits
-    for (Map.Entry<TopicPartition, Pair<Long, Long>> partitionInfo : offsets.entrySet()) {
-      KafkaInputSplit split = new KafkaInputSplit(partitionInfo.getKey().topic(), partitionInfo.getKey().partition(),
-              partitionInfo.getValue().first(), partitionInfo.getValue().second());
-
-      KafkaRecordReader<String, String> recordReader = new InjectableKafkaRecordReader<>(consumer, 1);
-      recordReader.initialize(split, context);
-
-      int numRecordsFound = 0;
-      while (recordReader.nextKeyValue()) {
-        keysRead.add(recordReader.getCurrentKey());
-        assertThat(keys, hasItem(recordReader.getCurrentKey()));
-        assertThat(recordReader.getCurrentValue(), is(notNullValue()));
-        numRecordsFound++;
-      }
-      recordReader.close();
-
-      //assert that it encountered a partitions worth of data
-      assertThat(((long) numRecordsFound), is(partitionInfo.getValue().second() - partitionInfo.getValue().first()));
-    }
-
-    //validate the same number of unique keys was read as were written.
-    assertThat(keysRead.size(), is(keys.size()));
-  }
-
-  @Test
-  public void pollReturnsEmptyInMiddle() throws IOException, InterruptedException {
-    List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
-
-    Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
-    Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
-
-    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
-    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
-      Long endingOffset = endOffsets.get(entry.getKey());
-      offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
-    }
-
-    KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
-
-    Set<String> keysRead = new HashSet<>();
-    //read all data from all splits
-    for (Map.Entry<TopicPartition, Pair<Long, Long>> partitionInfo : offsets.entrySet()) {
-      KafkaInputSplit split = new KafkaInputSplit(partitionInfo.getKey().topic(), partitionInfo.getKey().partition(),
-              partitionInfo.getValue().first(), partitionInfo.getValue().second());
-
-      when(consumer.poll(Matchers.anyLong())).thenReturn(ConsumerRecords.<String, String>empty());
-      KafkaRecordReader<String, String> recordReader = new InjectableKafkaRecordReader<>(consumer, 1);
-      recordReader.initialize(split, context);
-
-      int numRecordsFound = 0;
-      while (recordReader.nextKeyValue()) {
-        keysRead.add(recordReader.getCurrentKey());
-        assertThat(keys, hasItem(recordReader.getCurrentKey()));
-        assertThat(recordReader.getCurrentValue(), is(notNullValue()));
-        numRecordsFound++;
-      }
-      recordReader.close();
-
-      //assert that it encountered a partitions worth of data
-      assertThat(((long) numRecordsFound), is(partitionInfo.getValue().second() - partitionInfo.getValue().first()));
-    }
-
-    //validate the same number of unique keys was read as were written.
-    assertThat(keysRead.size(), is(keys.size()));
-  }
-
-  @Test
-  public void pollEarliestEqualsEnding() throws IOException, InterruptedException {
-    List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
-
-    Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
-    Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
-
-    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
-    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
-      Long endingOffset = endOffsets.get(entry.getKey());
-      offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
-    }
-
-    KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
-
-    Set<String> keysRead = new HashSet<>();
-    //read all data from all splits
-    for (Map.Entry<TopicPartition, Pair<Long, Long>> partitionInfo : offsets.entrySet()) {
-      KafkaInputSplit split = new KafkaInputSplit(partitionInfo.getKey().topic(), partitionInfo.getKey().partition(),
-              partitionInfo.getValue().first(), partitionInfo.getValue().second());
-
-      when(consumer.poll(Matchers.anyLong())).thenReturn(ConsumerRecords.<String, String>empty());
-      KafkaRecordReader<String, String> recordReader = new EarliestRecordReader<>(consumer,
-              partitionInfo.getValue().second());
-      recordReader.initialize(split, context);
-
-      int numRecordsFound = 0;
-      while (recordReader.nextKeyValue()) {
-        keysRead.add(recordReader.getCurrentKey());
-        numRecordsFound++;
-      }
-      recordReader.close();
-
-      //assert that it encountered a partitions worth of data
-      assertThat(numRecordsFound, is(0));
-    }
-
-    //validate the same number of unique keys was read as were written.
-    assertThat(keysRead.size(), is(0));
-  }
-
-
-  private static class NullAtStartKafkaRecordReader<K, V> extends KafkaRecordReader<K, V>{
-
-    private final Consumer consumer;
-    private final int callAttempts;
-
-    private int attempts;
-
-    public NullAtStartKafkaRecordReader(Consumer consumer, int callAttempts){
-      this.consumer = consumer;
-      this.callAttempts = callAttempts;
-      attempts = 0;
-    }
-
-    @Override
-    protected Consumer<K, V> getConsumer() {
-      if(attempts > callAttempts){
-        return super.getConsumer();
-      }
-      attempts++;
-      return consumer;
-    }
-  }
-
-  private static class InjectableKafkaRecordReader<K, V> extends KafkaRecordReader<K, V>{
-
-    private final Consumer consumer;
-    private final int failAttempt;
-
-    private int attempts;
-
-    public InjectableKafkaRecordReader(Consumer consumer, int failAttempt){
-      this.consumer = consumer;
-      this.failAttempt = failAttempt;
-      attempts = 0;
-    }
-
-    @Override
-    protected Consumer<K, V> getConsumer() {
-      if(attempts == failAttempt){
-        attempts++;
-        return consumer;
-      }
-      attempts++;
-      return super.getConsumer();
-    }
-  }
-
-  private static class EarliestRecordReader<K,V> extends KafkaRecordReader<K, V>{
-
-    private final long earliest;
-    private final Consumer consumer;
-
-    public EarliestRecordReader(Consumer consumer, long earliest){
-      this.earliest = earliest;
-      this.consumer = consumer;
-    }
-
-    @Override
-    protected Consumer<K, V> getConsumer() {
-      return consumer;
-    }
-
-    @Override
-    protected long getEarliestOffset() {
-      return earliest;
-    }
-  }
-}
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/offset/hdfs/OffsetsTest.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/offset/hdfs/OffsetsTest.java
index caad686..054e4d1 100644
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/offset/hdfs/OffsetsTest.java
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/offset/hdfs/OffsetsTest.java
@@ -18,8 +18,8 @@
 package org.apache.crunch.kafka.offset.hdfs;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import kafka.api.OffsetRequest;
 import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.kafka.common.requests.ListOffsetRequest;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -102,7 +102,7 @@
     Offsets.PartitionOffset partitionOffset = Offsets.PartitionOffset.Builder.newBuilder()
         .setTopic(testName.getMethodName()).setPartition(1).build();
 
-    assertThat(partitionOffset.getOffset(), is(OffsetRequest.EarliestTime()));
+    assertThat(partitionOffset.getOffset(), is(ListOffsetRequest.EARLIEST_TIMESTAMP));
     assertThat(partitionOffset.getPartition(), is(1));
     assertThat(partitionOffset.getTopic(), is(testName.getMethodName()));
   }
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaDataIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaDataIT.java
index 475f20f..3c167a4 100644
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaDataIT.java
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaDataIT.java
@@ -17,24 +17,21 @@
  */
 package org.apache.crunch.kafka.record;
 
-import kafka.api.OffsetRequest;
 import org.apache.crunch.Pair;
-import org.apache.crunch.kafka.*;
+import org.apache.crunch.kafka.ClusterTest;
+import org.apache.crunch.kafka.KafkaUtils;
+import org.apache.crunch.kafka.utils.KafkaTestUtils;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
+import org.junit.*;
 import org.junit.rules.TestName;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.UUID;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.*;
 
 import static org.apache.crunch.kafka.ClusterTest.writeData;
 import static org.hamcrest.Matchers.hasItem;
@@ -51,6 +48,7 @@
   private Map<TopicPartition, Long> stopOffsets;
   private Map<TopicPartition, Pair<Long, Long>> offsets;
   private Properties props;
+  private Consumer<String, String> consumer;
 
   @BeforeClass
   public static void init() throws Exception {
@@ -78,6 +76,8 @@
 
       offsets.put(tp, Pair.of(0L, 100L));
     }
+
+    consumer = new KafkaConsumer<>(props);
   }
 
   @Test
@@ -87,8 +87,8 @@
     int total = loops * numPerLoop;
     List<String> keys = writeData(props, topic, "batch", loops, numPerLoop);
 
-    startOffsets = getStartOffsets(props, topic);
-    stopOffsets = getStopOffsets(props, topic);
+    startOffsets = KafkaTestUtils.getStartOffsets(consumer, topic);
+    stopOffsets = KafkaTestUtils.getStopOffsets(consumer, topic);
 
     Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
     for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
@@ -107,12 +107,4 @@
     assertThat(count, is(total));
     assertThat(keys.size(), is(0));
   }
-
-  private static Map<TopicPartition, Long> getStopOffsets(Properties props, String topic) {
-    return KafkaUtils.getBrokerOffsets(props, OffsetRequest.LatestTime(), topic);
-  }
-
-  private static Map<TopicPartition, Long> getStartOffsets(Properties props, String topic) {
-    return KafkaUtils.getBrokerOffsets(props, OffsetRequest.EarliestTime(), topic);
-  }
 }
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaRecordsIterableIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaRecordsIterableIT.java
index f996d96..be69530 100644
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaRecordsIterableIT.java
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaRecordsIterableIT.java
@@ -18,9 +18,9 @@
 package org.apache.crunch.kafka.record;
 
 import org.apache.crunch.kafka.*;
+import org.apache.crunch.kafka.utils.KafkaTestUtils;
 import org.junit.Test;
 
-import kafka.api.OffsetRequest;
 import org.apache.crunch.Pair;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -143,8 +143,8 @@
     int total = loops * numPerLoop;
     List<String> keys = writeData(props, topic, "batch", loops, numPerLoop);
 
-    startOffsets = getStartOffsets(props, topic);
-    stopOffsets = getStopOffsets(props, topic);
+    startOffsets = KafkaTestUtils.getStartOffsets(consumer, topic);
+    stopOffsets = KafkaTestUtils.getStopOffsets(consumer, topic);
 
     Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
     for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
@@ -173,8 +173,8 @@
     int total = loops * numPerLoop;
     List<String> keys = writeData(props, topic, "batch", loops, numPerLoop);
 
-    startOffsets = getStartOffsets(props, topic);
-    stopOffsets = getStopOffsets(props, topic);
+    startOffsets = KafkaTestUtils.getStartOffsets(consumer, topic);
+    stopOffsets = KafkaTestUtils.getStopOffsets(consumer, topic);
 
     Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
     for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
@@ -202,8 +202,8 @@
     writeData(props, topic, "batch", loops, numPerLoop);
 
     //set the start offsets equal to the stop so won't iterate over anything
-    startOffsets = getStartOffsets(props, topic);
-    stopOffsets = getStartOffsets(props, topic);
+    startOffsets = KafkaTestUtils.getStartOffsets(consumer, topic);
+    stopOffsets = KafkaTestUtils.getStartOffsets(consumer, topic);
 
     Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
     for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
@@ -230,8 +230,7 @@
     writeData(props, topic, "batch", loops, numPerLoop);
 
     Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
-    //set the start offsets equal to the stop so won't iterate over anything
-    startOffsets = getStartOffsets(props, topic);
+    startOffsets = KafkaTestUtils.getStartOffsets(consumer, topic);
     for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
       offsets.put(entry.getKey(), Pair.of(entry.getValue(), entry.getValue() + numPerPartition));
     }
@@ -254,9 +253,8 @@
 
     List<String> keys = writeData(props, topic, "batch1", loops, numPerLoop);
 
-    //set the start offsets equal to the stop so won't iterate over anything
-    startOffsets = getStartOffsets(props, topic);
-    stopOffsets = getStopOffsets(props, topic);
+    startOffsets = KafkaTestUtils.getStartOffsets(consumer, topic);
+    stopOffsets = KafkaTestUtils.getStopOffsets(consumer, topic);
 
     Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
     for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
@@ -288,11 +286,11 @@
     List<String> keys = writeData(props, topic, "batch1", loops, numPerLoop);
 
     //set the start offsets equal to the stop so won't iterate over anything
-    startOffsets = getStopOffsets(props, topic);
+    startOffsets = KafkaTestUtils.getStopOffsets(consumer, topic);
 
     List<String> secondKeys = writeData(props, topic, "batch2", loops, numPerLoop);
 
-    stopOffsets = getStopOffsets(props, topic);
+    stopOffsets = KafkaTestUtils.getStopOffsets(consumer, topic);
 
 
     Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
@@ -404,11 +402,4 @@
     data.iterator().next();
   }
 
-  private static Map<TopicPartition, Long> getStopOffsets(Properties props, String topic) {
-    return KafkaUtils.getBrokerOffsets(props, OffsetRequest.LatestTime(), topic);
-  }
-
-  private static Map<TopicPartition, Long> getStartOffsets(Properties props, String topic) {
-    return KafkaUtils.getBrokerOffsets(props, OffsetRequest.EarliestTime(), topic);
-  }
 }
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaSourceIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaSourceIT.java
index af03d64..5ae376c 100644
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaSourceIT.java
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/record/KafkaSourceIT.java
@@ -17,7 +17,6 @@
  */
 package org.apache.crunch.kafka.record;
 
-import kafka.api.OffsetRequest;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.Pair;
@@ -29,6 +28,7 @@
 import org.apache.crunch.io.To;
 import org.apache.crunch.kafka.ClusterTest;
 import org.apache.crunch.kafka.KafkaUtils;
+import org.apache.crunch.kafka.utils.KafkaTestUtils;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.types.avro.Avros;
 import org.apache.hadoop.conf.Configuration;
@@ -36,24 +36,13 @@
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
+import org.junit.*;
 import org.junit.rules.TestName;
 
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
 
-import static org.apache.crunch.kafka.KafkaUtils.getBrokerOffsets;
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 import static org.junit.matchers.JUnitMatchers.hasItem;
@@ -121,8 +110,9 @@
   @Test
   public void sourceReadData() {
     List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
-    Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
-    Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
+    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
+    Map<TopicPartition, Long> startOffsets = KafkaTestUtils.getStartOffsets(consumer, topic);
+    Map<TopicPartition, Long> endOffsets = KafkaTestUtils.getStopOffsets(consumer, topic);
 
     Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
     for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
@@ -159,8 +149,9 @@
   @Test
   public void sourceReadDataThroughPipeline() {
     List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
-    Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
-    Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
+    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
+    Map<TopicPartition, Long> startOffsets = KafkaTestUtils.getStartOffsets(consumer, topic);
+    Map<TopicPartition, Long> endOffsets = KafkaTestUtils.getStopOffsets(consumer, topic);
 
     Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
     for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
@@ -204,8 +195,9 @@
     config.setLong(KafkaInputFormat.KAFKA_MAX_RECORDS_PER_SPLIT, 7L);
 
     List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
-    Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
-    Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
+    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
+    Map<TopicPartition, Long> startOffsets = KafkaTestUtils.getStartOffsets(consumer, topic);
+    Map<TopicPartition, Long> endOffsets = KafkaTestUtils.getStopOffsets(consumer, topic);
 
     Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
     for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaTestUtils.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaTestUtils.java
index f8eb2ff..5f4b789 100644
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaTestUtils.java
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaTestUtils.java
@@ -17,10 +17,17 @@
  */
 package org.apache.crunch.kafka.utils;
 
+import org.apache.crunch.kafka.KafkaUtils;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+
 import java.io.File;
 import java.io.IOException;
 import java.net.ServerSocket;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Random;
 import java.util.Set;
 
@@ -90,5 +97,14 @@
 
     return ports;
   }
+
+  public static Map<TopicPartition, Long> getStartOffsets(Consumer<?, ?> consumer, String topic) {
+    return consumer.beginningOffsets(KafkaUtils.getTopicPartitions(consumer, topic), Duration.of(1, ChronoUnit.MINUTES));
+  }
+
+  public static Map<TopicPartition, Long> getStopOffsets(Consumer<?, ?> consumer, String topic) {
+    return consumer.endOffsets(KafkaUtils.getTopicPartitions(consumer, topic), Duration.of(1, ChronoUnit.MINUTES));
+  }
+
 }
 
diff --git a/pom.xml b/pom.xml
index 49437a7..ac7428d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -104,7 +104,7 @@
     <hbase.version>2.0.1</hbase.version>
     <hive.version>2.1.0</hive.version>
 
-    <kafka.version>1.1.0</kafka.version>
+    <kafka.version>2.2.1</kafka.version>
     <scala.base.version>2.12</scala.base.version>
     <scala.version>2.12.10</scala.version>
     <scalatest.version>3.0.1</scalatest.version>