| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.samza.storage |
| |
| import java.io.{File, FileOutputStream, ObjectOutputStream} |
| import java.util |
| |
| import org.apache.samza.Partition |
| import org.apache.samza.config._ |
| import org.apache.samza.container.{SamzaContainerMetrics, TaskInstanceMetrics, TaskName} |
| import org.apache.samza.context.{ContainerContext, JobContext} |
| import org.apache.samza.job.model.{ContainerModel, TaskMode, TaskModel} |
| import org.apache.samza.serializers.{Serde, StringSerdeFactory} |
| import org.apache.samza.storage.StoreProperties.StorePropertiesBuilder |
| import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata |
| import org.apache.samza.system._ |
| import org.apache.samza.task.TaskInstanceCollector |
| import org.apache.samza.util.{FileUtil, SystemClock} |
| import org.junit.Assert._ |
| import org.junit.runner.RunWith |
| import org.junit.runners.Parameterized |
| import org.junit.runners.Parameterized.Parameters |
| import org.junit.{After, Before, Test} |
| import org.mockito.Matchers._ |
| import org.mockito.{Mockito} |
| import org.mockito.Mockito._ |
| import org.mockito.invocation.InvocationOnMock |
| import org.mockito.stubbing.Answer |
| import org.scalatest.mockito.MockitoSugar |
| |
| import scala.collection.JavaConverters._ |
| import scala.collection.immutable.HashMap |
| import scala.collection.mutable |
| import com.google.common.collect.{ImmutableMap, ImmutableSet} |
| import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager} |
| |
| /** |
| * This test is parameterized on the offsetFileName and is run for both |
| * StorageManagerUtil.OFFSET_FILE_NAME_LEGACY and StorageManagerUtil.OFFSET_FILE_NAME_NEW. |
| * |
| * @param offsetFileName the name of the offset file. |
| */ |
| @RunWith(value = classOf[Parameterized]) |
| class TestNonTransactionalStateTaskStorageManager(offsetFileName: String) extends MockitoSugar { |
| |
| val store = "store1" |
| val loggedStore = "loggedStore1" |
| val taskName = new TaskName("testTask") |
| val storageManagerUtil = new StorageManagerUtil |
| val fileUtil = new FileUtil |
| |
| @Before |
| def setupTestDirs() { |
| storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultStoreBaseDir, store, taskName, TaskMode.Active) |
| .mkdirs() |
| storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) |
| .mkdirs() |
| } |
| |
| @After |
| def tearDownTestDirs() { |
| fileUtil.rm(TaskStorageManagerBuilder.defaultStoreBaseDir) |
| fileUtil.rm(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir) |
| } |
| |
| def getStreamName(storeName : String): String = { |
| "testStream-"+storeName |
| } |
| |
| /** |
| * This tests the entire TaskStorageManager lifecycle for a Persisted Logged Store |
| * For example, a RocksDb store with changelog needs to continuously update the offset file on flush & stop |
| * When the task is restarted, it should restore correctly from the offset in the OFFSET file on disk (if available) |
| */ |
| @Test |
| def testStoreLifecycleForLoggedPersistedStore(): Unit = { |
| // Basic test setup of SystemStream, SystemStreamPartition for this task |
| val ss = new SystemStream("kafka", getStreamName(loggedStore)) |
| val partition = new Partition(0) |
| val ssp = new SystemStreamPartition(ss, partition) |
| val storeDirectory = storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, |
| loggedStore, taskName, TaskMode.Active) |
| val storeFile = new File(storeDirectory, "store.sst") |
| val offsetFile = new File(storeDirectory, offsetFileName) |
| |
| val mockStorageEngine: StorageEngine = createMockStorageEngine(isLoggedStore = true, isPersistedStore = true, storeFile) |
| |
| // Mock for StreamMetadataCache, SystemConsumer, SystemAdmin |
| val mockStreamMetadataCache = mock[StreamMetadataCache] |
| val mockSystemConsumer = mock[SystemConsumer] |
| val mockSystemAdmin = mock[SystemAdmin] |
| val changelogSpec = StreamSpec.createChangeLogStreamSpec(getStreamName(loggedStore), "kafka", 1) |
| doNothing().when(mockSystemAdmin).validateStream(changelogSpec) |
| doNothing().when(mockSystemConsumer).stop() |
| |
| // Test 1: Initial invocation - No store on disk (only changelog has data) |
| // Setup initial sspMetadata |
| var sspMetadata = new SystemStreamPartitionMetadata("0", "50", "51") |
| var metadata = new SystemStreamMetadata(getStreamName(loggedStore), new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() { |
| { |
| put(partition, sspMetadata) |
| } |
| }) |
| when(mockStreamMetadataCache.getStreamMetadata(any(), any())).thenReturn(Map(ss -> metadata)) |
| when(mockSystemAdmin.getSSPMetadata(ImmutableSet.of(ssp))).thenReturn(ImmutableMap.of(ssp, sspMetadata)) |
| |
| var taskManager = new TaskStorageManagerBuilder() |
| .addStore(loggedStore, mockStorageEngine, mockSystemConsumer) |
| .setStreamMetadataCache(mockStreamMetadataCache) |
| .setSystemAdmin("kafka", mockSystemAdmin) |
| .initializeContainerStorageManager() |
| .build |
| |
| assertTrue(storeFile.exists()) |
| assertFalse(offsetFile.exists()) |
| verify(mockSystemConsumer).register(ssp, "0") |
| |
| // Test 2: flush should update the offset file |
| taskManager.flush() |
| assertTrue(offsetFile.exists()) |
| validateOffsetFileContents(offsetFile, "kafka.testStream-loggedStore1.0", "50") |
| |
| // Test 3: Update sspMetadata before shutdown and verify that offset file is not updated |
| when(mockSystemAdmin.getSSPMetadata(ImmutableSet.of(ssp))) |
| .thenReturn(ImmutableMap.of(ssp, new SystemStreamPartitionMetadata("0", "100", "101"))) |
| taskManager.stop() |
| verify(mockStorageEngine, times(1)).flush() // only called once during Test 2. |
| assertTrue(storeFile.exists()) |
| assertTrue(offsetFile.exists()) |
| validateOffsetFileContents(offsetFile, "kafka.testStream-loggedStore1.0", "50") |
| |
| // Test 4: Initialize again with an updated sspMetadata; Verify that it restores from the correct offset |
| sspMetadata = new SystemStreamPartitionMetadata("0", "150", "151") |
| metadata = new SystemStreamMetadata(getStreamName(loggedStore), new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() { |
| { |
| put(partition, sspMetadata) |
| } |
| }) |
| when(mockStreamMetadataCache.getStreamMetadata(any(), any())).thenReturn(Map(ss -> metadata)) |
| when(mockSystemAdmin.getSSPMetadata(ImmutableSet.of(ssp))) |
| .thenReturn(ImmutableMap.of(ssp, sspMetadata)) |
| when(mockSystemAdmin.getOffsetsAfter(Map(ssp -> "50").asJava)).thenReturn(Map(ssp -> "51").asJava) |
| Mockito.reset(mockSystemConsumer) |
| |
| taskManager = new TaskStorageManagerBuilder() |
| .addStore(loggedStore, mockStorageEngine, mockSystemConsumer) |
| .setStreamMetadataCache(mockStreamMetadataCache) |
| .setSystemAdmin("kafka", mockSystemAdmin) |
| .initializeContainerStorageManager() |
| .build |
| |
| assertTrue(storeFile.exists()) |
| assertTrue(offsetFile.exists()) |
| verify(mockSystemConsumer).register(ssp, "51") |
| } |
| |
| /** |
| * This tests the entire TaskStorageManager lifecycle for an InMemory Logged Store |
| * For example, an InMemory KV store with changelog should not update the offset file on flush & stop |
| * When the task is restarted, it should ALWAYS restore correctly from the earliest offset |
| */ |
| @Test |
| def testStoreLifecycleForLoggedInMemoryStore(): Unit = { |
| // Basic test setup of SystemStream, SystemStreamPartition for this task |
| val ss = new SystemStream("kafka", getStreamName(store)) |
| val partition = new Partition(0) |
| val ssp = new SystemStreamPartition(ss, partition) |
| val storeDirectory = storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, store, taskName, TaskMode.Active) |
| |
| val mockStorageEngine: StorageEngine = createMockStorageEngine(isLoggedStore = true, isPersistedStore = false, null) |
| |
| // Mock for StreamMetadataCache, SystemConsumer, SystemAdmin |
| val mockStreamMetadataCache = mock[StreamMetadataCache] |
| val mockSystemAdmin = mock[SystemAdmin] |
| val changelogSpec = StreamSpec.createChangeLogStreamSpec(getStreamName(store), "kafka", 1) |
| doNothing().when(mockSystemAdmin).validateStream(changelogSpec) |
| |
| val mockSystemConsumer = mock[SystemConsumer] |
| doNothing().when(mockSystemConsumer).stop() |
| |
| // Test 1: Initial invocation - No store data (only changelog has data) |
| // Setup initial sspMetadata |
| val sspMetadata = new SystemStreamPartitionMetadata("0", "50", "51") |
| var metadata = new SystemStreamMetadata(getStreamName(store), new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() { |
| { |
| put(partition, sspMetadata) |
| } |
| }) |
| when(mockStreamMetadataCache.getStreamMetadata(any(), any())).thenReturn(Map(ss -> metadata)) |
| when(mockSystemAdmin.getSSPMetadata(ImmutableSet.of(ssp))).thenReturn(ImmutableMap.of(ssp, sspMetadata)) |
| var taskManager = new TaskStorageManagerBuilder() |
| .addStore(store, mockStorageEngine, mockSystemConsumer) |
| .setStreamMetadataCache(mockStreamMetadataCache) |
| .setSystemAdmin("kafka", mockSystemAdmin) |
| .initializeContainerStorageManager() |
| .build |
| |
| // Verify that the store directory doesn't have ANY files |
| assertTrue(storeDirectory.list().isEmpty) |
| verify(mockSystemConsumer).register(ssp, "0") |
| |
| // Test 2: flush should NOT create/update the offset file. Store directory has no files |
| taskManager.flush() |
| assertTrue(storeDirectory.list().isEmpty) |
| |
| // Test 3: Update sspMetadata before shutdown and verify that offset file is NOT created |
| metadata = new SystemStreamMetadata(getStreamName(store), new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() { |
| { |
| put(partition, new SystemStreamPartitionMetadata("0", "100", "101")) |
| } |
| }) |
| when(mockStreamMetadataCache.getStreamMetadata(any(), any())).thenReturn(Map(ss -> metadata)) |
| when(mockSystemAdmin.getSSPMetadata(ImmutableSet.of(ssp))).thenReturn(ImmutableMap.of(ssp, sspMetadata)) |
| taskManager.stop() |
| assertTrue(storeDirectory.list().isEmpty) |
| |
| // Test 4: Initialize again with an updated sspMetadata; Verify that it restores from the earliest offset |
| metadata = new SystemStreamMetadata(getStreamName(store), new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() { |
| { |
| put(partition, new SystemStreamPartitionMetadata("0", "150", "151")) |
| } |
| }) |
| when(mockStreamMetadataCache.getStreamMetadata(any(), any())).thenReturn(Map(ss -> metadata)) |
| |
| taskManager = new TaskStorageManagerBuilder() |
| .addStore(store, mockStorageEngine, mockSystemConsumer) |
| .setStreamMetadataCache(mockStreamMetadataCache) |
| .setSystemAdmin("kafka", mockSystemAdmin) |
| .initializeContainerStorageManager() |
| .build |
| |
| assertTrue(storeDirectory.list().isEmpty) |
| // second time to register; make sure it starts from beginning |
| verify(mockSystemConsumer, times(2)).register(ssp, "0") |
| } |
| |
| @Test |
| def testStoreDirsWithoutOffsetFileAreDeletedInCleanBaseDirs() { |
| val checkFilePath1 = new File(storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultStoreBaseDir, store, taskName, TaskMode.Active), "check") |
| checkFilePath1.createNewFile() |
| val checkFilePath2 = new File(storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active), "check") |
| checkFilePath2.createNewFile() |
| |
| val taskStorageManager = new TaskStorageManagerBuilder() |
| .addStore(store, false) |
| .addLoggedStore(loggedStore, true) |
| .setStreamMetadataCache(createMockStreamMetadataCache(null, null, null)) //empty store |
| .initializeContainerStorageManager() |
| .build |
| |
| assertTrue("check file was found in store partition directory. Clean up failed!", !checkFilePath1.exists()) |
| assertTrue("check file was found in logged store partition directory. Clean up failed!", !checkFilePath2.exists()) |
| } |
| |
| @Test |
| def testLoggedStoreDirsWithOffsetFileAreNotDeletedInCleanBaseDirs() { |
| val offsetFilePath = new File(storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active), offsetFileName) |
| fileUtil.writeWithChecksum(offsetFilePath, "100") |
| |
| val taskStorageManager = new TaskStorageManagerBuilder() |
| .addLoggedStore(loggedStore, true) |
| .setStreamMetadataCache(createMockStreamMetadataCache(null, null, null)) // empty store |
| .initializeContainerStorageManager() |
| .build |
| |
| assertTrue("Offset file was removed. Clean up failed!", offsetFilePath.exists()) |
| } |
| |
| @Test |
| def testStoreDeletedWhenOffsetFileOlderThanDeleteRetention() { |
| // This test ensures that store gets deleted when lastModifiedTime of the offset file |
| // is older than deletionRetention of the changeLog. |
| val storeDirectory = storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) |
| storeDirectory.setLastModified(0) |
| val offsetFile = new File(storeDirectory, offsetFileName) |
| offsetFile.createNewFile() |
| fileUtil.writeWithChecksum(offsetFile, "Test Offset Data") |
| offsetFile.setLastModified(0) |
| |
| val taskStorageManager = new TaskStorageManagerBuilder().addStore(store, false) |
| .addLoggedStore(loggedStore, true) |
| .setStreamMetadataCache(createMockStreamMetadataCache("0", "1", "2")) |
| .initializeContainerStorageManager() |
| .build |
| |
| assertTrue("Offset file was found in store partition directory. Clean up failed!", !offsetFile.exists()) |
| assertTrue("Store directory should be deleted and re-created with new last modified time", storeDirectory.lastModified() > 0) |
| } |
| |
| @Test |
| def testStoreDeletedWhenCleanDirsFlagSet() { |
| // This test ensures that store gets deleted when the stores.container.start.clean config is set, |
| // and new dir is created with a new last modified time |
| val storeDirectory = storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) |
| val offsetFile = new File(storeDirectory, offsetFileName) |
| offsetFile.createNewFile() |
| fileUtil.writeWithChecksum(offsetFile, "Test Offset Data") |
| |
| val taskStorageManager = new TaskStorageManagerBuilder().addStore(store, false) |
| .addLoggedStore(loggedStore, true) |
| .setStreamMetadataCache(createMockStreamMetadataCache("0", "1", "2")) |
| .initializeContainerStorageManager(true) |
| .build |
| |
| assertTrue("Offset file was found in store partition directory. Clean up failed!", !offsetFile.exists()) |
| assertTrue("Store directory should be deleted and re-created with new last modified time", storeDirectory.lastModified() > 0) |
| } |
| |
| @Test |
| def testOffsetFileIsRemovedInCleanBaseDirsForInMemoryLoggedStore() { |
| val offsetFilePath = new File(storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active), offsetFileName) |
| fileUtil.writeWithChecksum(offsetFilePath, "100") |
| |
| val taskStorageManager = new TaskStorageManagerBuilder() |
| .addLoggedStore(loggedStore, false) |
| .setStreamMetadataCache(createMockStreamMetadataCache(null, null, null)) // empty store |
| .initializeContainerStorageManager() |
| .build |
| |
| assertFalse("Offset file was not removed. Clean up failed!", offsetFilePath.exists()) |
| } |
| |
| @Test |
| def testStopDoesNotCreatesOffsetFileForLoggedStore() { |
| val partition = new Partition(0) |
| |
| val storeDirectory = storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) |
| val offsetFile = new File(storeDirectory, offsetFileName) |
| |
| val ssp = new SystemStreamPartition("kafka", getStreamName(loggedStore), partition) |
| val mockSystemAdmin = mock[SystemAdmin] |
| val sspMetadata = new SystemStreamPartitionMetadata("20", "100", "101") |
| when(mockSystemAdmin.getSSPMetadata(ImmutableSet.of(ssp))).thenReturn(ImmutableMap.of(ssp, sspMetadata)) |
| |
| var metadata = new SystemStreamMetadata(getStreamName(loggedStore), new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() { |
| { |
| put(partition, sspMetadata) |
| } |
| }) |
| |
| val mockStreamMetadataCache = mock[StreamMetadataCache] |
| when(mockStreamMetadataCache.getStreamMetadata(any(), any())).thenReturn(Map(new SystemStream("kafka", getStreamName(loggedStore)) -> metadata)) |
| |
| //Build TaskStorageManager |
| val taskStorageManager = new TaskStorageManagerBuilder() |
| .addLoggedStore(loggedStore, true) |
| .setStreamMetadataCache(mockStreamMetadataCache) |
| .setSystemAdmin("kafka", mockSystemAdmin) |
| .setPartition(partition) |
| .initializeContainerStorageManager() |
| .build |
| |
| //Invoke test method |
| taskStorageManager.stop() |
| |
| //Check conditions |
| assertFalse("Offset file doesn't exist!", offsetFile.exists()) |
| } |
| |
| /** |
| * Given that the SSPMetadataCache returns metadata, flush should create the offset files. |
| */ |
| @Test |
| def testFlushCreatesOffsetFileForLoggedStore() { |
| val partition = new Partition(0) |
| |
| val offsetFilePath = new File(storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + offsetFileName) |
| val anotherOffsetPath = new File( |
| storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, store, taskName, TaskMode.Active) + File.separator + offsetFileName) |
| |
| val ssp1 = new SystemStreamPartition("kafka", getStreamName(loggedStore), partition) |
| val ssp2 = new SystemStreamPartition("kafka", getStreamName(store), partition) |
| val sspMetadata = new SystemStreamPartitionMetadata("20", "100", "101") |
| |
| val mockSystemAdmin = mock[SystemAdmin] |
| when(mockSystemAdmin.getSSPMetadata(ImmutableSet.of(ssp1))).thenReturn(ImmutableMap.of(ssp1, sspMetadata)) |
| when(mockSystemAdmin.getSSPMetadata(ImmutableSet.of(ssp2))).thenReturn(ImmutableMap.of(ssp2, sspMetadata)) |
| |
| //Build TaskStorageManager |
| val taskStorageManager = new TaskStorageManagerBuilder() |
| .addLoggedStore(loggedStore, true) |
| .addStore(store, false) |
| .setSystemAdmin("kafka", mockSystemAdmin) |
| .setStreamMetadataCache(createMockStreamMetadataCache("20", "100", "101")) |
| .setPartition(partition) |
| .initializeContainerStorageManager() |
| .build |
| |
| //Invoke test method |
| taskStorageManager.flush() |
| |
| //Check conditions |
| assertTrue("Offset file doesn't exist!", offsetFilePath.exists()) |
| validateOffsetFileContents(offsetFilePath, "kafka.testStream-loggedStore1.0", "100") |
| |
| assertTrue("Offset file got created for a store that is not persisted to the disk!!", !anotherOffsetPath.exists()) |
| } |
| |
| /** |
| * Flush should delete the existing OFFSET file if the changelog partition (for some reason) becomes empty |
| */ |
| @Test |
| def testFlushDeletesOffsetFileForLoggedStoreForEmptyPartition() { |
| val partition = new Partition(0) |
| |
| val offsetFilePath = new File(storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + offsetFileName) |
| |
| val ssp = new SystemStreamPartition("kafka", getStreamName(loggedStore), partition) |
| val sspMetadata = new SystemStreamPartitionMetadata("0", "100", "101") |
| val nullSspMetadata = new SystemStreamPartitionMetadata(null, null, null) |
| val mockSystemAdmin = mock[SystemAdmin] |
| when(mockSystemAdmin.getSSPMetadata(ImmutableSet.of(ssp))) |
| .thenReturn(ImmutableMap.of(ssp, sspMetadata)) |
| .thenReturn(ImmutableMap.of(ssp, nullSspMetadata)) |
| |
| var metadata = new SystemStreamMetadata(getStreamName(loggedStore), new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() { |
| { |
| put(partition, sspMetadata) |
| } |
| }) |
| |
| val mockStreamMetadataCache = mock[StreamMetadataCache] |
| when(mockStreamMetadataCache.getStreamMetadata(any(), any())).thenReturn(Map(new SystemStream("kafka", getStreamName(loggedStore)) -> metadata)) |
| |
| //Build TaskStorageManager |
| val taskStorageManager = new TaskStorageManagerBuilder() |
| .addLoggedStore(loggedStore, true) |
| .setSystemAdmin("kafka", mockSystemAdmin) |
| .setStreamMetadataCache(mockStreamMetadataCache) |
| .setPartition(partition) |
| .initializeContainerStorageManager() |
| .build |
| |
| //Invoke test method |
| taskStorageManager.flush() |
| |
| //Check conditions |
| assertTrue("Offset file doesn't exist!", offsetFilePath.exists()) |
| validateOffsetFileContents(offsetFilePath, "kafka.testStream-loggedStore1.0", "100") |
| |
| //Invoke test method again |
| taskStorageManager.flush() |
| |
| //Check conditions |
| assertFalse("Offset file for null offset exists!", offsetFilePath.exists()) |
| } |
| |
| @Test |
| def testFlushOverwritesOffsetFileForLoggedStore() { |
| val partition = new Partition(0) |
| val ssp = new SystemStreamPartition("kafka", getStreamName(loggedStore), partition) |
| |
| val offsetFilePath = new File(storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + offsetFileName) |
| fileUtil.writeWithChecksum(offsetFilePath, "100") |
| |
| val sspMetadata = new SystemStreamPartitionMetadata("20", "139", "140") |
| val mockSystemAdmin = mock[SystemAdmin] |
| when(mockSystemAdmin.getSSPMetadata(ImmutableSet.of(ssp))).thenReturn(ImmutableMap.of(ssp, sspMetadata)) |
| |
| |
| var metadata = new SystemStreamMetadata(getStreamName(loggedStore), new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() { |
| { |
| put(partition, sspMetadata) |
| } |
| }) |
| |
| val mockStreamMetadataCache = mock[StreamMetadataCache] |
| when(mockStreamMetadataCache.getStreamMetadata(any(), any())).thenReturn(Map(new SystemStream("kafka", getStreamName(loggedStore)) -> metadata)) |
| |
| //Build TaskStorageManager |
| val taskStorageManager = new TaskStorageManagerBuilder() |
| .addLoggedStore(loggedStore, true) |
| .setSystemAdmin("kafka", mockSystemAdmin) |
| .setPartition(partition) |
| .setStreamMetadataCache(mockStreamMetadataCache) |
| .initializeContainerStorageManager() |
| .build |
| |
| //Invoke test method |
| taskStorageManager.flush() |
| |
| //Check conditions |
| assertTrue("Offset file doesn't exist!", offsetFilePath.exists()) |
| validateOffsetFileContents(offsetFilePath, "kafka.testStream-loggedStore1.0", "139") |
| |
| // Flush again |
| when(mockSystemAdmin.getSSPMetadata(ImmutableSet.of(ssp))) |
| .thenReturn(ImmutableMap.of(ssp, new SystemStreamPartitionMetadata("20", "193", "194"))) |
| |
| //Invoke test method |
| taskStorageManager.flush() |
| |
| //Check conditions |
| assertTrue("Offset file doesn't exist!", offsetFilePath.exists()) |
| validateOffsetFileContents(offsetFilePath, "kafka.testStream-loggedStore1.0", "193") |
| } |
| |
| /** |
| * Validates the contents of the offsetFile against the given ssp and offset. |
| * The legacy offset file only contains the offset as a string, while the new offset file contains a map of |
| * ssp to offset in json format. |
| * The name of the two offset files are given in {@link StorageManagerUtil.OFFSET_FILE_NAME_NEW} and |
| * {@link StorageManagerUtil.OFFSET_FILE_LEGACY}. |
| */ |
| private def validateOffsetFileContents(offsetFile: File, ssp: String, offset: String): Unit = { |
| |
| if (offsetFile.getCanonicalFile.getName.equals(StorageManagerUtil.OFFSET_FILE_NAME_NEW)) { |
| assertEquals("Found incorrect value in offset file!", "{\"" + ssp + "\":\"" + offset + "\"}", fileUtil.readWithChecksum(offsetFile)) |
| } else if (offsetFile.getCanonicalFile.getName.equals(StorageManagerUtil.OFFSET_FILE_NAME_LEGACY)) { |
| assertEquals("Found incorrect value in offset file!", offset, fileUtil.readWithChecksum(offsetFile)) |
| } else { |
| throw new IllegalArgumentException("Invalid offset file name"); |
| } |
| } |
| |
| @Test |
| def testStopShouldNotCreateOffsetFileForEmptyStore() { |
| val partition = new Partition(0) |
| |
| val offsetFilePath = new File(storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) + File.separator + offsetFileName) |
| |
| |
| val sspMetadataCache = mock[SSPMetadataCache] |
| when(sspMetadataCache.getMetadata(new SystemStreamPartition("kafka", getStreamName(loggedStore), partition))).thenReturn(null) |
| |
| //Build TaskStorageManager |
| val taskStorageManager = new TaskStorageManagerBuilder() |
| .addLoggedStore(loggedStore, true) |
| .setPartition(partition) |
| .setStreamMetadataCache(createMockStreamMetadataCache(null, null, null)) // null offsets for empty store |
| .initializeContainerStorageManager() |
| .build |
| |
| //Invoke test method |
| taskStorageManager.stop() |
| |
| //Check conditions |
| assertTrue("Offset file should not exist!", !offsetFilePath.exists()) |
| } |
| |
| @Test |
| def testCleanBaseDirsShouldNotAddNullOffsetsToFileOffsetsMap(): Unit = { |
| // If a null file offset were allowed, and the full Map passed to SystemAdmin.getOffsetsAfter an NPE could |
| // occur for some SystemAdmin implementations |
| val writeOffsetFile = true |
| val fileOffset = null |
| val oldestOffset = "3" |
| val newestOffset = "150" |
| val upcomingOffset = "151" |
| val expectedRegisteredOffset = "3" |
| |
| testChangelogConsumerOffsetRegistration(oldestOffset, newestOffset, upcomingOffset, expectedRegisteredOffset, fileOffset, writeOffsetFile) |
| } |
| |
| @Test |
| def testStartConsumersShouldRegisterCorrectOffsetWhenFileOffsetValid(): Unit = { |
| // We should register the offset AFTER the stored file offset. |
| // The file offset represents the last changelog message that is also reflected in the store. So start with next one. |
| val writeOffsetFile = true |
| val fileOffset = "139" |
| val oldestOffset = "3" |
| val newestOffset = "150" |
| val upcomingOffset = "151" |
| val expectedRegisteredOffset = "140" |
| |
| testChangelogConsumerOffsetRegistration(oldestOffset, newestOffset, upcomingOffset, expectedRegisteredOffset, fileOffset, writeOffsetFile) |
| } |
| |
| @Test |
| def testStartConsumersShouldRegisterCorrectOffsetWhenFileOffsetOlderThanOldestOffset(): Unit = { |
| // We should register the oldest offset if it is less than the file offset |
| val writeOffsetFile = true |
| val fileOffset = "139" |
| val oldestOffset = "145" |
| val newestOffset = "150" |
| val upcomingOffset = "151" |
| val expectedRegisteredOffset = "145" |
| |
| testChangelogConsumerOffsetRegistration(oldestOffset, newestOffset, upcomingOffset, expectedRegisteredOffset, fileOffset, writeOffsetFile) |
| } |
| |
| @Test |
| def testStartConsumersShouldRegisterCorrectOffsetWhenOldestOffsetGreaterThanZero(): Unit = { |
| val writeOffsetFile = false |
| val fileOffset = null |
| val oldestOffset = "3" |
| val newestOffset = "150" |
| val upcomingOffset = "151" |
| val expectedRegisteredOffset = "3" |
| |
| testChangelogConsumerOffsetRegistration(oldestOffset, newestOffset, upcomingOffset, expectedRegisteredOffset, fileOffset, writeOffsetFile) |
| } |
| |
| @Test |
| def testReadOfOldOffsetFormat(): Unit = { |
| // Create a file in old single-offset format, with a sample offset |
| val storeDirectory = storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) |
| val storeFile = new File(storeDirectory, "store.sst") |
| val offsetFile = new File(storeDirectory, offsetFileName) |
| val sampleOldOffset = "912321" |
| fileUtil.writeWithChecksum(offsetFile, sampleOldOffset) |
| |
| |
| // read offset against a given ssp from the file |
| var ssp = new SystemStreamPartition("kafka", "test-stream", new Partition(0)) |
| val offsets = storageManagerUtil.readOffsetFile(storeDirectory, Set(ssp).asJava, false) |
| assertTrue(offsets.get(ssp).equals(sampleOldOffset)) |
| } |
| |
| @Test |
| def testReadOfOffsetInCaseOfBothFilesPresent(): Unit = { |
| // Create a file in old single-offset format, with a sample offset, and another with the new-offset format |
| val storeDirectory = storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active) |
| val storeFile = new File(storeDirectory, "store.sst") |
| val sampleOldOffset = "100000001" |
| val sampleNewOffset = "{\"kafka.test-stream.0\":\"200000002\"}" |
| fileUtil.writeWithChecksum(new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_LEGACY), sampleOldOffset) |
| fileUtil.writeWithChecksum(new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_NEW), sampleNewOffset) |
| |
| // Ensure that the files exist |
| assertTrue(new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_LEGACY).exists()) |
| assertTrue(new File(storeDirectory, StorageManagerUtil.OFFSET_FILE_NAME_NEW).exists()) |
| |
| // read offset against a given ssp from the file, and check that the one in the new file should be read |
| var ssp = new SystemStreamPartition("kafka", "test-stream", new Partition(0)) |
| val offsets = storageManagerUtil.readOffsetFile(storeDirectory, Set(ssp).asJava, false) |
| |
| assertEquals(1, offsets.size()) |
| assertEquals("200000002", offsets.get(ssp)) |
| } |
| |
| private def testChangelogConsumerOffsetRegistration(oldestOffset: String, newestOffset: String, upcomingOffset: String, expectedRegisteredOffset: String, fileOffset: String, writeOffsetFile: Boolean): Unit = { |
| val systemName = "kafka" |
| val streamName = getStreamName(loggedStore) |
| val partitionCount = 1 |
| // Basic test setup of SystemStream, SystemStreamPartition for this task |
| val ss = new SystemStream(systemName, streamName) |
| val partition = new Partition(0) |
| val ssp = new SystemStreamPartition(ss, partition) |
| val storeDirectory = storageManagerUtil.getTaskStoreDir(TaskStorageManagerBuilder.defaultLoggedStoreBaseDir, loggedStore, taskName, TaskMode.Active ) |
| val storeFile = new File(storeDirectory, "store.sst") |
| |
| if (writeOffsetFile) { |
| val offsetFile = new File(storeDirectory, offsetFileName) |
| if (fileOffset != null) { |
| fileUtil.writeWithChecksum(offsetFile, fileOffset) |
| } else { |
| // Write garbage to produce a null result when it's read |
| val fos = new FileOutputStream(offsetFile) |
| val oos = new ObjectOutputStream(fos) |
| oos.writeLong(1) |
| oos.writeUTF("Bad Offset") |
| oos.close() |
| fos.close() |
| } |
| } |
| |
| val mockStorageEngine: StorageEngine = createMockStorageEngine(isLoggedStore = true, isPersistedStore = true, storeFile) |
| |
| // Mock for StreamMetadataCache, SystemConsumer, SystemAdmin |
| val mockStreamMetadataCache = mock[StreamMetadataCache] |
| |
| val mockSystemAdmin = mock[SystemAdmin] |
| val changelogSpec = StreamSpec.createChangeLogStreamSpec(streamName, systemName, partitionCount) |
| doNothing().when(mockSystemAdmin).validateStream(changelogSpec) |
| when(mockSystemAdmin.getOffsetsAfter(any())).thenAnswer(new Answer[util.Map[SystemStreamPartition, String]] { |
| override def answer(invocation: InvocationOnMock): util.Map[SystemStreamPartition, String] = { |
| val originalOffsets = invocation.getArgumentAt(0, classOf[util.Map[SystemStreamPartition, String]]) |
| originalOffsets.asScala.mapValues(offset => (offset.toLong + 1).toString).asJava |
| } |
| }) |
| when(mockSystemAdmin.offsetComparator(any(), any())).thenAnswer(new Answer[Integer] { |
| override def answer(invocation: InvocationOnMock): Integer = { |
| val offset1 = invocation.getArgumentAt(0, classOf[String]) |
| val offset2 = invocation.getArgumentAt(1, classOf[String]) |
| offset1.toLong compare offset2.toLong |
| } |
| }) |
| |
| val mockSystemConsumer = mock[SystemConsumer] |
| when(mockSystemConsumer.register(any(classOf[SystemStreamPartition]), any(classOf[String]))).thenAnswer(new Answer[Unit] { |
| override def answer(invocation: InvocationOnMock): Unit = { |
| val args = invocation.getArguments |
| if (ssp.equals(args.apply(0).asInstanceOf[SystemStreamPartition])) { |
| val offset = args.apply(1).asInstanceOf[String] |
| assertNotNull(offset) |
| assertEquals(expectedRegisteredOffset, offset) |
| } |
| } |
| }) |
| doNothing().when(mockSystemConsumer).stop() |
| |
| // Test 1: Initial invocation - No store on disk (only changelog has data) |
| // Setup initial sspMetadata |
| val sspMetadata = new SystemStreamPartitionMetadata(oldestOffset, newestOffset, upcomingOffset) |
| var metadata = new SystemStreamMetadata(streamName, new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() { |
| { |
| put(partition, sspMetadata) |
| } |
| }) |
| when(mockStreamMetadataCache.getStreamMetadata(any(), any())).thenReturn(Map(ss -> metadata)) |
| when(mockSystemAdmin.getSystemStreamMetadata(any())).thenReturn(new util.HashMap[String, SystemStreamMetadata]() { |
| { |
| put(streamName, metadata) |
| } |
| }) |
| |
| val taskManager = new TaskStorageManagerBuilder() |
| .addStore(loggedStore, mockStorageEngine, mockSystemConsumer) |
| .setStreamMetadataCache(mockStreamMetadataCache) |
| .setSystemAdmin(systemName, mockSystemAdmin) |
| .initializeContainerStorageManager() |
| .build |
| |
| verify(mockSystemConsumer).register(any(classOf[SystemStreamPartition]), anyString()) |
| } |
| |
| private def createMockStreamMetadataCache(oldestOffset: String, newestOffset: String, upcomingOffset: String) = { |
| // an empty store would return a SSPMetadata with oldest, newest and upcoming offset set to null |
| var metadata1 = new SystemStreamMetadata(getStreamName(loggedStore), new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() { |
| { |
| put(new Partition(0), new SystemStreamPartitionMetadata(oldestOffset, newestOffset, upcomingOffset)) |
| } |
| }) |
| |
| var metadata2 = new SystemStreamMetadata(getStreamName(store), new java.util.HashMap[Partition, SystemStreamPartitionMetadata]() { |
| { |
| put(new Partition(0), new SystemStreamPartitionMetadata(oldestOffset, newestOffset, upcomingOffset)) |
| } |
| }) |
| |
| val mockStreamMetadataCache = mock[StreamMetadataCache] |
| when(mockStreamMetadataCache.getStreamMetadata(org.mockito.Matchers.eq(Set(new SystemStream("kafka", getStreamName(loggedStore)))), any())).thenReturn(Map(new SystemStream("kafka", getStreamName(loggedStore)) -> metadata1)) |
| when(mockStreamMetadataCache.getStreamMetadata(org.mockito.Matchers.eq(Set(new SystemStream("kafka", getStreamName(store)))), any())).thenReturn(Map(new SystemStream("kafka", getStreamName(store)) -> metadata2)) |
| when(mockStreamMetadataCache.getStreamMetadata(org.mockito.Matchers.eq(Set(new SystemStream("kafka", getStreamName(store)), new SystemStream("kafka", getStreamName(loggedStore)))), any())). |
| thenReturn(Map(new SystemStream("kafka", getStreamName(store)) -> metadata2, new SystemStream("kafka", getStreamName(loggedStore)) -> metadata1)) |
| |
| mockStreamMetadataCache |
| } |
| |
| private def createMockStorageEngine(isLoggedStore: Boolean, isPersistedStore: Boolean, storeFile: File) = { |
| val mockStorageEngine = mock[StorageEngine] |
| // getStoreProperties should always return the same StoreProperties |
| when(mockStorageEngine.getStoreProperties).thenAnswer(new Answer[StoreProperties] { |
| override def answer(invocation: InvocationOnMock): StoreProperties = { |
| new StorePropertiesBuilder().setLoggedStore(isLoggedStore).setPersistedToDisk(isPersistedStore).build() |
| } |
| }) |
| // Restore simply creates the file |
| if (storeFile != null) { |
| when(mockStorageEngine.restore(any())).thenAnswer(new Answer[Unit] { |
| override def answer(invocation: InvocationOnMock): Unit = { |
| storeFile.createNewFile() |
| } |
| }) |
| } else { |
| doNothing().when(mockStorageEngine).restore(any()) |
| } |
| mockStorageEngine |
| } |
| } |
| |
| object TestNonTransactionalStateTaskStorageManager { |
| |
| @Parameters def parameters: util.Collection[Array[String]] = { |
| val offsetFileNames = new util.ArrayList[Array[String]]() |
| offsetFileNames.add(Array(StorageManagerUtil.OFFSET_FILE_NAME_NEW)) |
| offsetFileNames.add(Array(StorageManagerUtil.OFFSET_FILE_NAME_LEGACY)) |
| offsetFileNames |
| } |
| } |
| |
| |
| object TaskStorageManagerBuilder { |
| val defaultStoreBaseDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "store") |
| val defaultLoggedStoreBaseDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "loggedStore") |
| } |
| |
| class TaskStorageManagerBuilder extends MockitoSugar { |
| var taskStores: Map[String, StorageEngine] = Map() |
| var storeConsumers: Map[String, SystemConsumer] = Map() |
| var changeLogSystemStreams: Map[String, SystemStream] = Map() |
| var streamMetadataCache = mock[StreamMetadataCache] |
| var partition: Partition = new Partition(0) |
| var systemAdminsMap: Map[String, SystemAdmin] = Map("kafka" -> mock[SystemAdmin]) |
| var taskName: TaskName = new TaskName("testTask") |
| var storeBaseDir: File = TaskStorageManagerBuilder.defaultStoreBaseDir |
| var loggedStoreBaseDir: File = TaskStorageManagerBuilder.defaultLoggedStoreBaseDir |
| var changeLogStreamPartitions: Int = 1 |
| var containerStorageManager: ContainerStorageManager = mock[ContainerStorageManager] |
| |
| def addStore(storeName: String, storageEngine: StorageEngine, systemConsumer: SystemConsumer): TaskStorageManagerBuilder = { |
| taskStores = taskStores ++ Map(storeName -> storageEngine) |
| storeConsumers = storeConsumers ++ Map("kafka" -> systemConsumer) |
| changeLogSystemStreams = changeLogSystemStreams ++ Map(storeName -> new SystemStream("kafka", getStreamName(storeName))) |
| this |
| } |
| |
| def getStreamName(storeName : String): String = { |
| "testStream-"+storeName |
| } |
| |
| def addStore(storeName: String, isPersistedToDisk: Boolean): TaskStorageManagerBuilder = { |
| val mockStorageEngine = mock[StorageEngine] |
| when(mockStorageEngine.getStoreProperties) |
| .thenReturn(new StorePropertiesBuilder().setPersistedToDisk(isPersistedToDisk).setLoggedStore(false).build()) |
| addStore(storeName, mockStorageEngine, mock[SystemConsumer]) |
| } |
| |
| def addLoggedStore(storeName: String, isPersistedToDisk: Boolean): TaskStorageManagerBuilder = { |
| val mockStorageEngine = mock[StorageEngine] |
| when(mockStorageEngine.getStoreProperties) |
| .thenReturn(new StorePropertiesBuilder().setPersistedToDisk(isPersistedToDisk).setLoggedStore(true).build()) |
| addStore(storeName, mockStorageEngine, mock[SystemConsumer]) |
| } |
| |
| def setPartition(p: Partition) = { |
| partition = p |
| this |
| } |
| |
| def setChangeLogSystemStreams(storeName: String, systemStream: SystemStream) = { |
| changeLogSystemStreams = changeLogSystemStreams ++ Map(storeName -> systemStream) |
| this |
| } |
| |
| def setSystemAdmin(system: String, systemAdmin: SystemAdmin) = { |
| systemAdminsMap = systemAdminsMap ++ Map(system -> systemAdmin) |
| this |
| } |
| |
| def setTaskName(tn: TaskName) = { |
| taskName = tn |
| this |
| } |
| |
| def setStreamMetadataCache(metadataCache: StreamMetadataCache) = { |
| streamMetadataCache = metadataCache |
| this |
| } |
| |
| /** |
| * This method creates and starts a {@link ContainerStorageManager} |
| */ |
| def initializeContainerStorageManager(cleanStoreDirsOnStart : Boolean = false) = { |
| var tasks: Map[TaskName, TaskModel] = HashMap[TaskName, TaskModel]((taskName, new TaskModel(taskName, new util.HashSet[SystemStreamPartition], new Partition(0)))) |
| var containerModel = new ContainerModel("container", tasks.asJava) |
| |
| val mockSystemAdmins = Mockito.mock(classOf[SystemAdmins]) |
| Mockito.when(mockSystemAdmins.getSystemAdmin(org.mockito.Matchers.eq("kafka"))).thenReturn(systemAdminsMap.get("kafka").get) |
| |
| var mockStorageEngineFactory : StorageEngineFactory[AnyRef, AnyRef] = Mockito.mock(classOf[StorageEngineFactory[AnyRef, AnyRef]]) |
| |
| var storageEngineFactories : mutable.Map[String, StorageEngineFactory[AnyRef, AnyRef]] = scala.collection.mutable.Map[String, StorageEngineFactory[AnyRef, AnyRef]]() |
| |
| if(taskStores.contains("store1")) { |
| Mockito.when(mockStorageEngineFactory.getStorageEngine(org.mockito.Matchers.eq("store1"), any(), any(), any(), any(), any(), any(), any(), any(), any())) |
| .thenReturn(taskStores.get("store1").get) |
| storageEngineFactories += ("store1" -> mockStorageEngineFactory) |
| } |
| |
| if(taskStores.contains("loggedStore1")) { |
| Mockito.when(mockStorageEngineFactory.getStorageEngine(org.mockito.Matchers.eq("loggedStore1"), any(), any(), any(), any(), any(), any(), any(), any(), any())) |
| .thenReturn(taskStores.get("loggedStore1").get) |
| storageEngineFactories += ("loggedStore1" -> mockStorageEngineFactory) |
| } |
| |
| |
| var mockSystemFactory = Mockito.mock(classOf[SystemFactory]) |
| Mockito.when(mockSystemFactory.getConsumer(org.mockito.Matchers.eq("kafka"),any(), any())).thenReturn(storeConsumers.get("kafka").get) |
| var systemFactories : Map[String, SystemFactory] = HashMap[String, SystemFactory](("kafka", mockSystemFactory)) |
| |
| var config = new MapConfig(mutable.Map( |
| "stores.store1.clean.on.container.start" -> cleanStoreDirsOnStart.toString, |
| "stores.loggedStore1.clean.on.container.start" -> cleanStoreDirsOnStart.toString, |
| "stores.store1.key.serde" -> classOf[StringSerdeFactory].getCanonicalName, |
| "stores.store1.msg.serde" -> classOf[StringSerdeFactory].getCanonicalName, |
| "stores.loggedStore1.key.serde" -> classOf[StringSerdeFactory].getCanonicalName, |
| "stores.loggedStore1.msg.serde" -> classOf[StringSerdeFactory].getCanonicalName, |
| TaskConfig.TRANSACTIONAL_STATE_RESTORE_ENABLED -> "false").asJava) |
| |
| var mockSerdes: Map[String, Serde[AnyRef]] = HashMap[String, Serde[AnyRef]]((classOf[StringSerdeFactory].getCanonicalName, Mockito.mock(classOf[Serde[AnyRef]]))) |
| |
| val mockCheckpointManager = Mockito.mock(classOf[CheckpointManager]) |
| when(mockCheckpointManager.readLastCheckpoint(any(classOf[TaskName]))) |
| .thenReturn(new Checkpoint(new util.HashMap[SystemStreamPartition, String]())) |
| |
| val mockSSPMetadataCache = Mockito.mock(classOf[SSPMetadataCache]) |
| |
| containerStorageManager = new ContainerStorageManager( |
| mockCheckpointManager, |
| containerModel, |
| streamMetadataCache, |
| mockSSPMetadataCache, |
| mockSystemAdmins, |
| changeLogSystemStreams.asJava, |
| Map[String, util.Set[SystemStream]]().asJava, |
| storageEngineFactories.asJava, |
| systemFactories.asJava, |
| mockSerdes.asJava, |
| config, |
| new HashMap[TaskName, TaskInstanceMetrics]().asJava, |
| Mockito.mock(classOf[SamzaContainerMetrics]), |
| Mockito.mock(classOf[JobContext]), |
| Mockito.mock(classOf[ContainerContext]), |
| new HashMap[TaskName, TaskInstanceCollector].asJava, |
| loggedStoreBaseDir, |
| TaskStorageManagerBuilder.defaultStoreBaseDir, |
| 1, |
| null, |
| new SystemClock) |
| this |
| } |
| |
| |
| |
| def build: NonTransactionalStateTaskStorageManager = { |
| |
| if (containerStorageManager != null) { |
| containerStorageManager.start() |
| } |
| |
| new NonTransactionalStateTaskStorageManager( |
| taskName = taskName, |
| containerStorageManager = containerStorageManager, |
| storeChangelogs = changeLogSystemStreams, |
| systemAdmins = buildSystemAdmins(systemAdminsMap), |
| loggedStoreBaseDir = loggedStoreBaseDir, |
| partition = partition |
| ) |
| } |
| |
| private def buildSystemAdmins(systemAdminsMap: Map[String, SystemAdmin]): SystemAdmins = { |
| val systemAdmins = mock[SystemAdmins] |
| systemAdminsMap.foreach { case (system, systemAdmin) => |
| when(systemAdmins.getSystemAdmin(system)).thenReturn(systemAdmin) |
| } |
| systemAdmins |
| } |
| } |