blob: fc7d21ee79b11f5dcd0685ee3265f4cd7f1f53ab [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.storage;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.checkpoint.Checkpoint;
import org.apache.samza.checkpoint.CheckpointId;
import org.apache.samza.checkpoint.CheckpointManager;
import org.apache.samza.checkpoint.CheckpointV1;
import org.apache.samza.checkpoint.CheckpointV2;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.SamzaContainerMetrics;
import org.apache.samza.container.TaskInstance;
import org.apache.samza.container.TaskInstanceMetrics;
import org.apache.samza.container.TaskName;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.JobContext;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.StringSerdeFactory;
import org.apache.samza.storage.blobstore.BlobStoreManager;
import org.apache.samza.storage.blobstore.BlobStoreManagerFactory;
import org.apache.samza.storage.blobstore.BlobStoreRestoreManager;
import org.apache.samza.storage.blobstore.BlobStoreStateBackendFactory;
import org.apache.samza.storage.blobstore.Metadata;
import org.apache.samza.storage.blobstore.exceptions.DeletedException;
import org.apache.samza.storage.blobstore.index.DirIndex;
import org.apache.samza.storage.blobstore.index.SnapshotIndex;
import org.apache.samza.storage.blobstore.index.SnapshotMetadata;
import org.apache.samza.storage.blobstore.index.serde.SnapshotIndexSerde;
import org.apache.samza.system.SSPMetadataCache;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.task.TaskInstanceCollector;
import org.apache.samza.util.ReflectionUtil;
import org.apache.samza.util.SystemClock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import scala.collection.JavaConverters;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
@RunWith(PowerMockRunner.class)
@PrepareForTest({ReflectionUtil.class, ContainerStorageManagerRestoreUtil.class})
public class TestContainerStorageManager {
private static final String STORE_NAME = "store";
private static final String SYSTEM_NAME = "kafka";
private static final String STREAM_NAME = "store-stream";
private static final File DEFAULT_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + "store");
private static final File
DEFAULT_LOGGED_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + "loggedStore");
private ContainerStorageManager containerStorageManager;
private Map<TaskName, Gauge<Object>> taskRestoreMetricGauges;
private Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics;
private SamzaContainerMetrics samzaContainerMetrics;
private Map<TaskName, TaskModel> tasks;
private StandbyTestContext testContext;
private volatile int systemConsumerCreationCount;
private volatile int systemConsumerStartCount;
private volatile int systemConsumerStopCount;
private volatile int storeRestoreCallCount;
/**
* Utility method for creating a mocked taskInstance and taskStorageManager and adding it to the map.
* @param taskname the desired taskname.
*/
private void addMockedTask(String taskname, int changelogPartition) {
TaskInstance mockTaskInstance = mock(TaskInstance.class);
doAnswer(invocation -> {
return new TaskName(taskname);
}).when(mockTaskInstance).taskName();
Gauge testGauge = mock(Gauge.class);
this.tasks.put(new TaskName(taskname),
new TaskModel(new TaskName(taskname), new HashSet<>(), new Partition(changelogPartition)));
this.taskRestoreMetricGauges.put(new TaskName(taskname), testGauge);
this.taskInstanceMetrics.put(new TaskName(taskname), mock(TaskInstanceMetrics.class));
}
/**
* Method to create a containerStorageManager with mocked dependencies
*/
@Before
public void setUp() throws InterruptedException {
taskRestoreMetricGauges = new HashMap<>();
this.tasks = new HashMap<>();
this.taskInstanceMetrics = new HashMap<>();
// Add two mocked tasks
addMockedTask("task 0", 0);
addMockedTask("task 1", 1);
// Mock container metrics
samzaContainerMetrics = mock(SamzaContainerMetrics.class);
when(samzaContainerMetrics.taskStoreRestorationMetrics()).thenReturn(taskRestoreMetricGauges);
// Create a map of test changeLogSSPs
Map<String, SystemStream> changelogSystemStreams = new HashMap<>();
changelogSystemStreams.put(STORE_NAME, new SystemStream(SYSTEM_NAME, STREAM_NAME));
// Create mocked storage engine factories
Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories = new HashMap<>();
StorageEngineFactory mockStorageEngineFactory =
(StorageEngineFactory<Object, Object>) mock(StorageEngineFactory.class);
StorageEngine mockStorageEngine = mock(StorageEngine.class);
when(mockStorageEngine.getStoreProperties())
.thenReturn(new StoreProperties.StorePropertiesBuilder().setLoggedStore(true).setPersistedToDisk(true).build());
doAnswer(invocation -> {
return mockStorageEngine;
}).when(mockStorageEngineFactory).getStorageEngine(anyString(), any(), any(), any(), any(),
any(), any(), any(), any(), any());
storageEngineFactories.put(STORE_NAME, mockStorageEngineFactory);
// Add instrumentation to mocked storage engine, to record the number of store.restore() calls
doAnswer(invocation -> {
storeRestoreCallCount++;
return CompletableFuture.completedFuture(null);
}).when(mockStorageEngine).restore(any());
// Set the mocked stores' properties to be persistent
doAnswer(invocation -> {
return new StoreProperties.StorePropertiesBuilder().setLoggedStore(true).build();
}).when(mockStorageEngine).getStoreProperties();
// Mock and setup sysconsumers
SystemConsumer mockSystemConsumer = mock(SystemConsumer.class);
doAnswer(invocation -> {
systemConsumerStartCount++;
return null;
}).when(mockSystemConsumer).start();
doAnswer(invocation -> {
systemConsumerStopCount++;
return null;
}).when(mockSystemConsumer).stop();
// Create mocked system factories
Map<String, SystemFactory> systemFactories = new HashMap<>();
// Count the number of sysConsumers created
SystemFactory mockSystemFactory = mock(SystemFactory.class);
doAnswer(invocation -> {
this.systemConsumerCreationCount++;
return mockSystemConsumer;
}).when(mockSystemFactory).getConsumer(anyString(), any(), any());
systemFactories.put(SYSTEM_NAME, mockSystemFactory);
// Create mocked configs for specifying serdes
Map<String, String> configMap = new HashMap<>();
configMap.put("stores." + STORE_NAME + ".key.serde", "stringserde");
configMap.put("stores." + STORE_NAME + ".msg.serde", "stringserde");
configMap.put("stores." + STORE_NAME + ".factory", mockStorageEngineFactory.getClass().getName());
configMap.put("stores." + STORE_NAME + ".changelog", SYSTEM_NAME + "." + STREAM_NAME);
configMap.put("serializers.registry.stringserde.class", StringSerdeFactory.class.getName());
configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
Config config = new MapConfig(configMap);
Map<String, Serde<Object>> serdes = new HashMap<>();
serdes.put("stringserde", mock(Serde.class));
// Create mocked system admins
SystemAdmin mockSystemAdmin = mock(SystemAdmin.class);
doAnswer(new Answer<Void>() {
public Void answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
System.out.println("called with arguments: " + Arrays.toString(args));
return null;
}
}).when(mockSystemAdmin).validateStream(any());
SystemAdmins mockSystemAdmins = mock(SystemAdmins.class);
when(mockSystemAdmins.getSystemAdmin("kafka")).thenReturn(mockSystemAdmin);
// Create a mocked mockStreamMetadataCache
SystemStreamMetadata.SystemStreamPartitionMetadata sspMetadata =
new SystemStreamMetadata.SystemStreamPartitionMetadata("0", "50", "51");
Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new HashMap<>();
partitionMetadata.put(new Partition(0), sspMetadata);
partitionMetadata.put(new Partition(1), sspMetadata);
SystemStreamMetadata systemStreamMetadata = new SystemStreamMetadata(STREAM_NAME, partitionMetadata);
StreamMetadataCache mockStreamMetadataCache = mock(StreamMetadataCache.class);
when(mockStreamMetadataCache.
getStreamMetadata(JavaConverters.
asScalaSetConverter(new HashSet<SystemStream>(changelogSystemStreams.values())).asScala().toSet(), false))
.thenReturn(
new scala.collection.immutable.Map.Map1(new SystemStream(SYSTEM_NAME, STREAM_NAME), systemStreamMetadata));
CheckpointManager checkpointManager = mock(CheckpointManager.class);
when(checkpointManager.readLastCheckpoint(any(TaskName.class))).thenReturn(new CheckpointV1(new HashMap<>()));
SSPMetadataCache mockSSPMetadataCache = mock(SSPMetadataCache.class);
when(mockSSPMetadataCache.getMetadata(any(SystemStreamPartition.class)))
.thenReturn(new SystemStreamMetadata.SystemStreamPartitionMetadata("0", "10", "11"));
ContainerContext mockContainerContext = mock(ContainerContext.class);
ContainerModel mockContainerModel = new ContainerModel("samza-container-test", tasks);
when(mockContainerContext.getContainerModel()).thenReturn(mockContainerModel);
// Reset the expected number of sysConsumer create, start and stop calls, and store.restore() calls
this.systemConsumerCreationCount = 0;
this.systemConsumerStartCount = 0;
this.systemConsumerStopCount = 0;
this.storeRestoreCallCount = 0;
StateBackendFactory backendFactory = mock(StateBackendFactory.class);
TaskRestoreManager restoreManager = mock(TaskRestoreManager.class);
ArgumentCaptor<ExecutorService> restoreExecutorCaptor = ArgumentCaptor.forClass(ExecutorService.class);
when(backendFactory.getRestoreManager(any(), any(), any(), restoreExecutorCaptor.capture(), any(), any(), any(), any(), any(), any(), any()))
.thenReturn(restoreManager);
doAnswer(invocation -> {
storeRestoreCallCount++;
return CompletableFuture.completedFuture(null);
}).when(restoreManager).restore();
Map<TaskName, TaskInstanceCollector> taskInstanceCollectors = new HashMap<>();
tasks.keySet().forEach(taskName -> taskInstanceCollectors.put(taskName, mock(TaskInstanceCollector.class)));
// Create the container storage manager
this.containerStorageManager = new ContainerStorageManager(
checkpointManager,
mockContainerModel,
mockStreamMetadataCache,
mockSystemAdmins,
changelogSystemStreams,
new HashMap<>(),
storageEngineFactories,
systemFactories,
serdes,
config,
taskInstanceMetrics,
samzaContainerMetrics,
mock(JobContext.class),
mockContainerContext,
ImmutableMap.of(StorageConfig.KAFKA_STATE_BACKEND_FACTORY, backendFactory),
taskInstanceCollectors,
DEFAULT_LOGGED_STORE_BASE_DIR,
DEFAULT_STORE_BASE_DIR,
null,
SystemClock.instance());
this.testContext = new StandbyTestContext();
}
@Test
public void testParallelismAndMetrics() throws InterruptedException {
this.containerStorageManager.start();
this.containerStorageManager.shutdown();
for (Gauge gauge : taskRestoreMetricGauges.values()) {
Assert.assertTrue("Restoration time gauge value should be invoked atleast once",
mockingDetails(gauge).getInvocations().size() >= 1);
}
Assert.assertEquals("Store restore count should be 2 because there are 2 tasks", 2, this.storeRestoreCallCount);
Assert.assertEquals("systemConsumerCreation count should be 1 (1 consumer per system)", 1,
this.systemConsumerCreationCount);
Assert.assertEquals("systemConsumerStopCount count should be 1", 1, this.systemConsumerStopCount);
Assert.assertEquals("systemConsumerStartCount count should be 1", 1, this.systemConsumerStartCount);
}
@Test
public void testNoConfiguredDurableStores() throws InterruptedException {
taskRestoreMetricGauges = new HashMap<>();
this.tasks = new HashMap<>();
this.taskInstanceMetrics = new HashMap<>();
// Add two mocked tasks
addMockedTask("task 0", 0);
addMockedTask("task 1", 1);
// Mock container metrics
samzaContainerMetrics = mock(SamzaContainerMetrics.class);
when(samzaContainerMetrics.taskStoreRestorationMetrics()).thenReturn(taskRestoreMetricGauges);
// Create mocked configs for specifying serdes
Map<String, String> configMap = new HashMap<>();
configMap.put("serializers.registry.stringserde.class", StringSerdeFactory.class.getName());
configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
Config config = new MapConfig(configMap);
Map<String, Serde<Object>> serdes = new HashMap<>();
serdes.put("stringserde", mock(Serde.class));
CheckpointManager checkpointManager = mock(CheckpointManager.class);
when(checkpointManager.readLastCheckpoint(any(TaskName.class))).thenReturn(new CheckpointV1(new HashMap<>()));
ContainerContext mockContainerContext = mock(ContainerContext.class);
ContainerModel mockContainerModel = new ContainerModel("samza-container-test", tasks);
when(mockContainerContext.getContainerModel()).thenReturn(mockContainerModel);
// Reset the expected number of sysConsumer create, start and stop calls, and store.restore() calls
this.systemConsumerCreationCount = 0;
this.systemConsumerStartCount = 0;
this.systemConsumerStopCount = 0;
this.storeRestoreCallCount = 0;
StateBackendFactory backendFactory = mock(StateBackendFactory.class);
TaskRestoreManager restoreManager = mock(TaskRestoreManager.class);
when(backendFactory.getRestoreManager(any(), any(), any(), any(), any(), any(), any(), any(), any(), any(), any()))
.thenReturn(restoreManager);
doAnswer(invocation -> {
storeRestoreCallCount++;
return CompletableFuture.completedFuture(null);
}).when(restoreManager).restore();
// Create the container storage manager
ContainerStorageManager containerStorageManager = new ContainerStorageManager(
checkpointManager,
mockContainerModel,
mock(StreamMetadataCache.class),
mock(SystemAdmins.class),
new HashMap<>(),
new HashMap<>(),
new HashMap<>(),
new HashMap<>(),
serdes,
config,
taskInstanceMetrics,
samzaContainerMetrics,
mock(JobContext.class),
mockContainerContext,
new HashMap<>(),
mock(Map.class),
DEFAULT_LOGGED_STORE_BASE_DIR,
DEFAULT_STORE_BASE_DIR,
null,
SystemClock.instance());
containerStorageManager.start();
containerStorageManager.shutdown();
for (Gauge gauge : taskRestoreMetricGauges.values()) {
Assert.assertTrue("Restoration time gauge value should never be invoked",
mockingDetails(gauge).getInvocations().size() == 0);
}
Assert.assertEquals("Store restore count should be 2 because there are 0 stores", 0, this.storeRestoreCallCount);
Assert.assertEquals(0,
this.systemConsumerCreationCount);
Assert.assertEquals(0, this.systemConsumerStopCount);
Assert.assertEquals(0, this.systemConsumerStartCount);
}
@Test
public void testCheckpointBasedRestoreFactoryCreation() {
Set<String> storeNames = ImmutableSet.of("storeName0", "storeName1", "storeName2");
StorageConfig mockConfig = mock(StorageConfig.class);
when(mockConfig.getStoreRestoreFactories("storeName0"))
.thenReturn(ImmutableList.of("factory0", "factory1", "factory2"));
when(mockConfig.getStoreRestoreFactories("storeName1"))
.thenReturn(ImmutableList.of("factory2", "factory1"));
when(mockConfig.getStoreRestoreFactories("storeName2"))
.thenReturn(Collections.emptyList());
when(mockConfig.getChangelogStream("storeName0"))
.thenReturn(Optional.empty());
when(mockConfig.getChangelogStream("storeName1"))
.thenReturn(Optional.of("changelog"));
when(mockConfig.getChangelogStream("storeName2"))
.thenReturn(Optional.of("changelog"));
CheckpointV1 checkpointV1 = mock(CheckpointV1.class);
when(checkpointV1.getVersion()).thenReturn((short) 1);
Map<String, Set<String>> factoriesToStores =
ContainerStorageManagerUtil.getBackendFactoryStoreNames(storeNames, checkpointV1, mockConfig);
Assert.assertEquals(1, factoriesToStores.size());
Assert.assertEquals(ImmutableSet.of("storeName1", "storeName2"),
factoriesToStores.get(StorageConfig.KAFKA_STATE_BACKEND_FACTORY));
factoriesToStores = ContainerStorageManagerUtil.getBackendFactoryStoreNames(storeNames, null, mockConfig);
Assert.assertEquals(2, factoriesToStores.size());
Assert.assertEquals(ImmutableSet.of("storeName0"),
factoriesToStores.get("factory0"));
Assert.assertEquals(ImmutableSet.of("storeName1"),
factoriesToStores.get("factory2"));
}
@Test
public void testCheckpointV2BasedRestoreFactoryCreation() {
Set<String> storeNames = ImmutableSet.of("storeName0", "storeName1", "storeName2");
StorageConfig mockConfig = mock(StorageConfig.class);
when(mockConfig.getStoreRestoreFactories("storeName0"))
.thenReturn(ImmutableList.of("factory0", "factory1", "factory2"));
when(mockConfig.getStoreRestoreFactories("storeName1"))
.thenReturn(ImmutableList.of("factory2", "factory1"));
when(mockConfig.getStoreRestoreFactories("storeName2"))
.thenReturn(Collections.emptyList());
when(mockConfig.getChangelogStream("storeName0"))
.thenReturn(Optional.empty());
when(mockConfig.getChangelogStream("storeName1"))
.thenReturn(Optional.of("changelog"));
when(mockConfig.getChangelogStream("storeName2"))
.thenReturn(Optional.of("changelog"));
CheckpointV2 checkpointV2 = mock(CheckpointV2.class);
when(checkpointV2.getVersion()).thenReturn((short) 2);
when(checkpointV2.getStateCheckpointMarkers())
.thenReturn(ImmutableMap.of(
"factory0", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", ""),
"factory1", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", ""),
"factory2", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", "")));
Map<String, Set<String>> factoriesToStores =
ContainerStorageManagerUtil.getBackendFactoryStoreNames(storeNames, checkpointV2, mockConfig);
Assert.assertEquals(2, factoriesToStores.size());
Assert.assertEquals(ImmutableSet.of("storeName0"),
factoriesToStores.get("factory0"));
Assert.assertEquals(ImmutableSet.of("storeName1"),
factoriesToStores.get("factory2"));
when(checkpointV2.getStateCheckpointMarkers())
.thenReturn(ImmutableMap.of(
"factory2", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", "")));
factoriesToStores = ContainerStorageManagerUtil.getBackendFactoryStoreNames(storeNames, checkpointV2, mockConfig);
Assert.assertEquals(1, factoriesToStores.size());
Assert.assertEquals(ImmutableSet.of("storeName1", "storeName0"),
factoriesToStores.get("factory2"));
when(checkpointV2.getStateCheckpointMarkers())
.thenReturn(ImmutableMap.of(
"factory1", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", ""),
"factory2", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", "")));
factoriesToStores = ContainerStorageManagerUtil.getBackendFactoryStoreNames(storeNames, checkpointV2, mockConfig);
Assert.assertEquals(2, factoriesToStores.size());
Assert.assertEquals(ImmutableSet.of("storeName0"),
factoriesToStores.get("factory1"));
Assert.assertEquals(ImmutableSet.of("storeName1"),
factoriesToStores.get("factory2"));
when(checkpointV2.getStateCheckpointMarkers())
.thenReturn(ImmutableMap.of(
"factory1", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", ""),
"factory2", ImmutableMap.of("storeName0", "", "storeName2", "")));
factoriesToStores = ContainerStorageManagerUtil.getBackendFactoryStoreNames(storeNames, checkpointV2, mockConfig);
Assert.assertEquals(1, factoriesToStores.size());
Assert.assertEquals(ImmutableSet.of("storeName0", "storeName1"),
factoriesToStores.get("factory1"));
when(checkpointV2.getStateCheckpointMarkers())
.thenReturn(ImmutableMap.of(
"factory1", ImmutableMap.of("storeName0", "", "storeName1", "", "storeName2", "")));
factoriesToStores = ContainerStorageManagerUtil.getBackendFactoryStoreNames(storeNames, checkpointV2, mockConfig);
Assert.assertEquals(1, factoriesToStores.size());
Assert.assertEquals(ImmutableSet.of("storeName0", "storeName1"),
factoriesToStores.get("factory1"));
when(checkpointV2.getStateCheckpointMarkers())
.thenReturn(Collections.emptyMap());
factoriesToStores = ContainerStorageManagerUtil.getBackendFactoryStoreNames(storeNames, checkpointV2, mockConfig);
Assert.assertEquals(2, factoriesToStores.size());
Assert.assertEquals(ImmutableSet.of("storeName0"),
factoriesToStores.get("factory0"));
Assert.assertEquals(ImmutableSet.of("storeName1"),
factoriesToStores.get("factory2"));
when(checkpointV2.getStateCheckpointMarkers())
.thenReturn(ImmutableMap.of(
"factory0", ImmutableMap.of("storeName1", "", "storeName2", ""),
"factory1", ImmutableMap.of("storeName1", "", "storeName2", ""),
"factory2", ImmutableMap.of("storeName0", "", "storeName2", "")));
factoriesToStores = ContainerStorageManagerUtil.getBackendFactoryStoreNames(storeNames, checkpointV2, mockConfig);
Assert.assertEquals(2, factoriesToStores.size());
Assert.assertEquals(ImmutableSet.of("storeName1"),
factoriesToStores.get("factory1"));
Assert.assertEquals(ImmutableSet.of("storeName0"),
factoriesToStores.get("factory2"));
}
@Test
public void testInitRecoversFromDeletedException() {
TaskName taskName = new TaskName("task");
Set<String> stores = Collections.singleton("store");
BlobStoreRestoreManager taskRestoreManager = mock(BlobStoreRestoreManager.class);
Throwable deletedException = new SamzaException(new CompletionException(new DeletedException("410 gone")));
doThrow(deletedException).when(taskRestoreManager).init(any(Checkpoint.class));
when(taskRestoreManager.restore()).thenReturn(CompletableFuture.completedFuture(null));
when(taskRestoreManager.restore(true)).thenReturn(CompletableFuture.completedFuture(null));
// mock ReflectionUtil.getObj
PowerMockito.mockStatic(ReflectionUtil.class);
BlobStoreManagerFactory blobStoreManagerFactory = mock(BlobStoreManagerFactory.class);
BlobStoreManager blobStoreManager = mock(BlobStoreManager.class);
PowerMockito.when(ReflectionUtil.getObj(anyString(), eq(BlobStoreManagerFactory.class)))
.thenReturn(blobStoreManagerFactory);
when(blobStoreManagerFactory.getRestoreBlobStoreManager(any(Config.class), any(ExecutorService.class)))
.thenReturn(blobStoreManager);
Map<String, TaskRestoreManager> storeTaskRestoreManager = ImmutableMap.of("store", taskRestoreManager);
CheckpointManager checkpointManager = mock(CheckpointManager.class);
JobContext jobContext = mock(JobContext.class);
when(jobContext.getJobModel()).thenReturn(mock(JobModel.class));
TaskModel taskModel = mock(TaskModel.class);
when(taskModel.getTaskName()).thenReturn(new TaskName("test"));
ContainerModel containerModel = mock(ContainerModel.class);
when(containerModel.getTasks()).thenReturn(ImmutableMap.of(taskName, taskModel));
Checkpoint checkpoint = mock(CheckpointV2.class);
Map<TaskName, Checkpoint> taskCheckpoints = ImmutableMap.of(taskName, checkpoint);
Map<TaskName, Map<String, Set<String>>> taskBackendFactoryToStoreNames =
ImmutableMap.of(taskName, ImmutableMap.of(BlobStoreStateBackendFactory.class.getName(), stores));
Config config = new MapConfig(ImmutableMap.of("job.name", "test"), ImmutableMap.of("stores.store.backup.factories", BlobStoreStateBackendFactory.class.getName()));
ExecutorService executor = Executors.newSingleThreadExecutor();
SystemConsumer systemConsumer = mock(SystemConsumer.class);
ContainerStorageManagerRestoreUtil.initAndRestoreTaskInstances(ImmutableMap.of(taskName, storeTaskRestoreManager),
samzaContainerMetrics, checkpointManager, jobContext, containerModel, taskCheckpoints,
taskBackendFactoryToStoreNames, config, executor, new HashMap<>(), null,
ImmutableMap.of("store", systemConsumer));
// verify init() is called twice -> once without getDeleted flag, once with getDeleted flag
verify(taskRestoreManager, times(1)).init(any(Checkpoint.class));
verify(taskRestoreManager, times(1)).init(any(Checkpoint.class), anyBoolean());
// verify restore is called with getDeletedFlag only
verify(taskRestoreManager, times(0)).restore();
verify(taskRestoreManager, times(1)).restore(true);
}
@Test
public void testRestoreRecoversFromDeletedException() throws Exception {
TaskName taskName = new TaskName("task");
String storeName = "store";
Set<String> stores = Collections.singleton(storeName);
String jobName = "job";
String jobId = "jobId";
BlobStoreManager blobStoreManager = mock(BlobStoreManager.class);
doNothing().when(blobStoreManager).init();
doNothing().when(blobStoreManager).close();
BlobStoreRestoreManager taskRestoreManager = mock(BlobStoreRestoreManager.class);
doAnswer(invocation -> {
blobStoreManager.init();
return null;
}).when(taskRestoreManager).init(any(Checkpoint.class));
doAnswer(invocation -> {
blobStoreManager.close();
return null;
}).when(taskRestoreManager).close();
CompletableFuture<Void> failedFuture = CompletableFuture.completedFuture(null)
.thenCompose(v -> { throw new DeletedException("410 Gone"); });
when(taskRestoreManager.restore()).thenReturn(failedFuture);
when(taskRestoreManager.restore(true)).thenReturn(CompletableFuture.completedFuture(null));
Map<String, TaskRestoreManager> factoryToTaskRestoreManager = ImmutableMap.of(
BlobStoreStateBackendFactory.class.getName(), taskRestoreManager);
JobContext jobContext = mock(JobContext.class);
TaskModel taskModel = mock(TaskModel.class);
when(taskModel.getTaskName()).thenReturn(taskName);
when(taskModel.getTaskMode()).thenReturn(TaskMode.Active);
ContainerModel containerModel = mock(ContainerModel.class);
when(containerModel.getTasks()).thenReturn(ImmutableMap.of(taskName, taskModel));
CheckpointV2 checkpoint = mock(CheckpointV2.class);
when(checkpoint.getOffsets()).thenReturn(ImmutableMap.of());
when(checkpoint.getCheckpointId()).thenReturn(CheckpointId.create());
when(checkpoint.getStateCheckpointMarkers()).thenReturn(ImmutableMap.of(
KafkaChangelogStateBackendFactory.class.getName(), new HashMap<>()));
CheckpointManager checkpointManager = mock(CheckpointManager.class);
when(checkpointManager.readLastCheckpoint(taskName)).thenReturn(checkpoint);
String expectedOldBlobId = "oldBlobId";
when(checkpoint.getStateCheckpointMarkers()).thenReturn(ImmutableMap.of(
BlobStoreStateBackendFactory.class.getName(), ImmutableMap.of(storeName, expectedOldBlobId)));
Map<TaskName, Checkpoint> taskCheckpoints = ImmutableMap.of(taskName, checkpoint);
Map<TaskName, Map<String, Set<String>>> taskBackendFactoryToStoreNames =
ImmutableMap.of(taskName, ImmutableMap.of(
BlobStoreStateBackendFactory.class.getName(), stores));
Config config = new MapConfig(ImmutableMap.of(
"blob.store.manager.factory", BlobStoreStateBackendFactory.class.getName(),
"job.name", jobName));
ExecutorService executor = Executors.newFixedThreadPool(5);
SystemConsumer systemConsumer = mock(SystemConsumer.class);
// mock ReflectionUtil.getObj
PowerMockito.mockStatic(ReflectionUtil.class);
BlobStoreManagerFactory blobStoreManagerFactory = mock(BlobStoreManagerFactory.class);
PowerMockito.when(ReflectionUtil.getObj(anyString(), eq(BlobStoreManagerFactory.class)))
.thenReturn(blobStoreManagerFactory);
when(blobStoreManagerFactory.getRestoreBlobStoreManager(any(Config.class), any(ExecutorService.class)))
.thenReturn(blobStoreManager);
// To verify order of operations
InOrder inOrder = inOrder(blobStoreManager, taskRestoreManager);
// mock ContainerStorageManagerRestoreUtil.backupRecoveredStore
String expectedBlobId = "blobId";
PowerMockito.spy(ContainerStorageManagerRestoreUtil.class);
PowerMockito.doReturn(CompletableFuture.completedFuture(ImmutableMap.of(storeName, expectedBlobId)))
.when(ContainerStorageManagerRestoreUtil.class, "backupRecoveredStore",
any(JobContext.class), any(ContainerModel.class), any(Config.class),
any(TaskName.class), any(Set.class), any(Checkpoint.class), any(File.class),
any(BlobStoreManager.class), any(MetricsRegistry.class), any(ExecutorService.class));
SnapshotIndex snapshotIndex = new SnapshotIndex(System.currentTimeMillis(),
new SnapshotMetadata(CheckpointId.create(), jobName, jobId, taskName.getTaskName(), storeName),
new DirIndex("test", new ArrayList<>(), new ArrayList<>(), new ArrayList<>(), new ArrayList<>()),
Optional.empty());
ArgumentCaptor<String> getBlobIdCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<ByteArrayOutputStream> outputStreamCaptor = ArgumentCaptor.forClass(ByteArrayOutputStream.class);
when(blobStoreManager.get(getBlobIdCaptor.capture(), outputStreamCaptor.capture(),
any(Metadata.class), any(Boolean.class)))
.thenAnswer(invocation -> {
ByteArrayOutputStream outputStream = outputStreamCaptor.getValue();
outputStream.write(new SnapshotIndexSerde().toBytes(snapshotIndex));
return CompletableFuture.completedFuture(null);
});
ArgumentCaptor<String> removeTTLBlobIdCaptor = ArgumentCaptor.forClass(String.class);
when(blobStoreManager.removeTTL(removeTTLBlobIdCaptor.capture(), any(Metadata.class)))
.thenAnswer(invocation -> CompletableFuture.completedFuture(null));
ArgumentCaptor<String> deleteBlobIdCaptor = ArgumentCaptor.forClass(String.class);
when(blobStoreManager.delete(deleteBlobIdCaptor.capture(), any(Metadata.class)))
.thenAnswer(invocation -> CompletableFuture.completedFuture(null));
CompletableFuture<Map<TaskName, Checkpoint>> updatedTaskCheckpointsFuture =
ContainerStorageManagerRestoreUtil.initAndRestoreTaskInstances(ImmutableMap.of(taskName, factoryToTaskRestoreManager),
samzaContainerMetrics, checkpointManager, jobContext, containerModel, taskCheckpoints,
taskBackendFactoryToStoreNames, config, executor, new HashMap<>(), null,
ImmutableMap.of(storeName, systemConsumer));
// verify close is not called until init and restore futures are complete
verify(taskRestoreManager, never()).close();
Map<TaskName, Checkpoint> updatedTaskCheckpoints = updatedTaskCheckpointsFuture.get();
// verify close is called only once after restore future is complete
verify(taskRestoreManager, times(1)).close();
// verify taskCheckpoint is updated
assertNotEquals(((CheckpointV2) taskCheckpoints.get(taskName)).getCheckpointId(),
((CheckpointV2) updatedTaskCheckpoints.get(taskName)).getCheckpointId());
// verify init is not retried with getDeleted
verify(taskRestoreManager, times(0)).init(any(Checkpoint.class), anyBoolean());
// verify restore is call twice - once without getDeleted flag, once with getDeleted flag
verify(taskRestoreManager, times(1)).restore();
verify(taskRestoreManager, times(1)).restore(true);
// verify the GET and removeTTL was called on the new SnapshotIndex
assertEquals(expectedBlobId, getBlobIdCaptor.getAllValues().get(0));
assertEquals(expectedBlobId, removeTTLBlobIdCaptor.getAllValues().get(0));
// verify that GET and delete was called on the old SnapshotIndex
assertEquals(expectedOldBlobId, getBlobIdCaptor.getAllValues().get(1));
assertEquals(expectedOldBlobId, deleteBlobIdCaptor.getValue());
// verify the order of operations in taskRestoreManager and blobStoreManager
// Verifies that close is called after restore(true)
inOrder.verify(taskRestoreManager).init(any(Checkpoint.class));
inOrder.verify(blobStoreManager).init(); // init called on blobStoreManager passed to taskRestoreManager
inOrder.verify(taskRestoreManager).restore();
inOrder.verify(blobStoreManager).init(); // init called on blobStoreManager created in ContainerStorageManagerRestoreUtil#restoreDeletedSnapshot
inOrder.verify(taskRestoreManager).restore(true);
inOrder.verify(blobStoreManager).close(); // close called on blobStoreManager created in ContainerStorageManagerRestoreUtil#restoreDeletedSnapshot
inOrder.verify(blobStoreManager).close(); // close called on blobStoreManager passed to taskRestoreManager
}
@Test
public void getActiveTaskChangelogSystemStreams() {
Map<String, SystemStream> storeToChangelogSystemStreams =
ContainerStorageManagerUtil.getActiveTaskChangelogSystemStreams(testContext.storesToSystemStreams, testContext.standbyContainerModel
);
assertEquals("Standby container should have no active change log", Collections.emptyMap(),
storeToChangelogSystemStreams);
}
@Test
public void getActiveTaskChangelogSystemStreamsForActiveAndStandbyContainer() {
Map<String, SystemStream> expectedStoreToChangelogSystemStreams =
testContext.storesToSystemStreams;
Map<String, SystemStream> storeToChangelogSystemStreams = ContainerStorageManagerUtil.getActiveTaskChangelogSystemStreams(
testContext.storesToSystemStreams, testContext.activeAndStandbyContainerModel);
assertEquals("Active and standby container model should have non empty store to changelog mapping",
expectedStoreToChangelogSystemStreams, storeToChangelogSystemStreams);
}
@Test
public void getActiveTaskChangelogSystemStreamsForStandbyContainer() {
Map<String, SystemStream> expectedStoreToChangelogSystemStreams =
testContext.storesToSystemStreams;
Map<String, SystemStream> storeToChangelogSystemStreams = ContainerStorageManagerUtil.getActiveTaskChangelogSystemStreams(
testContext.storesToSystemStreams, testContext.activeContainerModel);
assertEquals("Active container model should have non empty store to changelog mapping",
expectedStoreToChangelogSystemStreams, storeToChangelogSystemStreams);
}
@Test
public void getSideInputStoresForActiveContainer() {
Set<String> expectedSideInputStores = testContext.activeStores;
Set<String> actualSideInputStores =
ContainerStorageManagerUtil.getSideInputStoreNames(testContext.sideInputStoresToSystemStreams, testContext.storesToSystemStreams, testContext.activeContainerModel
);
assertEquals("Mismatch in stores", expectedSideInputStores, actualSideInputStores);
}
@Test
public void getSideInputStoresForStandbyContainer() {
final Set<String> expectedSideInputStores = testContext.standbyStores;
Set<String> actualSideInputStores =
ContainerStorageManagerUtil.getSideInputStoreNames(testContext.sideInputStoresToSystemStreams, testContext.storesToSystemStreams, testContext.standbyContainerModel
);
assertEquals("Mismatch in side input stores", expectedSideInputStores, actualSideInputStores);
}
@Test
public void getTaskSideInputSSPsForActiveContainer() {
Map<TaskName, Map<String, Set<SystemStreamPartition>>> expectedSideInputSSPs = testContext.activeSideInputSSPs;
Map<TaskName, Map<String, Set<SystemStreamPartition>>> actualSideInputSSPs =
SideInputsManager.getTaskSideInputSSPs(Collections.emptyMap(), testContext.storesToSystemStreams, testContext.activeContainerModel
);
assertEquals("Mismatch in task name --> store --> SSP mapping", expectedSideInputSSPs, actualSideInputSSPs);
}
@Test
public void getTaskSideInputSSPsForStandbyContainerWithSideInput() {
Map<TaskName, Map<String, Set<SystemStreamPartition>>> expectedSideInputSSPs = testContext.standbyWithSideInputSSPs;
Map<TaskName, Map<String, Set<SystemStreamPartition>>> actualSideInputSSPs =
SideInputsManager.getTaskSideInputSSPs(testContext.sideInputStoresToSystemStreams, testContext.storesToSystemStreams, testContext.standbyContainerModelWithSideInputs
);
assertEquals("Mismatch in task name --> store --> SSP mapping", expectedSideInputSSPs, actualSideInputSSPs);
}
@Test
public void getTaskSideInputSSPsForStandbyContainerWithoutSideInputs() {
Map<TaskName, Map<String, Set<SystemStreamPartition>>> expectedSideInputSSPs = testContext.standbyChangelogSSPs;
Map<TaskName, Map<String, Set<SystemStreamPartition>>> actualSideInputSSPs =
SideInputsManager.getTaskSideInputSSPs(Collections.emptyMap(), testContext.storesToSystemStreams, testContext.standbyContainerModel
);
assertEquals("Mismatch in task name --> store --> SSP mapping", expectedSideInputSSPs, actualSideInputSSPs);
}
/**
* A container class to hold test fields and expected state for standby and side input related tests
*/
private static class StandbyTestContext {
private static final String ACTIVE_TASK = "active-task-1";
private static final TaskName ACTIVE_TASK_NAME = new TaskName(ACTIVE_TASK);
private static final TaskModel ACTIVE_TASK_MODEL =
new TaskModel(ACTIVE_TASK_NAME, Collections.emptySet(), new Partition(1));
private static final String STANDBY_TASK_2 = "standby-task-2";
private static final TaskName STANDBY_TASK_NAME_2 = new TaskName(STANDBY_TASK_2);
private static final Set<SystemStreamPartition> STANDBY_TASK_INPUT_SSP =
ImmutableSet.of(new SystemStreamPartition("test", "side-input-stream", new Partition(2)));
private static final TaskModel STANDBY_TASK_MODEL_WITH_SIDE_INPUT =
new TaskModel(STANDBY_TASK_NAME_2, STANDBY_TASK_INPUT_SSP, new Partition(0), TaskMode.Standby);
private static final String STANDBY_TASK = "standby-task";
private static final TaskName STANDBY_TASK_NAME = new TaskName(STANDBY_TASK);
private static final SystemStreamPartition STANDBY_CHANGELOG_SSP =
new SystemStreamPartition("test", "stream", new Partition(0));
private static final TaskModel STANDBY_TASK_MODEL =
new TaskModel(STANDBY_TASK_NAME, Collections.emptySet(), new Partition(0), TaskMode.Standby);
private static final String SIDE_INPUT_STORE = "side-input-store";
private static final SystemStream SIDE_INPUT_SYSTEM_STREAM = new SystemStream("test", "side-input-stream");
private static final String TEST_STORE = "test-store";
private static final SystemStream TEST_SYSTEM_STREAM = new SystemStream("test", "stream");
private final ContainerModel activeContainerModel;
private final ContainerModel activeAndStandbyContainerModel;
private final ContainerModel standbyContainerModel;
private final ContainerModel standbyContainerModelWithSideInputs;
private final Map<TaskName, Map<String, Set<SystemStreamPartition>>> activeSideInputSSPs;
private final Map<TaskName, Map<String, Set<SystemStreamPartition>>> standbyChangelogSSPs;
private final Map<TaskName, Map<String, Set<SystemStreamPartition>>> standbyWithSideInputSSPs;
private final Map<String, SystemStream> storesToSystemStreams;
private final Map<String, Set<SystemStream>> sideInputStoresToSystemStreams;
private final Set<String> activeStores;
private final Set<String> standbyStores;
public StandbyTestContext() {
Map<TaskName, TaskModel> activeTasks = ImmutableMap.of(ACTIVE_TASK_NAME, ACTIVE_TASK_MODEL);
Map<TaskName, TaskModel> standbyTasks = ImmutableMap.of(STANDBY_TASK_NAME, STANDBY_TASK_MODEL);
activeContainerModel = new ContainerModel("active-container-model", activeTasks);
activeAndStandbyContainerModel = new ContainerModel("active-standby-container-model",
ImmutableMap.<TaskName, TaskModel>builder().putAll(activeTasks).putAll(standbyTasks).build());
standbyContainerModel = new ContainerModel("standby-container-model", standbyTasks);
standbyContainerModelWithSideInputs = new ContainerModel("standby-container-with-side-input",
ImmutableMap.of(STANDBY_TASK_NAME_2, STANDBY_TASK_MODEL_WITH_SIDE_INPUT));
activeStores = ImmutableSet.of(SIDE_INPUT_STORE);
standbyStores = ImmutableSet.of(SIDE_INPUT_STORE, TEST_STORE);
sideInputStoresToSystemStreams = ImmutableMap.of(SIDE_INPUT_STORE, ImmutableSet.of(SIDE_INPUT_SYSTEM_STREAM));
storesToSystemStreams = ImmutableMap.of(TEST_STORE, TEST_SYSTEM_STREAM);
activeSideInputSSPs = ImmutableMap.of(ACTIVE_TASK_NAME, Collections.emptyMap());
standbyChangelogSSPs =
ImmutableMap.of(STANDBY_TASK_NAME, ImmutableMap.of(TEST_STORE, ImmutableSet.of(STANDBY_CHANGELOG_SSP)));
standbyWithSideInputSSPs = ImmutableMap.of(STANDBY_TASK_NAME_2,
ImmutableMap.of(TEST_STORE, ImmutableSet.of(STANDBY_CHANGELOG_SSP), SIDE_INPUT_STORE, STANDBY_TASK_INPUT_SSP));
}
}
}