blob: 297a4777f7a2a75538dd6b2bfe3f199d71435dea [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.recon.tasks;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.hdds.utils.db.TypedTable;
import org.apache.hadoop.ozone.recon.persistence.AbstractReconSqlDBTest;
import org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMUpdateEventBuilder;
import org.hadoop.ozone.recon.schema.UtilizationSchemaDefinition;
import org.hadoop.ozone.recon.schema.tables.daos.FileCountBySizeDao;
import org.hadoop.ozone.recon.schema.tables.pojos.FileCountBySize;
import org.jooq.DSLContext;
import org.jooq.Record3;
import org.junit.Before;
import org.junit.Test;
import org.mockito.AdditionalAnswers;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.DELETE;
import static org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.PUT;
import static org.apache.hadoop.ozone.recon.tasks.OMDBUpdateEvent.OMDBUpdateAction.UPDATE;
import static org.hadoop.ozone.recon.schema.tables.FileCountBySizeTable.FILE_COUNT_BY_SIZE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Unit test for File Size Count Task.
*/
public class TestFileSizeCountTask extends AbstractReconSqlDBTest {
private FileCountBySizeDao fileCountBySizeDao;
private FileSizeCountTask fileSizeCountTask;
private DSLContext dslContext;
@Before
public void setUp() {
fileCountBySizeDao = getDao(FileCountBySizeDao.class);
UtilizationSchemaDefinition utilizationSchemaDefinition =
getSchemaDefinition(UtilizationSchemaDefinition.class);
fileSizeCountTask =
new FileSizeCountTask(fileCountBySizeDao, utilizationSchemaDefinition);
dslContext = utilizationSchemaDefinition.getDSLContext();
// Truncate table before running each test
dslContext.truncate(FILE_COUNT_BY_SIZE);
}
@Test
public void testReprocess() throws IOException {
OmKeyInfo omKeyInfo1 = mock(OmKeyInfo.class);
given(omKeyInfo1.getKeyName()).willReturn("key1");
given(omKeyInfo1.getVolumeName()).willReturn("vol1");
given(omKeyInfo1.getBucketName()).willReturn("bucket1");
given(omKeyInfo1.getDataSize()).willReturn(1000L);
OmKeyInfo omKeyInfo2 = mock(OmKeyInfo.class);
given(omKeyInfo2.getKeyName()).willReturn("key2");
given(omKeyInfo2.getVolumeName()).willReturn("vol1");
given(omKeyInfo2.getBucketName()).willReturn("bucket1");
given(omKeyInfo2.getDataSize()).willReturn(100000L);
OmKeyInfo omKeyInfo3 = mock(OmKeyInfo.class);
given(omKeyInfo3.getKeyName()).willReturn("key3");
given(omKeyInfo3.getVolumeName()).willReturn("vol1");
given(omKeyInfo3.getBucketName()).willReturn("bucket1");
given(omKeyInfo3.getDataSize()).willReturn(1125899906842624L * 4); // 4PB
OMMetadataManager omMetadataManager = mock(OmMetadataManagerImpl.class);
TypedTable<String, OmKeyInfo> keyTable = mock(TypedTable.class);
TypedTable.TypedTableIterator mockKeyIter = mock(TypedTable
.TypedTableIterator.class);
TypedTable.TypedKeyValue mockKeyValue = mock(
TypedTable.TypedKeyValue.class);
when(keyTable.iterator()).thenReturn(mockKeyIter);
when(omMetadataManager.getKeyTable()).thenReturn(keyTable);
when(mockKeyIter.hasNext())
.thenReturn(true)
.thenReturn(true)
.thenReturn(true)
.thenReturn(false);
when(mockKeyIter.next()).thenReturn(mockKeyValue);
when(mockKeyValue.getValue())
.thenReturn(omKeyInfo1)
.thenReturn(omKeyInfo2)
.thenReturn(omKeyInfo3);
// Reprocess could be called from table having existing entries. Adding
// an entry to simulate that.
fileCountBySizeDao.insert(
new FileCountBySize("vol1", "bucket1", 1024L, 10L));
Pair<String, Boolean> result =
fileSizeCountTask.reprocess(omMetadataManager);
assertTrue(result.getRight());
assertEquals(3, fileCountBySizeDao.count());
Record3<String, String, Long> recordToFind = dslContext
.newRecord(FILE_COUNT_BY_SIZE.VOLUME,
FILE_COUNT_BY_SIZE.BUCKET,
FILE_COUNT_BY_SIZE.FILE_SIZE)
.value1("vol1")
.value2("bucket1")
.value3(1024L);
assertEquals(1L,
fileCountBySizeDao.findById(recordToFind).getCount().longValue());
// file size upper bound for 100000L is 131072L (next highest power of 2)
recordToFind.value3(131072L);
assertEquals(1L,
fileCountBySizeDao.findById(recordToFind).getCount().longValue());
// file size upper bound for 4PB is Long.MAX_VALUE
recordToFind.value3(Long.MAX_VALUE);
assertEquals(1L,
fileCountBySizeDao.findById(recordToFind).getCount().longValue());
}
@Test
public void testProcess() {
// Write 2 keys.
OmKeyInfo toBeDeletedKey = mock(OmKeyInfo.class);
given(toBeDeletedKey.getVolumeName()).willReturn("vol1");
given(toBeDeletedKey.getBucketName()).willReturn("bucket1");
given(toBeDeletedKey.getKeyName()).willReturn("deletedKey");
given(toBeDeletedKey.getDataSize()).willReturn(2000L); // Bin 1
OMDBUpdateEvent event = new OMUpdateEventBuilder()
.setAction(PUT)
.setKey("deletedKey")
.setValue(toBeDeletedKey)
.setTable(OmMetadataManagerImpl.KEY_TABLE)
.build();
OmKeyInfo toBeUpdatedKey = mock(OmKeyInfo.class);
given(toBeUpdatedKey.getVolumeName()).willReturn("vol1");
given(toBeUpdatedKey.getBucketName()).willReturn("bucket1");
given(toBeUpdatedKey.getKeyName()).willReturn("updatedKey");
given(toBeUpdatedKey.getDataSize()).willReturn(10000L); // Bin 4
OMDBUpdateEvent event2 = new OMUpdateEventBuilder()
.setAction(PUT)
.setKey("updatedKey")
.setValue(toBeUpdatedKey)
.setTable(OmMetadataManagerImpl.KEY_TABLE)
.build();
OMUpdateEventBatch omUpdateEventBatch =
new OMUpdateEventBatch(Arrays.asList(event, event2));
fileSizeCountTask.process(omUpdateEventBatch);
// Verify 2 keys are in correct bins.
assertEquals(2, fileCountBySizeDao.count());
Record3<String, String, Long> recordToFind = dslContext
.newRecord(FILE_COUNT_BY_SIZE.VOLUME,
FILE_COUNT_BY_SIZE.BUCKET,
FILE_COUNT_BY_SIZE.FILE_SIZE)
.value1("vol1")
.value2("bucket1")
.value3(2048L);
assertEquals(1L,
fileCountBySizeDao.findById(recordToFind).getCount().longValue());
// file size upper bound for 10000L is 16384L (next highest power of 2)
recordToFind.value3(16384L);
assertEquals(1L,
fileCountBySizeDao.findById(recordToFind).getCount().longValue());
// Add new key.
OmKeyInfo newKey = mock(OmKeyInfo.class);
given(newKey.getVolumeName()).willReturn("vol1");
given(newKey.getBucketName()).willReturn("bucket1");
given(newKey.getKeyName()).willReturn("newKey");
given(newKey.getDataSize()).willReturn(1000L); // Bin 0
OMDBUpdateEvent putEvent = new OMUpdateEventBuilder()
.setAction(PUT)
.setKey("newKey")
.setValue(newKey)
.setTable(OmMetadataManagerImpl.KEY_TABLE)
.build();
// Update existing key.
OmKeyInfo updatedKey = mock(OmKeyInfo.class);
given(updatedKey.getVolumeName()).willReturn("vol1");
given(updatedKey.getBucketName()).willReturn("bucket1");
given(updatedKey.getKeyName()).willReturn("updatedKey");
given(updatedKey.getDataSize()).willReturn(50000L); // Bin 6
OMDBUpdateEvent updateEvent = new OMUpdateEventBuilder()
.setAction(UPDATE)
.setKey("updatedKey")
.setValue(updatedKey)
.setOldValue(toBeUpdatedKey)
.setTable(OmMetadataManagerImpl.KEY_TABLE)
.build();
// Delete another existing key.
OMDBUpdateEvent deleteEvent = new OMUpdateEventBuilder()
.setAction(DELETE)
.setKey("deletedKey")
.setValue(toBeDeletedKey)
.setTable(OmMetadataManagerImpl.KEY_TABLE)
.build();
omUpdateEventBatch = new OMUpdateEventBatch(
Arrays.asList(updateEvent, putEvent, deleteEvent));
fileSizeCountTask.process(omUpdateEventBatch);
assertEquals(4, fileCountBySizeDao.count());
recordToFind.value3(1024L);
assertEquals(1, fileCountBySizeDao.findById(recordToFind)
.getCount().longValue());
recordToFind.value3(2048L);
assertEquals(0, fileCountBySizeDao.findById(recordToFind)
.getCount().longValue());
recordToFind.value3(16384L);
assertEquals(0, fileCountBySizeDao.findById(recordToFind)
.getCount().longValue());
recordToFind.value3(65536L);
assertEquals(1, fileCountBySizeDao.findById(recordToFind)
.getCount().longValue());
}
@Test
public void testReprocessAtScale() throws IOException {
// generate mocks for 2 volumes, 500 buckets each volume
// and 42 keys in each bucket.
List<OmKeyInfo> omKeyInfoList = new ArrayList<>();
List<Boolean> hasNextAnswer = new ArrayList<>();
for (int volIndex = 1; volIndex <= 2; volIndex++) {
for (int bktIndex = 1; bktIndex <= 500; bktIndex++) {
for (int keyIndex = 1; keyIndex <= 42; keyIndex++) {
OmKeyInfo omKeyInfo = mock(OmKeyInfo.class);
given(omKeyInfo.getKeyName()).willReturn("key" + keyIndex);
given(omKeyInfo.getVolumeName()).willReturn("vol" + volIndex);
given(omKeyInfo.getBucketName()).willReturn("bucket" + bktIndex);
// Place keys in each bin
long fileSize = (long)Math.pow(2, keyIndex + 9) - 1L;
given(omKeyInfo.getDataSize()).willReturn(fileSize);
omKeyInfoList.add(omKeyInfo);
hasNextAnswer.add(true);
}
}
}
hasNextAnswer.add(false);
OMMetadataManager omMetadataManager = mock(OmMetadataManagerImpl.class);
TypedTable<String, OmKeyInfo> keyTable = mock(TypedTable.class);
TypedTable.TypedTableIterator mockKeyIter = mock(TypedTable
.TypedTableIterator.class);
TypedTable.TypedKeyValue mockKeyValue = mock(
TypedTable.TypedKeyValue.class);
when(keyTable.iterator()).thenReturn(mockKeyIter);
when(omMetadataManager.getKeyTable()).thenReturn(keyTable);
when(mockKeyIter.hasNext())
.thenAnswer(AdditionalAnswers.returnsElementsOf(hasNextAnswer));
when(mockKeyIter.next()).thenReturn(mockKeyValue);
when(mockKeyValue.getValue())
.thenAnswer(AdditionalAnswers.returnsElementsOf(omKeyInfoList));
Pair<String, Boolean> result =
fileSizeCountTask.reprocess(omMetadataManager);
assertTrue(result.getRight());
// 2 volumes * 500 buckets * 42 bins = 42000 rows
assertEquals(42000, fileCountBySizeDao.count());
Record3<String, String, Long> recordToFind = dslContext
.newRecord(FILE_COUNT_BY_SIZE.VOLUME,
FILE_COUNT_BY_SIZE.BUCKET,
FILE_COUNT_BY_SIZE.FILE_SIZE)
.value1("vol1")
.value2("bucket1")
.value3(1024L);
assertEquals(1L,
fileCountBySizeDao.findById(recordToFind).getCount().longValue());
// file size upper bound for 100000L is 131072L (next highest power of 2)
recordToFind.value1("vol1");
recordToFind.value3(131072L);
assertEquals(1L,
fileCountBySizeDao.findById(recordToFind).getCount().longValue());
recordToFind.value2("bucket500");
recordToFind.value3(Long.MAX_VALUE);
assertEquals(1L,
fileCountBySizeDao.findById(recordToFind).getCount().longValue());
}
@Test
public void testProcessAtScale() {
// Write 10000 keys.
List<OMDBUpdateEvent> omDbEventList = new ArrayList<>();
List<OmKeyInfo> omKeyInfoList = new ArrayList<>();
for (int volIndex = 1; volIndex <= 10; volIndex++) {
for (int bktIndex = 1; bktIndex <= 100; bktIndex++) {
for (int keyIndex = 1; keyIndex <= 10; keyIndex++) {
OmKeyInfo omKeyInfo = mock(OmKeyInfo.class);
given(omKeyInfo.getKeyName()).willReturn("key" + keyIndex);
given(omKeyInfo.getVolumeName()).willReturn("vol" + volIndex);
given(omKeyInfo.getBucketName()).willReturn("bucket" + bktIndex);
// Place keys in each bin
long fileSize = (long)Math.pow(2, keyIndex + 9) - 1L;
given(omKeyInfo.getDataSize()).willReturn(fileSize);
omKeyInfoList.add(omKeyInfo);
omDbEventList.add(new OMUpdateEventBuilder()
.setAction(PUT)
.setKey("key" + keyIndex)
.setValue(omKeyInfo)
.setTable(OmMetadataManagerImpl.KEY_TABLE)
.build());
}
}
}
OMUpdateEventBatch omUpdateEventBatch =
new OMUpdateEventBatch(omDbEventList);
fileSizeCountTask.process(omUpdateEventBatch);
// Verify 2 keys are in correct bins.
assertEquals(10000, fileCountBySizeDao.count());
Record3<String, String, Long> recordToFind = dslContext
.newRecord(FILE_COUNT_BY_SIZE.VOLUME,
FILE_COUNT_BY_SIZE.BUCKET,
FILE_COUNT_BY_SIZE.FILE_SIZE)
.value1("vol1")
.value2("bucket1")
.value3(2048L);
assertEquals(1L,
fileCountBySizeDao.findById(recordToFind).getCount().longValue());
recordToFind.value1("vol10");
recordToFind.value2("bucket100");
// file size upper bound for 10000L is 16384L (next highest power of 2)
recordToFind.value3(16384L);
assertEquals(1L,
fileCountBySizeDao.findById(recordToFind).getCount().longValue());
// Process 500 deletes and 500 updates
omDbEventList = new ArrayList<>();
for (int volIndex = 1; volIndex <= 1; volIndex++) {
for (int bktIndex = 1; bktIndex <= 100; bktIndex++) {
for (int keyIndex = 1; keyIndex <= 10; keyIndex++) {
OmKeyInfo omKeyInfo = mock(OmKeyInfo.class);
given(omKeyInfo.getKeyName()).willReturn("key" + keyIndex);
given(omKeyInfo.getVolumeName()).willReturn("vol" + volIndex);
given(omKeyInfo.getBucketName()).willReturn("bucket" + bktIndex);
if (keyIndex <= 5) {
long fileSize = (long)Math.pow(2, keyIndex + 9) - 1L;
given(omKeyInfo.getDataSize()).willReturn(fileSize);
omDbEventList.add(new OMUpdateEventBuilder()
.setAction(DELETE)
.setKey("key" + keyIndex)
.setValue(omKeyInfo)
.setTable(OmMetadataManagerImpl.KEY_TABLE)
.build());
} else {
// update all the files with keyIndex > 5 to filesize 1023L
// so that they get into first bin
given(omKeyInfo.getDataSize()).willReturn(1023L);
omDbEventList.add(new OMUpdateEventBuilder()
.setAction(UPDATE)
.setKey("key" + keyIndex)
.setValue(omKeyInfo)
.setTable(OmMetadataManagerImpl.KEY_TABLE)
.setOldValue(
omKeyInfoList.get((volIndex * bktIndex) + keyIndex))
.build());
}
}
}
}
omUpdateEventBatch = new OMUpdateEventBatch(omDbEventList);
fileSizeCountTask.process(omUpdateEventBatch);
assertEquals(10000, fileCountBySizeDao.count());
recordToFind = dslContext
.newRecord(FILE_COUNT_BY_SIZE.VOLUME,
FILE_COUNT_BY_SIZE.BUCKET,
FILE_COUNT_BY_SIZE.FILE_SIZE)
.value1("vol1")
.value2("bucket1")
.value3(1024L);
// The update events on keys 6-10 should now put them under first bin 1024L
assertEquals(5, fileCountBySizeDao.findById(recordToFind)
.getCount().longValue());
recordToFind.value2("bucket100");
assertEquals(5, fileCountBySizeDao.findById(recordToFind)
.getCount().longValue());
recordToFind.value3(2048L);
assertEquals(0, fileCountBySizeDao.findById(recordToFind)
.getCount().longValue());
// Volumes 2 - 10 should not be affected by this process
recordToFind.value1("vol2");
assertEquals(1, fileCountBySizeDao.findById(recordToFind)
.getCount().longValue());
}
}