blob: 98e3ab8c8e81a650544ffb72f6534a7b8ea7a9ae [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.log.remote
import kafka.log.UnifiedLog
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteStorageManager}
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{OffsetIndex, OffsetPosition, TimeIndex}
import org.apache.kafka.test.TestUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito._
import java.io.{File, FileInputStream}
import java.nio.file.Files
import java.util.Collections
import scala.collection.mutable
class RemoteIndexCacheTest {
val time = new MockTime()
val partition = new TopicPartition("foo", 0)
val idPartition = new TopicIdPartition(Uuid.randomUuid(), partition)
val logDir: File = TestUtils.tempDirectory("kafka-logs")
val tpDir: File = new File(logDir, partition.toString)
val brokerId = 1
val baseOffset = 45L
val lastOffset = 75L
val segmentSize = 1024
val rsm: RemoteStorageManager = mock(classOf[RemoteStorageManager])
val cache: RemoteIndexCache = new RemoteIndexCache(remoteStorageManager = rsm, logDir = logDir.toString)
val remoteLogSegmentId = new RemoteLogSegmentId(idPartition, Uuid.randomUuid())
val rlsMetadata: RemoteLogSegmentMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset,
time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L))
@BeforeEach
def setup(): Unit = {
Files.createDirectory(tpDir.toPath)
val txnIdxFile = new File(tpDir, "txn-index" + UnifiedLog.TxnIndexFileSuffix)
txnIdxFile.createNewFile()
when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType])))
.thenAnswer(ans => {
val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
val indexType = ans.getArgument[IndexType](1)
val maxEntries = (metadata.endOffset() - metadata.startOffset()).asInstanceOf[Int]
val offsetIdx = new OffsetIndex(new File(tpDir, String.valueOf(metadata.startOffset()) + UnifiedLog.IndexFileSuffix),
metadata.startOffset(), maxEntries * 8)
val timeIdx = new TimeIndex(new File(tpDir, String.valueOf(metadata.startOffset()) + UnifiedLog.TimeIndexFileSuffix),
metadata.startOffset(), maxEntries * 12)
maybeAppendIndexEntries(offsetIdx, timeIdx)
indexType match {
case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
case IndexType.TRANSACTION => new FileInputStream(txnIdxFile)
case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed.
}
})
}
@AfterEach
def cleanup(): Unit = {
reset(rsm)
cache.entries.forEach((_, v) => v.cleanup())
cache.close()
}
@Test
def testFetchIndexFromRemoteStorage(): Unit = {
val offsetIndex = cache.getIndexEntry(rlsMetadata).offsetIndex
val offsetPosition1 = offsetIndex.entry(1)
// this call should have invoked fetchOffsetIndex, fetchTimestampIndex once
val resultPosition = cache.lookupOffset(rlsMetadata, offsetPosition1.offset)
assertEquals(offsetPosition1.position, resultPosition)
verifyFetchIndexInvocation(count = 1, Seq(IndexType.OFFSET, IndexType.TIMESTAMP))
// this should not cause fetching index from RemoteStorageManager as it is already fetched earlier
reset(rsm)
val offsetPosition2 = offsetIndex.entry(2)
val resultPosition2 = cache.lookupOffset(rlsMetadata, offsetPosition2.offset)
assertEquals(offsetPosition2.position, resultPosition2)
assertNotNull(cache.getIndexEntry(rlsMetadata))
verifyNoInteractions(rsm)
}
@Test
def testPositionForNonExistingIndexFromRemoteStorage(): Unit = {
val offsetIndex = cache.getIndexEntry(rlsMetadata).offsetIndex
val lastOffsetPosition = cache.lookupOffset(rlsMetadata, offsetIndex.lastOffset)
val greaterOffsetThanLastOffset = offsetIndex.lastOffset + 1
assertEquals(lastOffsetPosition, cache.lookupOffset(rlsMetadata, greaterOffsetThanLastOffset))
// offsetIndex.lookup() returns OffsetPosition(baseOffset, 0) for offsets smaller than least entry in the offset index.
val nonExistentOffsetPosition = new OffsetPosition(baseOffset, 0)
val lowerOffsetThanBaseOffset = offsetIndex.baseOffset - 1
assertEquals(nonExistentOffsetPosition.position, cache.lookupOffset(rlsMetadata, lowerOffsetThanBaseOffset))
}
@Test
def testCacheEntryExpiry(): Unit = {
val cache = new RemoteIndexCache(maxSize = 2, rsm, logDir = logDir.toString)
val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
assertEquals(0, cache.entries.size())
// getIndex for first time will call rsm#fetchIndex
cache.getIndexEntry(metadataList.head)
assertEquals(1, cache.entries.size())
// Calling getIndex on the same entry should not call rsm#fetchIndex again, but it should retrieve from cache
cache.getIndexEntry(metadataList.head)
assertEquals(1, cache.entries.size())
verifyFetchIndexInvocation(count = 1)
// Here a new key metadataList(1) is invoked, that should call rsm#fetchIndex, making the count to 2
cache.getIndexEntry(metadataList.head)
cache.getIndexEntry(metadataList(1))
assertEquals(2, cache.entries.size())
verifyFetchIndexInvocation(count = 2)
// getting index for metadataList.last should call rsm#fetchIndex, but metadataList(1) is already in cache.
cache.getIndexEntry(metadataList.last)
cache.getIndexEntry(metadataList(1))
assertEquals(2, cache.entries.size())
assertTrue(cache.entries.containsKey(metadataList.last.remoteLogSegmentId().id()))
assertTrue(cache.entries.containsKey(metadataList(1).remoteLogSegmentId().id()))
verifyFetchIndexInvocation(count = 3)
// getting index for metadataList.head should call rsm#fetchIndex as that entry was expired earlier,
// but metadataList(1) is already in cache.
cache.getIndexEntry(metadataList(1))
cache.getIndexEntry(metadataList.head)
assertEquals(2, cache.entries.size())
assertFalse(cache.entries.containsKey(metadataList.last.remoteLogSegmentId().id()))
verifyFetchIndexInvocation(count = 4)
}
@Test
def testGetIndexAfterCacheClose(): Unit = {
val cache = new RemoteIndexCache(maxSize = 2, rsm, logDir = logDir.toString)
val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
assertEquals(0, cache.entries.size())
cache.getIndexEntry(metadataList.head)
assertEquals(1, cache.entries.size())
verifyFetchIndexInvocation(count = 1)
cache.close()
// Check IllegalStateException is thrown when index is accessed after it is closed.
assertThrows(classOf[IllegalStateException], () => cache.getIndexEntry(metadataList.head))
}
@Test
def testReloadCacheAfterClose(): Unit = {
val cache = new RemoteIndexCache(maxSize = 2, rsm, logDir = logDir.toString)
val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
assertEquals(0, cache.entries.size())
// getIndex for first time will call rsm#fetchIndex
cache.getIndexEntry(metadataList.head)
assertEquals(1, cache.entries.size())
// Calling getIndex on the same entry should not call rsm#fetchIndex again, but it should retrieve from cache
cache.getIndexEntry(metadataList.head)
assertEquals(1, cache.entries.size())
verifyFetchIndexInvocation(count = 1)
// Here a new key metadataList(1) is invoked, that should call rsm#fetchIndex, making the count to 2
cache.getIndexEntry(metadataList(1))
assertEquals(2, cache.entries.size())
// Calling getIndex on the same entry should not call rsm#fetchIndex again, but it should retrieve from cache
cache.getIndexEntry(metadataList(1))
assertEquals(2, cache.entries.size())
verifyFetchIndexInvocation(count = 2)
// Here a new key metadataList(2) is invoked, that should call rsm#fetchIndex, making the count to 2
cache.getIndexEntry(metadataList(2))
assertEquals(2, cache.entries.size())
// Calling getIndex on the same entry should not call rsm#fetchIndex again, but it should retrieve from cache
cache.getIndexEntry(metadataList(2))
assertEquals(2, cache.entries.size())
verifyFetchIndexInvocation(count = 3)
// Close the cache
cache.close()
// Reload the cache from the disk and check the cache size is same as earlier
val reloadedCache = new RemoteIndexCache(maxSize = 2, rsm, logDir = logDir.toString)
assertEquals(2, reloadedCache.entries.size())
reloadedCache.close()
}
private def verifyFetchIndexInvocation(count: Int,
indexTypes: Seq[IndexType] =
Seq(IndexType.OFFSET, IndexType.TIMESTAMP, IndexType.TRANSACTION)): Unit = {
for (indexType <- indexTypes) {
verify(rsm, times(count)).fetchIndex(any(classOf[RemoteLogSegmentMetadata]), ArgumentMatchers.eq(indexType))
}
}
private def generateRemoteLogSegmentMetadata(size: Int,
tpId: TopicIdPartition): List[RemoteLogSegmentMetadata] = {
val metadataList = mutable.Buffer.empty[RemoteLogSegmentMetadata]
for (i <- 0 until size) {
metadataList.append(new RemoteLogSegmentMetadata(new RemoteLogSegmentId(tpId, Uuid.randomUuid()), baseOffset * i,
baseOffset * i + 10, time.milliseconds(), brokerId, time.milliseconds(), segmentSize,
Collections.singletonMap(0, 0L)))
}
metadataList.toList
}
private def maybeAppendIndexEntries(offsetIndex: OffsetIndex,
timeIndex: TimeIndex): Unit = {
if (!offsetIndex.isFull) {
val curTime = time.milliseconds()
for (i <- 0 until offsetIndex.maxEntries) {
val offset = offsetIndex.baseOffset + i
offsetIndex.append(offset, i)
timeIndex.maybeAppend(curTime + i, offset, true)
}
offsetIndex.flush()
timeIndex.flush()
}
}
}