blob: a89a4aea35b13f778f875f3bb270a763cbe25b29 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.recon.tasks;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.HashSet;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.recon.persistence.AbstractReconSqlDBTest;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Class used to test ReconTaskControllerImpl.
*/
public class TestReconTaskControllerImpl extends AbstractReconSqlDBTest {
private ReconTaskController reconTaskController;
private ReconTaskStatusDao reconTaskStatusDao;
@Before
public void setUp() {
OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
reconTaskStatusDao = getDao(ReconTaskStatusDao.class);
reconTaskController = new ReconTaskControllerImpl(ozoneConfiguration,
reconTaskStatusDao, new HashSet<>());
reconTaskController.start();
}
@Test
public void testRegisterTask() {
String taskName = "Dummy_" + System.currentTimeMillis();
DummyReconDBTask dummyReconDBTask =
new DummyReconDBTask(taskName, DummyReconDBTask.TaskType.ALWAYS_PASS);
reconTaskController.registerTask(dummyReconDBTask);
assertTrue(reconTaskController.getRegisteredTasks().size() == 1);
assertTrue(reconTaskController.getRegisteredTasks()
.get(dummyReconDBTask.getTaskName()) == dummyReconDBTask);
}
@Test
public void testConsumeOMEvents() throws Exception {
ReconOmTask reconOmTaskMock = getMockTask("MockTask");
when(reconOmTaskMock.process(any(OMUpdateEventBatch.class)))
.thenReturn(new ImmutablePair<>("MockTask", true));
reconTaskController.registerTask(reconOmTaskMock);
OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class);
when(omUpdateEventBatchMock.getLastSequenceNumber()).thenReturn(100L);
when(omUpdateEventBatchMock.isEmpty()).thenReturn(false);
long startTime = System.currentTimeMillis();
reconTaskController.consumeOMEvents(
omUpdateEventBatchMock,
mock(OMMetadataManager.class));
verify(reconOmTaskMock, times(1))
.process(any());
long endTime = System.currentTimeMillis();
reconTaskStatusDao = getDao(ReconTaskStatusDao.class);
ReconTaskStatus reconTaskStatus = reconTaskStatusDao.findById("MockTask");
long taskTimeStamp = reconTaskStatus.getLastUpdatedTimestamp();
long seqNumber = reconTaskStatus.getLastUpdatedSeqNumber();
Assert.assertTrue(startTime <= taskTimeStamp
&& taskTimeStamp <= endTime);
Assert.assertEquals(seqNumber,
omUpdateEventBatchMock.getLastSequenceNumber());
}
@Test
public void testFailedTaskRetryLogic() throws Exception {
String taskName = "Dummy_" + System.currentTimeMillis();
DummyReconDBTask dummyReconDBTask =
new DummyReconDBTask(taskName, DummyReconDBTask.TaskType.FAIL_ONCE);
reconTaskController.registerTask(dummyReconDBTask);
long currentTime = System.currentTimeMillis();
OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class);
when(omUpdateEventBatchMock.isEmpty()).thenReturn(false);
when(omUpdateEventBatchMock.getLastSequenceNumber()).thenReturn(100L);
reconTaskController.consumeOMEvents(omUpdateEventBatchMock,
mock(OMMetadataManager.class));
assertFalse(reconTaskController.getRegisteredTasks().isEmpty());
assertEquals(dummyReconDBTask, reconTaskController.getRegisteredTasks()
.get(dummyReconDBTask.getTaskName()));
reconTaskStatusDao = getDao(ReconTaskStatusDao.class);
ReconTaskStatus dbRecord = reconTaskStatusDao.findById(taskName);
Assert.assertEquals(taskName, dbRecord.getTaskName());
Assert.assertTrue(
dbRecord.getLastUpdatedTimestamp() > currentTime);
Assert.assertEquals(Long.valueOf(100L), dbRecord.getLastUpdatedSeqNumber());
}
@Test
public void testBadBehavedTaskIsIgnored() throws Exception {
String taskName = "Dummy_" + System.currentTimeMillis();
DummyReconDBTask dummyReconDBTask =
new DummyReconDBTask(taskName, DummyReconDBTask.TaskType.ALWAYS_FAIL);
reconTaskController.registerTask(dummyReconDBTask);
OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class);
when(omUpdateEventBatchMock.isEmpty()).thenReturn(false);
when(omUpdateEventBatchMock.getLastSequenceNumber()).thenReturn(100L);
OMMetadataManager omMetadataManagerMock = mock(OMMetadataManager.class);
for (int i = 0; i < 2; i++) {
reconTaskController.consumeOMEvents(omUpdateEventBatchMock,
omMetadataManagerMock);
assertFalse(reconTaskController.getRegisteredTasks().isEmpty());
assertEquals(dummyReconDBTask, reconTaskController.getRegisteredTasks()
.get(dummyReconDBTask.getTaskName()));
}
//Should be ignored now.
reconTaskController.consumeOMEvents(omUpdateEventBatchMock,
omMetadataManagerMock);
assertTrue(reconTaskController.getRegisteredTasks().isEmpty());
reconTaskStatusDao = getDao(ReconTaskStatusDao.class);
ReconTaskStatus dbRecord = reconTaskStatusDao.findById(taskName);
Assert.assertEquals(taskName, dbRecord.getTaskName());
Assert.assertEquals(Long.valueOf(0L), dbRecord.getLastUpdatedTimestamp());
Assert.assertEquals(Long.valueOf(0L), dbRecord.getLastUpdatedSeqNumber());
}
@Test
public void testReInitializeTasks() throws Exception {
ReconOMMetadataManager omMetadataManagerMock = mock(
ReconOMMetadataManager.class);
ReconOmTask reconOmTaskMock =
getMockTask("MockTask2");
when(reconOmTaskMock.reprocess(omMetadataManagerMock))
.thenReturn(new ImmutablePair<>("MockTask2", true));
when(omMetadataManagerMock.getLastSequenceNumberFromDB()
).thenReturn(100L);
long startTime = System.currentTimeMillis();
reconTaskController.registerTask(reconOmTaskMock);
reconTaskController.reInitializeTasks(omMetadataManagerMock);
long endTime = System.currentTimeMillis();
verify(reconOmTaskMock, times(1))
.reprocess(omMetadataManagerMock);
verify(omMetadataManagerMock, times(1)
).getLastSequenceNumberFromDB();
ReconTaskStatus reconTaskStatus = reconTaskStatusDao.findById("MockTask2");
long taskTimeStamp = reconTaskStatus.getLastUpdatedTimestamp();
long seqNumber = reconTaskStatus.getLastUpdatedSeqNumber();
Assert.assertTrue(startTime <= taskTimeStamp
&& taskTimeStamp <= endTime);
Assert.assertEquals(seqNumber,
omMetadataManagerMock.getLastSequenceNumberFromDB());
}
/**
* Helper method for getting a mocked Task.
* @param taskName name of the task.
* @return instance of reconOmTask.
*/
private ReconOmTask getMockTask(String taskName) {
ReconOmTask reconOmTaskMock = mock(ReconOmTask.class);
when(reconOmTaskMock.getTaskName()).thenReturn(taskName);
return reconOmTaskMock;
}
}