blob: 777a6a06d5b7e7c78f6997199be8572a163d244a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.checkpoint
import java.util.Properties
import org.apache.samza.Partition
import org.apache.samza.checkpoint.CheckpointTool.{CheckpointToolCommandLine, TaskNameToCheckpointMap}
import org.apache.samza.checkpoint.TestCheckpointTool.{MockCheckpointManagerFactory, MockSystemFactory}
import org.apache.samza.config._
import org.apache.samza.container.TaskName
import org.apache.samza.coordinator.metadatastore.{CoordinatorStreamStore, CoordinatorStreamStoreTestUtil}
import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
import org.apache.samza.execution.JobPlanner
import org.apache.samza.metrics.MetricsRegistry
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
import org.apache.samza.system._
import org.junit.{Before, Test}
import org.mockito.Matchers._
import org.mockito.Mockito
import org.mockito.Mockito._
import org.scalatest.junit.AssertionsForJUnit
import org.scalatest.mockito.MockitoSugar
import scala.collection.JavaConverters._
object TestCheckpointTool {
var checkpointManager: CheckpointManager = _
var systemConsumer: SystemConsumer = _
var systemProducer: SystemProducer = _
var systemAdmin: SystemAdmin = _
var coordinatorConfig: Config = new MapConfig()
class MockCheckpointManagerFactory extends CheckpointManagerFactory {
def getCheckpointManager(config: Config, registry: MetricsRegistry): CheckpointManager = {
coordinatorConfig = config
checkpointManager
}
}
class MockSystemFactory extends SystemFactory {
override def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = systemConsumer
override def getProducer(systemName: String, config: Config, registry: MetricsRegistry) = systemProducer
override def getAdmin(systemName: String, config: Config) = systemAdmin
}
}
class TestCheckpointTool extends AssertionsForJUnit with MockitoSugar {
var config: MapConfig = _
val tn0 = new TaskName("Partition 0")
val tn1 = new TaskName("Partition 1")
val p0 = new Partition(0)
val p1 = new Partition(1)
@Before
def setup() {
val userDefinedConfig: MapConfig = new MapConfig(Map(
ApplicationConfig.APP_NAME -> "test",
JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
TaskConfig.INPUT_STREAMS -> "test.foo",
TaskConfig.CHECKPOINT_MANAGER_FACTORY -> classOf[MockCheckpointManagerFactory].getName,
SystemConfig.SYSTEM_FACTORY_FORMAT.format("test") -> classOf[MockSystemFactory].getName,
SystemConfig.SYSTEM_FACTORY_FORMAT.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName,
TaskConfig.GROUPER_FACTORY -> "org.apache.samza.container.grouper.task.GroupByContainerCountFactory"
).asJava)
config = JobPlanner.generateSingleJobConfig(userDefinedConfig)
val metadata = new SystemStreamMetadata("foo", Map[Partition, SystemStreamPartitionMetadata](
new Partition(0) -> new SystemStreamPartitionMetadata("0", "100", "101"),
new Partition(1) -> new SystemStreamPartitionMetadata("0", "200", "201")
).asJava)
TestCheckpointTool.checkpointManager = mock[CheckpointManager]
TestCheckpointTool.systemAdmin = mock[SystemAdmin]
when(TestCheckpointTool.systemAdmin.getSystemStreamPartitionCounts(Set("foo").asJava, 0))
.thenReturn(Map("foo" -> metadata).asJava)
when(TestCheckpointTool.checkpointManager.readLastCheckpoint(tn0))
.thenReturn(new Checkpoint(Map(new SystemStreamPartition("test", "foo", p0) -> "1234").asJava))
when(TestCheckpointTool.checkpointManager.readLastCheckpoint(tn1))
.thenReturn(new Checkpoint(Map(new SystemStreamPartition("test", "foo", p1) -> "4321").asJava))
}
@Test
def testReadLatestCheckpoint() {
val checkpointTool = CheckpointTool(config, null)
checkpointTool.run()
verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(tn0)
verify(TestCheckpointTool.checkpointManager).readLastCheckpoint(tn1)
verify(TestCheckpointTool.checkpointManager, never()).writeCheckpoint(any(), any())
}
@Test
def testOverwriteCheckpoint() {
val toOverwrite = Map(tn0 -> Map(new SystemStreamPartition("test", "foo", p0) -> "42"),
tn1 -> Map(new SystemStreamPartition("test", "foo", p1) -> "43"))
val checkpointTool = CheckpointTool(config, toOverwrite)
checkpointTool.run()
verify(TestCheckpointTool.checkpointManager)
.writeCheckpoint(tn0, new Checkpoint(Map(new SystemStreamPartition("test", "foo", p0) -> "42").asJava))
verify(TestCheckpointTool.checkpointManager)
.writeCheckpoint(tn1, new Checkpoint(Map(new SystemStreamPartition("test", "foo", p1) -> "43").asJava))
}
@Test
def testGrouping(): Unit = {
val config : java.util.Properties = new Properties()
config.put("tasknames.Partition 0.systems.kafka-atc-repartitioned-requests.streams.ArticleRead.partitions.0", "0000")
config.put("tasknames.Partition 0.systems.kafka-atc-repartitioned-requests.streams.CommunicationRequest.partitions.0", "1111")
config.put("tasknames.Partition 1.systems.kafka-atc-repartitioned-requests.streams.ArticleRead.partitions.1", "2222")
config.put("tasknames.Partition 1.systems.kafka-atc-repartitioned-requests.streams.CommunicationRequest.partitions.1", "44444")
config.put("tasknames.Partition 1.systems.kafka-atc-repartitioned-requests.streams.StateChange.partitions.1", "5555")
val ccl = new CheckpointToolCommandLine
val result = ccl.parseOffsets(config)
assert(result(new TaskName("Partition 0")).size == 2)
assert(result(new TaskName("Partition 1")).size == 3)
}
@Test
def testShouldInstantiateCheckpointManagerWithConfigurationFromCoordinatorStream(): Unit = {
val offsetMap: TaskNameToCheckpointMap = Map(tn0 -> Map(new SystemStreamPartition("test", "foo", p0) -> "42"),
tn1 -> Map(new SystemStreamPartition("test", "foo", p1) -> "43"))
val userDefinedConfig: MapConfig = new MapConfig(Map(
ApplicationConfig.APP_NAME -> "test",
JobConfig.JOB_COORDINATOR_SYSTEM -> "coordinator",
TaskConfig.INPUT_STREAMS -> "test.foo",
TaskConfig.CHECKPOINT_MANAGER_FACTORY -> classOf[MockCheckpointManagerFactory].getName,
SystemConfig.SYSTEM_FACTORY_FORMAT.format("test") -> classOf[MockSystemFactory].getName,
SystemConfig.SYSTEM_FACTORY_FORMAT.format("coordinator") -> classOf[MockCoordinatorStreamSystemFactory].getName,
TaskConfig.GROUPER_FACTORY -> "org.apache.samza.container.grouper.task.GroupByContainerCountFactory"
).asJava)
val generatedConfigs: MapConfig = JobPlanner.generateSingleJobConfig(userDefinedConfig)
val coordinatorStreamStoreTestUtil: CoordinatorStreamStoreTestUtil = new CoordinatorStreamStoreTestUtil(config)
val coordinatorStreamStore: CoordinatorStreamStore = coordinatorStreamStoreTestUtil.getCoordinatorStreamStore
val checkpointTool: CheckpointTool = Mockito.spy(new CheckpointTool(offsetMap, coordinatorStreamStore, generatedConfigs))
Mockito.when(checkpointTool.getConfigFromCoordinatorStream(coordinatorStreamStore)).thenReturn(userDefinedConfig)
checkpointTool.run()
verify(TestCheckpointTool.checkpointManager)
.writeCheckpoint(tn0, new Checkpoint(Map(new SystemStreamPartition("test", "foo", p0) -> "42").asJava))
verify(TestCheckpointTool.checkpointManager)
.writeCheckpoint(tn1, new Checkpoint(Map(new SystemStreamPartition("test", "foo", p1) -> "43").asJava))
// Two configurations job.id, job.name are populated in the coordinator config by SamzaRuntime and it is not present in generated config.
assert(generatedConfigs.entrySet().containsAll(TestCheckpointTool.coordinatorConfig.entrySet()))
}
}