blob: ea00433c352ac6c3a25b370ac0492621f3f28c03 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.storage;
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.Checkpoint;
import org.apache.samza.checkpoint.CheckpointId;
import org.apache.samza.checkpoint.CheckpointManager;
import org.apache.samza.checkpoint.CheckpointV1;
import org.apache.samza.checkpoint.CheckpointV2;
import org.apache.samza.checkpoint.kafka.KafkaChangelogSSPOffset;
import org.apache.samza.config.MapConfig;
import org.apache.samza.container.TaskInstanceMetrics;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.metrics.Timer;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamPartition;
import org.junit.Test;
import org.mockito.stubbing.Answer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TestTaskStorageCommitManager {
@Test
public void testCommitManagerStart() {
CheckpointManager checkpointManager = mock(CheckpointManager.class);
TaskBackupManager taskBackupManager1 = mock(TaskBackupManager.class);
TaskBackupManager taskBackupManager2 = mock(TaskBackupManager.class);
ContainerStorageManager containerStorageManager = mock(ContainerStorageManager.class);
Checkpoint checkpoint = mock(Checkpoint.class);
TaskName taskName = new TaskName("task1");
Map<String, TaskBackupManager> backupManagers = ImmutableMap.of(
"factory1", taskBackupManager1,
"factory2", taskBackupManager2
);
TaskStorageCommitManager cm = new TaskStorageCommitManager(taskName, backupManagers, containerStorageManager,
Collections.emptyMap(), new Partition(1), checkpointManager, new MapConfig(),
ForkJoinPool.commonPool(), new StorageManagerUtil(), null, null);
when(checkpointManager.readLastCheckpoint(taskName)).thenReturn(checkpoint);
cm.init();
verify(taskBackupManager1).init(eq(checkpoint));
verify(taskBackupManager2).init(eq(checkpoint));
}
@Test
public void testCommitManagerStartNullCheckpointManager() {
TaskBackupManager taskBackupManager1 = mock(TaskBackupManager.class);
TaskBackupManager taskBackupManager2 = mock(TaskBackupManager.class);
ContainerStorageManager containerStorageManager = mock(ContainerStorageManager.class);
TaskName task = new TaskName("task1");
Map<String, TaskBackupManager> backupManagers = ImmutableMap.of(
"factory1", taskBackupManager1,
"factory2", taskBackupManager2
);
TaskStorageCommitManager cm = new TaskStorageCommitManager(task, backupManagers, containerStorageManager,
Collections.emptyMap(), new Partition(1), null, new MapConfig(),
ForkJoinPool.commonPool(), new StorageManagerUtil(), null, null);
cm.init();
verify(taskBackupManager1).init(eq(null));
verify(taskBackupManager2).init(eq(null));
}
@Test
public void testSnapshotAndCommitAllFactories() {
CheckpointManager checkpointManager = mock(CheckpointManager.class);
TaskBackupManager taskBackupManager1 = mock(TaskBackupManager.class);
TaskBackupManager taskBackupManager2 = mock(TaskBackupManager.class);
ContainerStorageManager containerStorageManager = mock(ContainerStorageManager.class);
Checkpoint checkpoint = mock(Checkpoint.class);
TaskInstanceMetrics metrics = mock(TaskInstanceMetrics.class);
Timer checkpointTimer = mock(Timer.class);
when(metrics.storeCheckpointNs()).thenReturn(checkpointTimer);
TaskName taskName = new TaskName("task1");
Map<String, TaskBackupManager> backupManagers = ImmutableMap.of(
"factory1", taskBackupManager1,
"factory2", taskBackupManager2
);
TaskStorageCommitManager cm = new TaskStorageCommitManager(taskName, backupManagers, containerStorageManager,
Collections.emptyMap(), new Partition(1), checkpointManager, new MapConfig(),
ForkJoinPool.commonPool(), new StorageManagerUtil(), null, metrics);
when(checkpointManager.readLastCheckpoint(taskName)).thenReturn(checkpoint);
cm.init();
verify(taskBackupManager1).init(eq(checkpoint));
verify(taskBackupManager2).init(eq(checkpoint));
CheckpointId newCheckpointId = CheckpointId.create();
Map<String, String> factory1Checkpoints = ImmutableMap.of(
"store1", "system;stream;1",
"store2", "system;stream;2"
);
Map<String, String> factory2Checkpoints = ImmutableMap.of(
"store1", "blobId1",
"store2", "blobId2"
);
when(containerStorageManager.getAllStores(taskName)).thenReturn(Collections.emptyMap());
when(taskBackupManager1.snapshot(newCheckpointId)).thenReturn(factory1Checkpoints);
when(taskBackupManager2.snapshot(newCheckpointId)).thenReturn(factory2Checkpoints);
when(taskBackupManager1.upload(newCheckpointId, factory1Checkpoints))
.thenReturn(CompletableFuture.completedFuture(factory1Checkpoints));
when(taskBackupManager2.upload(newCheckpointId, factory2Checkpoints))
.thenReturn(CompletableFuture.completedFuture(factory2Checkpoints));
Map<String, Map<String, String>> snapshotSCMs = cm.snapshot(newCheckpointId);
cm.upload(newCheckpointId, snapshotSCMs);
// Test flow for snapshot
verify(taskBackupManager1).snapshot(newCheckpointId);
verify(taskBackupManager2).snapshot(newCheckpointId);
// Test flow for upload
verify(taskBackupManager1).upload(newCheckpointId, factory1Checkpoints);
verify(taskBackupManager2).upload(newCheckpointId, factory2Checkpoints);
verify(checkpointTimer).update(anyLong());
}
@Test
public void testFlushAndCheckpointOnSnapshot() {
CheckpointManager checkpointManager = mock(CheckpointManager.class);
TaskBackupManager taskBackupManager1 = mock(TaskBackupManager.class);
TaskBackupManager taskBackupManager2 = mock(TaskBackupManager.class);
ContainerStorageManager containerStorageManager = mock(ContainerStorageManager.class);
Checkpoint checkpoint = mock(Checkpoint.class);
StorageEngine mockLPStore = mock(StorageEngine.class);
StoreProperties lpStoreProps = mock(StoreProperties.class);
when(mockLPStore.getStoreProperties()).thenReturn(lpStoreProps);
when(lpStoreProps.isPersistedToDisk()).thenReturn(true);
when(lpStoreProps.isDurableStore()).thenReturn(true);
Path mockPath = mock(Path.class);
when(mockLPStore.checkpoint(any())).thenReturn(Optional.of(mockPath));
StorageEngine mockPStore = mock(StorageEngine.class);
StoreProperties pStoreProps = mock(StoreProperties.class);
when(mockPStore.getStoreProperties()).thenReturn(pStoreProps);
when(pStoreProps.isPersistedToDisk()).thenReturn(true);
when(pStoreProps.isDurableStore()).thenReturn(false);
StorageEngine mockLIStore = mock(StorageEngine.class);
StoreProperties liStoreProps = mock(StoreProperties.class);
when(mockLIStore.getStoreProperties()).thenReturn(liStoreProps);
when(liStoreProps.isPersistedToDisk()).thenReturn(false);
when(liStoreProps.isDurableStore()).thenReturn(true);
StorageEngine mockIStore = mock(StorageEngine.class);
StoreProperties iStoreProps = mock(StoreProperties.class);
when(mockIStore.getStoreProperties()).thenReturn(iStoreProps);
when(iStoreProps.isPersistedToDisk()).thenReturn(false);
when(iStoreProps.isDurableStore()).thenReturn(false);
TaskName taskName = new TaskName("task1");
Map<String, TaskBackupManager> backupManagers = ImmutableMap.of(
"factory1", taskBackupManager1,
"factory2", taskBackupManager2
);
Map<String, StorageEngine> storageEngines = ImmutableMap.of(
"storeLP", mockLPStore,
"storeP", mockPStore,
"storeLI", mockLIStore,
"storeI", mockIStore
);
TaskInstanceMetrics metrics = mock(TaskInstanceMetrics.class);
Timer checkpointTimer = mock(Timer.class);
when(metrics.storeCheckpointNs()).thenReturn(checkpointTimer);
TaskStorageCommitManager cm = new TaskStorageCommitManager(taskName, backupManagers, containerStorageManager,
Collections.emptyMap(), new Partition(1), checkpointManager, new MapConfig(),
ForkJoinPool.commonPool(), new StorageManagerUtil(), null, metrics);
when(checkpointManager.readLastCheckpoint(taskName)).thenReturn(checkpoint);
cm.init();
verify(taskBackupManager1).init(eq(checkpoint));
verify(taskBackupManager2).init(eq(checkpoint));
CheckpointId newCheckpointId = CheckpointId.create();
Map<String, String> factory1Checkpoints = ImmutableMap.of(
"store1", "system;stream;1",
"store2", "system;stream;2"
);
Map<String, String> factory2Checkpoints = ImmutableMap.of(
"store1", "blobId1",
"store2", "blobId2"
);
when(containerStorageManager.getAllStores(taskName)).thenReturn(storageEngines);
when(taskBackupManager1.snapshot(newCheckpointId)).thenReturn(factory1Checkpoints);
when(taskBackupManager1.upload(newCheckpointId, factory1Checkpoints))
.thenReturn(CompletableFuture.completedFuture(factory1Checkpoints));
when(taskBackupManager2.snapshot(newCheckpointId)).thenReturn(factory2Checkpoints);
when(taskBackupManager2.upload(newCheckpointId, factory2Checkpoints))
.thenReturn(CompletableFuture.completedFuture(factory2Checkpoints));
when(mockLIStore.checkpoint(newCheckpointId)).thenReturn(Optional.empty());
cm.init();
cm.snapshot(newCheckpointId);
// Assert stores where flushed
verify(mockIStore).flush();
verify(mockPStore).flush();
verify(mockLIStore).flush();
verify(mockLPStore).flush();
// only logged and persisted stores are checkpointed
verify(mockLPStore).checkpoint(newCheckpointId);
// ensure that checkpoint is never called for non-logged persistent stores since they're
// always cleared on restart.
verify(mockPStore, never()).checkpoint(any());
// ensure that checkpoint is never called for non-persistent stores
verify(mockIStore, never()).checkpoint(any());
verify(mockLIStore, never()).checkpoint(any());
verify(checkpointTimer).update(anyLong());
}
@Test(expected = IllegalStateException.class)
public void testSnapshotFailsIfErrorCreatingCheckpoint() {
CheckpointManager checkpointManager = mock(CheckpointManager.class);
TaskBackupManager taskBackupManager1 = mock(TaskBackupManager.class);
TaskBackupManager taskBackupManager2 = mock(TaskBackupManager.class);
ContainerStorageManager containerStorageManager = mock(ContainerStorageManager.class);
StorageEngine mockLPStore = mock(StorageEngine.class);
StoreProperties lpStoreProps = mock(StoreProperties.class);
when(mockLPStore.getStoreProperties()).thenReturn(lpStoreProps);
when(lpStoreProps.isPersistedToDisk()).thenReturn(true);
when(lpStoreProps.isDurableStore()).thenReturn(true);
when(mockLPStore.checkpoint(any())).thenThrow(new IllegalStateException());
TaskInstanceMetrics metrics = mock(TaskInstanceMetrics.class);
Timer checkpointTimer = mock(Timer.class);
when(metrics.storeCheckpointNs()).thenReturn(checkpointTimer);
TaskName taskName = new TaskName("task1");
Map<String, TaskBackupManager> backupManagers = ImmutableMap.of(
"factory1", taskBackupManager1,
"factory2", taskBackupManager2
);
Map<String, StorageEngine> storageEngines = ImmutableMap.of(
"storeLP", mockLPStore
);
TaskStorageCommitManager cm = new TaskStorageCommitManager(taskName, backupManagers, containerStorageManager,
Collections.emptyMap(), new Partition(1), checkpointManager, new MapConfig(),
ForkJoinPool.commonPool(), new StorageManagerUtil(), null, metrics);
when(containerStorageManager.getAllStores(taskName)).thenReturn(storageEngines);
CheckpointId newCheckpointId = CheckpointId.create();
cm.init();
cm.snapshot(newCheckpointId);
// Assert stores where flushed
verify(mockLPStore).flush();
// only logged and persisted stores are checkpointed
verify(mockLPStore).checkpoint(newCheckpointId);
verify(taskBackupManager1, never()).snapshot(any());
verify(taskBackupManager2, never()).snapshot(any());
verify(taskBackupManager1, never()).upload(any(), any());
verify(taskBackupManager2, never()).upload(any(), any());
fail("Should have thrown an exception when the storageEngine#checkpoint did not succeed");
}
@Test
public void testCleanupAllBackupManagers() {
CheckpointManager checkpointManager = mock(CheckpointManager.class);
TaskBackupManager taskBackupManager1 = mock(TaskBackupManager.class);
TaskBackupManager taskBackupManager2 = mock(TaskBackupManager.class);
ContainerStorageManager containerStorageManager = mock(ContainerStorageManager.class);
Checkpoint checkpoint = mock(Checkpoint.class);
File durableStoreDir = mock(File.class);
when(durableStoreDir.listFiles()).thenReturn(new File[0]);
TaskInstanceMetrics metrics = mock(TaskInstanceMetrics.class);
Timer checkpointTimer = mock(Timer.class);
when(metrics.storeCheckpointNs()).thenReturn(checkpointTimer);
TaskName taskName = new TaskName("task1");
Map<String, TaskBackupManager> backupManagers = ImmutableMap.of(
"factory1", taskBackupManager1,
"factory2", taskBackupManager2
);
TaskStorageCommitManager cm = new TaskStorageCommitManager(taskName, backupManagers, containerStorageManager,
Collections.emptyMap(), new Partition(1), checkpointManager, new MapConfig(),
ForkJoinPool.commonPool(), new StorageManagerUtil(), durableStoreDir, metrics);
when(checkpointManager.readLastCheckpoint(taskName)).thenReturn(checkpoint);
when(containerStorageManager.getAllStores(taskName)).thenReturn(Collections.emptyMap());
when(taskBackupManager1.cleanUp(any(), any())).thenReturn(CompletableFuture.<Void>completedFuture(null));
when(taskBackupManager2.cleanUp(any(), any())).thenReturn(CompletableFuture.<Void>completedFuture(null));
Map<String, String> factory1Checkpoints = ImmutableMap.of(
"store1", "system;stream;1",
"store2", "system;stream;2"
);
Map<String, String> factory2Checkpoints = ImmutableMap.of(
"store1", "blobId1",
"store2", "blobId2"
);
Map<String, Map<String, String>> factoryCheckpointsMap = ImmutableMap.of(
"factory1", factory1Checkpoints,
"factory2", factory2Checkpoints
);
when(taskBackupManager1.cleanUp(any(), any())).thenReturn(CompletableFuture.completedFuture(null));
when(taskBackupManager2.cleanUp(any(), any())).thenReturn(CompletableFuture.completedFuture(null));
CheckpointId newCheckpointId = CheckpointId.create();
cm.cleanUp(newCheckpointId, factoryCheckpointsMap).join();
verify(taskBackupManager1).cleanUp(newCheckpointId, factory1Checkpoints);
verify(taskBackupManager2).cleanUp(newCheckpointId, factory2Checkpoints);
}
@Test
public void testCleanupFailsIfBackupManagerNotInitiated() {
CheckpointManager checkpointManager = mock(CheckpointManager.class);
ContainerStorageManager containerStorageManager = mock(ContainerStorageManager.class);
Checkpoint checkpoint = mock(Checkpoint.class);
File durableStoreDir = mock(File.class);
when(durableStoreDir.listFiles()).thenReturn(new File[0]);
TaskInstanceMetrics metrics = mock(TaskInstanceMetrics.class);
Timer checkpointTimer = mock(Timer.class);
when(metrics.storeCheckpointNs()).thenReturn(checkpointTimer);
TaskName taskName = new TaskName("task1");
when(containerStorageManager.getAllStores(taskName)).thenReturn(Collections.emptyMap());
TaskStorageCommitManager cm = new TaskStorageCommitManager(taskName, Collections.emptyMap(), containerStorageManager,
Collections.emptyMap(), new Partition(1), checkpointManager, new MapConfig(),
ForkJoinPool.commonPool(), new StorageManagerUtil(), durableStoreDir, metrics);
when(checkpointManager.readLastCheckpoint(taskName)).thenReturn(checkpoint);
Map<String, Map<String, String>> factoryCheckpointsMap = ImmutableMap.of(
"factory3", Collections.emptyMap() // factory 3 should be ignored
);
CheckpointId newCheckpointId = CheckpointId.create();
cm.cleanUp(newCheckpointId, factoryCheckpointsMap);
// should not fail the commit because the job should ignore any factories checkpoints not initialized
// in case the user is in a migration phase from on state backend to another
}
@Test
public void testPersistToFileSystemCheckpointV1AndV2Checkpoint() throws IOException {
ContainerStorageManager containerStorageManager = mock(ContainerStorageManager.class);
StorageEngine mockLPStore = mock(StorageEngine.class);
StoreProperties lpStoreProps = mock(StoreProperties.class);
when(mockLPStore.getStoreProperties()).thenReturn(lpStoreProps);
when(lpStoreProps.isPersistedToDisk()).thenReturn(true);
when(lpStoreProps.isDurableStore()).thenReturn(true);
Path mockPath = mock(Path.class);
when(mockLPStore.checkpoint(any())).thenReturn(Optional.of(mockPath));
StorageEngine mockPStore = mock(StorageEngine.class);
StoreProperties pStoreProps = mock(StoreProperties.class);
when(mockPStore.getStoreProperties()).thenReturn(pStoreProps);
when(pStoreProps.isPersistedToDisk()).thenReturn(true);
when(pStoreProps.isDurableStore()).thenReturn(false);
StorageEngine mockLIStore = mock(StorageEngine.class);
StoreProperties liStoreProps = mock(StoreProperties.class);
when(mockLIStore.getStoreProperties()).thenReturn(liStoreProps);
when(liStoreProps.isPersistedToDisk()).thenReturn(false);
when(liStoreProps.isDurableStore()).thenReturn(true);
StorageEngine mockIStore = mock(StorageEngine.class);
StoreProperties iStoreProps = mock(StoreProperties.class);
when(mockIStore.getStoreProperties()).thenReturn(iStoreProps);
when(iStoreProps.isPersistedToDisk()).thenReturn(false);
when(iStoreProps.isDurableStore()).thenReturn(false);
Map<String, StorageEngine> taskStores = ImmutableMap.of(
"loggedPersistentStore", mockLPStore,
"persistentStore", mockPStore,
"loggedInMemStore", mockLIStore,
"inMemStore", mockIStore
);
Partition changelogPartition = new Partition(0);
SystemStream changelogSystemStream = new SystemStream("changelogSystem", "changelogStream");
SystemStreamPartition changelogSSP = new SystemStreamPartition(changelogSystemStream, changelogPartition);
Map<String, SystemStream> storeChangelogsStreams = ImmutableMap.of(
"loggedPersistentStore", changelogSystemStream,
"loggedInMemStore", new SystemStream("system", "stream")
);
StorageManagerUtil storageManagerUtil = mock(StorageManagerUtil.class);
File durableStoreDir = new File("durableStorePath");
when(storageManagerUtil.getTaskStoreDir(eq(durableStoreDir), any(), any(), any())).thenReturn(durableStoreDir);
TaskName taskName = new TaskName("task");
TaskInstanceMetrics metrics = mock(TaskInstanceMetrics.class);
Timer checkpointTimer = mock(Timer.class);
when(metrics.storeCheckpointNs()).thenReturn(checkpointTimer);
when(containerStorageManager.getAllStores(taskName)).thenReturn(taskStores);
TaskStorageCommitManager commitManager = spy(new TaskStorageCommitManager(taskName,
Collections.emptyMap(), containerStorageManager, storeChangelogsStreams, changelogPartition,
null, null, ForkJoinPool.commonPool(), storageManagerUtil, durableStoreDir, metrics));
doNothing().when(commitManager).writeChangelogOffsetFile(any(), any(), any(), any());
when(storageManagerUtil.getStoreCheckpointDir(any(File.class), any(CheckpointId.class)))
.thenAnswer((Answer<String>) invocation -> {
File file = invocation.getArgumentAt(0, File.class);
CheckpointId checkpointId = invocation.getArgumentAt(1, CheckpointId.class);
return file + "-" + checkpointId;
});
CheckpointId newCheckpointId = CheckpointId.create();
String newestOffset = "1";
KafkaChangelogSSPOffset kafkaChangelogSSPOffset = new KafkaChangelogSSPOffset(newCheckpointId, newestOffset);
Map<SystemStreamPartition, String> offsetsJava = ImmutableMap.of(
changelogSSP, kafkaChangelogSSPOffset.toString()
);
commitManager.init();
// invoke persist to file system for v2 checkpoint
commitManager.writeCheckpointToStoreDirectories(new CheckpointV1(offsetsJava));
verify(commitManager).writeChangelogOffsetFiles(offsetsJava);
// evoked twice, for OFFSET-V1 and OFFSET-V2
verify(commitManager).writeChangelogOffsetFile(
eq("loggedPersistentStore"), eq(changelogSSP), eq(newestOffset), eq(durableStoreDir));
File checkpointFile = Paths.get(storageManagerUtil.getStoreCheckpointDir(
durableStoreDir, kafkaChangelogSSPOffset.getCheckpointId())).toFile();
verify(commitManager).writeChangelogOffsetFile(
eq("loggedPersistentStore"), eq(changelogSSP), eq(newestOffset), eq(checkpointFile));
Map<String, String> storeSCM = ImmutableMap.of(
"loggedPersistentStore", "system;loggedPersistentStoreStream;1",
"persistentStore", "system;persistentStoreStream;1",
"loggedInMemStore", "system;loggedInMemStoreStream;1",
"inMemStore", "system;inMemStoreStream;1"
);
CheckpointV2 checkpoint = new CheckpointV2(newCheckpointId, Collections.emptyMap(), Collections.singletonMap("factory", storeSCM));
// invoke persist to file system for v2 checkpoint
commitManager.writeCheckpointToStoreDirectories(checkpoint);
// Validate only durable and persisted stores are persisted
// This should be evoked twice, for checkpointV1 and checkpointV2
verify(storageManagerUtil, times(2)).getTaskStoreDir(eq(durableStoreDir), eq("loggedPersistentStore"), eq(taskName), any());
File checkpointPath = Paths.get(storageManagerUtil.getStoreCheckpointDir(durableStoreDir, newCheckpointId)).toFile();
verify(storageManagerUtil).writeCheckpointV2File(eq(checkpointPath), eq(checkpoint));
}
@Test
public void testPersistToFileSystemCheckpointV2Only() throws IOException {
ContainerStorageManager containerStorageManager = mock(ContainerStorageManager.class);
StorageEngine mockLPStore = mock(StorageEngine.class);
StoreProperties lpStoreProps = mock(StoreProperties.class);
when(mockLPStore.getStoreProperties()).thenReturn(lpStoreProps);
when(lpStoreProps.isPersistedToDisk()).thenReturn(true);
when(lpStoreProps.isDurableStore()).thenReturn(true);
Path mockPath = mock(Path.class);
when(mockLPStore.checkpoint(any())).thenReturn(Optional.of(mockPath));
StorageEngine mockPStore = mock(StorageEngine.class);
StoreProperties pStoreProps = mock(StoreProperties.class);
when(mockPStore.getStoreProperties()).thenReturn(pStoreProps);
when(pStoreProps.isPersistedToDisk()).thenReturn(true);
when(pStoreProps.isDurableStore()).thenReturn(false);
StorageEngine mockLIStore = mock(StorageEngine.class);
StoreProperties liStoreProps = mock(StoreProperties.class);
when(mockLIStore.getStoreProperties()).thenReturn(liStoreProps);
when(liStoreProps.isPersistedToDisk()).thenReturn(false);
when(liStoreProps.isDurableStore()).thenReturn(true);
StorageEngine mockIStore = mock(StorageEngine.class);
StoreProperties iStoreProps = mock(StoreProperties.class);
when(mockIStore.getStoreProperties()).thenReturn(iStoreProps);
when(iStoreProps.isPersistedToDisk()).thenReturn(false);
when(iStoreProps.isDurableStore()).thenReturn(false);
java.util.Map<String, StorageEngine> taskStores = ImmutableMap.of(
"loggedPersistentStore", mockLPStore,
"persistentStore", mockPStore,
"loggedInMemStore", mockLIStore,
"inMemStore", mockIStore
);
Partition changelogPartition = new Partition(0);
SystemStream changelogSystemStream = new SystemStream("changelogSystem", "changelogStream");
SystemStreamPartition changelogSSP = new SystemStreamPartition(changelogSystemStream, changelogPartition);
java.util.Map<String, SystemStream> storeChangelogsStreams = ImmutableMap.of(
"loggedPersistentStore", changelogSystemStream,
"loggedInMemStore", new SystemStream("system", "stream")
);
StorageManagerUtil storageManagerUtil = mock(StorageManagerUtil.class);
File durableStoreDir = new File("durableStorePath");
when(storageManagerUtil.getTaskStoreDir(eq(durableStoreDir), eq("loggedPersistentStore"), any(), any()))
.thenReturn(durableStoreDir);
TaskName taskName = new TaskName("task");
TaskInstanceMetrics metrics = mock(TaskInstanceMetrics.class);
Timer checkpointTimer = mock(Timer.class);
when(metrics.storeCheckpointNs()).thenReturn(checkpointTimer);
when(containerStorageManager.getAllStores(taskName)).thenReturn(taskStores);
TaskStorageCommitManager commitManager = spy(new TaskStorageCommitManager(taskName,
Collections.emptyMap(), containerStorageManager, storeChangelogsStreams, changelogPartition,
null, null, ForkJoinPool.commonPool(), storageManagerUtil, durableStoreDir, metrics));
doNothing().when(commitManager).writeChangelogOffsetFile(any(), any(), any(), any());
when(storageManagerUtil.getStoreCheckpointDir(any(File.class), any(CheckpointId.class)))
.thenAnswer((Answer<String>) invocation -> {
File file = invocation.getArgumentAt(0, File.class);
CheckpointId checkpointId = invocation.getArgumentAt(1, CheckpointId.class);
return file + "-" + checkpointId;
});
CheckpointId newCheckpointId = CheckpointId.create();
java.util.Map<String, String> storeSCM = ImmutableMap.of(
"loggedPersistentStore", "system;loggedPersistentStoreStream;1",
"persistentStore", "system;persistentStoreStream;1",
"loggedInMemStore", "system;loggedInMemStoreStream;1",
"inMemStore", "system;inMemStoreStream;1"
);
CheckpointV2 checkpoint = new CheckpointV2(newCheckpointId, Collections.emptyMap(), Collections.singletonMap("factory", storeSCM));
commitManager.init();
// invoke persist to file system
commitManager.writeCheckpointToStoreDirectories(checkpoint);
// Validate only durable and persisted stores are persisted
verify(storageManagerUtil).getTaskStoreDir(eq(durableStoreDir), eq("loggedPersistentStore"), eq(taskName), any());
File checkpointPath = Paths.get(storageManagerUtil.getStoreCheckpointDir(durableStoreDir, newCheckpointId)).toFile();
verify(storageManagerUtil).writeCheckpointV2File(eq(checkpointPath), eq(checkpoint));
}
@Test
public void testWriteChangelogOffsetFilesV1() throws IOException {
Map<String, Map<SystemStreamPartition, String>> mockFileSystem = new HashMap<>();
ContainerStorageManager containerStorageManager = mock(ContainerStorageManager.class);
StorageEngine mockLPStore = mock(StorageEngine.class);
StoreProperties lpStoreProps = mock(StoreProperties.class);
when(mockLPStore.getStoreProperties()).thenReturn(lpStoreProps);
when(lpStoreProps.isPersistedToDisk()).thenReturn(true);
when(lpStoreProps.isDurableStore()).thenReturn(true);
Path mockPath = mock(Path.class);
when(mockLPStore.checkpoint(any())).thenReturn(Optional.of(mockPath));
TaskInstanceMetrics metrics = mock(TaskInstanceMetrics.class);
Timer checkpointTimer = mock(Timer.class);
when(metrics.storeCheckpointNs()).thenReturn(checkpointTimer);
java.util.Map<String, StorageEngine> taskStores = ImmutableMap.of("loggedPersistentStore", mockLPStore);
Partition changelogPartition = new Partition(0);
SystemStream changelogSystemStream = new SystemStream("changelogSystem", "changelogStream");
SystemStreamPartition changelogSSP = new SystemStreamPartition(changelogSystemStream, changelogPartition);
java.util.Map<String, SystemStream> storeChangelogsStreams = ImmutableMap.of("loggedPersistentStore", changelogSystemStream);
StorageManagerUtil storageManagerUtil = mock(StorageManagerUtil.class);
File tmpTestPath = new File("store-checkpoint-test");
when(storageManagerUtil.getTaskStoreDir(eq(tmpTestPath), eq("loggedPersistentStore"), any(), any())).thenReturn(tmpTestPath);
TaskName taskName = new TaskName("task");
when(containerStorageManager.getAllStores(taskName)).thenReturn(taskStores);
TaskStorageCommitManager commitManager = spy(new TaskStorageCommitManager(taskName,
Collections.emptyMap(), containerStorageManager, storeChangelogsStreams, changelogPartition,
null, null, ForkJoinPool.commonPool(), storageManagerUtil, tmpTestPath, metrics));
when(storageManagerUtil.getStoreCheckpointDir(any(File.class), any(CheckpointId.class)))
.thenAnswer((Answer<String>) invocation -> {
File file = invocation.getArgumentAt(0, File.class);
CheckpointId checkpointId = invocation.getArgumentAt(1, CheckpointId.class);
return file + "-" + checkpointId;
});
doAnswer(invocation -> {
String fileDir = invocation.getArgumentAt(3, File.class).getName();
SystemStreamPartition ssp = invocation.getArgumentAt(1, SystemStreamPartition.class);
String offset = invocation.getArgumentAt(2, String.class);
if (mockFileSystem.containsKey(fileDir)) {
mockFileSystem.get(fileDir).put(ssp, offset);
} else {
Map<SystemStreamPartition, String> sspOffsets = new HashMap<>();
sspOffsets.put(ssp, offset);
mockFileSystem.put(fileDir, sspOffsets);
}
return null;
}).when(commitManager).writeChangelogOffsetFile(any(), any(), any(), any());
CheckpointId newCheckpointId = CheckpointId.create();
String newestOffset = "1";
KafkaChangelogSSPOffset kafkaChangelogSSPOffset = new KafkaChangelogSSPOffset(newCheckpointId, newestOffset);
java.util.Map<SystemStreamPartition, String> offsetsJava = ImmutableMap.of(
changelogSSP, kafkaChangelogSSPOffset.toString()
);
commitManager.init();
// invoke persist to file system for v2 checkpoint
commitManager.writeCheckpointToStoreDirectories(new CheckpointV1(offsetsJava));
assertEquals(2, mockFileSystem.size());
// check if v2 offsets are written correctly
String v2FilePath = storageManagerUtil.getStoreCheckpointDir(tmpTestPath, newCheckpointId);
assertTrue(mockFileSystem.containsKey(v2FilePath));
assertTrue(mockFileSystem.get(v2FilePath).containsKey(changelogSSP));
assertEquals(1, mockFileSystem.get(v2FilePath).size());
assertEquals(newestOffset, mockFileSystem.get(v2FilePath).get(changelogSSP));
// check if v1 offsets are written correctly
String v1FilePath = tmpTestPath.getPath();
assertTrue(mockFileSystem.containsKey(v1FilePath));
assertTrue(mockFileSystem.get(v1FilePath).containsKey(changelogSSP));
assertEquals(1, mockFileSystem.get(v1FilePath).size());
assertEquals(newestOffset, mockFileSystem.get(v1FilePath).get(changelogSSP));
}
@Test
public void testWriteChangelogOffsetFilesV2andV1() throws IOException {
Map<String, Map<SystemStreamPartition, String>> mockFileSystem = new HashMap<>();
ContainerStorageManager containerStorageManager = mock(ContainerStorageManager.class);
Map<String, CheckpointV2> mockCheckpointFileSystem = new HashMap<>();
StorageEngine mockLPStore = mock(StorageEngine.class);
StoreProperties lpStoreProps = mock(StoreProperties.class);
when(mockLPStore.getStoreProperties()).thenReturn(lpStoreProps);
when(lpStoreProps.isPersistedToDisk()).thenReturn(true);
when(lpStoreProps.isDurableStore()).thenReturn(true);
Path mockPath = mock(Path.class);
when(mockLPStore.checkpoint(any())).thenReturn(Optional.of(mockPath));
TaskInstanceMetrics metrics = mock(TaskInstanceMetrics.class);
Timer checkpointTimer = mock(Timer.class);
when(metrics.storeCheckpointNs()).thenReturn(checkpointTimer);
java.util.Map<String, StorageEngine> taskStores = ImmutableMap.of("loggedPersistentStore", mockLPStore);
Partition changelogPartition = new Partition(0);
SystemStream changelogSystemStream = new SystemStream("changelogSystem", "changelogStream");
SystemStreamPartition changelogSSP = new SystemStreamPartition(changelogSystemStream, changelogPartition);
java.util.Map<String, SystemStream> storeChangelogsStreams = ImmutableMap.of("loggedPersistentStore", changelogSystemStream);
StorageManagerUtil storageManagerUtil = mock(StorageManagerUtil.class);
File tmpTestPath = new File("store-checkpoint-test");
when(storageManagerUtil.getTaskStoreDir(eq(tmpTestPath), eq("loggedPersistentStore"), any(), any())).thenReturn(tmpTestPath);
TaskName taskName = new TaskName("task");
when(containerStorageManager.getAllStores(taskName)).thenReturn(taskStores);
TaskStorageCommitManager commitManager = spy(new TaskStorageCommitManager(taskName,
Collections.emptyMap(), containerStorageManager, storeChangelogsStreams, changelogPartition,
null, null, ForkJoinPool.commonPool(), storageManagerUtil, tmpTestPath, metrics));
doAnswer(invocation -> {
String fileDir = invocation.getArgumentAt(3, File.class).getName();
SystemStreamPartition ssp = invocation.getArgumentAt(1, SystemStreamPartition.class);
String offset = invocation.getArgumentAt(2, String.class);
if (mockFileSystem.containsKey(fileDir)) {
mockFileSystem.get(fileDir).put(ssp, offset);
} else {
Map<SystemStreamPartition, String> sspOffsets = new HashMap<>();
sspOffsets.put(ssp, offset);
mockFileSystem.put(fileDir, sspOffsets);
}
return null;
}).when(commitManager).writeChangelogOffsetFile(any(), any(), any(), any());
doAnswer(invocation -> {
String storeDir = invocation.getArgumentAt(0, File.class).getName();
CheckpointV2 checkpointV2 = invocation.getArgumentAt(1, CheckpointV2.class);
mockCheckpointFileSystem.put(storeDir, checkpointV2);
return null;
}).when(storageManagerUtil).writeCheckpointV2File(any(), any());
when(storageManagerUtil.getStoreCheckpointDir(any(File.class), any(CheckpointId.class)))
.thenAnswer((Answer<String>) invocation -> {
File file = invocation.getArgumentAt(0, File.class);
CheckpointId checkpointId = invocation.getArgumentAt(1, CheckpointId.class);
return file + "-" + checkpointId;
});
CheckpointId newCheckpointId = CheckpointId.create();
String newestOffset = "1";
KafkaChangelogSSPOffset kafkaChangelogSSPOffset = new KafkaChangelogSSPOffset(newCheckpointId, newestOffset);
java.util.Map<SystemStreamPartition, String> offsetsJava = ImmutableMap.of(
changelogSSP, kafkaChangelogSSPOffset.toString()
);
commitManager.init();
// invoke persist to file system for v1 checkpoint
commitManager.writeCheckpointToStoreDirectories(new CheckpointV1(offsetsJava));
assertEquals(2, mockFileSystem.size());
// check if v2 offsets are written correctly
String v2FilePath = storageManagerUtil.getStoreCheckpointDir(tmpTestPath, newCheckpointId);
assertTrue(mockFileSystem.containsKey(v2FilePath));
assertTrue(mockFileSystem.get(v2FilePath).containsKey(changelogSSP));
assertEquals(1, mockFileSystem.get(v2FilePath).size());
assertEquals(newestOffset, mockFileSystem.get(v2FilePath).get(changelogSSP));
// check if v1 offsets are written correctly
String v1FilePath = tmpTestPath.getPath();
assertTrue(mockFileSystem.containsKey(v1FilePath));
assertTrue(mockFileSystem.get(v1FilePath).containsKey(changelogSSP));
assertEquals(1, mockFileSystem.get(v1FilePath).size());
assertEquals(newestOffset, mockFileSystem.get(v1FilePath).get(changelogSSP));
java.util.Map<String, String> storeSCM = ImmutableMap.of(
"loggedPersistentStore", "system;loggedPersistentStoreStream;1",
"persistentStore", "system;persistentStoreStream;1",
"loggedInMemStore", "system;loggedInMemStoreStream;1",
"inMemStore", "system;inMemStoreStream;1"
);
CheckpointV2 checkpoint = new CheckpointV2(newCheckpointId, Collections.emptyMap(), Collections.singletonMap("factory", storeSCM));
// invoke persist to file system with checkpoint v2
commitManager.writeCheckpointToStoreDirectories(checkpoint);
assertTrue(mockCheckpointFileSystem.containsKey(v2FilePath));
assertEquals(checkpoint, mockCheckpointFileSystem.get(v2FilePath));
assertTrue(mockCheckpointFileSystem.containsKey(v1FilePath));
assertEquals(checkpoint, mockCheckpointFileSystem.get(v1FilePath));
assertEquals(2, mockCheckpointFileSystem.size());
CheckpointV2 updatedCheckpoint = new CheckpointV2(
newCheckpointId, ImmutableMap.of(
new SystemStreamPartition("inputSystem", "inputStream", changelogPartition), "5"),
Collections.singletonMap("factory", storeSCM));
commitManager.writeCheckpointToStoreDirectories(updatedCheckpoint);
assertEquals(updatedCheckpoint, mockCheckpointFileSystem.get(v2FilePath));
assertEquals(updatedCheckpoint, mockCheckpointFileSystem.get(v1FilePath));
assertEquals(2, mockCheckpointFileSystem.size());
}
@Test
public void testWriteChangelogOffsetFilesWithEmptyChangelogTopic() throws IOException {
Map<String, Map<SystemStreamPartition, String>> mockFileSystem = new HashMap<>();
ContainerStorageManager containerStorageManager = mock(ContainerStorageManager.class);
StorageEngine mockLPStore = mock(StorageEngine.class);
StoreProperties lpStoreProps = mock(StoreProperties.class);
when(mockLPStore.getStoreProperties()).thenReturn(lpStoreProps);
when(lpStoreProps.isPersistedToDisk()).thenReturn(true);
when(lpStoreProps.isDurableStore()).thenReturn(true);
Path mockPath = mock(Path.class);
when(mockLPStore.checkpoint(any())).thenReturn(Optional.of(mockPath));
TaskInstanceMetrics metrics = mock(TaskInstanceMetrics.class);
Timer checkpointTimer = mock(Timer.class);
when(metrics.storeCheckpointNs()).thenReturn(checkpointTimer);
java.util.Map<String, StorageEngine> taskStores = ImmutableMap.of("loggedPersistentStore", mockLPStore);
Partition changelogPartition = new Partition(0);
SystemStream changelogSystemStream = new SystemStream("changelogSystem", "changelogStream");
SystemStreamPartition changelogSSP = new SystemStreamPartition(changelogSystemStream, changelogPartition);
java.util.Map<String, SystemStream> storeChangelogsStreams = ImmutableMap.of("loggedPersistentStore", changelogSystemStream);
StorageManagerUtil storageManagerUtil = mock(StorageManagerUtil.class);
File tmpTestPath = new File("store-checkpoint-test");
when(storageManagerUtil.getTaskStoreDir(eq(tmpTestPath), any(), any(), any())).thenReturn(tmpTestPath);
TaskName taskName = new TaskName("task");
when(containerStorageManager.getAllStores(taskName)).thenReturn(taskStores);
TaskStorageCommitManager commitManager = spy(new TaskStorageCommitManager(taskName,
Collections.emptyMap(), containerStorageManager, storeChangelogsStreams, changelogPartition,
null, null, ForkJoinPool.commonPool(), storageManagerUtil, tmpTestPath, metrics));
doAnswer(invocation -> {
String storeName = invocation.getArgumentAt(0, String.class);
String fileDir = invocation.getArgumentAt(3, File.class).getName();
String mockKey = storeName + fileDir;
SystemStreamPartition ssp = invocation.getArgumentAt(1, SystemStreamPartition.class);
String offset = invocation.getArgumentAt(2, String.class);
if (mockFileSystem.containsKey(mockKey)) {
mockFileSystem.get(mockKey).put(ssp, offset);
} else {
Map<SystemStreamPartition, String> sspOffsets = new HashMap<>();
sspOffsets.put(ssp, offset);
mockFileSystem.put(mockKey, sspOffsets);
}
return null;
}).when(commitManager).writeChangelogOffsetFile(any(), any(), any(), any());
CheckpointId newCheckpointId = CheckpointId.create();
String newestOffset = null;
KafkaChangelogSSPOffset kafkaChangelogSSPOffset = new KafkaChangelogSSPOffset(newCheckpointId, newestOffset);
java.util.Map<SystemStreamPartition, String> offsetsJava = ImmutableMap.of(
changelogSSP, kafkaChangelogSSPOffset.toString()
);
commitManager.init();
// invoke persist to file system for v2 checkpoint
commitManager.writeCheckpointToStoreDirectories(new CheckpointV1(offsetsJava));
assertTrue(mockFileSystem.isEmpty());
// verify that delete was called on current store dir offset file
verify(storageManagerUtil, times(1)).deleteOffsetFile(eq(tmpTestPath));
}
@Test(expected = SamzaException.class)
public void testThrowOnWriteCheckpointDirIfUnsuccessful() {
ContainerStorageManager containerStorageManager = mock(ContainerStorageManager.class);
StorageEngine mockLPStore = mock(StorageEngine.class);
StoreProperties lpStoreProps = mock(StoreProperties.class);
when(mockLPStore.getStoreProperties()).thenReturn(lpStoreProps);
when(lpStoreProps.isPersistedToDisk()).thenReturn(true);
when(lpStoreProps.isDurableStore()).thenReturn(true);
Path mockPath = mock(Path.class);
when(mockLPStore.checkpoint(any())).thenReturn(Optional.of(mockPath));
TaskInstanceMetrics metrics = mock(TaskInstanceMetrics.class);
Timer checkpointTimer = mock(Timer.class);
when(metrics.storeCheckpointNs()).thenReturn(checkpointTimer);
java.util.Map<String, StorageEngine> taskStores = ImmutableMap.of("loggedPersistentStore", mockLPStore);
java.util.Map<String, SystemStream> storeChangelogsStreams = ImmutableMap.of("loggedPersistentStore", mock(SystemStream.class));
StorageManagerUtil storageManagerUtil = mock(StorageManagerUtil.class);
File tmpTestPath = new File("store-checkpoint-test");
when(storageManagerUtil.getTaskStoreDir(eq(tmpTestPath), eq("loggedPersistentStore"), any(), any())).thenReturn(tmpTestPath);
TaskName taskName = new TaskName("task");
TaskStorageCommitManager commitManager = spy(new TaskStorageCommitManager(taskName,
Collections.emptyMap(), containerStorageManager, storeChangelogsStreams, mock(Partition.class),
null, null, ForkJoinPool.commonPool(), storageManagerUtil, tmpTestPath, metrics));
java.util.Map<String, String> storeSCM = ImmutableMap.of(
"loggedPersistentStore", "system;loggedPersistentStoreStream;1",
"persistentStore", "system;persistentStoreStream;1",
"loggedInMemStore", "system;loggedInMemStoreStream;1",
"inMemStore", "system;inMemStoreStream;1"
);
when(containerStorageManager.getAllStores(taskName)).thenReturn(taskStores);
CheckpointV2 checkpoint = new CheckpointV2(CheckpointId.create(), Collections.emptyMap(), Collections.singletonMap("factory", storeSCM));
doThrow(IOException.class).when(storageManagerUtil).writeCheckpointV2File(eq(tmpTestPath), eq(checkpoint));
commitManager.init();
// Should throw samza exception since writeCheckpointV2 failed
commitManager.writeCheckpointToStoreDirectories(checkpoint);
}
@Test
public void testRemoveOldCheckpointsWhenBaseDirContainsRegularFiles() {
ContainerStorageManager containerStorageManager = mock(ContainerStorageManager.class);
CheckpointManager checkpointManager = mock(CheckpointManager.class);
TaskBackupManager taskBackupManager1 = mock(TaskBackupManager.class);
TaskBackupManager taskBackupManager2 = mock(TaskBackupManager.class);
File durableStoreDir = mock(File.class);
TaskInstanceMetrics metrics = mock(TaskInstanceMetrics.class);
Timer checkpointTimer = mock(Timer.class);
when(metrics.storeCheckpointNs()).thenReturn(checkpointTimer);
StorageManagerUtil storageManagerUtil = mock(StorageManagerUtil.class);
TaskName taskName = new TaskName("task1");
Map<String, TaskBackupManager> backupManagers = ImmutableMap.of(
"factory1", taskBackupManager1,
"factory2", taskBackupManager2
);
when(containerStorageManager.getAllStores(taskName)).thenReturn(Collections.emptyMap());
TaskStorageCommitManager cm = new TaskStorageCommitManager(taskName, backupManagers, containerStorageManager,
Collections.emptyMap(), new Partition(1), checkpointManager, new MapConfig(),
ForkJoinPool.commonPool(), storageManagerUtil, durableStoreDir, metrics);
File mockStoreDir = mock(File.class);
String mockStoreDirName = "notDirectory";
when(durableStoreDir.listFiles()).thenReturn(new File[] {mockStoreDir});
when(mockStoreDir.getName()).thenReturn(mockStoreDirName);
when(storageManagerUtil.getTaskStoreDir(eq(durableStoreDir), eq(mockStoreDirName), eq(taskName), eq(TaskMode.Active))).thenReturn(mockStoreDir);
// null here can happen if listFiles is called on a non-directory
when(mockStoreDir.listFiles(any(FileFilter.class))).thenReturn(null);
cm.cleanUp(CheckpointId.create(), new HashMap<>()).join();
verify(durableStoreDir).listFiles();
verify(mockStoreDir).listFiles(any(FileFilter.class));
verify(storageManagerUtil).getTaskStoreDir(eq(durableStoreDir), eq(mockStoreDirName), eq(taskName), eq(TaskMode.Active));
}
}