| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.ozone.recon.tasks; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| import static org.mockito.ArgumentMatchers.any; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.when; |
| |
| import java.util.HashSet; |
| |
| import org.apache.commons.lang3.tuple.ImmutablePair; |
| import org.apache.hadoop.hdds.conf.OzoneConfiguration; |
| import org.apache.hadoop.ozone.om.OMMetadataManager; |
| import org.apache.hadoop.ozone.recon.persistence.AbstractReconSqlDBTest; |
| import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; |
| import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao; |
| import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| /** |
| * Class used to test ReconTaskControllerImpl. |
| */ |
| public class TestReconTaskControllerImpl extends AbstractReconSqlDBTest { |
| |
| private ReconTaskController reconTaskController; |
| private ReconTaskStatusDao reconTaskStatusDao; |
| |
| @Before |
| public void setUp() { |
| OzoneConfiguration ozoneConfiguration = new OzoneConfiguration(); |
| reconTaskStatusDao = getDao(ReconTaskStatusDao.class); |
| reconTaskController = new ReconTaskControllerImpl(ozoneConfiguration, |
| reconTaskStatusDao, new HashSet<>()); |
| reconTaskController.start(); |
| } |
| |
| @Test |
| public void testRegisterTask() { |
| String taskName = "Dummy_" + System.currentTimeMillis(); |
| DummyReconDBTask dummyReconDBTask = |
| new DummyReconDBTask(taskName, DummyReconDBTask.TaskType.ALWAYS_PASS); |
| reconTaskController.registerTask(dummyReconDBTask); |
| assertTrue(reconTaskController.getRegisteredTasks().size() == 1); |
| assertTrue(reconTaskController.getRegisteredTasks() |
| .get(dummyReconDBTask.getTaskName()) == dummyReconDBTask); |
| } |
| |
| @Test |
| public void testConsumeOMEvents() throws Exception { |
| ReconOmTask reconOmTaskMock = getMockTask("MockTask"); |
| when(reconOmTaskMock.process(any(OMUpdateEventBatch.class))) |
| .thenReturn(new ImmutablePair<>("MockTask", true)); |
| reconTaskController.registerTask(reconOmTaskMock); |
| OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class); |
| when(omUpdateEventBatchMock.getLastSequenceNumber()).thenReturn(100L); |
| when(omUpdateEventBatchMock.isEmpty()).thenReturn(false); |
| |
| long startTime = System.currentTimeMillis(); |
| reconTaskController.consumeOMEvents( |
| omUpdateEventBatchMock, |
| mock(OMMetadataManager.class)); |
| |
| verify(reconOmTaskMock, times(1)) |
| .process(any()); |
| long endTime = System.currentTimeMillis(); |
| |
| reconTaskStatusDao = getDao(ReconTaskStatusDao.class); |
| ReconTaskStatus reconTaskStatus = reconTaskStatusDao.findById("MockTask"); |
| long taskTimeStamp = reconTaskStatus.getLastUpdatedTimestamp(); |
| long seqNumber = reconTaskStatus.getLastUpdatedSeqNumber(); |
| |
| Assert.assertTrue(startTime <= taskTimeStamp |
| && taskTimeStamp <= endTime); |
| Assert.assertEquals(seqNumber, |
| omUpdateEventBatchMock.getLastSequenceNumber()); |
| } |
| |
| @Test |
| public void testFailedTaskRetryLogic() throws Exception { |
| String taskName = "Dummy_" + System.currentTimeMillis(); |
| |
| DummyReconDBTask dummyReconDBTask = |
| new DummyReconDBTask(taskName, DummyReconDBTask.TaskType.FAIL_ONCE); |
| reconTaskController.registerTask(dummyReconDBTask); |
| |
| long currentTime = System.currentTimeMillis(); |
| OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class); |
| when(omUpdateEventBatchMock.isEmpty()).thenReturn(false); |
| when(omUpdateEventBatchMock.getLastSequenceNumber()).thenReturn(100L); |
| |
| reconTaskController.consumeOMEvents(omUpdateEventBatchMock, |
| mock(OMMetadataManager.class)); |
| assertFalse(reconTaskController.getRegisteredTasks().isEmpty()); |
| assertEquals(dummyReconDBTask, reconTaskController.getRegisteredTasks() |
| .get(dummyReconDBTask.getTaskName())); |
| |
| reconTaskStatusDao = getDao(ReconTaskStatusDao.class); |
| ReconTaskStatus dbRecord = reconTaskStatusDao.findById(taskName); |
| |
| Assert.assertEquals(taskName, dbRecord.getTaskName()); |
| Assert.assertTrue( |
| dbRecord.getLastUpdatedTimestamp() > currentTime); |
| |
| Assert.assertEquals(Long.valueOf(100L), dbRecord.getLastUpdatedSeqNumber()); |
| } |
| |
| @Test |
| public void testBadBehavedTaskIsIgnored() throws Exception { |
| String taskName = "Dummy_" + System.currentTimeMillis(); |
| DummyReconDBTask dummyReconDBTask = |
| new DummyReconDBTask(taskName, DummyReconDBTask.TaskType.ALWAYS_FAIL); |
| reconTaskController.registerTask(dummyReconDBTask); |
| |
| OMUpdateEventBatch omUpdateEventBatchMock = mock(OMUpdateEventBatch.class); |
| when(omUpdateEventBatchMock.isEmpty()).thenReturn(false); |
| when(omUpdateEventBatchMock.getLastSequenceNumber()).thenReturn(100L); |
| |
| OMMetadataManager omMetadataManagerMock = mock(OMMetadataManager.class); |
| for (int i = 0; i < 2; i++) { |
| reconTaskController.consumeOMEvents(omUpdateEventBatchMock, |
| omMetadataManagerMock); |
| |
| assertFalse(reconTaskController.getRegisteredTasks().isEmpty()); |
| assertEquals(dummyReconDBTask, reconTaskController.getRegisteredTasks() |
| .get(dummyReconDBTask.getTaskName())); |
| } |
| |
| //Should be ignored now. |
| reconTaskController.consumeOMEvents(omUpdateEventBatchMock, |
| omMetadataManagerMock); |
| assertTrue(reconTaskController.getRegisteredTasks().isEmpty()); |
| |
| reconTaskStatusDao = getDao(ReconTaskStatusDao.class); |
| ReconTaskStatus dbRecord = reconTaskStatusDao.findById(taskName); |
| |
| Assert.assertEquals(taskName, dbRecord.getTaskName()); |
| Assert.assertEquals(Long.valueOf(0L), dbRecord.getLastUpdatedTimestamp()); |
| Assert.assertEquals(Long.valueOf(0L), dbRecord.getLastUpdatedSeqNumber()); |
| } |
| |
| |
| @Test |
| public void testReInitializeTasks() throws Exception { |
| |
| ReconOMMetadataManager omMetadataManagerMock = mock( |
| ReconOMMetadataManager.class); |
| ReconOmTask reconOmTaskMock = |
| getMockTask("MockTask2"); |
| when(reconOmTaskMock.reprocess(omMetadataManagerMock)) |
| .thenReturn(new ImmutablePair<>("MockTask2", true)); |
| when(omMetadataManagerMock.getLastSequenceNumberFromDB() |
| ).thenReturn(100L); |
| |
| long startTime = System.currentTimeMillis(); |
| reconTaskController.registerTask(reconOmTaskMock); |
| reconTaskController.reInitializeTasks(omMetadataManagerMock); |
| long endTime = System.currentTimeMillis(); |
| |
| verify(reconOmTaskMock, times(1)) |
| .reprocess(omMetadataManagerMock); |
| |
| verify(omMetadataManagerMock, times(1) |
| ).getLastSequenceNumberFromDB(); |
| |
| ReconTaskStatus reconTaskStatus = reconTaskStatusDao.findById("MockTask2"); |
| long taskTimeStamp = reconTaskStatus.getLastUpdatedTimestamp(); |
| long seqNumber = reconTaskStatus.getLastUpdatedSeqNumber(); |
| |
| Assert.assertTrue(startTime <= taskTimeStamp |
| && taskTimeStamp <= endTime); |
| Assert.assertEquals(seqNumber, |
| omMetadataManagerMock.getLastSequenceNumberFromDB()); |
| } |
| |
| /** |
| * Helper method for getting a mocked Task. |
| * @param taskName name of the task. |
| * @return instance of reconOmTask. |
| */ |
| private ReconOmTask getMockTask(String taskName) { |
| ReconOmTask reconOmTaskMock = mock(ReconOmTask.class); |
| when(reconOmTaskMock.getTaskName()).thenReturn(taskName); |
| return reconOmTaskMock; |
| } |
| } |