KAFKA-1981; Make log compaction point configurable

Now uses LogSegment.largestTimestamp to determine age of segment's messages.

Author: Eric Wasserman <eric.wasserman@gmail.com>

Reviewers: Jun Rao <junrao@gmail.com>

Closes #1794 from ewasserman/feat-1981
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 5e3e662..820d123 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -36,7 +36,9 @@
  * A message with key K and offset O is obsolete if there exists a message with key K and offset O' such that O < O'.
  * 
  * Each log can be thought of being split into two sections of segments: a "clean" section which has previously been cleaned followed by a
- * "dirty" section that has not yet been cleaned. The active log segment is always excluded from cleaning.
+ * "dirty" section that has not yet been cleaned. The dirty section is further divided into the "cleanable" section followed by an "uncleanable" section.
+ * The uncleanable section is excluded from cleaning. The active log segment is always uncleanable. If there is a
+ * compaction lag time set, segments whose largest message timestamp is within the compaction lag time of the cleaning operation are also uncleanable.
  *
  * The cleaning is carried out by a pool of background threads. Each thread chooses the dirtiest log that has the "dedupe" retention policy 
  * and cleans that. The dirtiness of the log is guessed by taking the ratio of bytes in the dirty section of the log to the total bytes in the log. 
@@ -227,7 +229,7 @@
      * Clean a log if there is a dirty log available, otherwise sleep for a bit
      */
     private def cleanOrSleep() {
-      val cleaned = cleanerManager.grabFilthiestCompactedLog() match {
+      val cleaned = cleanerManager.grabFilthiestCompactedLog(time) match {
         case None =>
           false
         case Some(cleanable) =>
@@ -339,7 +341,7 @@
 
     // build the offset map
     info("Building offset map for %s...".format(cleanable.log.name))
-    val upperBoundOffset = log.activeSegment.baseOffset
+    val upperBoundOffset = cleanable.firstUncleanableOffset
     buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap)
     val endOffset = offsetMap.latestOffset + 1
     stats.indexDone()
@@ -351,9 +353,13 @@
         case None => 0L
         case Some(seg) => seg.largestTimestamp - log.config.deleteRetentionMs
     }
-        
+
+    // determine the timestamp up to which the log will be cleaned
+    // this is the lower of the last active segment and the compaction lag
+    val cleanableHorizionMs = log.logSegments(0, cleanable.firstUncleanableOffset).lastOption.map(_.largestTimestamp).getOrElse(0L)
+
     // group the segments and clean the groups
-    info("Cleaning log %s (discarding tombstones prior to %s)...".format(log.name, new Date(deleteHorizonMs)))
+    info("Cleaning log %s (cleaning prior to %s, discarding tombstones prior to %s)...".format(log.name, new Date(cleanableHorizionMs), new Date(deleteHorizonMs)))
     for (group <- groupSegmentsBySize(log.logSegments(0, endOffset), log.config.segmentSize, log.config.maxIndexSize))
       cleanSegments(log, group, offsetMap, deleteHorizonMs)
 
@@ -627,7 +633,7 @@
   }
 
   /**
-   * Build a map of key_hash => offset for the keys in the dirty portion of the log to use in cleaning.
+   * Build a map of key_hash => offset for the keys in the cleanable dirty portion of the log to use in cleaning.
    * @param log The log to use
    * @param start The offset at which dirty messages begin
    * @param end The ending offset for the map that is being built
@@ -638,7 +644,7 @@
     val dirty = log.logSegments(start, end).toBuffer
     info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name, dirty.size, start, end))
     
-    // Add all the dirty segments. We must take at least map.slots * load_factor,
+    // Add all the cleanable dirty segments. We must take at least map.slots * load_factor,
     // but we may be able to fit more (if there is lots of duplication in the dirty section of the log)
     var full = false
     for (segment <- dirty if !full) {
@@ -749,12 +755,14 @@
 }
 
 /**
- * Helper class for a log, its topic/partition, and the last clean position
+ * Helper class for a log, its topic/partition, the first cleanable position, and the first uncleanable dirty position
  */
-private case class LogToClean(topicPartition: TopicAndPartition, log: Log, firstDirtyOffset: Long) extends Ordered[LogToClean] {
+private case class LogToClean(topicPartition: TopicAndPartition, log: Log, firstDirtyOffset: Long, uncleanableOffset: Long) extends Ordered[LogToClean] {
   val cleanBytes = log.logSegments(-1, firstDirtyOffset).map(_.size).sum
-  val dirtyBytes = log.logSegments(firstDirtyOffset, math.max(firstDirtyOffset, log.activeSegment.baseOffset)).map(_.size).sum
-  val cleanableRatio = dirtyBytes / totalBytes.toDouble
-  def totalBytes = cleanBytes + dirtyBytes
+  private[this] val firstUncleanableSegment = log.logSegments(uncleanableOffset, log.activeSegment.baseOffset).headOption.getOrElse(log.activeSegment)
+  val firstUncleanableOffset = firstUncleanableSegment.baseOffset
+  val cleanableBytes = log.logSegments(firstDirtyOffset, math.max(firstDirtyOffset, firstUncleanableOffset)).map(_.size).sum
+  val totalBytes = cleanBytes + cleanableBytes
+  val cleanableRatio = cleanableBytes / totalBytes.toDouble
   override def compare(that: LogToClean): Int = math.signum(this.cleanableRatio - that.cleanableRatio).toInt
 }
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index a601ede..b3e6e72 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -18,15 +18,17 @@
 package kafka.log
 
 import java.io.File
-import kafka.metrics.KafkaMetricsGroup
-import com.yammer.metrics.core.Gauge
-import kafka.utils.{Logging, Pool}
-import kafka.server.OffsetCheckpoint
-import collection.mutable
-import java.util.concurrent.locks.ReentrantLock
-import kafka.utils.CoreUtils._
 import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
+
+import com.yammer.metrics.core.Gauge
 import kafka.common.{LogCleaningAbortedException, TopicAndPartition}
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.OffsetCheckpoint
+import kafka.utils.CoreUtils._
+import kafka.utils.{Logging, Pool, Time}
+
+import scala.collection.{immutable, mutable}
 
 private[log] sealed trait LogCleaningState
 private[log] case object LogCleaningInProgress extends LogCleaningState
@@ -43,6 +45,8 @@
  */
 private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[TopicAndPartition, Log]) extends Logging with KafkaMetricsGroup {
 
+  import LogCleanerManager._
+
   override val loggerName = classOf[LogCleaner].getName
 
   // package-private for testing
@@ -67,39 +71,27 @@
   /**
    * @return the position processed for all logs.
    */
-  def allCleanerCheckpoints(): Map[TopicAndPartition, Long] =
+  def allCleanerCheckpoints: Map[TopicAndPartition, Long] =
     checkpoints.values.flatMap(_.read()).toMap
 
    /**
     * Choose the log to clean next and add it to the in-progress set. We recompute this
-    * every time off the full set of logs to allow logs to be dynamically added to the pool of logs
+    * each time from the full set of logs to allow logs to be dynamically added to the pool of logs
     * the log manager maintains.
     */
-  def grabFilthiestCompactedLog(): Option[LogToClean] = {
+  def grabFilthiestCompactedLog(time: Time): Option[LogToClean] = {
     inLock(lock) {
-      val lastClean = allCleanerCheckpoints()
+      val now = time.milliseconds
+      val lastClean = allCleanerCheckpoints
       val dirtyLogs = logs.filter {
         case (_, log) => log.config.compact  // match logs that are marked as compacted
       }.filterNot {
         case (topicAndPartition, _) => inProgress.contains(topicAndPartition) // skip any logs already in-progress
       }.map {
         case (topicAndPartition, log) => // create a LogToClean instance for each
-          // if the log segments are abnormally truncated and hence the checkpointed offset
-          // is no longer valid, reset to the log starting offset and log the error event
-          val logStartOffset = log.logSegments.head.baseOffset
-          val firstDirtyOffset = {
-            val offset = lastClean.getOrElse(topicAndPartition, logStartOffset)
-            if (offset < logStartOffset) {
-              // don't bother with the warning if compact and delete are enabled.
-              if (!isCompactAndDelete(log))
-                warn("Resetting first dirty offset for %s to log start offset %d since the checkpointed offset %d is invalid."
-                    .format(topicAndPartition, logStartOffset, offset))
-              logStartOffset
-            } else {
-              offset
-            }
-          }
-          LogToClean(topicAndPartition, log, firstDirtyOffset)
+          val (firstDirtyOffset, firstUncleanableDirtyOffset) = LogCleanerManager.cleanableOffsets(log, topicAndPartition,
+            lastClean, now)
+          LogToClean(topicAndPartition, log, firstDirtyOffset, firstUncleanableDirtyOffset)
       }.filter(ltc => ltc.totalBytes > 0) // skip any empty logs
 
       this.dirtiestLogCleanableRatio = if (dirtyLogs.nonEmpty) dirtyLogs.max.cleanableRatio else 0
@@ -131,10 +123,6 @@
 
   }
 
-  def isCompactAndDelete(log: Log): Boolean = {
-    log.config.compact && log.config.delete
-  }
-
   /**
    *  Abort the cleaning of a particular partition, if it's in progress. This call blocks until the cleaning of
    *  the partition is aborted.
@@ -145,7 +133,7 @@
       abortAndPauseCleaning(topicAndPartition)
       resumeCleaning(topicAndPartition)
     }
-    info("The cleaning for partition %s is aborted".format(topicAndPartition))
+    info(s"The cleaning for partition $topicAndPartition is aborted")
   }
 
   /**
@@ -168,14 +156,13 @@
             case LogCleaningInProgress =>
               inProgress.put(topicAndPartition, LogCleaningAborted)
             case s =>
-              throw new IllegalStateException("Compaction for partition %s cannot be aborted and paused since it is in %s state."
-                                              .format(topicAndPartition, s))
+              throw new IllegalStateException(s"Compaction for partition $topicAndPartition cannot be aborted and paused since it is in $s state.")
           }
       }
       while (!isCleaningInState(topicAndPartition, LogCleaningPaused))
         pausedCleaningCond.await(100, TimeUnit.MILLISECONDS)
     }
-    info("The cleaning for partition %s is aborted and paused".format(topicAndPartition))
+    info(s"The cleaning for partition $topicAndPartition is aborted and paused")
   }
 
   /**
@@ -185,19 +172,17 @@
     inLock(lock) {
       inProgress.get(topicAndPartition) match {
         case None =>
-          throw new IllegalStateException("Compaction for partition %s cannot be resumed since it is not paused."
-                                          .format(topicAndPartition))
+          throw new IllegalStateException(s"Compaction for partition $topicAndPartition cannot be resumed since it is not paused.")
         case Some(state) =>
           state match {
             case LogCleaningPaused =>
               inProgress.remove(topicAndPartition)
             case s =>
-              throw new IllegalStateException("Compaction for partition %s cannot be resumed since it is in %s state."
-                                              .format(topicAndPartition, s))
+              throw new IllegalStateException(s"Compaction for partition $topicAndPartition cannot be resumed since it is in $s state.")
           }
       }
     }
-    info("Compaction for partition %s is resumed".format(topicAndPartition))
+    info(s"Compaction for partition $topicAndPartition is resumed")
   }
 
   /**
@@ -257,7 +242,7 @@
           inProgress.put(topicAndPartition, LogCleaningPaused)
           pausedCleaningCond.signalAll()
         case s =>
-          throw new IllegalStateException("In-progress partition %s cannot be in %s state.".format(topicAndPartition, s))
+          throw new IllegalStateException(s"In-progress partition $topicAndPartition cannot be in $s state.")
       }
     }
   }
@@ -268,3 +253,68 @@
     }
   }
 }
+
+private[log] object LogCleanerManager extends Logging {
+
+  def isCompactAndDelete(log: Log): Boolean = {
+    log.config.compact && log.config.delete
+  }
+
+
+  /**
+    * Returns the range of dirty offsets that can be cleaned.
+    *
+    * @param log the log
+    * @param lastClean the map of checkpointed offsets
+    * @param now the current time in milliseconds of the cleaning operation
+    * @return the lower (inclusive) and upper (exclusive) offsets
+    */
+  def cleanableOffsets(log: Log, topicAndPartition: TopicAndPartition, lastClean: immutable.Map[TopicAndPartition, Long], now: Long): (Long, Long) = {
+
+    // the checkpointed offset, ie., the first offset of the next dirty segment
+    val lastCleanOffset: Option[Long] = lastClean.get(topicAndPartition)
+
+    // If the log segments are abnormally truncated and hence the checkpointed offset is no longer valid;
+    // reset to the log starting offset and log the error
+    val logStartOffset = log.logSegments.head.baseOffset
+    val firstDirtyOffset = {
+      val offset = lastCleanOffset.getOrElse(logStartOffset)
+      if (offset < logStartOffset) {
+        // don't bother with the warning if compact and delete are enabled.
+        if (!isCompactAndDelete(log))
+          warn(s"Resetting first dirty offset to log start offset $logStartOffset since the checkpointed offset $offset is invalid.")
+        logStartOffset
+      } else {
+        offset
+      }
+    }
+
+    // dirty log segments
+    val dirtyNonActiveSegments = log.logSegments(firstDirtyOffset, log.activeSegment.baseOffset).toArray
+
+    val compactionLagMs = math.max(log.config.compactionLagMs, 0L)
+
+    // find first segment that cannot be cleaned
+    // neither the active segment, nor segments with any messages closer to the head of the log than the minimum compaction lag time
+    // may be cleaned
+    val firstUncleanableDirtyOffset: Long = Seq (
+
+        // the active segment is always uncleanable
+        Option(log.activeSegment.baseOffset),
+
+        // the first segment whose largest message timestamp is within a minimum time lag from now
+        if (compactionLagMs > 0) {
+          dirtyNonActiveSegments.find {
+            s =>
+              val isUncleanable = s.largestTimestamp > now - compactionLagMs
+              debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset} segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - compactionLagMs}; is uncleanable=$isUncleanable")
+              isUncleanable
+          } map(_.baseOffset)
+        } else None
+      ).flatten.min
+
+    debug(s"Finding range of cleanable offsets for log=${log.name} topicAndPartition=$topicAndPartition. Last clean offset=$lastCleanOffset now=$now => firstDirtyOffset=$firstDirtyOffset firstUncleanableOffset=$firstUncleanableDirtyOffset activeSegment.baseOffset=${log.activeSegment.baseOffset}")
+
+    (firstDirtyOffset, firstUncleanableDirtyOffset)
+  }
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index a01ecc4..b8efcc3 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -45,6 +45,7 @@
   val IndexInterval = kafka.server.Defaults.LogIndexIntervalBytes
   val FileDeleteDelayMs = kafka.server.Defaults.LogDeleteDelayMs
   val DeleteRetentionMs = kafka.server.Defaults.LogCleanerDeleteRetentionMs
+  val MinCompactionLagMs = kafka.server.Defaults.LogCleanerMinCompactionLagMs
   val MinCleanableDirtyRatio = kafka.server.Defaults.LogCleanerMinCleanRatio
   val Compact = kafka.server.Defaults.LogCleanupPolicy
   val UncleanLeaderElectionEnable = kafka.server.Defaults.UncleanLeaderElectionEnable
@@ -73,6 +74,7 @@
   val indexInterval = getInt(LogConfig.IndexIntervalBytesProp)
   val fileDeleteDelayMs = getLong(LogConfig.FileDeleteDelayMsProp)
   val deleteRetentionMs = getLong(LogConfig.DeleteRetentionMsProp)
+  val compactionLagMs = getLong(LogConfig.MinCompactionLagMsProp)
   val minCleanableRatio = getDouble(LogConfig.MinCleanableDirtyRatioProp)
   val compact = getList(LogConfig.CleanupPolicyProp).asScala.map(_.toLowerCase(Locale.ROOT)).contains(LogConfig.Compact)
   val delete = getList(LogConfig.CleanupPolicyProp).asScala.map(_.toLowerCase(Locale.ROOT)).contains(LogConfig.Delete)
@@ -108,6 +110,7 @@
   val MaxMessageBytesProp = "max.message.bytes"
   val IndexIntervalBytesProp = "index.interval.bytes"
   val DeleteRetentionMsProp = "delete.retention.ms"
+  val MinCompactionLagMsProp = "min.compaction.lag.ms"
   val FileDeleteDelayMsProp = "file.delete.delay.ms"
   val MinCleanableDirtyRatioProp = "min.cleanable.dirty.ratio"
   val CleanupPolicyProp = "cleanup.policy"
@@ -162,6 +165,8 @@
     "on the time in which a consumer must complete a read if they begin from offset 0 " +
     "to ensure that they get a valid snapshot of the final stage (otherwise delete " +
     "tombstones may be collected before they complete their scan)."
+  val MinCompactionLagMsDoc = "The minimum time a message will remain uncompacted in the log. " +
+    "Only applicable for logs that are being compacted."
   val MinCleanableRatioDoc = "This configuration controls how frequently the log " +
     "compactor will attempt to clean the log (assuming <a href=\"#compaction\">log " +
     "compaction</a> is enabled). By default we will avoid cleaning a log where more than " +
@@ -253,6 +258,8 @@
         KafkaConfig.LogIndexIntervalBytesProp)
       .define(DeleteRetentionMsProp, LONG, Defaults.DeleteRetentionMs, atLeast(0), MEDIUM,
         DeleteRetentionMsDoc, KafkaConfig.LogCleanerDeleteRetentionMsProp)
+      .define(MinCompactionLagMsProp, LONG, Defaults.MinCompactionLagMs, atLeast(0), MEDIUM, MinCompactionLagMsDoc,
+        KafkaConfig.LogCleanerMinCompactionLagMsProp)
       .define(FileDeleteDelayMsProp, LONG, Defaults.FileDeleteDelayMs, atLeast(0), MEDIUM, FileDeleteDelayMsDoc,
         KafkaConfig.LogDeleteDelayMsProp)
       .define(MinCleanableDirtyRatioProp, DOUBLE, Defaults.MinCleanableDirtyRatio, between(0, 1), MEDIUM,
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 42ae8e5..531ee62 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -89,6 +89,7 @@
   val LogCleanerMinCleanRatio = 0.5d
   val LogCleanerEnable = true
   val LogCleanerDeleteRetentionMs = 24 * 60 * 60 * 1000L
+  val LogCleanerMinCompactionLagMs = 0L
   val LogIndexSizeMaxBytes = 10 * 1024 * 1024
   val LogIndexIntervalBytes = 4096
   val LogFlushIntervalMessages = Long.MaxValue
@@ -255,6 +256,7 @@
   val LogCleanerMinCleanRatioProp = "log.cleaner.min.cleanable.ratio"
   val LogCleanerEnableProp = "log.cleaner.enable"
   val LogCleanerDeleteRetentionMsProp = "log.cleaner.delete.retention.ms"
+  val LogCleanerMinCompactionLagMsProp = "log.cleaner.min.compaction.lag.ms"
   val LogIndexSizeMaxBytesProp = "log.index.size.max.bytes"
   val LogIndexIntervalBytesProp = "log.index.interval.bytes"
   val LogFlushIntervalMessagesProp = "log.flush.interval.messages"
@@ -434,6 +436,7 @@
   val LogCleanerMinCleanRatioDoc = "The minimum ratio of dirty log to total log for a log to eligible for cleaning"
   val LogCleanerEnableDoc = "Enable the log cleaner process to run on the server? Should be enabled if using any topics with a cleanup.policy=compact including the internal offsets topic. If disabled those topics will not be compacted and continually grow in size."
   val LogCleanerDeleteRetentionMsDoc = "How long are delete records retained?"
+  val LogCleanerMinCompactionLagMsDoc = "The minimum time a message will remain uncompacted in the log. Only applicable for logs that are being compacted."
   val LogIndexSizeMaxBytesDoc = "The maximum size in bytes of the offset index"
   val LogIndexIntervalBytesDoc = "The interval with which we add an entry to the offset index"
   val LogFlushIntervalMessagesDoc = "The number of messages accumulated on a log partition before messages are flushed to disk "
@@ -633,6 +636,7 @@
       .define(LogCleanerMinCleanRatioProp, DOUBLE, Defaults.LogCleanerMinCleanRatio, MEDIUM, LogCleanerMinCleanRatioDoc)
       .define(LogCleanerEnableProp, BOOLEAN, Defaults.LogCleanerEnable, MEDIUM, LogCleanerEnableDoc)
       .define(LogCleanerDeleteRetentionMsProp, LONG, Defaults.LogCleanerDeleteRetentionMs, MEDIUM, LogCleanerDeleteRetentionMsDoc)
+      .define(LogCleanerMinCompactionLagMsProp, LONG, Defaults.LogCleanerMinCompactionLagMs, MEDIUM, LogCleanerMinCompactionLagMsDoc)
       .define(LogIndexSizeMaxBytesProp, INT, Defaults.LogIndexSizeMaxBytes, atLeast(4), MEDIUM, LogIndexSizeMaxBytesDoc)
       .define(LogIndexIntervalBytesProp, INT, Defaults.LogIndexIntervalBytes, atLeast(0), MEDIUM, LogIndexIntervalBytesDoc)
       .define(LogFlushIntervalMessagesProp, LONG, Defaults.LogFlushIntervalMessages, atLeast(1), HIGH, LogFlushIntervalMessagesDoc)
@@ -833,6 +837,7 @@
   val logCleanerIoBufferSize = getInt(KafkaConfig.LogCleanerIoBufferSizeProp)
   val logCleanerIoMaxBytesPerSecond = getDouble(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp)
   val logCleanerDeleteRetentionMs = getLong(KafkaConfig.LogCleanerDeleteRetentionMsProp)
+  val logCleanerMinCompactionLagMs = getLong(KafkaConfig.LogCleanerMinCompactionLagMsProp)
   val logCleanerBackoffMs = getLong(KafkaConfig.LogCleanerBackoffMsProp)
   val logCleanerMinCleanRatio = getDouble(KafkaConfig.LogCleanerMinCleanRatioProp)
   val logCleanerEnable = getBoolean(KafkaConfig.LogCleanerEnableProp)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 00b2a8a..b83a3ee 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -68,6 +68,7 @@
     logProps.put(LogConfig.MaxMessageBytesProp, kafkaConfig.messageMaxBytes)
     logProps.put(LogConfig.IndexIntervalBytesProp, kafkaConfig.logIndexIntervalBytes)
     logProps.put(LogConfig.DeleteRetentionMsProp, kafkaConfig.logCleanerDeleteRetentionMs)
+    logProps.put(LogConfig.MinCompactionLagMsProp, kafkaConfig.logCleanerMinCompactionLagMs)
     logProps.put(LogConfig.FileDeleteDelayMsProp, kafkaConfig.logDeleteDelayMs)
     logProps.put(LogConfig.MinCleanableDirtyRatioProp, kafkaConfig.logCleanerMinCleanRatio)
     logProps.put(LogConfig.CleanupPolicyProp, kafkaConfig.logCleanupPolicy)
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index 4f116ab..c7c3dab 100755
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -48,7 +48,7 @@
   val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time)
   
   @After
-  def teardown() {
+  def teardown(): Unit = {
     Utils.delete(tmpdir)
   }
   
@@ -56,7 +56,7 @@
    * Test simple log cleaning
    */
   @Test
-  def testCleanSegments() {
+  def testCleanSegments(): Unit = {
     val cleaner = makeCleaner(Int.MaxValue)
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
@@ -81,7 +81,7 @@
   }
 
   @Test
-  def testCleaningWithDeletes() {
+  def testCleaningWithDeletes(): Unit = {
     val cleaner = makeCleaner(Int.MaxValue)
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
@@ -101,14 +101,14 @@
     while(log.numberOfSegments < 4)
       log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
       
-    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0))
+    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset))
     val keys = keysInLog(log).toSet
     assertTrue("None of the keys we deleted should still exist.", 
                (0 until leo.toInt by 2).forall(!keys.contains(_)))
   }
 
   @Test
-  def testPartialSegmentClean() {
+  def testPartialSegmentClean(): Unit = {
     // because loadFactor is 0.75, this means we can fit 2 messages in the map
     var cleaner = makeCleaner(2)
     val logProps = new Properties()
@@ -125,22 +125,64 @@
     log.roll()
 
     // clean the log with only one message removed
-    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 2))
+    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 2, log.activeSegment.baseOffset))
     assertEquals(immutable.List(1,0,1,0), keysInLog(log))
     assertEquals(immutable.List(1,2,3,4), offsetsInLog(log))
 
     // continue to make progress, even though we can only clean one message at a time
-    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 3))
+    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 3, log.activeSegment.baseOffset))
     assertEquals(immutable.List(0,1,0), keysInLog(log))
     assertEquals(immutable.List(2,3,4), offsetsInLog(log))
 
-    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 4))
+    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 4, log.activeSegment.baseOffset))
     assertEquals(immutable.List(1,0), keysInLog(log))
     assertEquals(immutable.List(3,4), offsetsInLog(log))
   }
 
   @Test
-  def testLogToClean: Unit = {
+  def testCleaningWithUncleanableSection(): Unit = {
+    val cleaner = makeCleaner(Int.MaxValue)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+    // Number of distinct keys. For an effective test this should be small enough such that each log segment contains some duplicates.
+    val N = 10
+    val numCleanableSegments = 2
+    val numTotalSegments = 7
+
+    // append messages with the keys 0 through N-1, values equal offset
+    while(log.numberOfSegments <= numCleanableSegments)
+      log.append(message(log.logEndOffset.toInt % N, log.logEndOffset.toInt))
+
+    // at this point one message past the cleanable segments has been added
+    // the entire segment containing the first uncleanable offset should not be cleaned.
+    val firstUncleanableOffset = log.logEndOffset + 1  // +1  so it is past the baseOffset
+
+    while(log.numberOfSegments < numTotalSegments - 1)
+      log.append(message(log.logEndOffset.toInt % N, log.logEndOffset.toInt))
+
+    // the last (active) segment has just one message
+
+    def distinctValuesBySegment = log.logSegments.map(s => s.log.map(m => TestUtils.readString(m.message.payload)).toSet.size).toSeq
+
+    val disctinctValuesBySegmentBeforeClean = distinctValuesBySegment
+    assertTrue("Test is not effective unless each segment contains duplicates. Increase segment size or decrease number of keys.",
+      distinctValuesBySegment.reverse.tail.forall(_ > N))
+
+    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, firstUncleanableOffset))
+
+    val distinctValuesBySegmentAfterClean = distinctValuesBySegment
+
+    assertTrue("The cleanable segments should have fewer number of values after cleaning",
+      disctinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean).take(numCleanableSegments).forall { case (before, after) => after < before })
+    assertTrue("The uncleanable segments should have the same number of values after cleaning", disctinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean)
+      .slice(numCleanableSegments, numTotalSegments).forall { x => x._1 == x._2 })
+  }
+
+  @Test
+  def testLogToClean(): Unit = {
     // create a log with small segment size
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
@@ -151,14 +193,44 @@
     for (i <- 0 until 6)
       log.append(messageSet, assignOffsets = true)
 
-    val logToClean = LogToClean(TopicAndPartition("test", 0), log, log.activeSegment.baseOffset)
+    val logToClean = LogToClean(TopicAndPartition("test", 0), log, log.activeSegment.baseOffset, log.activeSegment.baseOffset)
 
     assertEquals("Total bytes of LogToClean should equal size of all segments excluding the active segment",
       logToClean.totalBytes, log.size - log.activeSegment.size)
   }
 
   @Test
-  def testCleaningWithUnkeyedMessages {
+  def testLogToCleanWithUncleanableSection(): Unit = {
+    // create a log with small segment size
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 100: java.lang.Integer)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+    // create 6 segments with only one message in each segment
+    val messageSet = TestUtils.singleMessageSet(payload = Array.fill[Byte](50)(0), key = 1.toString.getBytes)
+    for (i <- 0 until 6)
+      log.append(messageSet, assignOffsets = true)
+
+    // segments [0,1] are clean; segments [2, 3] are cleanable; segments [4,5] are uncleanable
+    val segs = log.logSegments.toSeq
+    val logToClean = LogToClean(TopicAndPartition("test", 0), log, segs(2).baseOffset, segs(4).baseOffset)
+
+    val expectedCleanSize = segs.take(2).map(_.size).sum
+    val expectedCleanableSize = segs.slice(2, 4).map(_.size).sum
+    val expectedUncleanableSize = segs.drop(4).map(_.size).sum
+
+    assertEquals("Uncleanable bytes of LogToClean should equal size of all segments prior the one containing first dirty",
+      logToClean.cleanBytes, expectedCleanSize)
+    assertEquals("Cleanable bytes of LogToClean should equal size of all segments from the one containing first dirty offset" +
+      " to the segment prior to the one with the first uncleanable offset",
+      logToClean.cleanableBytes, expectedCleanableSize)
+    assertEquals("Total bytes should be the sum of the clean and cleanable segments", logToClean.totalBytes, expectedCleanSize + expectedCleanableSize)
+    assertEquals("Total cleanable ratio should be the ratio of cleanable size to clean plus cleanable", logToClean.cleanableRatio,
+      expectedCleanableSize / (expectedCleanSize + expectedCleanableSize).toDouble, 1.0e-6d)
+  }
+
+  @Test
+  def testCleaningWithUnkeyedMessages(): Unit = {
     val cleaner = makeCleaner(Int.MaxValue)
 
     // create a log with compaction turned off so we can append unkeyed messages
@@ -180,7 +252,7 @@
       log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))
 
     val expectedSizeAfterCleaning = log.size - sizeWithUnkeyedMessages
-    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0))
+    cleaner.clean(LogToClean(TopicAndPartition("test", 0), log, 0, log.activeSegment.baseOffset))
 
     assertEquals("Log should only contain keyed messages after cleaning.", 0, unkeyedMessageCountInLog(log))
     assertEquals("Log should only contain keyed messages after cleaning.", expectedSizeAfterCleaning, log.size)
@@ -198,7 +270,7 @@
   def unkeyedMessageCountInLog(log: Log) =
     log.logSegments.map(s => s.log.filter(!_.message.isNull).count(m => !m.message.hasKey)).sum
 
-  def abortCheckDone(topicAndPartition: TopicAndPartition) {
+  def abortCheckDone(topicAndPartition: TopicAndPartition): Unit = {
     throw new LogCleaningAbortedException()
   }
 
@@ -206,7 +278,7 @@
    * Test that abortion during cleaning throws a LogCleaningAbortedException
    */
   @Test
-  def testCleanSegmentsWithAbort() {
+  def testCleanSegmentsWithAbort(): Unit = {
     val cleaner = makeCleaner(Int.MaxValue, abortCheckDone)
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
@@ -229,7 +301,7 @@
    * Validate the logic for grouping log segments together for cleaning
    */
   @Test
-  def testSegmentGrouping() {
+  def testSegmentGrouping(): Unit = {
     val cleaner = makeCleaner(Int.MaxValue)
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
@@ -282,7 +354,7 @@
    * stored in 4 bytes.
    */
   @Test
-  def testSegmentGroupingWithSparseOffsets() {
+  def testSegmentGroupingWithSparseOffsets(): Unit = {
     val cleaner = makeCleaner(Int.MaxValue)
 
     val logProps = new Properties()
@@ -326,7 +398,7 @@
     
   }
   
-  private def checkSegmentOrder(groups: Seq[Seq[LogSegment]]) {
+  private def checkSegmentOrder(groups: Seq[Seq[LogSegment]]): Unit = {
     val offsets = groups.flatMap(_.map(_.baseOffset))
     assertEquals("Offsets should be in increasing order.", offsets.sorted, offsets)
   }
@@ -335,7 +407,7 @@
    * Test building an offset map off the log
    */
   @Test
-  def testBuildOffsetMap() {
+  def testBuildOffsetMap(): Unit = {
     val map = new FakeOffsetMap(1000)
     val log = makeLog()
     val cleaner = makeCleaner(Int.MaxValue)
@@ -369,7 +441,7 @@
    * </ol>
    */
   @Test
-  def testRecoveryAfterCrash() {
+  def testRecoveryAfterCrash(): Unit = {
     val cleaner = makeCleaner(Int.MaxValue)
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 300: java.lang.Integer)
@@ -428,7 +500,6 @@
     for (file <- dir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) {
       Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")))
     }   
-    System.out.println("here")
     log = recoverAndCheck(config, cleanedKeys)
 
     // add some more messages and clean the log again
@@ -463,7 +534,7 @@
   }
 
   @Test
-  def testBuildOffsetMapFakeLarge() {
+  def testBuildOffsetMapFakeLarge(): Unit = {
     val map = new FakeOffsetMap(1000)
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, 72: java.lang.Integer)
@@ -488,7 +559,7 @@
    * Test building a partial offset map of part of a log segment
    */
   @Test
-  def testBuildPartialOffsetMap() {
+  def testBuildPartialOffsetMap(): Unit = {
     // because loadFactor is 0.75, this means we can fit 2 messages in the map
     val map = new FakeOffsetMap(3)
     val log = makeLog()
@@ -581,7 +652,7 @@
       -1L
   }
   
-  def clear() = map.clear()
+  def clear(): Unit = map.clear()
   
   def size: Int = map.size
 
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index 0449be5..9e4951a 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -92,7 +92,7 @@
   }
 
   @Test
-  def testCleansCombinedCompactAndDeleteTopic() {
+  def testCleansCombinedCompactAndDeleteTopic(): Unit = {
     val logProps  = new Properties()
     val retentionMs: Integer = 100000
     logProps.put(LogConfig.RetentionMsProp, retentionMs: Integer)
@@ -144,7 +144,7 @@
   }
 
   @Test
-  def testCleanerWithMessageFormatV0() {
+  def testCleanerWithMessageFormatV0(): Unit = {
     val largeMessageKey = 20
     val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, Message.MagicValue_V0)
     val maxMessageSize = codec match {
@@ -232,7 +232,7 @@
   }
     
   @After
-  def tearDown() {
+  def tearDown(): Unit = {
     cleaner.shutdown()
     time.scheduler.shutdown()
     Utils.delete(logDir)
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
new file mode 100644
index 0000000..14d24f7
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerLagIntegrationTest.scala
@@ -0,0 +1,188 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.log
+
+import java.io.File
+import java.util.Properties
+
+import kafka.common.TopicAndPartition
+import kafka.message._
+import kafka.utils._
+import org.apache.kafka.common.record.CompressionType
+import org.apache.kafka.common.utils.Utils
+import org.junit.Assert._
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.runners.Parameterized.Parameters
+
+import scala.collection._
+
+
+/**
+  * This is an integration test that tests the fully integrated log cleaner
+  */
+@RunWith(value = classOf[Parameterized])
+class LogCleanerLagIntegrationTest(compressionCodecName: String) extends Logging {
+  val msPerHour = 60 * 60 * 1000
+
+  val compactionLag = 1 * msPerHour
+  assertTrue("compactionLag must be divisible by 2 for this test", compactionLag % 2 == 0)
+
+  val time = new MockTime(1400000000000L)  // Tue May 13 16:53:20 UTC 2014
+  val cleanerBackOffMs = 200L
+  val segmentSize = 100
+  val deleteDelay = 1000
+  val logName = "log"
+  val logDir = TestUtils.tempDir()
+  var counter = 0
+  val topics = Array(TopicAndPartition("log", 0), TopicAndPartition("log", 1), TopicAndPartition("log", 2))
+  val compressionCodec = CompressionCodec.getCompressionCodec(compressionCodecName)
+
+  @Test
+  def cleanerTest(): Unit = {
+    val cleaner = makeCleaner(parts = 3, backOffMs = cleanerBackOffMs)
+    val log = cleaner.logs.get(topics(0))
+
+    // t = T0
+    val T0 = time.milliseconds
+    val appends0 = writeDups(numKeys = 100, numDups = 3, log, compressionCodec, timestamp = T0)
+    val startSizeBlock0 = log.size
+    debug(s"total log size at T0: $startSizeBlock0")
+
+    val activeSegAtT0 = log.activeSegment
+    debug(s"active segment at T0 has base offset: ${activeSegAtT0.baseOffset}")
+    val sizeUpToActiveSegmentAtT0 = log.logSegments(0L, activeSegAtT0.baseOffset).map(_.size).sum
+    debug(s"log size up to base offset of active segment at T0: $sizeUpToActiveSegmentAtT0")
+
+    cleaner.startup()
+
+    // T0 < t < T1
+    // advance to a time still less than one compaction lag from start
+    time.sleep(compactionLag/2)
+    Thread.sleep(5 * cleanerBackOffMs) // give cleaning thread a chance to _not_ clean
+    assertEquals("There should be no cleaning until the compaction lag has passed", startSizeBlock0, log.size)
+
+    // t = T1 > T0 + compactionLag
+    // advance to time a bit more than one compaction lag from start
+    time.sleep(compactionLag/2 + 1)
+    val T1 = time.milliseconds
+
+    // write another block of data
+    val appends1 = appends0 ++ writeDups(numKeys = 100, numDups = 3, log, compressionCodec, timestamp = T1)
+    val firstBlock1SegmentBaseOffset = activeSegAtT0.baseOffset
+
+    // the first block should get cleaned
+    cleaner.awaitCleaned("log", 0, activeSegAtT0.baseOffset)
+
+    // check the data is the same
+    val read1 = readFromLog(log)
+    assertEquals("Contents of the map shouldn't change.", appends1.toMap, read1.toMap)
+
+    val compactedSize = log.logSegments(0L, activeSegAtT0.baseOffset).map(_.size).sum
+    debug(s"after cleaning the compacted size up to active segment at T0: $compactedSize")
+    val lastCleaned = cleaner.cleanerManager.allCleanerCheckpoints.get(TopicAndPartition("log", 0)).get
+    assertTrue(s"log cleaner should have processed up to offset $firstBlock1SegmentBaseOffset, but lastCleaned=$lastCleaned", lastCleaned >= firstBlock1SegmentBaseOffset)
+    assertTrue(s"log should have been compacted: size up to offset of active segment at T0=$sizeUpToActiveSegmentAtT0 compacted size=$compactedSize",
+      sizeUpToActiveSegmentAtT0 > compactedSize)
+
+    cleaner.logs.remove(topics(0))
+    cleaner.shutdown()
+  }
+
+  private def readFromLog(log: Log): Iterable[(Int, Int)] = {
+    for (segment <- log.logSegments; entry <- segment.log; messageAndOffset <- {
+      // create single message iterator or deep iterator depending on compression codec
+      if (entry.message.compressionCodec == NoCompressionCodec)
+        Stream.cons(entry, Stream.empty).iterator
+      else
+        ByteBufferMessageSet.deepIterator(entry)
+    }) yield {
+      val key = TestUtils.readString(messageAndOffset.message.key).toInt
+      val value = TestUtils.readString(messageAndOffset.message.payload).toInt
+      key -> value
+    }
+  }
+
+  private def writeDups(numKeys: Int, numDups: Int, log: Log, codec: CompressionCodec, timestamp: Long): Seq[(Int, Int)] = {
+    for(dup <- 0 until numDups; key <- 0 until numKeys) yield {
+      val count = counter
+      val info = log.append(TestUtils.singleMessageSet(payload = counter.toString.getBytes, codec = codec, key = key.toString.getBytes, timestamp = timestamp), assignOffsets = true)
+      counter += 1
+      (key, count)
+    }
+  }
+
+  @After
+  def teardown(): Unit = {
+    time.scheduler.shutdown()
+    Utils.delete(logDir)
+  }
+
+  /* create a cleaner instance and logs with the given parameters */
+  private def makeCleaner(parts: Int,
+                  minCleanableDirtyRatio: Float = 0.0F,
+                  numThreads: Int = 1,
+                  backOffMs: Long = 200L,
+                  defaultPolicy: String = "compact",
+                  policyOverrides: Map[String, String] = Map()): LogCleaner = {
+
+    // create partitions and add them to the pool
+    val logs = new Pool[TopicAndPartition, Log]()
+    for(i <- 0 until parts) {
+      val dir = new File(logDir, "log-" + i)
+      dir.mkdirs()
+      val logProps = new Properties()
+      logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
+      logProps.put(LogConfig.SegmentIndexBytesProp, 100*1024: java.lang.Integer)
+      logProps.put(LogConfig.FileDeleteDelayMsProp, deleteDelay: java.lang.Integer)
+      logProps.put(LogConfig.MinCompactionLagMsProp, compactionLag: java.lang.Integer)
+      logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+      logProps.put(LogConfig.MinCleanableDirtyRatioProp, minCleanableDirtyRatio: java.lang.Float)
+
+      val log = new Log(dir = dir,
+        LogConfig(logProps),
+        recoveryPoint = 0L,
+        scheduler = time.scheduler,
+        time = time)
+      logs.put(TopicAndPartition("log", i), log)
+    }
+
+    new LogCleaner(CleanerConfig(numThreads = numThreads, backOffMs = backOffMs),
+      logDirs = Array(logDir),
+      logs = logs,
+      time = time)
+  }
+
+}
+
+object LogCleanerLagIntegrationTest {
+  def oneParameter: java.util.Collection[Array[String]] = {
+    val l = new java.util.ArrayList[Array[String]]()
+    l.add(Array("NONE"))
+    l
+  }
+
+  @Parameters
+  def parameters: java.util.Collection[Array[String]] = {
+    val list = new java.util.ArrayList[Array[String]]()
+    for (codec <- CompressionType.values)
+      list.add(Array(codec.name))
+    list
+  }
+}
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 5508d69..4fa73dc 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -6,7 +6,7 @@
   * (the "License"); you may not use this file except in compliance with
   * the License.  You may obtain a copy of the License at
   *
-  * http://www.apache.org/licenses/LICENSE-2.0
+  *    http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,27 +14,36 @@
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
+
 package kafka.log
 
 import java.io.File
 import java.util.Properties
 
-import kafka.common.TopicAndPartition
-import kafka.message.ByteBufferMessageSet
-import kafka.utils.{MockTime, Pool, TestUtils}
+import kafka.common._
+import kafka.message._
+import kafka.utils._
 import org.apache.kafka.common.utils.Utils
 import org.junit.Assert._
 import org.junit.{After, Test}
 import org.scalatest.junit.JUnitSuite
 
-class LogCleanerManagerTest extends JUnitSuite {
+/**
+  * Unit tests for the log cleaning logic
+  */
+class LogCleanerManagerTest extends JUnitSuite with Logging {
 
   val tmpDir = TestUtils.tempDir()
   val logDir = TestUtils.randomPartitionLogDir(tmpDir)
-  val time = new MockTime()
+  val logProps = new Properties()
+  logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+  logProps.put(LogConfig.SegmentIndexBytesProp, 1024: java.lang.Integer)
+  logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+  val logConfig = LogConfig(logProps)
+  val time = new MockTime(1400000000000L)  // Tue May 13 16:53:20 UTC 2014
 
   @After
-  def tearDown() {
+  def tearDown(): Unit = {
     Utils.delete(tmpDir)
   }
 
@@ -44,7 +53,7 @@
     * as they are handled by the LogManager
     */
   @Test
-  def testLogsWithSegmentsToDeleteShouldNotConsiderCleanupPolicyDeleteLogs() {
+  def testLogsWithSegmentsToDeleteShouldNotConsiderCleanupPolicyDeleteLogs(): Unit = {
     val messageSet = TestUtils.singleMessageSet("test".getBytes)
     val log: Log = createLog(messageSet.sizeInBytes * 5, LogConfig.Delete)
     val cleanerManager: LogCleanerManager = createCleanerManager(log)
@@ -80,15 +89,91 @@
     assertEquals("should have 1 logs ready to be deleted", 0, readyToDelete)
   }
 
+  /**
+    * Test computation of cleanable range with no minimum compaction lag settings active
+    */
+  @Test
+  def testCleanableOffsetsForNone(): Unit = {
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
 
-  def createCleanerManager(log: Log): LogCleanerManager = {
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+    while(log.numberOfSegments < 8)
+      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = time.milliseconds))
+
+    val topicAndPartition = TopicAndPartition("log", 0)
+    val lastClean = Map(topicAndPartition-> 0L)
+    val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicAndPartition, lastClean, time.milliseconds)
+    assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1)
+    assertEquals("The first uncleanable offset begins with the active segment.", log.activeSegment.baseOffset, cleanableOffsets._2)
+  }
+
+  /**
+    * Test computation of cleanable range with a minimum compaction lag time
+    */
+  @Test
+  def testCleanableOffsetsForTime(): Unit = {
+    val compactionLag = 60 * 60 * 1000
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+    logProps.put(LogConfig.MinCompactionLagMsProp, compactionLag: java.lang.Integer)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+    val t0 = time.milliseconds
+    while(log.numberOfSegments < 4)
+      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t0))
+
+    val activeSegAtT0 = log.activeSegment
+
+    time.sleep(compactionLag + 1)
+    val t1 = time.milliseconds
+
+    while (log.numberOfSegments < 8)
+      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t1))
+
+    val topicAndPartition = TopicAndPartition("log", 0)
+    val lastClean = Map(topicAndPartition-> 0L)
+    val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicAndPartition, lastClean, time.milliseconds)
+    assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1)
+    assertEquals("The first uncleanable offset begins with the second block of log entries.", activeSegAtT0.baseOffset, cleanableOffsets._2)
+  }
+
+  /**
+    * Test computation of cleanable range with a minimum compaction lag time that is small enough that
+    * the active segment contains it.
+    */
+  @Test
+  def testCleanableOffsetsForShortTime(): Unit = {
+    val compactionLag = 60 * 60 * 1000
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+    logProps.put(LogConfig.MinCompactionLagMsProp, compactionLag: java.lang.Integer)
+
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+    val t0 = time.milliseconds
+    while (log.numberOfSegments < 8)
+      log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt, timestamp = t0))
+
+    time.sleep(compactionLag + 1)
+
+    val topicAndPartition = TopicAndPartition("log", 0)
+    val lastClean = Map(topicAndPartition-> 0L)
+    val cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicAndPartition, lastClean, time.milliseconds)
+    assertEquals("The first cleanable offset starts at the beginning of the log.", 0L, cleanableOffsets._1)
+    assertEquals("The first uncleanable offset begins with active segment.", log.activeSegment.baseOffset, cleanableOffsets._2)
+  }
+
+  private def createCleanerManager(log: Log): LogCleanerManager = {
     val logs = new Pool[TopicAndPartition, Log]()
     logs.put(TopicAndPartition("log", 0), log)
     val cleanerManager = new LogCleanerManager(Array(logDir), logs)
     cleanerManager
   }
 
-  def appendMessagesAndExpireSegments(set: ByteBufferMessageSet, log: Log): Unit = {
+  private def appendMessagesAndExpireSegments(set: ByteBufferMessageSet, log: Log): Unit = {
     // append some messages to create some segments
     for (i <- 0 until 100)
       log.append(set)
@@ -97,7 +182,7 @@
     log.logSegments.foreach(_.lastModified = time.milliseconds - 1000)
   }
 
-  def createLog(segmentSize: Int, cleanupPolicy: String = "delete"): Log = {
+  private def createLog(segmentSize: Int, cleanupPolicy: String = "delete"): Log = {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, segmentSize: Integer)
     logProps.put(LogConfig.RetentionMsProp, 1: Integer)
@@ -113,5 +198,13 @@
     log
   }
 
+  private def makeLog(dir: File = logDir, config: LogConfig = logConfig) =
+    new Log(dir = dir, config = config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
+
+  private def message(key: Int, value: Int, timestamp: Long) =
+    new ByteBufferMessageSet(new Message(key = key.toString.getBytes,
+      bytes = value.toString.getBytes,
+      timestamp = timestamp,
+      magicValue = Message.MagicValue_V1))
 
 }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index eb4c0ea..9ee3d32 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -481,6 +481,7 @@
         case KafkaConfig.LogCleanerDedupeBufferLoadFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.LogCleanerEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean")
         case KafkaConfig.LogCleanerDeleteRetentionMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
+        case KafkaConfig.LogCleanerMinCompactionLagMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.LogCleanerMinCleanRatioProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
         case KafkaConfig.LogIndexSizeMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "3")
         case KafkaConfig.LogFlushIntervalMessagesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
diff --git a/docs/design.html b/docs/design.html
index 80af0bb..9a6fcb1 100644
--- a/docs/design.html
+++ b/docs/design.html
@@ -320,7 +320,7 @@
 
 Log compaction guarantees the following:
 <ol>
-<li>Any consumer that stays caught-up to within the head of the log will see every message that is written; these messages will have sequential offsets.
+<li>Any consumer that stays caught-up to within the head of the log will see every message that is written; these messages will have sequential offsets. The topic's <code>min.compaction.lag.ms</code> can be used to guarantee the minimum length of time must pass after a message is written before it could be compacted. I.e. it provides a lower bound on how long each message will remain in the (uncompacted) head.
 <li>Ordering of messages is always maintained.  Compaction will never re-order messages, just remove some.
 <li>The offset for a message never changes.  It is the permanent identifier for a position in the log.
 <li>Any read progressing from offset 0 will see at least the final state of all records in the order they were written. All delete markers for deleted records will be seen provided the reader reaches the head of the log in a time period less than the topic's delete.retention.ms setting (the default is 24 hours). This is important as delete marker removal happens concurrently with read (and thus it is important that we not remove any delete marker prior to the reader seeing it).
@@ -344,13 +344,14 @@
   <pre>  log.cleanup.policy=compact</pre>
 This can be done either at topic creation time or using the alter topic command.
 <p>
+The log cleaner can be configured to retain a minimum amount of the uncompacted "head" of the log. This is enabled by setting the compaction time lag.
+  <pre>  log.cleaner.min.compaction.lag.ms</pre>
+
+This can be used to prevent messages newer than a minimum message age from being subject to compaction. If not set, all log segments are eligible for compaction except for the last segment, i.e. the one currently being written to. The active segment will not be compacted even if all of its messages are older than the minimum compaction time lag.
+</p>
+<p>
 Further cleaner configurations are described <a href="/documentation.html#brokerconfigs">here</a>.
 
-<h4><a id="design_compactionlimitations" href="#design_compactionlimitations">Log Compaction Limitations</a></h4>
-
-<ol>
-  <li>You cannot configure yet how much log is retained without compaction (the "head" of the log).  Currently all segments are eligible except for the last segment, i.e. the one currently being written to.</li>
-</ol>
 <h3><a id="design_quotas" href="#design_quotas">4.9 Quotas</a></h3>
 <p>
     Starting in 0.9, the Kafka cluster has the ability to enforce quotas on produce and fetch requests. Quotas are basically byte-rate thresholds defined per client-id. A client-id logically identifies an application making a request. Hence a single client-id can span multiple producer and consumer instances and the quota will apply for all of them as a single entity i.e. if client-id="test-client" has a produce quota of 10MB/sec, this is shared across all instances with that same id.