blob: 63b414071d77a1c3b5c57888b781d5b739b24328 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.recon.spi.impl;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR;
import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR;
import static org.apache.hadoop.ozone.recon.ReconUtils.createTarFile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Paths;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.recon.AbstractOMMetadataManagerTest;
import org.apache.hadoop.ozone.recon.ReconUtils;
import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager;
import org.apache.hadoop.ozone.recon.tasks.OMDBUpdatesHandler;
import org.apache.hadoop.ozone.recon.tasks.OMUpdateEventBatch;
import org.apache.hadoop.ozone.recon.tasks.ReconTaskController;
import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
import org.apache.hadoop.hdds.utils.db.DBUpdatesWrapper;
import org.apache.hadoop.hdds.utils.db.RDBStore;
import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
import org.hadoop.ozone.recon.schema.tables.pojos.ReconTaskStatus;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.rocksdb.RocksDB;
import org.rocksdb.TransactionLogIterator;
import org.rocksdb.WriteBatch;
/**
* Class to test Ozone Manager Service Provider Implementation.
*/
public class TestOzoneManagerServiceProviderImpl extends
AbstractOMMetadataManagerTest {
private OzoneConfiguration configuration;
private OzoneManagerProtocol ozoneManagerProtocol;
@Before
public void setUp() throws Exception {
configuration = new OzoneConfiguration();
configuration.set(OZONE_RECON_OM_SNAPSHOT_DB_DIR,
temporaryFolder.newFolder().getAbsolutePath());
configuration.set(OZONE_RECON_DB_DIR,
temporaryFolder.newFolder().getAbsolutePath());
configuration.set("ozone.om.address", "localhost:9862");
ozoneManagerProtocol = getMockOzoneManagerClient(new DBUpdatesWrapper());
}
@Test
public void testUpdateReconOmDBWithNewSnapshot() throws Exception {
OMMetadataManager omMetadataManager = initializeNewOmMetadataManager();
ReconOMMetadataManager reconOMMetadataManager =
getTestMetadataManager(omMetadataManager);
writeDataToOm(omMetadataManager, "key_one");
writeDataToOm(omMetadataManager, "key_two");
DBCheckpoint checkpoint = omMetadataManager.getStore()
.getCheckpoint(true);
File tarFile = createTarFile(checkpoint.getCheckpointLocation());
InputStream inputStream = new FileInputStream(tarFile);
ReconUtils reconUtilsMock = getMockReconUtils();
when(reconUtilsMock.makeHttpCall(any(), anyString()))
.thenReturn(inputStream);
ReconTaskController reconTaskController = getMockTaskController();
OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
new OzoneManagerServiceProviderImpl(configuration,
reconOMMetadataManager, reconTaskController, reconUtilsMock,
ozoneManagerProtocol);
Assert.assertNull(reconOMMetadataManager.getKeyTable()
.get("/sampleVol/bucketOne/key_one"));
Assert.assertNull(reconOMMetadataManager.getKeyTable()
.get("/sampleVol/bucketOne/key_two"));
assertTrue(ozoneManagerServiceProvider.updateReconOmDBWithNewSnapshot());
assertNotNull(reconOMMetadataManager.getKeyTable()
.get("/sampleVol/bucketOne/key_one"));
assertNotNull(reconOMMetadataManager.getKeyTable()
.get("/sampleVol/bucketOne/key_two"));
}
@Test
public void testGetOzoneManagerDBSnapshot() throws Exception {
File reconOmSnapshotDbDir = temporaryFolder.newFolder();
File checkpointDir = Paths.get(reconOmSnapshotDbDir.getAbsolutePath(),
"testGetOzoneManagerDBSnapshot").toFile();
checkpointDir.mkdir();
File file1 = Paths.get(checkpointDir.getAbsolutePath(), "file1")
.toFile();
String str = "File1 Contents";
BufferedWriter writer = new BufferedWriter(new FileWriter(
file1.getAbsolutePath()));
writer.write(str);
writer.close();
File file2 = Paths.get(checkpointDir.getAbsolutePath(), "file2")
.toFile();
str = "File2 Contents";
writer = new BufferedWriter(new FileWriter(file2.getAbsolutePath()));
writer.write(str);
writer.close();
//Create test tar file.
File tarFile = createTarFile(checkpointDir.toPath());
InputStream fileInputStream = new FileInputStream(tarFile);
ReconUtils reconUtilsMock = getMockReconUtils();
when(reconUtilsMock.makeHttpCall(any(), anyString()))
.thenReturn(fileInputStream);
ReconOMMetadataManager reconOMMetadataManager =
mock(ReconOMMetadataManager.class);
ReconTaskController reconTaskController = getMockTaskController();
OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
new OzoneManagerServiceProviderImpl(configuration,
reconOMMetadataManager, reconTaskController, reconUtilsMock,
ozoneManagerProtocol);
DBCheckpoint checkpoint = ozoneManagerServiceProvider
.getOzoneManagerDBSnapshot();
assertNotNull(checkpoint);
assertTrue(checkpoint.getCheckpointLocation().toFile().isDirectory());
assertTrue(checkpoint.getCheckpointLocation().toFile()
.listFiles().length == 2);
}
@Test
public void testGetAndApplyDeltaUpdatesFromOM() throws Exception {
// Writing 2 Keys into a source OM DB and collecting it in a
// DBUpdatesWrapper.
OMMetadataManager sourceOMMetadataMgr = initializeNewOmMetadataManager();
writeDataToOm(sourceOMMetadataMgr, "key_one");
writeDataToOm(sourceOMMetadataMgr, "key_two");
RocksDB rocksDB = ((RDBStore)sourceOMMetadataMgr.getStore()).getDb();
TransactionLogIterator transactionLogIterator = rocksDB.getUpdatesSince(0L);
DBUpdatesWrapper dbUpdatesWrapper = new DBUpdatesWrapper();
while(transactionLogIterator.isValid()) {
TransactionLogIterator.BatchResult result =
transactionLogIterator.getBatch();
result.writeBatch().markWalTerminationPoint();
WriteBatch writeBatch = result.writeBatch();
dbUpdatesWrapper.addWriteBatch(writeBatch.data(),
result.sequenceNumber());
transactionLogIterator.next();
}
// OM Service Provider's Metadata Manager.
OMMetadataManager omMetadataManager = initializeNewOmMetadataManager();
OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
new OzoneManagerServiceProviderImpl(configuration,
getTestMetadataManager(omMetadataManager),
getMockTaskController(), new ReconUtils(),
getMockOzoneManagerClient(dbUpdatesWrapper));
OMDBUpdatesHandler updatesHandler =
new OMDBUpdatesHandler(omMetadataManager);
ozoneManagerServiceProvider.getAndApplyDeltaUpdatesFromOM(
0L, updatesHandler);
// In this method, we have to assert the "GET" part and the "APPLY" path.
// Assert GET path --> verify if the OMDBUpdatesHandler picked up the 4
// events ( 1 Vol PUT + 1 Bucket PUT + 2 Key PUTs).
assertEquals(4, updatesHandler.getEvents().size());
// Assert APPLY path --> Verify if the OM service provider's RocksDB got
// the changes.
String fullKey = omMetadataManager.getOzoneKey("sampleVol",
"bucketOne", "key_one");
assertTrue(ozoneManagerServiceProvider.getOMMetadataManagerInstance()
.getKeyTable().isExist(fullKey));
fullKey = omMetadataManager.getOzoneKey("sampleVol",
"bucketOne", "key_two");
assertTrue(ozoneManagerServiceProvider.getOMMetadataManagerInstance()
.getKeyTable().isExist(fullKey));
}
@Test
public void testSyncDataFromOMFullSnapshot() throws Exception {
// Empty OM DB to start with.
ReconOMMetadataManager omMetadataManager = getTestMetadataManager(
initializeEmptyOmMetadataManager());
ReconTaskStatusDao reconTaskStatusDaoMock =
mock(ReconTaskStatusDao.class);
doNothing().when(reconTaskStatusDaoMock)
.update(any(ReconTaskStatus.class));
ReconTaskController reconTaskControllerMock = getMockTaskController();
when(reconTaskControllerMock.getReconTaskStatusDao())
.thenReturn(reconTaskStatusDaoMock);
doNothing().when(reconTaskControllerMock)
.reInitializeTasks(omMetadataManager);
OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
new MockOzoneServiceProvider(configuration, omMetadataManager,
reconTaskControllerMock, new ReconUtils(), ozoneManagerProtocol);
// Should trigger full snapshot request.
ozoneManagerServiceProvider.syncDataFromOM();
ArgumentCaptor<ReconTaskStatus> captor =
ArgumentCaptor.forClass(ReconTaskStatus.class);
verify(reconTaskStatusDaoMock, times(1))
.update(captor.capture());
assertTrue(captor.getValue().getTaskName().equals("OM_DB_FULL_SNAPSHOT"));
verify(reconTaskControllerMock, times(1))
.reInitializeTasks(omMetadataManager);
}
@Test
public void testSyncDataFromOMDeltaUpdates() throws Exception {
// Non-Empty OM DB to start with.
ReconOMMetadataManager omMetadataManager = getTestMetadataManager(
initializeNewOmMetadataManager());
ReconTaskStatusDao reconTaskStatusDaoMock =
mock(ReconTaskStatusDao.class);
doNothing().when(reconTaskStatusDaoMock)
.update(any(ReconTaskStatus.class));
ReconTaskController reconTaskControllerMock = getMockTaskController();
when(reconTaskControllerMock.getReconTaskStatusDao())
.thenReturn(reconTaskStatusDaoMock);
doNothing().when(reconTaskControllerMock)
.consumeOMEvents(any(OMUpdateEventBatch.class),
any(OMMetadataManager.class));
OzoneManagerServiceProviderImpl ozoneManagerServiceProvider =
new OzoneManagerServiceProviderImpl(configuration, omMetadataManager,
reconTaskControllerMock, new ReconUtils(), ozoneManagerProtocol);
// Should trigger delta updates.
ozoneManagerServiceProvider.syncDataFromOM();
ArgumentCaptor<ReconTaskStatus> captor =
ArgumentCaptor.forClass(ReconTaskStatus.class);
verify(reconTaskStatusDaoMock, times(1))
.update(captor.capture());
assertTrue(captor.getValue().getTaskName().equals("OM_DB_DELTA_UPDATES"));
verify(reconTaskControllerMock, times(1))
.consumeOMEvents(any(OMUpdateEventBatch.class),
any(OMMetadataManager.class));
}
private ReconTaskController getMockTaskController() {
ReconTaskController reconTaskControllerMock =
mock(ReconTaskController.class);
return reconTaskControllerMock;
}
private ReconUtils getMockReconUtils() throws IOException {
ReconUtils reconUtilsMock = mock(ReconUtils.class);
when(reconUtilsMock.getReconDbDir(any(), anyString())).thenCallRealMethod();
doCallRealMethod().when(reconUtilsMock).untarCheckpointFile(any(), any());
return reconUtilsMock;
}
private OzoneManagerProtocol getMockOzoneManagerClient(
DBUpdatesWrapper dbUpdatesWrapper) throws IOException {
OzoneManagerProtocol ozoneManagerProtocolMock =
mock(OzoneManagerProtocol.class);
when(ozoneManagerProtocolMock.getDBUpdates(any(OzoneManagerProtocolProtos
.DBUpdatesRequest.class))).thenReturn(dbUpdatesWrapper);
return ozoneManagerProtocolMock;
}
}
/**
* Mock OzoneManagerServiceProviderImpl which overrides
* updateReconOmDBWithNewSnapshot.
*/
class MockOzoneServiceProvider extends OzoneManagerServiceProviderImpl {
MockOzoneServiceProvider(OzoneConfiguration configuration,
ReconOMMetadataManager omMetadataManager,
ReconTaskController reconTaskController,
ReconUtils reconUtils,
OzoneManagerProtocol ozoneManagerClient)
throws IOException {
super(configuration, omMetadataManager, reconTaskController, reconUtils,
ozoneManagerClient);
}
@Override
public boolean updateReconOmDBWithNewSnapshot() {
return true;
}
}