blob: 1672191dd2d36d2ff09b8abe7d8bc8168f976981 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.container
import java.util.concurrent.ConcurrentHashMap
import org.apache.samza.Partition
import org.apache.samza.checkpoint.{Checkpoint, OffsetManager}
import org.apache.samza.config.{Config, MapConfig}
import org.apache.samza.metrics.{Counter, Metric, MetricsRegistryMap}
import org.apache.samza.serializers.SerdeManager
import org.apache.samza.system.IncomingMessageEnvelope
import org.apache.samza.system.SystemAdmin
import org.apache.samza.system.SystemConsumer
import org.apache.samza.system.SystemConsumers
import org.apache.samza.system.SystemProducer
import org.apache.samza.system.SystemProducers
import org.apache.samza.system.SystemStream
import org.apache.samza.system.SystemStreamMetadata
import org.apache.samza.storage.TaskStorageManager
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
import org.apache.samza.system._
import org.apache.samza.system.chooser.RoundRobinChooser
import org.apache.samza.task._
import org.junit.Assert._
import org.junit.Test
import org.mockito.Matchers._
import org.mockito.Mockito
import org.mockito.Mockito._
import org.scalatest.Assertions.intercept
import scala.collection.mutable.ListBuffer
import scala.collection.JavaConverters._
class TestTaskInstance {
@Test
def testOffsetsAreUpdatedOnProcess {
val task = new StreamTask {
def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) {
}
}
val config = new MapConfig
val partition = new Partition(0)
val consumerMultiplexer = new SystemConsumers(
new RoundRobinChooser,
Map[String, SystemConsumer]())
val producerMultiplexer = new SystemProducers(
Map[String, SystemProducer](),
new SerdeManager)
val systemStream = new SystemStream("test-system", "test-stream")
val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
val systemStreamPartitions = Set(systemStreamPartition)
// Pretend our last checkpointed (next) offset was 2.
val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
val taskName = new TaskName("taskName")
val collector = new TaskInstanceCollector(producerMultiplexer)
val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava, new MetricsRegistryMap)
val taskInstance: TaskInstance = new TaskInstance(
task,
taskName,
config,
new TaskInstanceMetrics,
null,
consumerMultiplexer,
collector,
containerContext,
offsetManager,
systemStreamPartitions = systemStreamPartitions)
// Pretend we got a message with offset 2 and next offset 3.
val coordinator = new ReadableCoordinator(taskName)
taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "2", null, null), coordinator)
// Check to see if the offset manager has been properly updated with offset 3.
val lastProcessedOffset = offsetManager.getLastProcessedOffset(taskName, systemStreamPartition)
assertTrue(lastProcessedOffset.isDefined)
assertEquals("2", lastProcessedOffset.get)
}
/**
* Mock exception used to test exception counts metrics.
*/
class TroublesomeException extends RuntimeException {
}
/**
* Mock exception used to test exception counts metrics.
*/
class NonFatalException extends RuntimeException {
}
/**
* Mock exception used to test exception counts metrics.
*/
class FatalException extends RuntimeException {
}
/**
* Task used to test exception counts metrics.
*/
class TroublesomeTask extends StreamTask with WindowableTask {
def process(
envelope: IncomingMessageEnvelope,
collector: MessageCollector,
coordinator: TaskCoordinator) {
envelope.getOffset().toInt match {
case offset if offset % 2 == 0 => throw new TroublesomeException
case _ => throw new NonFatalException
}
}
def window(collector: MessageCollector, coordinator: TaskCoordinator) {
throw new FatalException
}
}
/*
* Helper method used to retrieve the value of a counter from a group.
*/
private def getCount(
group: ConcurrentHashMap[String, Metric],
name: String): Long = {
group.get("exception-ignored-" + name.toLowerCase).asInstanceOf[Counter].getCount
}
/**
* Test task instance exception metrics with two ignored exceptions and one
* exception not ignored.
*/
@Test
def testExceptionCounts {
val task = new TroublesomeTask
val ignoredExceptions = classOf[TroublesomeException].getName + "," +
classOf[NonFatalException].getName
val config = new MapConfig(Map[String, String](
"task.ignored.exceptions" -> ignoredExceptions).asJava)
val partition = new Partition(0)
val consumerMultiplexer = new SystemConsumers(
new RoundRobinChooser,
Map[String, SystemConsumer]())
val producerMultiplexer = new SystemProducers(
Map[String, SystemProducer](),
new SerdeManager)
val systemStream = new SystemStream("test-system", "test-stream")
val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
val systemStreamPartitions = Set(systemStreamPartition)
// Pretend our last checkpointed (next) offset was 2.
val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
val taskName = new TaskName("taskName")
val collector = new TaskInstanceCollector(producerMultiplexer)
val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava, new MetricsRegistryMap)
val registry = new MetricsRegistryMap
val taskMetrics = new TaskInstanceMetrics(registry = registry)
val taskInstance = new TaskInstance(
task,
taskName,
config,
taskMetrics,
null,
consumerMultiplexer,
collector,
containerContext,
offsetManager,
systemStreamPartitions = systemStreamPartitions,
exceptionHandler = TaskInstanceExceptionHandler(taskMetrics, config))
val coordinator = new ReadableCoordinator(taskName)
taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "1", null, null), coordinator)
taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "2", null, null), coordinator)
taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "3", null, null), coordinator)
val group = registry.getGroup(taskMetrics.group)
assertEquals(1L, getCount(group, classOf[TroublesomeException].getName))
assertEquals(2L, getCount(group, classOf[NonFatalException].getName))
intercept[FatalException] {
taskInstance.window(coordinator)
}
assertFalse(group.contains(classOf[FatalException].getName.toLowerCase))
}
/**
* Test task instance exception metrics with all exception ignored using a
* wildcard.
*/
@Test
def testIgnoreAllExceptions {
val task = new TroublesomeTask
val config = new MapConfig(Map[String, String](
"task.ignored.exceptions" -> "*").asJava)
val partition = new Partition(0)
val consumerMultiplexer = new SystemConsumers(
new RoundRobinChooser,
Map[String, SystemConsumer]())
val producerMultiplexer = new SystemProducers(
Map[String, SystemProducer](),
new SerdeManager)
val systemStream = new SystemStream("test-system", "test-stream")
val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
val systemStreamPartitions = Set(systemStreamPartition)
// Pretend our last checkpointed (next) offset was 2.
val testSystemStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
val offsetManager = OffsetManager(Map(systemStream -> testSystemStreamMetadata), config)
val taskName = new TaskName("taskName")
val collector = new TaskInstanceCollector(producerMultiplexer)
val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava, new MetricsRegistryMap)
val registry = new MetricsRegistryMap
val taskMetrics = new TaskInstanceMetrics(registry = registry)
val taskInstance = new TaskInstance(
task,
taskName,
config,
taskMetrics,
null,
consumerMultiplexer,
collector,
containerContext,
offsetManager,
systemStreamPartitions = systemStreamPartitions,
exceptionHandler = TaskInstanceExceptionHandler(taskMetrics, config))
val coordinator = new ReadableCoordinator(taskName)
taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "1", null, null), coordinator)
taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "2", null, null), coordinator)
taskInstance.process(new IncomingMessageEnvelope(systemStreamPartition, "3", null, null), coordinator)
taskInstance.window(coordinator)
val group = registry.getGroup(taskMetrics.group)
assertEquals(1L, getCount(group, classOf[TroublesomeException].getName))
assertEquals(2L, getCount(group, classOf[NonFatalException].getName))
assertEquals(1L, getCount(group, classOf[FatalException].getName))
}
/**
* Tests that the init() method of task can override the existing offset
* assignment.
*/
@Test
def testManualOffsetReset {
val partition0 = new SystemStreamPartition("system", "stream", new Partition(0))
val partition1 = new SystemStreamPartition("system", "stream", new Partition(1))
val task = new StreamTask with InitableTask {
override def init(config: Config, context: TaskContext): Unit = {
assertTrue("Can only update offsets for assigned partition",
context.getSystemStreamPartitions.contains(partition1))
context.setStartingOffset(partition1, "10")
}
override def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator): Unit = {}
}
val config = new MapConfig()
val chooser = new RoundRobinChooser()
val consumers = new SystemConsumers(chooser, consumers = Map.empty)
val producers = new SystemProducers(Map.empty, new SerdeManager())
val metrics = new TaskInstanceMetrics()
val taskName = new TaskName("Offset Reset Task 0")
val collector = new TaskInstanceCollector(producers)
val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava, new MetricsRegistryMap)
val offsetManager = new OffsetManager()
offsetManager.startingOffsets += taskName -> Map(partition0 -> "0", partition1 -> "0")
val taskInstance = new TaskInstance(
task,
taskName,
config,
metrics,
null,
consumers,
collector,
containerContext,
offsetManager,
systemStreamPartitions = Set(partition0, partition1))
taskInstance.initTask
assertEquals(Some("0"), offsetManager.getStartingOffset(taskName, partition0))
assertEquals(Some("10"), offsetManager.getStartingOffset(taskName, partition1))
}
@Test
def testIgnoreMessagesOlderThanStartingOffsets {
val partition0 = new SystemStreamPartition("system", "stream", new Partition(0))
val partition1 = new SystemStreamPartition("system", "stream", new Partition(1))
val config = new MapConfig()
val chooser = new RoundRobinChooser()
val consumers = new SystemConsumers(chooser, consumers = Map.empty)
val producers = new SystemProducers(Map.empty, new SerdeManager())
val metrics = new TaskInstanceMetrics()
val taskName = new TaskName("testing")
val collector = new TaskInstanceCollector(producers)
val containerContext = new SamzaContainerContext("0", config, Set(taskName).asJava, new MetricsRegistryMap)
val offsetManager = new OffsetManager()
offsetManager.startingOffsets += taskName -> Map(partition0 -> "0", partition1 -> "100")
val systemAdmins = Mockito.mock(classOf[SystemAdmins])
when(systemAdmins.getSystemAdmin("system")).thenReturn(new MockSystemAdmin)
var result = new ListBuffer[IncomingMessageEnvelope]
val task = new StreamTask {
def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) {
result += envelope
}
}
val taskInstance = new TaskInstance(
task,
taskName,
config,
metrics,
systemAdmins,
consumers,
collector,
containerContext,
offsetManager,
systemStreamPartitions = Set(partition0, partition1))
val coordinator = new ReadableCoordinator(taskName)
val envelope1 = new IncomingMessageEnvelope(partition0, "1", null, null)
val envelope2 = new IncomingMessageEnvelope(partition0, "2", null, null)
val envelope3 = new IncomingMessageEnvelope(partition1, "1", null, null)
val envelope4 = new IncomingMessageEnvelope(partition1, "102", null, null)
taskInstance.process(envelope1, coordinator)
taskInstance.process(envelope2, coordinator)
taskInstance.process(envelope3, coordinator)
taskInstance.process(envelope4, coordinator)
val expected = List(envelope1, envelope2, envelope4)
assertEquals(expected, result.toList)
}
@Test
def testCommitOrder {
// Simple objects
val partition = new Partition(0)
val taskName = new TaskName("taskName")
val systemStream = new SystemStream("test-system", "test-stream")
val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
val checkpoint = new Checkpoint(Map(systemStreamPartition -> "4").asJava)
// Mocks
val collector = Mockito.mock(classOf[TaskInstanceCollector])
val storageManager = Mockito.mock(classOf[TaskStorageManager])
val offsetManager = Mockito.mock(classOf[OffsetManager])
when(offsetManager.buildCheckpoint(any())).thenReturn(checkpoint)
val mockOrder = inOrder(offsetManager, collector, storageManager)
val taskInstance: TaskInstance = new TaskInstance(
Mockito.mock(classOf[StreamTask]),
taskName,
new MapConfig,
new TaskInstanceMetrics,
null,
Mockito.mock(classOf[SystemConsumers]),
collector,
Mockito.mock(classOf[SamzaContainerContext]),
offsetManager,
storageManager,
systemStreamPartitions = Set(systemStreamPartition))
taskInstance.commit
// We must first get a snapshot of the checkpoint so it doesn't change while we flush. SAMZA-1384
mockOrder.verify(offsetManager).buildCheckpoint(taskName)
// Producers must be flushed next and ideally the output would be flushed before the changelog
// s.t. the changelog and checkpoints (state and inputs) are captured last
mockOrder.verify(collector).flush
// Local state is next, to ensure that the state (particularly the offset file) never points to a newer changelog
// offset than what is reflected in the on disk state.
mockOrder.verify(storageManager).flush()
// Finally, checkpoint the inputs with the snapshotted checkpoint captured at the beginning of commit
mockOrder.verify(offsetManager).writeCheckpoint(taskName, checkpoint)
}
@Test(expected = classOf[SystemProducerException])
def testProducerExceptionsIsPropagated {
// Simple objects
val partition = new Partition(0)
val taskName = new TaskName("taskName")
val systemStream = new SystemStream("test-system", "test-stream")
val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
// Mocks
val collector = Mockito.mock(classOf[TaskInstanceCollector])
when(collector.flush).thenThrow(new SystemProducerException("Test"))
val storageManager = Mockito.mock(classOf[TaskStorageManager])
val offsetManager = Mockito.mock(classOf[OffsetManager])
val taskInstance: TaskInstance = new TaskInstance(
Mockito.mock(classOf[StreamTask]),
taskName,
new MapConfig,
new TaskInstanceMetrics,
null,
Mockito.mock(classOf[SystemConsumers]),
collector,
Mockito.mock(classOf[SamzaContainerContext]),
offsetManager,
storageManager,
systemStreamPartitions = Set(systemStreamPartition))
try {
taskInstance.commit // Should not swallow the SystemProducerException
} finally {
Mockito.verify(offsetManager, times(0)).writeCheckpoint(any(classOf[TaskName]), any(classOf[Checkpoint]))
}
}
}
class MockSystemAdmin extends SystemAdmin {
override def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = { offsets }
override def getSystemStreamMetadata(streamNames: java.util.Set[String]) = null
override def offsetComparator(offset1: String, offset2: String) = {
offset1.toLong compare offset2.toLong
}
}