blob: 244a35b82420d1ae18701610f7657e9ba18a1510 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.storage;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.io.FileFilter;
import scala.Option;
import scala.collection.immutable.Map;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Optional;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.CheckpointId;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.ScalaJavaUtil;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestTransactionalStateTaskStorageManager {
@Test
public void testFlushOrder() {
ContainerStorageManager csm = mock(ContainerStorageManager.class);
StorageEngine mockStore = mock(StorageEngine.class);
java.util.Map<String, StorageEngine> taskStores = ImmutableMap.of("mockStore", mockStore);
when(csm.getAllStores(any())).thenReturn(taskStores);
TransactionalStateTaskStorageManager tsm = spy(buildTSM(csm, mock(Partition.class), new StorageManagerUtil()));
// stub actual method call
doReturn(mock(Map.class)).when(tsm).getNewestChangelogSSPOffsets(any(), any(), any(), any());
// invoke flush
tsm.flush();
// ensure that stores are flushed before we get newest changelog offsets
InOrder inOrder = inOrder(mockStore, tsm);
inOrder.verify(mockStore).flush();
inOrder.verify(tsm).getNewestChangelogSSPOffsets(any(), any(), any(), any());
}
@Test
public void testGetNewestOffsetsReturnsCorrectOffset() {
ContainerStorageManager csm = mock(ContainerStorageManager.class);
TransactionalStateTaskStorageManager tsm = buildTSM(csm, mock(Partition.class), new StorageManagerUtil());
TaskName taskName = mock(TaskName.class);
String changelogSystemName = "systemName";
String storeName = "storeName";
String changelogStreamName = "changelogName";
String newestChangelogSSPOffset = "1";
SystemStream changelogSystemStream = new SystemStream(changelogSystemName, changelogStreamName);
Partition changelogPartition = new Partition(0);
SystemStreamPartition changelogSSP = new SystemStreamPartition(changelogSystemStream, changelogPartition);
Map<String, SystemStream> storeChangelogs =
ScalaJavaUtil.toScalaMap(ImmutableMap.of(storeName, changelogSystemStream));
SystemAdmins systemAdmins = mock(SystemAdmins.class);
SystemAdmin systemAdmin = mock(SystemAdmin.class);
SystemStreamPartitionMetadata metadata = mock(SystemStreamPartitionMetadata.class);
when(metadata.getNewestOffset()).thenReturn(newestChangelogSSPOffset);
when(systemAdmins.getSystemAdmin(changelogSystemName)).thenReturn(systemAdmin);
when(systemAdmin.getSSPMetadata(eq(ImmutableSet.of(changelogSSP)))).thenReturn(ImmutableMap.of(changelogSSP, metadata));
// invoke the method
Map<SystemStreamPartition, Option<String>> offsets =
tsm.getNewestChangelogSSPOffsets(
taskName, storeChangelogs, changelogPartition, systemAdmins);
// verify results
assertEquals(1, offsets.size());
assertEquals(Option.apply(newestChangelogSSPOffset), offsets.apply(changelogSSP));
}
@Test
public void testGetNewestOffsetsReturnsNoneForEmptyTopic() {
// empty topic == null newest offset
ContainerStorageManager csm = mock(ContainerStorageManager.class);
TransactionalStateTaskStorageManager tsm = buildTSM(csm, mock(Partition.class), new StorageManagerUtil());
TaskName taskName = mock(TaskName.class);
String changelogSystemName = "systemName";
String storeName = "storeName";
String changelogStreamName = "changelogName";
String newestChangelogSSPOffset = null;
SystemStream changelogSystemStream = new SystemStream(changelogSystemName, changelogStreamName);
Partition changelogPartition = new Partition(0);
SystemStreamPartition changelogSSP = new SystemStreamPartition(changelogSystemStream, changelogPartition);
Map<String, SystemStream> storeChangelogs =
ScalaJavaUtil.toScalaMap(ImmutableMap.of(storeName, changelogSystemStream));
SystemAdmins systemAdmins = mock(SystemAdmins.class);
SystemAdmin systemAdmin = mock(SystemAdmin.class);
SystemStreamPartitionMetadata metadata = mock(SystemStreamPartitionMetadata.class);
when(metadata.getNewestOffset()).thenReturn(newestChangelogSSPOffset);
when(systemAdmins.getSystemAdmin(changelogSystemName)).thenReturn(systemAdmin);
when(systemAdmin.getSSPMetadata(eq(ImmutableSet.of(changelogSSP)))).thenReturn(ImmutableMap.of(changelogSSP, metadata));
// invoke the method
Map<SystemStreamPartition, Option<String>> offsets =
tsm.getNewestChangelogSSPOffsets(
taskName, storeChangelogs, changelogPartition, systemAdmins);
// verify results
assertEquals(1, offsets.size());
assertEquals(Option.empty(), offsets.apply(changelogSSP));
}
@Test(expected = SamzaException.class)
public void testGetNewestOffsetsThrowsIfNullMetadata() {
// empty topic == null newest offset
ContainerStorageManager csm = mock(ContainerStorageManager.class);
TransactionalStateTaskStorageManager tsm = buildTSM(csm, mock(Partition.class), new StorageManagerUtil());
TaskName taskName = mock(TaskName.class);
String changelogSystemName = "systemName";
String storeName = "storeName";
String changelogStreamName = "changelogName";
String newestChangelogSSPOffset = null;
SystemStream changelogSystemStream = new SystemStream(changelogSystemName, changelogStreamName);
Partition changelogPartition = new Partition(0);
SystemStreamPartition changelogSSP = new SystemStreamPartition(changelogSystemStream, changelogPartition);
Map<String, SystemStream> storeChangelogs =
ScalaJavaUtil.toScalaMap(ImmutableMap.of(storeName, changelogSystemStream));
SystemAdmins systemAdmins = mock(SystemAdmins.class);
SystemAdmin systemAdmin = mock(SystemAdmin.class);
SystemStreamPartitionMetadata metadata = mock(SystemStreamPartitionMetadata.class);
when(metadata.getNewestOffset()).thenReturn(newestChangelogSSPOffset);
when(systemAdmins.getSystemAdmin(changelogSystemName)).thenReturn(systemAdmin);
when(systemAdmin.getSSPMetadata(eq(ImmutableSet.of(changelogSSP)))).thenReturn(null);
// invoke the method
Map<SystemStreamPartition, Option<String>> offsets =
tsm.getNewestChangelogSSPOffsets(
taskName, storeChangelogs, changelogPartition, systemAdmins);
// verify results
fail("Should have thrown an exception if admin didn't return any metadata");
}
@Test(expected = SamzaException.class)
public void testGetNewestOffsetsThrowsIfNullSSPMetadata() {
// empty topic == null newest offset
ContainerStorageManager csm = mock(ContainerStorageManager.class);
TransactionalStateTaskStorageManager tsm = buildTSM(csm, mock(Partition.class), new StorageManagerUtil());
TaskName taskName = mock(TaskName.class);
String changelogSystemName = "systemName";
String storeName = "storeName";
String changelogStreamName = "changelogName";
String newestChangelogSSPOffset = null;
SystemStream changelogSystemStream = new SystemStream(changelogSystemName, changelogStreamName);
Partition changelogPartition = new Partition(0);
SystemStreamPartition changelogSSP = new SystemStreamPartition(changelogSystemStream, changelogPartition);
Map<String, SystemStream> storeChangelogs =
ScalaJavaUtil.toScalaMap(ImmutableMap.of(storeName, changelogSystemStream));
SystemAdmins systemAdmins = mock(SystemAdmins.class);
SystemAdmin systemAdmin = mock(SystemAdmin.class);
SystemStreamPartitionMetadata metadata = mock(SystemStreamPartitionMetadata.class);
when(metadata.getNewestOffset()).thenReturn(newestChangelogSSPOffset);
when(systemAdmins.getSystemAdmin(changelogSystemName)).thenReturn(systemAdmin);
java.util.Map metadataMap = new HashMap() { {
put(changelogSSP, null);
} };
when(systemAdmin.getSSPMetadata(eq(ImmutableSet.of(changelogSSP)))).thenReturn(metadataMap);
// invoke the method
Map<SystemStreamPartition, Option<String>> offsets =
tsm.getNewestChangelogSSPOffsets(
taskName, storeChangelogs, changelogPartition, systemAdmins);
// verify results
fail("Should have thrown an exception if admin returned null metadata for changelog SSP");
}
@Test(expected = SamzaException.class)
public void testGetNewestOffsetsThrowsIfErrorGettingMetadata() {
// empty topic == null newest offset
ContainerStorageManager csm = mock(ContainerStorageManager.class);
TransactionalStateTaskStorageManager tsm = buildTSM(csm, mock(Partition.class), new StorageManagerUtil());
TaskName taskName = mock(TaskName.class);
String changelogSystemName = "systemName";
String storeName = "storeName";
String changelogStreamName = "changelogName";
String newestChangelogSSPOffset = null;
SystemStream changelogSystemStream = new SystemStream(changelogSystemName, changelogStreamName);
Partition changelogPartition = new Partition(0);
SystemStreamPartition changelogSSP = new SystemStreamPartition(changelogSystemStream, changelogPartition);
Map<String, SystemStream> storeChangelogs =
ScalaJavaUtil.toScalaMap(ImmutableMap.of(storeName, changelogSystemStream));
SystemAdmins systemAdmins = mock(SystemAdmins.class);
SystemAdmin systemAdmin = mock(SystemAdmin.class);
SystemStreamPartitionMetadata metadata = mock(SystemStreamPartitionMetadata.class);
when(metadata.getNewestOffset()).thenReturn(newestChangelogSSPOffset);
when(systemAdmins.getSystemAdmin(changelogSystemName)).thenThrow(new SamzaException("Error getting metadata"));
when(systemAdmin.getSSPMetadata(eq(ImmutableSet.of(changelogSSP)))).thenReturn(null);
// invoke the method
Map<SystemStreamPartition, Option<String>> offsets =
tsm.getNewestChangelogSSPOffsets(
taskName, storeChangelogs, changelogPartition, systemAdmins);
// verify results
fail("Should have thrown an exception if admin had an error getting metadata");
}
@Test
public void testCheckpoint() {
ContainerStorageManager csm = mock(ContainerStorageManager.class);
StorageEngine mockLPStore = mock(StorageEngine.class);
StoreProperties lpStoreProps = mock(StoreProperties.class);
when(mockLPStore.getStoreProperties()).thenReturn(lpStoreProps);
when(lpStoreProps.isPersistedToDisk()).thenReturn(true);
when(lpStoreProps.isLoggedStore()).thenReturn(true);
Path mockPath = mock(Path.class);
when(mockLPStore.checkpoint(any())).thenReturn(Optional.of(mockPath));
StorageEngine mockPStore = mock(StorageEngine.class);
StoreProperties pStoreProps = mock(StoreProperties.class);
when(mockPStore.getStoreProperties()).thenReturn(pStoreProps);
when(pStoreProps.isPersistedToDisk()).thenReturn(true);
when(pStoreProps.isLoggedStore()).thenReturn(false);
StorageEngine mockLIStore = mock(StorageEngine.class);
StoreProperties liStoreProps = mock(StoreProperties.class);
when(mockLIStore.getStoreProperties()).thenReturn(liStoreProps);
when(liStoreProps.isPersistedToDisk()).thenReturn(false);
when(liStoreProps.isLoggedStore()).thenReturn(true);
StorageEngine mockIStore = mock(StorageEngine.class);
StoreProperties iStoreProps = mock(StoreProperties.class);
when(mockIStore.getStoreProperties()).thenReturn(iStoreProps);
when(iStoreProps.isPersistedToDisk()).thenReturn(false);
when(iStoreProps.isLoggedStore()).thenReturn(false);
java.util.Map<String, StorageEngine> taskStores = ImmutableMap.of(
"loggedPersistentStore", mockLPStore,
"persistentStore", mockPStore,
"loggedInMemStore", mockLIStore,
"inMemStore", mockIStore
);
when(csm.getAllStores(any())).thenReturn(taskStores);
TransactionalStateTaskStorageManager tsm = spy(buildTSM(csm, mock(Partition.class), new StorageManagerUtil()));
// stub actual method call
ArgumentCaptor<Map> checkpointPathsCaptor = ArgumentCaptor.forClass(Map.class);
doNothing().when(tsm).writeChangelogOffsetFiles(any(), any(), any());
Map<SystemStreamPartition, Option<String>> offsets = ScalaJavaUtil.toScalaMap(
ImmutableMap.of(mock(SystemStreamPartition.class), Option.apply("1")));
// invoke checkpoint
tsm.checkpoint(CheckpointId.create(), offsets);
// ensure that checkpoint is never called for non-logged persistent stores since they're
// always cleared on restart.
verify(mockPStore, never()).checkpoint(any());
// ensure that checkpoint is never called for in-memory stores since they're not persistent.
verify(mockIStore, never()).checkpoint(any());
verify(mockLIStore, never()).checkpoint(any());
verify(tsm).writeChangelogOffsetFiles(checkpointPathsCaptor.capture(), any(), eq(offsets));
Map<String, Path> checkpointPaths = checkpointPathsCaptor.getValue();
assertEquals(1, checkpointPaths.size());
assertEquals(mockPath, checkpointPaths.apply("loggedPersistentStore"));
}
@Test(expected = IllegalStateException.class)
public void testCheckpointFailsIfErrorCreatingCheckpoint() {
ContainerStorageManager csm = mock(ContainerStorageManager.class);
StorageEngine mockLPStore = mock(StorageEngine.class);
StoreProperties lpStoreProps = mock(StoreProperties.class);
when(mockLPStore.getStoreProperties()).thenReturn(lpStoreProps);
when(lpStoreProps.isPersistedToDisk()).thenReturn(true);
when(lpStoreProps.isLoggedStore()).thenReturn(true);
when(mockLPStore.checkpoint(any())).thenThrow(new IllegalStateException());
java.util.Map<String, StorageEngine> taskStores =
ImmutableMap.of("loggedPersistentStore", mockLPStore);
when(csm.getAllStores(any())).thenReturn(taskStores);
TransactionalStateTaskStorageManager tsm = spy(buildTSM(csm, mock(Partition.class), new StorageManagerUtil()));
Map<SystemStreamPartition, Option<String>> offsets = ScalaJavaUtil.toScalaMap(
ImmutableMap.of(mock(SystemStreamPartition.class), Option.apply("1")));
// invoke checkpoint
tsm.checkpoint(CheckpointId.create(), offsets);
verify(tsm, never()).writeChangelogOffsetFiles(any(), any(), any());
fail("Should have thrown an exception if error creating store checkpoint");
}
@Test(expected = SamzaException.class)
public void testCheckpointFailsIfErrorWritingOffsetFiles() {
ContainerStorageManager csm = mock(ContainerStorageManager.class);
StorageEngine mockLPStore = mock(StorageEngine.class);
StoreProperties lpStoreProps = mock(StoreProperties.class);
when(mockLPStore.getStoreProperties()).thenReturn(lpStoreProps);
when(lpStoreProps.isPersistedToDisk()).thenReturn(true);
when(lpStoreProps.isLoggedStore()).thenReturn(true);
Path mockPath = mock(Path.class);
when(mockLPStore.checkpoint(any())).thenReturn(Optional.of(mockPath));
java.util.Map<String, StorageEngine> taskStores =
ImmutableMap.of("loggedPersistentStore", mockLPStore);
when(csm.getAllStores(any())).thenReturn(taskStores);
TransactionalStateTaskStorageManager tsm = spy(buildTSM(csm, mock(Partition.class), new StorageManagerUtil()));
doThrow(new SamzaException("Error writing offset file"))
.when(tsm).writeChangelogOffsetFiles(any(), any(), any());
Map<SystemStreamPartition, Option<String>> offsets = ScalaJavaUtil.toScalaMap(
ImmutableMap.of(mock(SystemStreamPartition.class), Option.apply("1")));
// invoke checkpoint
tsm.checkpoint(CheckpointId.create(), offsets);
fail("Should have thrown an exception if error writing offset file.");
}
@Test
public void testWriteChangelogOffsetFiles() throws IOException {
String storeName = "mockStore";
ContainerStorageManager csm = mock(ContainerStorageManager.class);
StorageEngine mockStore = mock(StorageEngine.class);
java.util.Map<String, StorageEngine> taskStores = ImmutableMap.of(storeName, mockStore);
when(csm.getAllStores(any())).thenReturn(taskStores);
Partition changelogPartition = new Partition(0);
SystemStream changelogSS = new SystemStream("system", "changelog");
SystemStreamPartition changelogSSP = new SystemStreamPartition(changelogSS, changelogPartition);
StorageManagerUtil smu = spy(new StorageManagerUtil());
File mockCurrentStoreDir = mock(File.class);
doReturn(mockCurrentStoreDir).when(smu).getTaskStoreDir(any(), eq(storeName), any(), any());
doNothing().when(smu).writeOffsetFile(eq(mockCurrentStoreDir), any(), anyBoolean());
TransactionalStateTaskStorageManager tsm = spy(buildTSM(csm, changelogPartition, smu));
String changelogNewestOffset = "1";
Map<SystemStreamPartition, Option<String>> offsets = ScalaJavaUtil.toScalaMap(
ImmutableMap.of(changelogSSP, Option.apply(changelogNewestOffset)));
Path checkpointPath = Files.createTempDirectory("store-checkpoint-test").toAbsolutePath();
Map<String, Path> checkpointPaths = ScalaJavaUtil.toScalaMap(
ImmutableMap.of(storeName, checkpointPath));
Map<String, SystemStream> storeChangelogs = ScalaJavaUtil.toScalaMap(
ImmutableMap.of(storeName, changelogSS));
// invoke method
tsm.writeChangelogOffsetFiles(checkpointPaths, storeChangelogs, offsets);
// verify that offset file was written to the checkpoint dir
java.util.Map<SystemStreamPartition, String> fileOffsets = new StorageManagerUtil()
.readOffsetFile(checkpointPath.toFile(), ImmutableSet.of(changelogSSP), false);
assertEquals(1, fileOffsets.size());
assertEquals(changelogNewestOffset, fileOffsets.get(changelogSSP));
// verify that offset file write was called on the current dir
verify(smu, times(1)).writeOffsetFile(eq(mockCurrentStoreDir), any(), anyBoolean());
}
@Test
public void testWriteChangelogOffsetFilesWithEmptyChangelogTopic() throws IOException {
String storeName = "mockStore";
ContainerStorageManager csm = mock(ContainerStorageManager.class);
StorageEngine mockStore = mock(StorageEngine.class);
java.util.Map<String, StorageEngine> taskStores = ImmutableMap.of(storeName, mockStore);
when(csm.getAllStores(any())).thenReturn(taskStores);
Partition changelogPartition = new Partition(0);
SystemStream changelogSS = new SystemStream("system", "changelog");
SystemStreamPartition changelogSSP = new SystemStreamPartition(changelogSS, changelogPartition);
StorageManagerUtil mockSMU = mock(StorageManagerUtil.class);
File mockCurrentStoreDir = mock(File.class);
when(mockSMU.getTaskStoreDir(any(), eq(storeName), any(), any())).thenReturn(mockCurrentStoreDir);
TransactionalStateTaskStorageManager tsm = spy(buildTSM(csm, changelogPartition, mockSMU));
String changelogNewestOffset = null;
Map<SystemStreamPartition, Option<String>> offsets = ScalaJavaUtil.toScalaMap(
ImmutableMap.of(changelogSSP, Option.apply(changelogNewestOffset)));
Path checkpointPath = Files.createTempDirectory("store-checkpoint-test").toAbsolutePath();
Map<String, Path> checkpointPaths = ScalaJavaUtil.toScalaMap(
ImmutableMap.of(storeName, checkpointPath));
Map<String, SystemStream> storeChangelogs = ScalaJavaUtil.toScalaMap(
ImmutableMap.of(storeName, changelogSS));
// invoke method
tsm.writeChangelogOffsetFiles(checkpointPaths, storeChangelogs, offsets);
// verify that the offset files were not written to the checkpoint dir
assertFalse(Files.exists(new File(checkpointPath.toFile(), StorageManagerUtil.OFFSET_FILE_NAME_LEGACY).toPath()));
assertFalse(Files.exists(new File(checkpointPath.toFile(), StorageManagerUtil.OFFSET_FILE_NAME_NEW).toPath()));
java.util.Map<SystemStreamPartition, String> fileOffsets = new StorageManagerUtil()
.readOffsetFile(checkpointPath.toFile(), ImmutableSet.of(changelogSSP), false);
assertEquals(0, fileOffsets.size());
// verify that delete was called on current store dir offset file
verify(mockSMU, times(1)).deleteOffsetFile(eq(mockCurrentStoreDir));
}
/**
* This should never happen with CheckpointingTaskStorageManager. #getNewestChangelogSSPOffset must
* return a key for every changelog SSP. If the SSP is empty, the value should be none. If it could
* not fetch metadata, it should throw an exception instead of skipping the SSP.
* If this contract is accidentally broken, ensure that we fail the commit
*/
@Test(expected = SamzaException.class)
public void testWriteChangelogOffsetFilesWithNoChangelogOffset() throws IOException {
String storeName = "mockStore";
ContainerStorageManager csm = mock(ContainerStorageManager.class);
StorageEngine mockStore = mock(StorageEngine.class);
java.util.Map<String, StorageEngine> taskStores = ImmutableMap.of(storeName, mockStore);
when(csm.getAllStores(any())).thenReturn(taskStores);
Partition changelogPartition = new Partition(0);
SystemStream changelogSS = new SystemStream("system", "changelog");
SystemStreamPartition changelogSSP = new SystemStreamPartition(changelogSS, changelogPartition);
TransactionalStateTaskStorageManager tsm = spy(buildTSM(csm, changelogPartition, new StorageManagerUtil()));
// no mapping present for changelog newest offset
Map<SystemStreamPartition, Option<String>> offsets = ScalaJavaUtil.toScalaMap(ImmutableMap.of());
Path checkpointPath = Files.createTempDirectory("store-checkpoint-test").toAbsolutePath();
Map<String, Path> checkpointPaths = ScalaJavaUtil.toScalaMap(
ImmutableMap.of(storeName, checkpointPath));
Map<String, SystemStream> storeChangelogs = ScalaJavaUtil.toScalaMap(
ImmutableMap.of(storeName, changelogSS));
// invoke method
tsm.writeChangelogOffsetFiles(checkpointPaths, storeChangelogs, offsets);
fail("Should have thrown an exception if no changelog offset found for checkpointed store");
}
@Test
public void testRemoveOldCheckpointsWhenBaseDirContainsRegularFiles() {
TaskName taskName = new TaskName("Partition 0");
ContainerStorageManager containerStorageManager = mock(ContainerStorageManager.class);
Map<String, SystemStream> changelogSystemStreams = mock(Map.class);
SystemAdmins systemAdmins = mock(SystemAdmins.class);
File loggedStoreBaseDir = mock(File.class);
Partition changelogPartition = new Partition(0);
TaskMode taskMode = TaskMode.Active;
StorageManagerUtil storageManagerUtil = mock(StorageManagerUtil.class);
File mockStoreDir = mock(File.class);
String mockStoreDirName = "notDirectory";
when(loggedStoreBaseDir.listFiles()).thenReturn(new File[] {mockStoreDir});
when(mockStoreDir.getName()).thenReturn(mockStoreDirName);
when(storageManagerUtil.getTaskStoreDir(eq(loggedStoreBaseDir), eq(mockStoreDirName), eq(taskName), eq(taskMode))).thenReturn(mockStoreDir);
// null here can happen if listFiles is called on a non-directory
when(mockStoreDir.listFiles(any(FileFilter.class))).thenReturn(null);
TransactionalStateTaskStorageManager tsm = new TransactionalStateTaskStorageManager(taskName, containerStorageManager,
changelogSystemStreams, systemAdmins, loggedStoreBaseDir, changelogPartition, taskMode, storageManagerUtil);
tsm.removeOldCheckpoints(CheckpointId.create());
}
private TransactionalStateTaskStorageManager buildTSM(ContainerStorageManager csm, Partition changelogPartition,
StorageManagerUtil smu) {
TaskName taskName = new TaskName("Partition 0");
Map<String, SystemStream> changelogSystemStreams = mock(Map.class);
SystemAdmins systemAdmins = mock(SystemAdmins.class);
File loggedStoreBaseDir = mock(File.class);
TaskMode taskMode = TaskMode.Active;
return new TransactionalStateTaskStorageManager(
taskName, csm, changelogSystemStreams, systemAdmins,
loggedStoreBaseDir, changelogPartition, taskMode, smu);
}
}