blob: 574a8f586235a8266f215e79cd87ad807bc2b889 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log
import java.io.File
import kafka.utils.TestUtils
import org.apache.kafka.common.requests.FetchResponse.AbortedTransaction
import org.junit.Assert._
import org.junit.{After, Before, Test}
import org.scalatest.junit.JUnitSuite
class TransactionIndexTest extends JUnitSuite {
var file: File = _
var index: TransactionIndex = _
val offset = 0L
@Before
def setup(): Unit = {
file = TestUtils.tempFile()
index = new TransactionIndex(offset, file)
}
@After
def teardown(): Unit = {
index.close()
}
@Test
def testPositionSetCorrectlyWhenOpened(): Unit = {
val abortedTxns = List(
new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 11),
new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13),
new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25),
new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40))
abortedTxns.foreach(index.append)
index.close()
val reopenedIndex = new TransactionIndex(0L, file)
val anotherAbortedTxn = new AbortedTxn(producerId = 3L, firstOffset = 50, lastOffset = 60, lastStableOffset = 55)
reopenedIndex.append(anotherAbortedTxn)
assertEquals(abortedTxns ++ List(anotherAbortedTxn), reopenedIndex.allAbortedTxns)
}
@Test(expected = classOf[CorruptIndexException])
def testSanityCheck(): Unit = {
val abortedTxns = List(
new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 11),
new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13),
new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25),
new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40))
abortedTxns.foreach(index.append)
index.close()
// open the index with a different starting offset to fake invalid data
val reopenedIndex = new TransactionIndex(100L, file)
reopenedIndex.sanityCheck()
}
@Test(expected = classOf[IllegalArgumentException])
def testLastOffsetMustIncrease(): Unit = {
index.append(new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13))
index.append(new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 15, lastStableOffset = 11))
}
@Test(expected = classOf[IllegalArgumentException])
def testLastOffsetCannotDecrease(): Unit = {
index.append(new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13))
index.append(new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 11))
}
@Test
def testCollectAbortedTransactions(): Unit = {
val abortedTransactions = List(
new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 11),
new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13),
new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25),
new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40))
abortedTransactions.foreach(index.append)
var result = index.collectAbortedTxns(0L, 100L)
assertEquals(abortedTransactions, result.abortedTransactions)
assertFalse(result.isComplete)
result = index.collectAbortedTxns(0L, 32)
assertEquals(abortedTransactions.take(3), result.abortedTransactions)
assertTrue(result.isComplete)
result = index.collectAbortedTxns(0L, 35)
assertEquals(abortedTransactions, result.abortedTransactions)
assertTrue(result.isComplete)
result = index.collectAbortedTxns(10, 35)
assertEquals(abortedTransactions, result.abortedTransactions)
assertTrue(result.isComplete)
result = index.collectAbortedTxns(11, 35)
assertEquals(abortedTransactions.slice(1, 4), result.abortedTransactions)
assertTrue(result.isComplete)
result = index.collectAbortedTxns(20, 41)
assertEquals(abortedTransactions.slice(2, 4), result.abortedTransactions)
assertFalse(result.isComplete)
}
@Test
def testTruncate(): Unit = {
val abortedTransactions = List(
new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 2),
new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 16),
new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25),
new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40))
abortedTransactions.foreach(index.append)
index.truncateTo(51)
assertEquals(abortedTransactions, index.collectAbortedTxns(0L, 100L).abortedTransactions)
index.truncateTo(50)
assertEquals(abortedTransactions.take(3), index.collectAbortedTxns(0L, 100L).abortedTransactions)
index.reset()
assertEquals(List.empty[AbortedTransaction], index.collectAbortedTxns(0L, 100L).abortedTransactions)
}
@Test
def testAbortedTxnSerde(): Unit = {
val pid = 983493L
val firstOffset = 137L
val lastOffset = 299L
val lastStableOffset = 200L
val abortedTxn = new AbortedTxn(pid, firstOffset, lastOffset, lastStableOffset)
assertEquals(AbortedTxn.CurrentVersion, abortedTxn.version)
assertEquals(pid, abortedTxn.producerId)
assertEquals(firstOffset, abortedTxn.firstOffset)
assertEquals(lastOffset, abortedTxn.lastOffset)
assertEquals(lastStableOffset, abortedTxn.lastStableOffset)
}
@Test
def testRenameIndex(): Unit = {
val renamed = TestUtils.tempFile()
index.append(new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 2))
index.renameTo(renamed)
index.append(new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 16))
val abortedTxns = index.collectAbortedTxns(0L, 100L).abortedTransactions
assertEquals(2, abortedTxns.size)
assertEquals(0, abortedTxns(0).firstOffset)
assertEquals(5, abortedTxns(1).firstOffset)
}
}