blob: e357fe5e0aa8618d0b4a426bbed169f705a61b1f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.container
import com.google.common.collect.{ImmutableMap, ImmutableSet}
import com.google.common.util.concurrent.MoreExecutors
import org.apache.samza.checkpoint._
import org.apache.samza.checkpoint.kafka.{KafkaChangelogSSPOffset, KafkaStateCheckpointMarker}
import org.apache.samza.config.MapConfig
import org.apache.samza.context.{TaskContext => _, _}
import org.apache.samza.job.model.TaskModel
import org.apache.samza.metrics.{Counter, Gauge, Timer}
import org.apache.samza.storage.TaskStorageCommitManager
import org.apache.samza.system.{IncomingMessageEnvelope, StreamMetadataCache, SystemAdmin, SystemConsumers, SystemStream, SystemStreamMetadata, _}
import org.apache.samza.table.TableManager
import org.apache.samza.task._
import org.apache.samza.util.FutureUtil
import org.apache.samza.{Partition, SamzaException}
import org.junit.Assert._
import org.junit.{Before, Test}
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.mockito.{ArgumentCaptor, Matchers, Mock, MockitoAnnotations}
import org.scalatest.junit.AssertionsForJUnit
import org.scalatest.mockito.MockitoSugar
import java.util
import java.util.Collections
import java.util.concurrent.{CompletableFuture, ExecutorService, Executors, ForkJoinPool}
import java.util.function.Consumer
import scala.collection.JavaConverters._
class TestTaskInstance extends AssertionsForJUnit with MockitoSugar {
private val SYSTEM_NAME = "test-system"
private val TASK_NAME = new TaskName("taskName")
private val SYSTEM_STREAM_PARTITION =
new SystemStreamPartition(new SystemStream(SYSTEM_NAME, "test-stream"), new Partition(0))
private val SYSTEM_STREAM_PARTITIONS = ImmutableSet.of(SYSTEM_STREAM_PARTITION)
@Mock
private var task: AllTask = null
@Mock
private var taskModel: TaskModel = null
@Mock
private var metrics: TaskInstanceMetrics = null
@Mock
private var systemAdmins: SystemAdmins = null
@Mock
private var systemAdmin: SystemAdmin = null
@Mock
private var consumerMultiplexer: SystemConsumers = null
@Mock
private var collector: TaskInstanceCollector = null
@Mock
private var offsetManager: OffsetManager = null
@Mock
private var taskCommitManager: TaskStorageCommitManager = null
@Mock
private var checkpointManager: CheckpointManager = null
@Mock
private var taskTableManager: TableManager = null
// not a mock; using MockTaskInstanceExceptionHandler
private var taskInstanceExceptionHandler: MockTaskInstanceExceptionHandler = null
@Mock
private var jobContext: JobContext = null
@Mock
private var containerContext: ContainerContext = null
@Mock
private var applicationContainerContext: ApplicationContainerContext = null
@Mock
private var applicationTaskContextFactory: ApplicationTaskContextFactory[ApplicationTaskContext] = null
@Mock
private var applicationTaskContext: ApplicationTaskContext = null
@Mock
private var externalContext: ExternalContext = null
private var taskInstance: TaskInstance = null
private val numCheckpointVersions = 2 // checkpoint versions count
@Before
def setup(): Unit = {
MockitoAnnotations.initMocks(this)
// not using Mockito mock since Mockito doesn't work well with the call-by-name argument in maybeHandle
this.taskInstanceExceptionHandler = new MockTaskInstanceExceptionHandler
when(this.taskModel.getTaskName).thenReturn(TASK_NAME)
when(this.applicationTaskContextFactory.create(Matchers.eq(this.externalContext), Matchers.eq(this.jobContext),
Matchers.eq(this.containerContext), any(), Matchers.eq(this.applicationContainerContext)))
.thenReturn(this.applicationTaskContext)
when(this.systemAdmins.getSystemAdmin(SYSTEM_NAME)).thenReturn(this.systemAdmin)
val taskConfigsMap = new util.HashMap[String, String]()
taskConfigsMap.put("task.commit.ms", "-1")
taskConfigsMap.put("task.commit.max.delay.ms", "100000")
when(this.jobContext.getConfig).thenReturn(new MapConfig(taskConfigsMap))
setupTaskInstance(Some(this.applicationTaskContextFactory))
}
@Test
def testProcess() {
val processesCounter = mock[Counter]
when(this.metrics.processes).thenReturn(processesCounter)
val messagesActuallyProcessedCounter = mock[Counter]
when(this.metrics.messagesActuallyProcessed).thenReturn(messagesActuallyProcessedCounter)
when(this.offsetManager.getStartingOffset(TASK_NAME, SYSTEM_STREAM_PARTITION)).thenReturn(Some("0"))
val envelope = new IncomingMessageEnvelope(SYSTEM_STREAM_PARTITION, "0", null, null)
val coordinator = mock[ReadableCoordinator]
val callbackFactory = mock[TaskCallbackFactory]
val callback = mock[TaskCallback]
when(callbackFactory.createCallback()).thenReturn(callback)
this.taskInstance.process(envelope, coordinator, callbackFactory)
assertEquals(1, this.taskInstanceExceptionHandler.numTimesCalled)
verify(this.task).processAsync(envelope, this.collector, coordinator, callback)
verify(processesCounter).inc()
verify(messagesActuallyProcessedCounter).inc()
}
@Test
def testWindow() {
val windowsCounter = mock[Counter]
when(this.metrics.windows).thenReturn(windowsCounter)
val coordinator = mock[ReadableCoordinator]
this.taskInstance.window(coordinator)
assertEquals(1, this.taskInstanceExceptionHandler.numTimesCalled)
verify(this.task).window(this.collector, coordinator)
verify(windowsCounter).inc()
}
@Test
def testInitTask(): Unit = {
this.taskInstance.initTask
val contextCaptor = ArgumentCaptor.forClass(classOf[Context])
verify(this.task).init(contextCaptor.capture())
val actualContext = contextCaptor.getValue
assertEquals(this.jobContext, actualContext.getJobContext)
assertEquals(this.containerContext, actualContext.getContainerContext)
assertEquals(this.taskModel, actualContext.getTaskContext.getTaskModel)
assertEquals(this.applicationContainerContext, actualContext.getApplicationContainerContext)
assertEquals(this.applicationTaskContext, actualContext.getApplicationTaskContext)
assertEquals(this.externalContext, actualContext.getExternalContext)
verify(this.applicationTaskContext).start()
}
@Test
def testShutdownTask(): Unit = {
this.taskInstance.shutdownTask
verify(this.applicationTaskContext).stop()
verify(this.task).close()
}
/**
* Tests that the init() method of task can override the existing offset assignment.
* This helps verify wiring for the task context (i.e. offset manager).
*/
@Test
def testManualOffsetReset() {
when(this.task.init(any())).thenAnswer(new Answer[Void] {
override def answer(invocation: InvocationOnMock): Void = {
val context = invocation.getArgumentAt(0, classOf[Context])
context.getTaskContext.setStartingOffset(SYSTEM_STREAM_PARTITION, "10")
null
}
})
taskInstance.initTask
verify(this.offsetManager).setStartingOffset(TASK_NAME, SYSTEM_STREAM_PARTITION, "10")
}
@Test
def testIgnoreMessagesOlderThanStartingOffsets() {
val processesCounter = mock[Counter]
when(this.metrics.processes).thenReturn(processesCounter)
val messagesActuallyProcessedCounter = mock[Counter]
when(this.metrics.messagesActuallyProcessed).thenReturn(messagesActuallyProcessedCounter)
when(this.offsetManager.getStartingOffset(TASK_NAME, SYSTEM_STREAM_PARTITION)).thenReturn(Some("5"))
when(this.systemAdmin.offsetComparator(any(), any())).thenAnswer(new Answer[Integer] {
override def answer(invocation: InvocationOnMock): Integer = {
val offset1 = invocation.getArgumentAt(0, classOf[String])
val offset2 = invocation.getArgumentAt(1, classOf[String])
offset1.toLong.compareTo(offset2.toLong)
}
})
val oldEnvelope = new IncomingMessageEnvelope(SYSTEM_STREAM_PARTITION, "0", null, null)
val newEnvelope0 = new IncomingMessageEnvelope(SYSTEM_STREAM_PARTITION, "5", null, null)
val newEnvelope1 = new IncomingMessageEnvelope(SYSTEM_STREAM_PARTITION, "7", null, null)
val mockCoordinator = mock[ReadableCoordinator]
val mockCallback = mock[TaskCallback]
val mockCallbackFactory = mock[TaskCallbackFactory]
when(mockCallbackFactory.createCallback()).thenReturn(mockCallback)
this.taskInstance.process(oldEnvelope, mockCoordinator, mockCallbackFactory)
this.taskInstance.process(newEnvelope0, mockCoordinator, mockCallbackFactory)
this.taskInstance.process(newEnvelope1, mockCoordinator, mockCallbackFactory)
verify(this.task).processAsync(Matchers.eq(newEnvelope0), Matchers.eq(this.collector), Matchers.eq(mockCoordinator), Matchers.eq(mockCallback))
verify(this.task).processAsync(Matchers.eq(newEnvelope1), Matchers.eq(this.collector), Matchers.eq(mockCoordinator), Matchers.eq(mockCallback))
verify(this.task, never()).processAsync(Matchers.eq(oldEnvelope), any(), any(), any())
verify(processesCounter, times(3)).inc()
verify(messagesActuallyProcessedCounter, times(2)).inc()
}
@Test
def testCommitOrder() {
val commitsCounter = mock[Counter]
when(this.metrics.commits).thenReturn(commitsCounter)
val snapshotTimer = mock[Timer]
when(this.metrics.snapshotNs).thenReturn(snapshotTimer)
val commitTimer = mock[Timer]
when(this.metrics.commitNs).thenReturn(commitTimer)
val lastCommitGauge = mock[Gauge[Long]]
when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
val commitSyncTimer = mock[Timer]
when(this.metrics.commitSyncNs).thenReturn(commitSyncTimer)
val commitAsyncTimer = mock[Timer]
when(this.metrics.commitAsyncNs).thenReturn(commitAsyncTimer)
val uploadTimer = mock[Timer]
when(this.metrics.asyncUploadNs).thenReturn(uploadTimer)
val cleanUpTimer = mock[Timer]
when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer)
val skippedCounter = mock[Gauge[Int]]
when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
val inputOffsets = new util.HashMap[SystemStreamPartition, String]()
inputOffsets.put(SYSTEM_STREAM_PARTITION,"4")
val changelogSSP = new SystemStreamPartition(new SystemStream(SYSTEM_NAME, "test-changelog-stream"), new Partition(0))
val stateCheckpointMarkers: util.Map[String, String] = new util.HashMap[String, String]()
val stateCheckpointMarker = KafkaStateCheckpointMarker.serialize(new KafkaStateCheckpointMarker(changelogSSP, "5"))
stateCheckpointMarkers.put("storeName", stateCheckpointMarker)
when(this.offsetManager.getLastProcessedOffsets(TASK_NAME)).thenReturn(inputOffsets)
val snapshotSCMs = ImmutableMap.of(KafkaStateCheckpointMarker.KAFKA_STATE_BACKEND_FACTORY_NAME, stateCheckpointMarkers)
when(this.taskCommitManager.snapshot(any())).thenReturn(snapshotSCMs)
val snapshotSCMFuture: CompletableFuture[util.Map[String, util.Map[String, String]]] =
CompletableFuture.completedFuture(snapshotSCMs)
when(this.taskCommitManager.upload(any(), Matchers.eq(snapshotSCMs))).thenReturn(snapshotSCMFuture) // kafka is no-op
when(this.taskCommitManager.cleanUp(any(), any())).thenReturn(CompletableFuture.completedFuture[Void](null))
taskInstance.commit
val mockOrder = inOrder(this.offsetManager, this.collector, this.taskTableManager, this.taskCommitManager)
// We must first get a snapshot of the input offsets so it doesn't change while we flush. SAMZA-1384
mockOrder.verify(this.offsetManager).getLastProcessedOffsets(TASK_NAME)
// Producers must be flushed next and ideally the output would be flushed before the changelog
// s.t. the changelog and checkpoints (state and inputs) are captured last
mockOrder.verify(this.collector).flush
// Tables should be flushed next
mockOrder.verify(this.taskTableManager).flush()
// Local state should be flushed next next
mockOrder.verify(this.taskCommitManager).snapshot(any())
// Upload should be called next with the snapshot SCMs.
mockOrder.verify(this.taskCommitManager).upload(any(), Matchers.eq(snapshotSCMs))
// Stores checkpoints should be created next with the newest changelog offsets
mockOrder.verify(this.taskCommitManager).writeCheckpointToStoreDirectories(any())
// Input checkpoint should be written with the snapshot captured at the beginning of commit and the
// newest changelog offset captured during storage manager flush
val captor = ArgumentCaptor.forClass(classOf[Checkpoint])
mockOrder.verify(offsetManager, times(numCheckpointVersions)).writeCheckpoint(any(), captor.capture)
val cp = captor.getAllValues
assertEquals(numCheckpointVersions, cp.size())
cp.forEach(new Consumer[Checkpoint] {
override def accept(c: Checkpoint): Unit = {
assertEquals("4", c.getOffsets.get(SYSTEM_STREAM_PARTITION))
if (c.getVersion == 2) {
assertEquals(1, c.getOffsets.size())
assertTrue(c.isInstanceOf[CheckpointV2])
val checkpointedStateCheckpointMarkers = c.asInstanceOf[CheckpointV2]
.getStateCheckpointMarkers.get(KafkaStateCheckpointMarker.KAFKA_STATE_BACKEND_FACTORY_NAME)
assertTrue(checkpointedStateCheckpointMarkers.size() == 1)
val checkpointedStateCheckpointMarker = checkpointedStateCheckpointMarkers.get("storeName")
assertTrue(checkpointedStateCheckpointMarker.equals(stateCheckpointMarker))
val kafkaMarker = KafkaStateCheckpointMarker.deserialize(checkpointedStateCheckpointMarker)
assertEquals(kafkaMarker.getChangelogOffset, "5")
assertEquals(kafkaMarker.getChangelogSSP, changelogSSP)
} else { // c.getVersion == 1
assertEquals(2, c.getOffsets.size())
assertTrue(c.isInstanceOf[CheckpointV1])
assertEquals("5", KafkaChangelogSSPOffset.fromString(c.getOffsets.get(changelogSSP)).getChangelogOffset)
}
}
})
// Old checkpointed stores should be cleared
mockOrder.verify(this.taskCommitManager).cleanUp(any(), any())
verify(commitsCounter).inc()
verify(snapshotTimer).update(anyLong())
verify(uploadTimer).update(anyLong())
verify(commitTimer).update(anyLong())
}
@Test
def testEmptyChangelogSSPOffsetInCommit() { // e.g. if changelog topic is empty
val commitsCounter = mock[Counter]
when(this.metrics.commits).thenReturn(commitsCounter)
val snapshotTimer = mock[Timer]
when(this.metrics.snapshotNs).thenReturn(snapshotTimer)
val commitTimer = mock[Timer]
when(this.metrics.commitNs).thenReturn(commitTimer)
val lastCommitGauge = mock[Gauge[Long]]
when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
val commitSyncTimer = mock[Timer]
when(this.metrics.commitSyncNs).thenReturn(commitSyncTimer)
val commitAsyncTimer = mock[Timer]
when(this.metrics.commitAsyncNs).thenReturn(commitAsyncTimer)
val uploadTimer = mock[Timer]
when(this.metrics.asyncUploadNs).thenReturn(uploadTimer)
val skippedCounter = mock[Gauge[Int]]
when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
val inputOffsets = Map(SYSTEM_STREAM_PARTITION -> "4").asJava
val changelogSSP = new SystemStreamPartition(new SystemStream(SYSTEM_NAME, "test-changelog-stream"), new Partition(0))
val stateCheckpointMarkers: util.Map[String, String] = new util.HashMap[String, String]()
val nullStateCheckpointMarker = KafkaStateCheckpointMarker.serialize(new KafkaStateCheckpointMarker(changelogSSP, null))
stateCheckpointMarkers.put("storeName", nullStateCheckpointMarker)
when(this.offsetManager.getLastProcessedOffsets(TASK_NAME)).thenReturn(inputOffsets)
when(this.taskCommitManager.upload(any(), any()))
.thenReturn(CompletableFuture.completedFuture(
Collections.singletonMap(KafkaStateCheckpointMarker.KAFKA_STATE_BACKEND_FACTORY_NAME, stateCheckpointMarkers)))
taskInstance.commit
val captor = ArgumentCaptor.forClass(classOf[Checkpoint])
verify(offsetManager, times(numCheckpointVersions)).writeCheckpoint(any(), captor.capture)
val cp = captor.getAllValues
assertEquals(numCheckpointVersions, cp.size())
cp.forEach(new Consumer[Checkpoint] {
override def accept(checkpoint: Checkpoint): Unit = {
assertEquals("4", checkpoint.getOffsets.get(SYSTEM_STREAM_PARTITION))
if (checkpoint.getVersion == 2) {
assertEquals(1, checkpoint.getOffsets.size())
assertTrue(checkpoint.isInstanceOf[CheckpointV2])
val checkpointedStateCheckpointMarkers = checkpoint.asInstanceOf[CheckpointV2]
.getStateCheckpointMarkers.get(KafkaStateCheckpointMarker.KAFKA_STATE_BACKEND_FACTORY_NAME)
assertTrue(checkpointedStateCheckpointMarkers.size() == 1)
val checkpointedStateCheckpointMarker = checkpointedStateCheckpointMarkers.get("storeName")
assertTrue(checkpointedStateCheckpointMarker.equals(nullStateCheckpointMarker))
val kafkaMarker = KafkaStateCheckpointMarker.deserialize(checkpointedStateCheckpointMarker)
assertNull(kafkaMarker.getChangelogOffset)
assertEquals(kafkaMarker.getChangelogSSP, changelogSSP)
} else { // c.getVersion == 1
assertEquals(2, checkpoint.getOffsets.size())
assertTrue(checkpoint.isInstanceOf[CheckpointV1])
val message = checkpoint.getOffsets.get(changelogSSP)
val checkpointedOffset = KafkaChangelogSSPOffset.fromString(message)
assertNull(checkpointedOffset.getChangelogOffset)
assertNotNull(checkpointedOffset.getCheckpointId)
}
}
})
verify(commitsCounter).inc()
verify(snapshotTimer).update(anyLong())
verify(uploadTimer).update(anyLong())
}
@Test
def testEmptyChangelogOffsetsInCommit() { // e.g. if stores have no changelogs
val commitsCounter = mock[Counter]
when(this.metrics.commits).thenReturn(commitsCounter)
val snapshotTimer = mock[Timer]
when(this.metrics.snapshotNs).thenReturn(snapshotTimer)
val commitTimer = mock[Timer]
when(this.metrics.commitNs).thenReturn(commitTimer)
val commitSyncTimer = mock[Timer]
when(this.metrics.commitSyncNs).thenReturn(commitSyncTimer)
val commitAsyncTimer = mock[Timer]
when(this.metrics.commitAsyncNs).thenReturn(commitAsyncTimer)
val uploadTimer = mock[Timer]
when(this.metrics.asyncUploadNs).thenReturn(uploadTimer)
val skippedCounter = mock[Gauge[Int]]
when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
val lastCommitGauge = mock[Gauge[Long]]
when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
val inputOffsets = Map(SYSTEM_STREAM_PARTITION -> "4").asJava
val stateCheckpointMarkers: util.Map[String, String] = new util.HashMap[String, String]()
when(this.offsetManager.getLastProcessedOffsets(TASK_NAME)).thenReturn(inputOffsets)
when(this.taskCommitManager.upload(any(), any()))
.thenReturn(CompletableFuture.completedFuture(
Collections.singletonMap(KafkaStateCheckpointMarker.KAFKA_STATE_BACKEND_FACTORY_NAME, stateCheckpointMarkers)))
taskInstance.commit
val captor = ArgumentCaptor.forClass(classOf[Checkpoint])
// verify the write checkpoint is evoked twice, once per checkpoint version
verify(offsetManager, times(numCheckpointVersions)).writeCheckpoint(any(), captor.capture)
val cp = captor.getAllValues
assertEquals(numCheckpointVersions, cp.size())
cp.forEach(new Consumer[Checkpoint] {
override def accept(c: Checkpoint): Unit = {
assertEquals("4", c.getOffsets.get(SYSTEM_STREAM_PARTITION))
assertEquals(1, c.getOffsets.size())
}
})
verify(commitsCounter).inc()
verify(snapshotTimer).update(anyLong())
verify(uploadTimer).update(anyLong())
}
@Test
def testCommitFailsIfErrorGettingChangelogOffset() { // required for transactional state
val commitsCounter = mock[Counter]
when(this.metrics.commits).thenReturn(commitsCounter)
val snapshotTimer = mock[Timer]
when(this.metrics.snapshotNs).thenReturn(snapshotTimer)
val inputOffsets = new util.HashMap[SystemStreamPartition, String]()
inputOffsets.put(SYSTEM_STREAM_PARTITION,"4")
when(this.offsetManager.getLastProcessedOffsets(TASK_NAME)).thenReturn(inputOffsets)
when(this.taskCommitManager.snapshot(any())).thenThrow(new SamzaException("Error getting changelog offsets"))
try {
// sync stage exception should be caught and rethrown immediately
taskInstance.commit
verify(commitsCounter).inc()
verifyZeroInteractions(snapshotTimer)
} catch {
case e: SamzaException =>
val msg = e.getMessage
// exception is expected, container should fail if could not get changelog offsets.
return
}
fail("Should have failed commit if error getting newest changelog offsets")
}
@Test
def testCommitFailsIfPreviousAsyncUploadFails() {
val commitsCounter = mock[Counter]
when(this.metrics.commits).thenReturn(commitsCounter)
val snapshotTimer = mock[Timer]
when(this.metrics.snapshotNs).thenReturn(snapshotTimer)
val commitTimer = mock[Timer]
when(this.metrics.commitNs).thenReturn(commitTimer)
val commitSyncTimer = mock[Timer]
when(this.metrics.commitSyncNs).thenReturn(commitSyncTimer)
val commitAsyncTimer = mock[Timer]
when(this.metrics.commitAsyncNs).thenReturn(commitAsyncTimer)
val uploadTimer = mock[Timer]
when(this.metrics.asyncUploadNs).thenReturn(uploadTimer)
val cleanUpTimer = mock[Timer]
when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer)
val skippedCounter = mock[Gauge[Int]]
when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
val lastCommitGauge = mock[Gauge[Long]]
when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
val inputOffsets = new util.HashMap[SystemStreamPartition, String]()
inputOffsets.put(SYSTEM_STREAM_PARTITION,"4")
val stateCheckpointMarkers: util.Map[String, String] = new util.HashMap[String, String]()
when(this.offsetManager.getLastProcessedOffsets(TASK_NAME)).thenReturn(inputOffsets)
when(this.taskCommitManager.upload(any(), any()))
.thenReturn(CompletableFuture.completedFuture(
Collections.singletonMap(KafkaStateCheckpointMarker.KAFKA_STATE_BACKEND_FACTORY_NAME, stateCheckpointMarkers)))
when(this.taskCommitManager.upload(any(), any()))
.thenReturn(FutureUtil.failedFuture[util.Map[String, util.Map[String, String]]](new RuntimeException))
try {
taskInstance.commit
verify(commitsCounter).inc()
verify(snapshotTimer).update(anyLong())
verifyZeroInteractions(uploadTimer)
verifyZeroInteractions(commitTimer)
verifyZeroInteractions(skippedCounter)
// async stage exception in first commit should be caught and rethrown by the subsequent commit
taskInstance.commit
verifyNoMoreInteractions(commitsCounter)
} catch {
case e: SamzaException =>
// exception is expected, container should fail if could not upload previous snapshot.
return
}
fail("Should have failed commit if error uploading store contents")
}
@Test
def testCommitFailsIfAsyncStoreDirCheckpointWriteFails() { // required for transactional state
val commitsCounter = mock[Counter]
when(this.metrics.commits).thenReturn(commitsCounter)
val snapshotTimer = mock[Timer]
when(this.metrics.snapshotNs).thenReturn(snapshotTimer)
val commitTimer = mock[Timer]
when(this.metrics.commitNs).thenReturn(commitTimer)
val commitSyncTimer = mock[Timer]
when(this.metrics.commitSyncNs).thenReturn(commitSyncTimer)
val commitAsyncTimer = mock[Timer]
when(this.metrics.commitAsyncNs).thenReturn(commitAsyncTimer)
val uploadTimer = mock[Timer]
when(this.metrics.asyncUploadNs).thenReturn(uploadTimer)
val cleanUpTimer = mock[Timer]
when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer)
val skippedCounter = mock[Gauge[Int]]
when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
val lastCommitGauge = mock[Gauge[Long]]
when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
val inputOffsets = new util.HashMap[SystemStreamPartition, String]()
inputOffsets.put(SYSTEM_STREAM_PARTITION,"4")
val stateCheckpointMarkers: util.Map[String, String] = new util.HashMap[String, String]()
when(this.offsetManager.getLastProcessedOffsets(TASK_NAME)).thenReturn(inputOffsets)
when(this.taskCommitManager.upload(any(), any()))
.thenReturn(CompletableFuture.completedFuture(
Collections.singletonMap(KafkaStateCheckpointMarker.KAFKA_STATE_BACKEND_FACTORY_NAME, stateCheckpointMarkers)))
when(this.taskCommitManager.writeCheckpointToStoreDirectories(any()))
.thenThrow(new SamzaException("Error creating store checkpoint"))
try {
taskInstance.commit
verify(commitsCounter).inc()
verify(snapshotTimer).update(anyLong())
verify(uploadTimer).update(anyLong())
verifyZeroInteractions(commitTimer)
verifyZeroInteractions(skippedCounter)
// async stage exception in first commit should be caught and rethrown by the subsequent commit
taskInstance.commit
verifyNoMoreInteractions(commitsCounter)
} catch {
case e: SamzaException =>
// exception is expected, container should fail if could not get changelog offsets.
return
}
fail("Should have failed commit if error writing checkpoint to store dirs")
}
@Test
def testCommitFailsIfPreviousAsyncCheckpointTopicWriteFails() {
val commitsCounter = mock[Counter]
when(this.metrics.commits).thenReturn(commitsCounter)
val snapshotTimer = mock[Timer]
when(this.metrics.snapshotNs).thenReturn(snapshotTimer)
val commitTimer = mock[Timer]
when(this.metrics.commitNs).thenReturn(commitTimer)
val commitSyncTimer = mock[Timer]
when(this.metrics.commitSyncNs).thenReturn(commitSyncTimer)
val commitAsyncTimer = mock[Timer]
when(this.metrics.commitAsyncNs).thenReturn(commitAsyncTimer)
val uploadTimer = mock[Timer]
when(this.metrics.asyncUploadNs).thenReturn(uploadTimer)
val cleanUpTimer = mock[Timer]
when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer)
val skippedCounter = mock[Gauge[Int]]
when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
val lastCommitGauge = mock[Gauge[Long]]
when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
val inputOffsets = new util.HashMap[SystemStreamPartition, String]()
inputOffsets.put(SYSTEM_STREAM_PARTITION,"4")
val stateCheckpointMarkers: util.Map[String, String] = new util.HashMap[String, String]()
when(this.offsetManager.getLastProcessedOffsets(TASK_NAME)).thenReturn(inputOffsets)
when(this.taskCommitManager.upload(any(), any()))
.thenReturn(CompletableFuture.completedFuture(
Collections.singletonMap(KafkaStateCheckpointMarker.KAFKA_STATE_BACKEND_FACTORY_NAME, stateCheckpointMarkers)))
doNothing().when(this.taskCommitManager).writeCheckpointToStoreDirectories(any())
when(this.offsetManager.writeCheckpoint(any(), any()))
.thenThrow(new SamzaException("Error writing checkpoint"))
try {
taskInstance.commit
verify(commitsCounter).inc()
verify(snapshotTimer).update(anyLong())
verify(uploadTimer).update(anyLong())
verifyZeroInteractions(commitTimer)
verifyZeroInteractions(skippedCounter)
// async stage exception in first commit should be caught and rethrown by the subsequent commit
taskInstance.commit
verifyNoMoreInteractions(commitsCounter)
} catch {
case e: SamzaException =>
// exception is expected, container should fail if could not write previous checkpoint.
return
}
fail("Should have failed commit if error writing checkpoints to checkpoint topic")
}
@Test
def testCommitFailsIfPreviousAsyncCleanUpFails() { // required for blob store backend
val commitsCounter = mock[Counter]
when(this.metrics.commits).thenReturn(commitsCounter)
val snapshotTimer = mock[Timer]
when(this.metrics.snapshotNs).thenReturn(snapshotTimer)
val commitTimer = mock[Timer]
when(this.metrics.commitNs).thenReturn(commitTimer)
val commitSyncTimer = mock[Timer]
when(this.metrics.commitSyncNs).thenReturn(commitSyncTimer)
val commitAsyncTimer = mock[Timer]
when(this.metrics.commitAsyncNs).thenReturn(commitAsyncTimer)
val uploadTimer = mock[Timer]
when(this.metrics.asyncUploadNs).thenReturn(uploadTimer)
val cleanUpTimer = mock[Timer]
when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer)
val skippedCounter = mock[Gauge[Int]]
when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
val lastCommitGauge = mock[Gauge[Long]]
when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
val inputOffsets = new util.HashMap[SystemStreamPartition, String]()
inputOffsets.put(SYSTEM_STREAM_PARTITION,"4")
val stateCheckpointMarkers: util.Map[String, String] = new util.HashMap[String, String]()
when(this.offsetManager.getLastProcessedOffsets(TASK_NAME)).thenReturn(inputOffsets)
when(this.taskCommitManager.upload(any(), any()))
.thenReturn(CompletableFuture.completedFuture(
Collections.singletonMap(KafkaStateCheckpointMarker.KAFKA_STATE_BACKEND_FACTORY_NAME, stateCheckpointMarkers)))
doNothing().when(this.taskCommitManager).writeCheckpointToStoreDirectories(any())
when(this.taskCommitManager.cleanUp(any(), any()))
.thenReturn(FutureUtil.failedFuture[Void](new SamzaException("Error during cleanup")))
try {
taskInstance.commit
verify(commitsCounter).inc()
verify(snapshotTimer).update(anyLong())
verify(uploadTimer).update(anyLong())
verifyZeroInteractions(commitTimer)
verifyZeroInteractions(skippedCounter)
// async stage exception in first commit should be caught and rethrown by the subsequent commit
taskInstance.commit
verifyNoMoreInteractions(commitsCounter)
} catch {
case e: SamzaException =>
// exception is expected, container should fail if could not clean up old checkpoint.
return
}
fail("Should have failed commit if error cleaning up previous commit")
}
@Test
def testCommitFailsIfPreviousAsyncUploadFailsSynchronously() {
val commitsCounter = mock[Counter]
when(this.metrics.commits).thenReturn(commitsCounter)
val snapshotTimer = mock[Timer]
when(this.metrics.snapshotNs).thenReturn(snapshotTimer)
val commitTimer = mock[Timer]
when(this.metrics.commitNs).thenReturn(commitTimer)
val commitSyncTimer = mock[Timer]
when(this.metrics.commitSyncNs).thenReturn(commitSyncTimer)
val commitAsyncTimer = mock[Timer]
when(this.metrics.commitAsyncNs).thenReturn(commitAsyncTimer)
val uploadTimer = mock[Timer]
when(this.metrics.asyncUploadNs).thenReturn(uploadTimer)
val cleanUpTimer = mock[Timer]
when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer)
val skippedCounter = mock[Gauge[Int]]
when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
val lastCommitGauge = mock[Gauge[Long]]
when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
val inputOffsets = new util.HashMap[SystemStreamPartition, String]()
inputOffsets.put(SYSTEM_STREAM_PARTITION,"4")
val stateCheckpointMarkers: util.Map[String, String] = new util.HashMap[String, String]()
when(this.offsetManager.getLastProcessedOffsets(TASK_NAME)).thenReturn(inputOffsets)
when(this.taskCommitManager.upload(any(), any()))
.thenReturn(CompletableFuture.completedFuture(
Collections.singletonMap(KafkaStateCheckpointMarker.KAFKA_STATE_BACKEND_FACTORY_NAME, stateCheckpointMarkers)))
// Fail synchronously instead of returning a failed future.
when(this.taskCommitManager.upload(any(), any()))
.thenThrow(new RuntimeException)
try {
taskInstance.commit
verify(commitsCounter).inc()
verify(snapshotTimer).update(anyLong())
verifyZeroInteractions(uploadTimer)
verifyZeroInteractions(commitTimer)
verifyZeroInteractions(skippedCounter)
// async stage exception in first commit should be caught and rethrown by the subsequent commit
taskInstance.commit
verifyNoMoreInteractions(commitsCounter)
} catch {
case e: SamzaException =>
// exception is expected, container should fail if could not upload previous snapshot.
return
}
fail("Should have failed commit if synchronous error during upload in async stage of previous commit")
}
@Test
def testCommitSucceedsIfPreviousAsyncStageSucceeds() {
val commitsCounter = mock[Counter]
when(this.metrics.commits).thenReturn(commitsCounter)
val snapshotTimer = mock[Timer]
when(this.metrics.snapshotNs).thenReturn(snapshotTimer)
val commitTimer = mock[Timer]
when(this.metrics.commitNs).thenReturn(commitTimer)
val commitSyncTimer = mock[Timer]
when(this.metrics.commitSyncNs).thenReturn(commitSyncTimer)
val commitAsyncTimer = mock[Timer]
when(this.metrics.commitAsyncNs).thenReturn(commitAsyncTimer)
val uploadTimer = mock[Timer]
when(this.metrics.asyncUploadNs).thenReturn(uploadTimer)
val cleanUpTimer = mock[Timer]
when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer)
val skippedCounter = mock[Gauge[Int]]
when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
val lastCommitGauge = mock[Gauge[Long]]
when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
val inputOffsets = new util.HashMap[SystemStreamPartition, String]()
inputOffsets.put(SYSTEM_STREAM_PARTITION,"4")
val stateCheckpointMarkers: util.Map[String, String] = new util.HashMap[String, String]()
when(this.offsetManager.getLastProcessedOffsets(TASK_NAME)).thenReturn(inputOffsets)
when(this.taskCommitManager.upload(any(), any()))
.thenReturn(CompletableFuture.completedFuture(
Collections.singletonMap(KafkaStateCheckpointMarker.KAFKA_STATE_BACKEND_FACTORY_NAME, stateCheckpointMarkers)))
doNothing().when(this.taskCommitManager).writeCheckpointToStoreDirectories(any())
when(this.taskCommitManager.cleanUp(any(), any()))
.thenReturn(CompletableFuture.completedFuture[Void](null))
taskInstance.commit // async stage will be run by caller due to direct executor
val asyncCommitTimeCaptor = ArgumentCaptor.forClass(classOf[Long])
val uploadTimeCaptor = ArgumentCaptor.forClass(classOf[Long])
val cleanUpTimeCaptor = ArgumentCaptor.forClass(classOf[Long])
verify(commitsCounter).inc()
verify(snapshotTimer).update(anyLong())
verify(uploadTimer).update(anyLong())
verify(commitTimer).update(anyLong())
verify(commitAsyncTimer).update(asyncCommitTimeCaptor.capture())
verify(uploadTimer).update(uploadTimeCaptor.capture())
verify(cleanUpTimer).update(cleanUpTimeCaptor.capture())
assertTrue((cleanUpTimeCaptor.getValue + uploadTimeCaptor.getValue) < asyncCommitTimeCaptor.getValue)
taskInstance.commit
// verify that all commit operations ran twice
verify(taskCommitManager, times(2)).snapshot(any())
verify(taskCommitManager, times(2)).upload(any(), any())
// called 2x per commit, once for each checkpoint version
verify(taskCommitManager, times(4)).writeCheckpointToStoreDirectories(any())
verify(offsetManager, times(4)).writeCheckpoint(any(), any())
verify(taskCommitManager, times(2)).cleanUp(any(), any())
verify(commitsCounter, times(2)).inc()
}
@Test
def testCommitSkipsIfPreviousAsyncCommitInProgressWithinMaxCommitDelay() {
val commitsCounter = mock[Counter]
when(this.metrics.commits).thenReturn(commitsCounter)
val snapshotTimer = mock[Timer]
when(this.metrics.snapshotNs).thenReturn(snapshotTimer)
val commitTimer = mock[Timer]
when(this.metrics.commitNs).thenReturn(commitTimer)
val commitSyncTimer = mock[Timer]
when(this.metrics.commitSyncNs).thenReturn(commitSyncTimer)
val commitAsyncTimer = mock[Timer]
when(this.metrics.commitAsyncNs).thenReturn(commitAsyncTimer)
val uploadTimer = mock[Timer]
when(this.metrics.asyncUploadNs).thenReturn(uploadTimer)
val cleanUpTimer = mock[Timer]
when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer)
val skippedCounter = mock[Gauge[Int]]
when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
val lastCommitGauge = mock[Gauge[Long]]
when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
val inputOffsets = new util.HashMap[SystemStreamPartition, String]()
inputOffsets.put(SYSTEM_STREAM_PARTITION,"4")
val changelogSSP = new SystemStreamPartition(new SystemStream(SYSTEM_NAME, "test-changelog-stream"), new Partition(0))
val stateCheckpointMarkers: util.Map[String, String] = new util.HashMap[String, String]()
val stateCheckpointMarker = KafkaStateCheckpointMarker.serialize(new KafkaStateCheckpointMarker(changelogSSP, "5"))
stateCheckpointMarkers.put("storeName", stateCheckpointMarker)
when(this.offsetManager.getLastProcessedOffsets(TASK_NAME)).thenReturn(inputOffsets)
val snapshotSCMs = ImmutableMap.of(KafkaStateCheckpointMarker.KAFKA_STATE_BACKEND_FACTORY_NAME, stateCheckpointMarkers)
when(this.taskCommitManager.snapshot(any())).thenReturn(snapshotSCMs)
val snapshotSCMFuture: CompletableFuture[util.Map[String, util.Map[String, String]]] =
CompletableFuture.completedFuture(snapshotSCMs)
when(this.taskCommitManager.upload(any(), Matchers.eq(snapshotSCMs))).thenReturn(snapshotSCMFuture) // kafka is no-op
val cleanUpFuture = new CompletableFuture[Void]() // not completed until subsequent commit starts
when(this.taskCommitManager.cleanUp(any(), any())).thenReturn(cleanUpFuture)
// use a separate executor to perform async operations on to test caller thread blocking behavior
setupTaskInstance(None, ForkJoinPool.commonPool())
taskInstance.commit // async stage will not complete until cleanUpFuture is completed
taskInstance.commit
verify(skippedCounter).set(1)
verify(commitsCounter, times(1)).inc() // should only have been incremented once on the initial commit
verify(snapshotTimer).update(anyLong())
verifyZeroInteractions(commitTimer)
cleanUpFuture.complete(null) // just to unblock shared executor
}
@Test
def testCommitThrowsIfPreviousAsyncCommitInProgressAfterMaxCommitDelayAndBlockTime() {
val commitsCounter = mock[Counter]
when(this.metrics.commits).thenReturn(commitsCounter)
val snapshotTimer = mock[Timer]
when(this.metrics.snapshotNs).thenReturn(snapshotTimer)
val commitTimer = mock[Timer]
when(this.metrics.commitNs).thenReturn(commitTimer)
val commitSyncTimer = mock[Timer]
when(this.metrics.commitSyncNs).thenReturn(commitSyncTimer)
val commitAsyncTimer = mock[Timer]
when(this.metrics.commitAsyncNs).thenReturn(commitAsyncTimer)
val uploadTimer = mock[Timer]
when(this.metrics.asyncUploadNs).thenReturn(uploadTimer)
val cleanUpTimer = mock[Timer]
when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer)
val skippedCounter = mock[Gauge[Int]]
when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
val lastCommitGauge = mock[Gauge[Long]]
when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
val inputOffsets = new util.HashMap[SystemStreamPartition, String]()
inputOffsets.put(SYSTEM_STREAM_PARTITION,"4")
val changelogSSP = new SystemStreamPartition(new SystemStream(SYSTEM_NAME, "test-changelog-stream"), new Partition(0))
val stateCheckpointMarkers: util.Map[String, String] = new util.HashMap[String, String]()
val stateCheckpointMarker = KafkaStateCheckpointMarker.serialize(new KafkaStateCheckpointMarker(changelogSSP, "5"))
stateCheckpointMarkers.put("storeName", stateCheckpointMarker)
when(this.offsetManager.getLastProcessedOffsets(TASK_NAME)).thenReturn(inputOffsets)
val snapshotSCMs = ImmutableMap.of(KafkaStateCheckpointMarker.KAFKA_STATE_BACKEND_FACTORY_NAME, stateCheckpointMarkers)
when(this.taskCommitManager.snapshot(any())).thenReturn(snapshotSCMs)
val snapshotSCMFuture: CompletableFuture[util.Map[String, util.Map[String, String]]] =
CompletableFuture.completedFuture(snapshotSCMs)
when(this.taskCommitManager.upload(any(), Matchers.eq(snapshotSCMs))).thenReturn(snapshotSCMFuture) // kafka is no-op
val cleanUpFuture = new CompletableFuture[Void]()
when(this.taskCommitManager.cleanUp(any(), any())).thenReturn(cleanUpFuture)
// use a separate executor to perform async operations on to test caller thread blocking behavior
val taskConfigsMap = new util.HashMap[String, String]()
taskConfigsMap.put("task.commit.ms", "-1")
// "block" immediately if previous commit async stage not complete
taskConfigsMap.put("task.commit.max.delay.ms", "-1")
taskConfigsMap.put("task.commit.timeout.ms", "0") // throw exception immediately if blocked
when(this.jobContext.getConfig).thenReturn(new MapConfig(taskConfigsMap)) // override default behavior
setupTaskInstance(None, ForkJoinPool.commonPool())
taskInstance.commit // async stage will not complete until cleanUpFuture is completed
try {
taskInstance.commit // should throw exception
fail("Should have thrown an exception if blocked for previous commit async stage.")
} catch {
case e: Exception =>
verify(commitsCounter, times(1)).inc() // should only have been incremented once on the initial commit
}
cleanUpFuture.complete(null) // just to unblock shared executor
}
@Test
def testCommitBlocksIfPreviousAsyncCommitInProgressAfterMaxCommitDelayButWithinBlockTime() {
val commitsCounter = mock[Counter]
when(this.metrics.commits).thenReturn(commitsCounter)
val snapshotTimer = mock[Timer]
when(this.metrics.snapshotNs).thenReturn(snapshotTimer)
val commitTimer = mock[Timer]
when(this.metrics.commitNs).thenReturn(commitTimer)
val uploadTimer = mock[Timer]
val commitSyncTimer = mock[Timer]
when(this.metrics.commitSyncNs).thenReturn(commitSyncTimer)
val commitAsyncTimer = mock[Timer]
when(this.metrics.commitAsyncNs).thenReturn(commitAsyncTimer)
when(this.metrics.asyncUploadNs).thenReturn(uploadTimer)
val cleanUpTimer = mock[Timer]
when(this.metrics.asyncCleanupNs).thenReturn(cleanUpTimer)
val skippedCounter = mock[Gauge[Int]]
when(this.metrics.commitsSkipped).thenReturn(skippedCounter)
val lastCommitGauge = mock[Gauge[Long]]
when(this.metrics.lastCommitNs).thenReturn(lastCommitGauge)
val inputOffsets = new util.HashMap[SystemStreamPartition, String]()
inputOffsets.put(SYSTEM_STREAM_PARTITION,"4")
val changelogSSP = new SystemStreamPartition(new SystemStream(SYSTEM_NAME, "test-changelog-stream"), new Partition(0))
val stateCheckpointMarkers: util.Map[String, String] = new util.HashMap[String, String]()
val stateCheckpointMarker = KafkaStateCheckpointMarker.serialize(new KafkaStateCheckpointMarker(changelogSSP, "5"))
stateCheckpointMarkers.put("storeName", stateCheckpointMarker)
when(this.offsetManager.getLastProcessedOffsets(TASK_NAME)).thenReturn(inputOffsets)
val snapshotSCMs = ImmutableMap.of(KafkaStateCheckpointMarker.KAFKA_STATE_BACKEND_FACTORY_NAME, stateCheckpointMarkers)
when(this.taskCommitManager.snapshot(any())).thenReturn(snapshotSCMs)
val snapshotSCMFuture: CompletableFuture[util.Map[String, util.Map[String, String]]] =
CompletableFuture.completedFuture(snapshotSCMs)
when(this.taskCommitManager.upload(any(), Matchers.eq(snapshotSCMs))).thenReturn(snapshotSCMFuture) // kafka is no-op
val cleanUpFuture = new CompletableFuture[Void]()
when(this.taskCommitManager.cleanUp(any(), any())).thenReturn(cleanUpFuture)
// use a separate executor to perform async operations on to test caller thread blocking behavior
val taskConfigsMap = new util.HashMap[String, String]()
taskConfigsMap.put("task.commit.ms", "-1")
// "block" immediately if previous commit async stage not complete
taskConfigsMap.put("task.commit.max.delay.ms", "-1")
taskConfigsMap.put("task.commit.timeout.ms", "1000000") // block until previous stage is complete
when(this.jobContext.getConfig).thenReturn(new MapConfig(taskConfigsMap)) // override default behavior
setupTaskInstance(None, ForkJoinPool.commonPool())
taskInstance.commit // async stage will not complete until cleanUpFuture is completed
val executorService = Executors.newSingleThreadExecutor()
val secondCommitFuture = CompletableFuture.runAsync(new Runnable {
override def run(): Unit = taskInstance.commit // will block on executor
}, executorService)
var retries = 0 // wait no more than ~100 millis
while (!taskInstance.commitInProgress.hasQueuedThreads && retries < 10) {
retries += 1
Thread.sleep(10) // wait until commit in other thread blocks on the semaphore.
}
if (!taskInstance.commitInProgress.hasQueuedThreads) {
fail("Other thread should have blocked on semaphore acquisition. " +
"May need to increase retries if transient failure.")
}
cleanUpFuture.complete(null) // will eventually unblock the 2nd commit in other thread.
secondCommitFuture.join() // will complete when the sync phase of 2nd commit is complete.
verify(commitsCounter, times(2)).inc() // should only have been incremented twice - once for each commit
verify(snapshotTimer, times(2)).update(anyLong())
}
/**
* Given that no application task context factory is provided, then no lifecycle calls should be made.
*/
@Test
def testNoApplicationTaskContextFactoryProvided() {
setupTaskInstance(None)
this.taskInstance.initTask
this.taskInstance.shutdownTask
verifyZeroInteractions(this.applicationTaskContext)
}
@Test(expected = classOf[SystemProducerException])
def testProducerExceptionsIsPropagated() {
when(this.metrics.commits).thenReturn(mock[Counter])
when(this.collector.flush).thenThrow(new SystemProducerException("systemProducerException"))
try {
taskInstance.commit // Should not swallow the SystemProducerException
} finally {
verify(offsetManager, never()).writeCheckpoint(any(), any())
}
}
@Test
def testInitCaughtUpMapping() {
val offsetManagerMock = mock[OffsetManager]
when(offsetManagerMock.getStartingOffset(anyObject(), anyObject())).thenReturn(Option("42"))
val cacheMock = mock[StreamMetadataCache]
val systemStreamMetadata = mock[SystemStreamMetadata]
when(cacheMock.getSystemStreamMetadata(anyObject(), anyBoolean()))
.thenReturn(systemStreamMetadata)
val sspMetadata = mock[SystemStreamMetadata.SystemStreamPartitionMetadata]
when(sspMetadata.getUpcomingOffset).thenReturn("42")
when(systemStreamMetadata.getSystemStreamPartitionMetadata)
.thenReturn(Collections.singletonMap(new Partition(0), sspMetadata))
val ssp = new SystemStreamPartition("test-system", "test-stream", new Partition(0))
val inputStreamMetadata = collection.Map(ssp.getSystemStream -> systemStreamMetadata)
val taskInstance = new TaskInstance(this.task,
this.taskModel,
this.metrics,
this.systemAdmins,
this.consumerMultiplexer,
this.collector,
offsetManager = offsetManagerMock,
commitManager = this.taskCommitManager,
tableManager = this.taskTableManager,
systemStreamPartitions = ImmutableSet.of(ssp),
exceptionHandler = this.taskInstanceExceptionHandler,
streamMetadataCache = cacheMock,
inputStreamMetadata = Map.empty ++ inputStreamMetadata,
jobContext = this.jobContext,
containerContext = this.containerContext,
applicationContainerContextOption = Some(this.applicationContainerContext),
applicationTaskContextFactoryOption = Some(this.applicationTaskContextFactory),
externalContextOption = Some(this.externalContext))
taskInstance.initCaughtUpMapping()
assertTrue(taskInstance.ssp2CaughtupMapping(ssp))
}
private def setupTaskInstance(
applicationTaskContextFactory: Option[ApplicationTaskContextFactory[ApplicationTaskContext]],
commitThreadPool: ExecutorService = MoreExecutors.newDirectExecutorService()): Unit = {
this.taskInstance = new TaskInstance(this.task,
this.taskModel,
this.metrics,
this.systemAdmins,
this.consumerMultiplexer,
this.collector,
offsetManager = this.offsetManager,
commitManager = this.taskCommitManager,
tableManager = this.taskTableManager,
systemStreamPartitions = SYSTEM_STREAM_PARTITIONS,
exceptionHandler = this.taskInstanceExceptionHandler,
commitThreadPool = commitThreadPool,
jobContext = this.jobContext,
containerContext = this.containerContext,
applicationContainerContextOption = Some(this.applicationContainerContext),
applicationTaskContextFactoryOption = applicationTaskContextFactory,
externalContextOption = Some(this.externalContext))
}
/**
* Task type which has all task traits, which can be mocked.
*/
trait AllTask extends AsyncStreamTask with InitableTask with ClosableTask with WindowableTask {}
/**
* Mock version of [TaskInstanceExceptionHandler] which just does a passthrough execution and keeps track of the
* number of times it is called. This is used to verify that the handler does get used to wrap the actual processing.
*/
class MockTaskInstanceExceptionHandler extends TaskInstanceExceptionHandler {
var numTimesCalled = 0
override def maybeHandle(tryCodeBlock: => Unit): Unit = {
numTimesCalled += 1
tryCodeBlock
}
}
}