blob: c36a3bebdeb978c0ca60062f67b45539b98e60ca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.storage;
import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.apache.samza.Partition;
import org.apache.samza.checkpoint.Checkpoint;
import org.apache.samza.checkpoint.CheckpointManager;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.container.SamzaContainerMetrics;
import org.apache.samza.container.TaskInstance;
import org.apache.samza.container.TaskInstanceMetrics;
import org.apache.samza.container.TaskName;
import org.apache.samza.context.ContainerContext;
import org.apache.samza.context.JobContext;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.StringSerdeFactory;
import org.apache.samza.system.SSPMetadataCache;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.SystemClock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import scala.collection.JavaConverters;
import static org.mockito.Mockito.*;
public class TestContainerStorageManager {
private static final String STORE_NAME = "store";
private static final String SYSTEM_NAME = "kafka";
private static final String STREAM_NAME = "store-stream";
private static final File DEFAULT_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + "store");
private static final File
DEFAULT_LOGGED_STORE_BASE_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + "loggedStore");
private ContainerStorageManager containerStorageManager;
private Map<TaskName, Gauge<Object>> taskRestoreMetricGauges;
private Map<TaskName, TaskInstanceMetrics> taskInstanceMetrics;
private SamzaContainerMetrics samzaContainerMetrics;
private Map<TaskName, TaskModel> tasks;
private volatile int systemConsumerCreationCount;
private volatile int systemConsumerStartCount;
private volatile int systemConsumerStopCount;
private volatile int storeRestoreCallCount;
/**
* Utility method for creating a mocked taskInstance and taskStorageManager and adding it to the map.
* @param taskname the desired taskname.
*/
private void addMockedTask(String taskname, int changelogPartition) {
TaskInstance mockTaskInstance = mock(TaskInstance.class);
doAnswer(invocation -> {
return new TaskName(taskname);
}).when(mockTaskInstance).taskName();
Gauge testGauge = mock(Gauge.class);
this.tasks.put(new TaskName(taskname),
new TaskModel(new TaskName(taskname), new HashSet<>(), new Partition(changelogPartition)));
this.taskRestoreMetricGauges.put(new TaskName(taskname), testGauge);
this.taskInstanceMetrics.put(new TaskName(taskname), mock(TaskInstanceMetrics.class));
}
/**
* Method to create a containerStorageManager with mocked dependencies
*/
@Before
public void setUp() throws InterruptedException {
taskRestoreMetricGauges = new HashMap<>();
this.tasks = new HashMap<>();
this.taskInstanceMetrics = new HashMap<>();
// Add two mocked tasks
addMockedTask("task 0", 0);
addMockedTask("task 1", 1);
// Mock container metrics
samzaContainerMetrics = mock(SamzaContainerMetrics.class);
when(samzaContainerMetrics.taskStoreRestorationMetrics()).thenReturn(taskRestoreMetricGauges);
// Create a map of test changeLogSSPs
Map<String, SystemStream> changelogSystemStreams = new HashMap<>();
changelogSystemStreams.put(STORE_NAME, new SystemStream(SYSTEM_NAME, STREAM_NAME));
// Create mocked storage engine factories
Map<String, StorageEngineFactory<Object, Object>> storageEngineFactories = new HashMap<>();
StorageEngineFactory mockStorageEngineFactory =
(StorageEngineFactory<Object, Object>) mock(StorageEngineFactory.class);
StorageEngine mockStorageEngine = mock(StorageEngine.class);
when(mockStorageEngine.getStoreProperties())
.thenReturn(new StoreProperties.StorePropertiesBuilder().setLoggedStore(true).setPersistedToDisk(true).build());
doAnswer(invocation -> {
return mockStorageEngine;
}).when(mockStorageEngineFactory).getStorageEngine(anyString(), any(), any(), any(), any(),
any(), any(), any(), any(), any());
storageEngineFactories.put(STORE_NAME, mockStorageEngineFactory);
// Add instrumentation to mocked storage engine, to record the number of store.restore() calls
doAnswer(invocation -> {
storeRestoreCallCount++;
return null;
}).when(mockStorageEngine).restore(any());
// Set the mocked stores' properties to be persistent
doAnswer(invocation -> {
return new StoreProperties.StorePropertiesBuilder().setLoggedStore(true).build();
}).when(mockStorageEngine).getStoreProperties();
// Mock and setup sysconsumers
SystemConsumer mockSystemConsumer = mock(SystemConsumer.class);
doAnswer(invocation -> {
systemConsumerStartCount++;
return null;
}).when(mockSystemConsumer).start();
doAnswer(invocation -> {
systemConsumerStopCount++;
return null;
}).when(mockSystemConsumer).stop();
// Create mocked system factories
Map<String, SystemFactory> systemFactories = new HashMap<>();
// Count the number of sysConsumers created
SystemFactory mockSystemFactory = mock(SystemFactory.class);
doAnswer(invocation -> {
this.systemConsumerCreationCount++;
return mockSystemConsumer;
}).when(mockSystemFactory).getConsumer(anyString(), any(), any());
systemFactories.put(SYSTEM_NAME, mockSystemFactory);
// Create mocked configs for specifying serdes
Map<String, String> configMap = new HashMap<>();
configMap.put("stores." + STORE_NAME + ".key.serde", "stringserde");
configMap.put("stores." + STORE_NAME + ".msg.serde", "stringserde");
configMap.put("serializers.registry.stringserde.class", StringSerdeFactory.class.getName());
configMap.put(TaskConfig.TRANSACTIONAL_STATE_RETAIN_EXISTING_STATE, "true");
Config config = new MapConfig(configMap);
Map<String, Serde<Object>> serdes = new HashMap<>();
serdes.put("stringserde", mock(Serde.class));
// Create mocked system admins
SystemAdmin mockSystemAdmin = mock(SystemAdmin.class);
doAnswer(new Answer<Void>() {
public Void answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
System.out.println("called with arguments: " + Arrays.toString(args));
return null;
}
}).when(mockSystemAdmin).validateStream(any());
SystemAdmins mockSystemAdmins = mock(SystemAdmins.class);
when(mockSystemAdmins.getSystemAdmin("kafka")).thenReturn(mockSystemAdmin);
// Create a mocked mockStreamMetadataCache
SystemStreamMetadata.SystemStreamPartitionMetadata sspMetadata =
new SystemStreamMetadata.SystemStreamPartitionMetadata("0", "50", "51");
Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> partitionMetadata = new HashMap<>();
partitionMetadata.put(new Partition(0), sspMetadata);
partitionMetadata.put(new Partition(1), sspMetadata);
SystemStreamMetadata systemStreamMetadata = new SystemStreamMetadata(STREAM_NAME, partitionMetadata);
StreamMetadataCache mockStreamMetadataCache = mock(StreamMetadataCache.class);
when(mockStreamMetadataCache.
getStreamMetadata(JavaConverters.
asScalaSetConverter(new HashSet<SystemStream>(changelogSystemStreams.values())).asScala().toSet(), false))
.thenReturn(
new scala.collection.immutable.Map.Map1(new SystemStream(SYSTEM_NAME, STREAM_NAME), systemStreamMetadata));
CheckpointManager checkpointManager = mock(CheckpointManager.class);
when(checkpointManager.readLastCheckpoint(any(TaskName.class))).thenReturn(new Checkpoint(new HashMap<>()));
SSPMetadataCache mockSSPMetadataCache = mock(SSPMetadataCache.class);
when(mockSSPMetadataCache.getMetadata(any(SystemStreamPartition.class)))
.thenReturn(new SystemStreamMetadata.SystemStreamPartitionMetadata("0", "10", "11"));
// Reset the expected number of sysConsumer create, start and stop calls, and store.restore() calls
this.systemConsumerCreationCount = 0;
this.systemConsumerStartCount = 0;
this.systemConsumerStopCount = 0;
this.storeRestoreCallCount = 0;
// Create the container storage manager
this.containerStorageManager = new ContainerStorageManager(
checkpointManager,
new ContainerModel("samza-container-test", tasks),
mockStreamMetadataCache,
mockSSPMetadataCache,
mockSystemAdmins,
changelogSystemStreams,
new HashMap<>(),
storageEngineFactories,
systemFactories,
serdes,
config,
taskInstanceMetrics,
samzaContainerMetrics,
mock(JobContext.class),
mock(ContainerContext.class),
mock(Map.class),
DEFAULT_LOGGED_STORE_BASE_DIR,
DEFAULT_STORE_BASE_DIR,
2,
null,
new SystemClock());
}
@Test
public void testParallelismAndMetrics() throws InterruptedException {
this.containerStorageManager.start();
this.containerStorageManager.shutdown();
for (Gauge gauge : taskRestoreMetricGauges.values()) {
Assert.assertTrue("Restoration time gauge value should be invoked atleast once",
mockingDetails(gauge).getInvocations().size() >= 1);
}
Assert.assertTrue("Store restore count should be 2 because there are 2 tasks", this.storeRestoreCallCount == 2);
Assert.assertTrue("systemConsumerCreation count should be 1 (1 consumer per system)",
this.systemConsumerCreationCount == 1);
Assert.assertTrue("systemConsumerStopCount count should be 1", this.systemConsumerStopCount == 1);
Assert.assertTrue("systemConsumerStartCount count should be 1", this.systemConsumerStartCount == 1);
}
}