blob: a79eccaa8fc18d197b77f9363f1814fefc4ac40d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.checkpoint
import java.util
import org.apache.samza.container.TaskName
import org.apache.samza.Partition
import org.apache.samza.system.SystemStream
import org.apache.samza.system.SystemStreamMetadata
import org.apache.samza.system.SystemStreamMetadata.{OffsetType, SystemStreamPartitionMetadata}
import org.apache.samza.system.SystemStreamPartition
import org.junit.Assert._
import org.junit.{Ignore, Test}
import org.apache.samza.SamzaException
import org.apache.samza.config.MapConfig
import org.apache.samza.system.SystemAdmin
import org.scalatest.Assertions.intercept
import scala.collection.JavaConversions._
class TestOffsetManager {
@Test
def testSystemShouldUseDefaults {
val taskName = new TaskName("c")
val systemStream = new SystemStream("test-system", "test-stream")
val partition = new Partition(0)
val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
val config = new MapConfig(Map("systems.test-system.samza.offset.default" -> "oldest"))
val offsetManager = OffsetManager(systemStreamMetadata, config)
offsetManager.register(taskName, Set(systemStreamPartition))
offsetManager.start
assertFalse(offsetManager.getLastProcessedOffset(systemStreamPartition).isDefined)
assertTrue(offsetManager.getStartingOffset(systemStreamPartition).isDefined)
assertEquals("0", offsetManager.getStartingOffset(systemStreamPartition).get)
}
@Test
def testShouldLoadFromAndSaveWithCheckpointManager {
val taskName = new TaskName("c")
val systemStream = new SystemStream("test-system", "test-stream")
val partition = new Partition(0)
val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
val config = new MapConfig
val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
val systemAdmins = Map("test-system" -> getSystemAdmin)
val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins)
offsetManager.register(taskName, Set(systemStreamPartition))
offsetManager.start
assertTrue(checkpointManager.isStarted)
assertEquals(1, checkpointManager.registered.size)
assertEquals(taskName, checkpointManager.registered.head)
assertEquals(checkpointManager.checkpoints.head._2, checkpointManager.readLastCheckpoint(taskName))
// Should get offset 45 back from the checkpoint manager, which is last processed, and system admin should return 46 as starting offset.
assertEquals("46", offsetManager.getStartingOffset(systemStreamPartition).get)
assertEquals("45", offsetManager.getLastProcessedOffset(systemStreamPartition).get)
offsetManager.update(systemStreamPartition, "46")
assertEquals("46", offsetManager.getLastProcessedOffset(systemStreamPartition).get)
offsetManager.update(systemStreamPartition, "47")
assertEquals("47", offsetManager.getLastProcessedOffset(systemStreamPartition).get)
// Should never update starting offset.
assertEquals("46", offsetManager.getStartingOffset(systemStreamPartition).get)
offsetManager.checkpoint(taskName)
val expectedCheckpoint = new Checkpoint(Map(systemStreamPartition -> "47"))
assertEquals(expectedCheckpoint, checkpointManager.readLastCheckpoint(taskName))
}
@Test
def testShouldResetStreams {
val taskName = new TaskName("c")
val systemStream = new SystemStream("test-system", "test-stream")
val partition = new Partition(0)
val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
val checkpoint = new Checkpoint(Map(systemStreamPartition -> "45"))
val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
val config = new MapConfig(Map(
"systems.test-system.samza.offset.default" -> "oldest",
"systems.test-system.streams.test-stream.samza.reset.offset" -> "true"))
val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager)
offsetManager.register(taskName, Set(systemStreamPartition))
offsetManager.start
assertTrue(checkpointManager.isStarted)
assertEquals(1, checkpointManager.registered.size)
assertEquals(taskName, checkpointManager.registered.head)
assertEquals(checkpoint, checkpointManager.readLastCheckpoint(taskName))
// Should be zero even though the checkpoint has an offset of 45, since we're forcing a reset.
assertEquals("0", offsetManager.getStartingOffset(systemStreamPartition).get)
}
@Test
def testOffsetManagerShouldHandleNullCheckpoints {
val systemStream = new SystemStream("test-system", "test-stream")
val partition1 = new Partition(0)
val partition2 = new Partition(1)
val taskName1 = new TaskName("P0")
val taskName2 = new TaskName("P1")
val systemStreamPartition1 = new SystemStreamPartition(systemStream, partition1)
val systemStreamPartition2 = new SystemStreamPartition(systemStream, partition2)
val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(
partition1 -> new SystemStreamPartitionMetadata("0", "1", "2"),
partition2 -> new SystemStreamPartitionMetadata("3", "4", "5")))
val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
val checkpoint = new Checkpoint(Map(systemStreamPartition1 -> "45"))
// Checkpoint manager only has partition 1.
val checkpointManager = getCheckpointManager(systemStreamPartition1, taskName1)
val systemAdmins = Map("test-system" -> getSystemAdmin)
val config = new MapConfig
val offsetManager = OffsetManager(systemStreamMetadata, config, checkpointManager, systemAdmins)
// Register both partitions. Partition 2 shouldn't have a checkpoint.
offsetManager.register(taskName1, Set(systemStreamPartition1))
offsetManager.register(taskName2, Set(systemStreamPartition2))
offsetManager.start
assertTrue(checkpointManager.isStarted)
assertEquals(2, checkpointManager.registered.size)
assertEquals(checkpoint, checkpointManager.readLastCheckpoint(taskName1))
assertNull(checkpointManager.readLastCheckpoint(taskName2))
}
@Test
def testShouldFailWhenMissingMetadata {
val taskName = new TaskName("c")
val systemStream = new SystemStream("test-system", "test-stream")
val partition = new Partition(0)
val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
val offsetManager = new OffsetManager
offsetManager.register(taskName, Set(systemStreamPartition))
intercept[SamzaException] {
offsetManager.start
}
}
@Ignore("OffsetManager.start is supposed to throw an exception - but it doesn't") @Test
def testShouldFailWhenMissingDefault {
val taskName = new TaskName("c")
val systemStream = new SystemStream("test-system", "test-stream")
val partition = new Partition(0)
val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
val offsetManager = OffsetManager(systemStreamMetadata, new MapConfig(Map[String, String]()))
offsetManager.register(taskName, Set(systemStreamPartition))
intercept[SamzaException] {
offsetManager.start
}
}
@Test
def testDefaultSystemShouldFailWhenFailIsSpecified {
val systemStream = new SystemStream("test-system", "test-stream")
val partition = new Partition(0)
val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
val config = new MapConfig(Map("systems.test-system.samza.offset.default" -> "fail"))
intercept[IllegalArgumentException] {
OffsetManager(systemStreamMetadata, config)
}
}
@Test
def testDefaultStreamShouldFailWhenFailIsSpecified {
val systemStream = new SystemStream("test-system", "test-stream")
val partition = new Partition(0)
val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")))
val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
val config = new MapConfig(Map("systems.test-system.streams.test-stream.samza.offset.default" -> "fail"))
intercept[IllegalArgumentException] {
OffsetManager(systemStreamMetadata, config)
}
}
@Test
def testOutdatedStreamInCheckpoint {
val taskName = new TaskName("c")
val systemStream0 = new SystemStream("test-system-0", "test-stream")
val systemStream1 = new SystemStream("test-system-1", "test-stream")
val partition0 = new Partition(0)
val systemStreamPartition0 = new SystemStreamPartition(systemStream0, partition0)
val systemStreamPartition1 = new SystemStreamPartition(systemStream1, partition0)
val testStreamMetadata = new SystemStreamMetadata(systemStream0.getStream, Map(partition0 -> new SystemStreamPartitionMetadata("0", "1", "2")))
val systemStreamMetadata = Map(systemStream0 -> testStreamMetadata)
val offsetSettings = Map(systemStream0 -> OffsetSetting(testStreamMetadata, OffsetType.UPCOMING, false))
val checkpointManager = getCheckpointManager(systemStreamPartition1)
val offsetManager = new OffsetManager(offsetSettings, checkpointManager)
offsetManager.register(taskName, Set(systemStreamPartition0))
offsetManager.start
assertTrue(checkpointManager.isStarted)
assertEquals(1, checkpointManager.registered.size)
assertNull(offsetManager.getLastProcessedOffset(systemStreamPartition1).getOrElse(null))
}
private def getCheckpointManager(systemStreamPartition: SystemStreamPartition, taskName:TaskName = new TaskName("taskName")) = {
val checkpoint = new Checkpoint(Map(systemStreamPartition -> "45"))
new CheckpointManager {
var isStarted = false
var isStopped = false
var registered = Set[TaskName]()
var checkpoints: Map[TaskName, Checkpoint] = Map(taskName -> checkpoint)
var taskNameToPartitionMapping: util.Map[TaskName, java.lang.Integer] = new util.HashMap[TaskName, java.lang.Integer]()
def start { isStarted = true }
def register(taskName: TaskName) { registered += taskName }
def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) { checkpoints += taskName -> checkpoint }
def readLastCheckpoint(taskName: TaskName) = checkpoints.getOrElse(taskName, null)
def stop { isStopped = true }
override def writeChangeLogPartitionMapping(mapping: util.Map[TaskName, java.lang.Integer]): Unit = taskNameToPartitionMapping = mapping
override def readChangeLogPartitionMapping(): util.Map[TaskName, java.lang.Integer] = taskNameToPartitionMapping
}
}
private def getSystemAdmin = {
new SystemAdmin {
def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) =
offsets.mapValues(offset => (offset.toLong + 1).toString)
def getSystemStreamMetadata(streamNames: java.util.Set[String]) =
Map[String, SystemStreamMetadata]()
}
}
}