blob: 3ba26d349812c0491d401664c12cbb2e13e8c66a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log
import java.io.File
import kafka.utils._
import kafka.server.{KafkaConfig, KafkaServer}
import junit.framework.Assert._
import java.util.{Random, Properties}
import kafka.api.{FetchRequest, OffsetRequest}
import collection.mutable.WrappedArray
import kafka.consumer.SimpleConsumer
import org.scalatest.junit.JUnitSuite
import org.junit.{After, Before, Test}
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
import org.apache.log4j._
object LogOffsetTest {
val random = new Random()
}
class LogOffsetTest extends JUnitSuite {
var logDir: File = null
var topicLogDir: File = null
var server: KafkaServer = null
var logSize: Int = 100
val brokerPort: Int = 9099
var simpleConsumer: SimpleConsumer = null
@Before
def setUp() {
val config: Properties = createBrokerConfig(1, brokerPort)
val logDirPath = config.getProperty("log.dir")
logDir = new File(logDirPath)
server = TestUtils.createServer(new KafkaConfig(config))
simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024)
}
@After
def tearDown() {
simpleConsumer.close
server.shutdown
Utils.rm(logDir)
}
@Test
def testEmptyLogs() {
val messageSet: ByteBufferMessageSet = simpleConsumer.fetch(
new FetchRequest("test", 0, 0, 300 * 1024))
assertFalse(messageSet.iterator.hasNext)
val name = "test"
val logFile = new File(logDir, name + "-0")
{
val offsets = simpleConsumer.getOffsetsBefore(name, 0, OffsetRequest.LatestTime, 10)
assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
assertTrue(!logFile.exists())
}
{
val offsets = simpleConsumer.getOffsetsBefore(name, 0, OffsetRequest.EarliestTime, 10)
assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
assertTrue(!logFile.exists())
}
{
val offsets = simpleConsumer.getOffsetsBefore(name, 0, SystemTime.milliseconds, 10)
assertEquals( 0, offsets.length )
assertTrue(!logFile.exists())
}
}
@Test
def testGetOffsetsBeforeLatestTime() {
val topicPartition = "kafka-" + 0
val topicPartitionPath = getLogDir.getAbsolutePath + "/" + topicPartition
val topic = topicPartition.split("-").head
val part = Integer.valueOf(topicPartition.split("-").last).intValue
val logManager = server.getLogManager
val log = logManager.getOrCreateLog(topic, part)
val message = new Message(Integer.toString(42).getBytes())
for(i <- 0 until 20)
log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
log.flush()
Thread.sleep(100)
val offsetRequest = new OffsetRequest(topic, part, OffsetRequest.LatestTime, 10)
val offsets = log.getOffsetsBefore(offsetRequest)
assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]))
val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part,
OffsetRequest.LatestTime, 10)
assertTrue((Array(240L, 216L, 108L, 0L): WrappedArray[Long]) == (consumerOffsets: WrappedArray[Long]))
// try to fetch using latest offset
val messageSet: ByteBufferMessageSet = simpleConsumer.fetch(
new FetchRequest(topic, 0, consumerOffsets.head, 300 * 1024))
assertFalse(messageSet.iterator.hasNext)
}
@Test
def testEmptyLogsGetOffsets() {
val topicPartition = "kafka-" + LogOffsetTest.random.nextInt(10)
val topicPartitionPath = getLogDir.getAbsolutePath + "/" + topicPartition
topicLogDir = new File(topicPartitionPath)
topicLogDir.mkdir
val topic = topicPartition.split("-").head
val part = Integer.valueOf(topicPartition.split("-").last).intValue
var offsetChanged = false
for(i <- 1 to 14) {
val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part,
OffsetRequest.EarliestTime, 1)
if(consumerOffsets(0) == 1) {
offsetChanged = true
}
}
assertFalse(offsetChanged)
}
@Test
def testGetOffsetsBeforeNow() {
val topicPartition = "kafka-" + LogOffsetTest.random.nextInt(10)
val topicPartitionPath = getLogDir.getAbsolutePath + "/" + topicPartition
val topic = topicPartition.split("-").head
val part = Integer.valueOf(topicPartition.split("-").last).intValue
val logManager = server.getLogManager
val log = logManager.getOrCreateLog(topic, part)
val message = new Message(Integer.toString(42).getBytes())
for(i <- 0 until 20)
log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
log.flush()
val now = System.currentTimeMillis
Thread.sleep(100)
val offsetRequest = new OffsetRequest(topic, part, now, 10)
val offsets = log.getOffsetsBefore(offsetRequest)
assertTrue((Array(216L, 108L, 0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]))
val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part, now, 10)
assertTrue((Array(216L, 108L, 0L): WrappedArray[Long]) == (consumerOffsets: WrappedArray[Long]))
}
@Test
def testGetOffsetsBeforeEarliestTime() {
val topicPartition = "kafka-" + LogOffsetTest.random.nextInt(10)
val topicPartitionPath = getLogDir.getAbsolutePath + "/" + topicPartition
val topic = topicPartition.split("-").head
val part = Integer.valueOf(topicPartition.split("-").last).intValue
val logManager = server.getLogManager
val log = logManager.getOrCreateLog(topic, part)
val message = new Message(Integer.toString(42).getBytes())
for(i <- 0 until 20)
log.append(new ByteBufferMessageSet(NoCompressionCodec, message))
log.flush()
Thread.sleep(100)
val offsetRequest = new OffsetRequest(topic, part,
OffsetRequest.EarliestTime, 10)
val offsets = log.getOffsetsBefore(offsetRequest)
assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
val consumerOffsets = simpleConsumer.getOffsetsBefore(topic, part,
OffsetRequest.EarliestTime, 10)
assertTrue( (Array(0L): WrappedArray[Long]) == (offsets: WrappedArray[Long]) )
}
private def createBrokerConfig(nodeId: Int, port: Int): Properties = {
val props = new Properties
props.put("brokerid", nodeId.toString)
props.put("port", port.toString)
props.put("log.dir", getLogDir.getAbsolutePath)
props.put("log.flush.interval", "1")
props.put("enable.zookeeper", "false")
props.put("num.partitions", "20")
props.put("log.retention.hours", "10")
props.put("log.cleanup.interval.mins", "5")
props.put("log.file.size", logSize.toString)
props
}
private def getLogDir(): File = {
val dir = TestUtils.tempDir()
dir
}
}