blob: 972598f86a3444509e1e89a371364035fb83b5ca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.monitor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.samza.config.MapConfig;
import org.apache.samza.rest.model.JobStatus;
import org.apache.samza.rest.model.Task;
import org.apache.samza.rest.proxy.job.JobInstance;
import org.apache.samza.storage.StorageManagerUtil;
import org.apache.samza.util.NoOpMetricsRegistry;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static junit.framework.TestCase.*;
public class TestLocalStoreMonitor {
private static final Logger LOG = LoggerFactory.getLogger(TestLocalStoreMonitor.class);
private File localStoreDir;
private File jobDir;
private File taskStoreDir;
private Map<String, String> config;
private LocalStoreMonitor localStoreMonitor;
// Create mock for jobs client.
private JobsClient jobsClientMock = Mockito.mock(JobsClient.class);
private LocalStoreMonitorMetrics localStoreMonitorMetrics;
private long taskStoreSize;
@Before
public void setUp() throws Exception {
// Create a different local store directory every time the test is executed to avoid
// intermittent test failures due to previous unsuccessful test re-runs.
localStoreDir = Files.createTempDir();
jobDir = new File(localStoreDir, "test-jobName-jobId");
taskStoreDir = new File(new File(jobDir, "test-store"), "test-task");
config = ImmutableMap.of(LocalStoreMonitorConfig.CONFIG_LOCAL_STORE_DIR, localStoreDir.getCanonicalPath());
// Make scaffold directories for testing.
FileUtils.forceMkdir(taskStoreDir);
taskStoreSize = taskStoreDir.getTotalSpace();
// Set default return values for methods.
Mockito.when(jobsClientMock.getJobStatus(Mockito.any())).thenReturn(JobStatus.STOPPED);
Task task = new Task("localHost", "test-task", "0", new ArrayList<>(), ImmutableList.of("test-store"));
Mockito.when(jobsClientMock.getTasks(Mockito.any())).thenReturn(ImmutableList.of(task));
localStoreMonitorMetrics = new LocalStoreMonitorMetrics("TestMonitorName", new NoOpMetricsRegistry());
// Initialize the local store monitor with mock and config
localStoreMonitor =
new LocalStoreMonitor(new LocalStoreMonitorConfig(new MapConfig(config)), localStoreMonitorMetrics,
jobsClientMock);
}
@After
public void cleanUp() {
// Clean up the entire temp local store directory and all files underneath it.
try {
FileUtils.deleteDirectory(localStoreDir);
} catch (IOException e) {
// Happens when task store can't be deleted after test finishes.
LOG.error("Deletion of directory: {} resulted in the exception: {}.", new Object[]{localStoreDir, e});
Assert.fail(e.getMessage());
}
}
@Test
public void shouldDeleteLocalTaskStoreWhenItHasNoOffsetFile() throws Exception {
localStoreMonitor.monitor();
assertTrue("Task store directory should not exist.", !taskStoreDir.exists());
assertEquals(taskStoreSize, localStoreMonitorMetrics.diskSpaceFreedInBytes.getCount());
assertEquals(1, localStoreMonitorMetrics.noOfDeletedTaskPartitionStores.getCount());
}
@Test
public void shouldDeleteLocalStoreWhenLastModifiedTimeOfOffsetFileIsGreaterThanOffsetTTL() throws Exception {
File offsetFile = createOffsetFile(taskStoreDir);
offsetFile.setLastModified(0);
localStoreMonitor.monitor();
assertTrue("Offset file should not exist.", !offsetFile.exists());
assertEquals(0, localStoreMonitorMetrics.diskSpaceFreedInBytes.getCount());
}
@Test
public void shouldDeleteInActiveLocalStoresOfTheJob() throws Exception {
File inActiveStoreDir = new File(jobDir, "inActiveStore");
FileUtils.forceMkdir(inActiveStoreDir);
File inActiveTaskDir = new File(inActiveStoreDir, "test-task");
FileUtils.forceMkdir(inActiveTaskDir);
long inActiveTaskDirSize = inActiveTaskDir.getTotalSpace();
localStoreMonitor.monitor();
assertTrue("Inactive task store directory should not exist.", !inActiveTaskDir.exists());
assertEquals(taskStoreSize + inActiveTaskDirSize, localStoreMonitorMetrics.diskSpaceFreedInBytes.getCount());
assertEquals(2, localStoreMonitorMetrics.noOfDeletedTaskPartitionStores.getCount());
FileUtils.deleteDirectory(inActiveStoreDir);
}
@Test
public void shouldDoNothingWhenLastModifiedTimeOfOffsetFileIsLessThanOffsetTTL() throws Exception {
File offsetFile = createOffsetFile(taskStoreDir);
localStoreMonitor.monitor();
assertTrue("Offset file should exist.", offsetFile.exists());
assertEquals(0, localStoreMonitorMetrics.diskSpaceFreedInBytes.getCount());
}
@Test
public void shouldDoNothingWhenTheJobIsRunning() throws Exception {
Mockito.when(jobsClientMock.getJobStatus(Mockito.any())).thenReturn(JobStatus.STARTED);
File offsetFile = createOffsetFile(taskStoreDir);
localStoreMonitor.monitor();
assertTrue("Offset file should exist.", offsetFile.exists());
assertEquals(0, localStoreMonitorMetrics.diskSpaceFreedInBytes.getCount());
}
@Test
public void shouldDeleteTaskStoreWhenTaskPreferredStoreIsNotLocalHost() throws Exception {
Task task = new Task("notLocalHost", "test-task", "0", new ArrayList<>(), ImmutableList.of("test-store"));
Mockito.when(jobsClientMock.getTasks(Mockito.any())).thenReturn(ImmutableList.of(task));
localStoreMonitor.monitor();
assertTrue("Task store directory should not exist.", !taskStoreDir.exists());
assertEquals(taskStoreSize, localStoreMonitorMetrics.diskSpaceFreedInBytes.getCount());
assertEquals(1, localStoreMonitorMetrics.noOfDeletedTaskPartitionStores.getCount());
}
@Test
public void shouldContinueLocalStoreCleanUpAfterFailureToCleanUpStoreOfAJob() throws Exception {
File testFailingJobDir = new File(localStoreDir, "test-jobName-jobId-1");
File testFailingTaskStoreDir = new File(new File(testFailingJobDir, "test-store"), "test-task");
FileUtils.forceMkdir(testFailingTaskStoreDir);
// For job: test-jobName-jobId-1, throw up in getTasks call and
// expect the cleanup to succeed for other job: test-jobName-jobId.
Mockito.doThrow(new RuntimeException("Dummy exception message."))
.when(jobsClientMock)
.getTasks(new JobInstance("test-jobName", "jobId-1"));
Task task = new Task("notLocalHost", "test-task", "0", new ArrayList<>(), ImmutableList.of("test-store"));
Mockito.when(jobsClientMock.getTasks(new JobInstance("test-jobName", "jobId"))).thenReturn(ImmutableList.of(task));
Map<String, String> configMap = new HashMap<>(config);
configMap.put(LocalStoreMonitorConfig.CONFIG_IGNORE_FAILURES, "true");
LocalStoreMonitor localStoreMonitor =
new LocalStoreMonitor(new LocalStoreMonitorConfig(new MapConfig(configMap)), localStoreMonitorMetrics,
jobsClientMock);
localStoreMonitor.monitor();
// Non failing job directory should be cleaned up.
assertTrue("Task store directory should not exist.", !taskStoreDir.exists());
FileUtils.deleteDirectory(testFailingJobDir);
}
private static File createOffsetFile(File taskStoreDir) throws Exception {
File offsetFile = new File(taskStoreDir, StorageManagerUtil.OFFSET_FILE_NAME_NEW);
offsetFile.createNewFile();
return offsetFile;
}
}